3 * most of the swift's state machine
5 * Created by Victor Grishchenko on 3/6/09.
6 * Copyright 2009 Delft University of Technology. All rights reserved.
10 #include <algorithm> // kill it
12 using namespace swift;
18 - randomized testing of advanced ops (new testcase)
21 void Channel::AddPeakHashes (Datagram& dgram) {
22 for(int i=0; i<file().peak_count(); i++) {
23 bin64_t peak = file().peak(i);
24 dgram.Push8(SWIFT_HASH);
25 dgram.Push32((uint32_t)peak);
26 dgram.PushHash(file().peak_hash(i));
27 dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
32 void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
33 bin64_t peak = file().peak_for(pos);
34 while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
35 ack_in_.get(pos.parent())==binmap_t::EMPTY ) {
36 bin64_t uncle = pos.sibling();
37 dgram.Push8(SWIFT_HASH);
38 dgram.Push32((uint32_t)uncle);
39 dgram.PushHash( file().hash(uncle) );
40 dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
46 bin64_t Channel::ImposeHint () {
47 uint64_t twist = peer_channel_id_; // got no hints, send something randomly
48 twist &= file().peak(0); // FIXME may make it semi-seq here
49 file().ack_out().twist(twist);
52 file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
53 while (my_pick.width()>max(1,(int)cwnd_))
54 my_pick = my_pick.left();
55 file().ack_out().twist(0);
57 return my_pick.twisted(twist);
61 bin64_t Channel::DequeueHint () {
62 if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
63 bin64_t my_pick = ImposeHint(); // FIXME move to the loop
64 if (my_pick!=bin64_t::NONE) {
65 hint_in_.push_back(my_pick);
66 dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
69 bin64_t send = bin64_t::NONE;
70 while (!hint_in_.empty() && send==bin64_t::NONE) {
71 bin64_t hint = hint_in_.front().bin;
72 tint time = hint_in_.front().time;
74 while (!hint.is_base()) { // FIXME optimize; possible attack
75 hint_in_.push_front(tintbin(time,hint.right()));
78 //if (time < NOW-TINT_SEC*3/2 )
80 if (ack_in_.get(hint)!=binmap_t::FILLED)
84 for(int i=0; i<hint_in_.size(); i++)
85 mass += hint_in_[i].bin.width();
86 dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(),mass);
91 void Channel::AddHandshake (Datagram& dgram) {
92 if (!peer_channel_id_) { // initiating
93 dgram.Push8(SWIFT_HASH);
94 dgram.Push32(bin64_t::ALL32);
95 dgram.PushHash(file().root_hash());
96 dprintf("%s #%u +hash ALL %s\n",
97 tintstr(),id_,file().root_hash().hex().c_str());
99 dgram.Push8(SWIFT_HANDSHAKE);
100 int encoded = EncodeID(id_);
101 dgram.Push32(encoded);
102 dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
108 void Channel::Send () {
109 Datagram dgram(socket_,peer());
110 dgram.Push32(peer_channel_id_);
111 bin64_t data = bin64_t::NONE;
112 if ( is_established() ) {
113 // FIXME: seeder check
116 if (!file().is_complete())
120 data = AddData(dgram);
126 dprintf("%s #%u sent %ib %s:%x\n",
127 tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
128 if (dgram.size()==4) {// only the channel id; bare keep-alive
131 if (dgram.Send()==-1)
132 print_error("can't send datagram");
133 last_send_time_ = NOW;
140 void Channel::AddHint (Datagram& dgram) {
142 tint plan_for = max(TINT_SEC,rtt_avg_*4);
144 tint timed_out = NOW - plan_for*2;
145 while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
146 hint_out_size_ -= hint_out_.front().bin.width();
147 hint_out_.pop_front();
150 int plan_pck = max ( (tint)1, plan_for / dip_avg_ );
152 if ( hint_out_size_ < plan_pck ) {
154 int diff = plan_pck - hint_out_size_; // TODO: aggregate
155 bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
157 if (hint!=bin64_t::NONE) {
158 dgram.Push8(SWIFT_HINT);
160 dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(),hint_out_size_);
161 hint_out_.push_back(hint);
162 hint_out_size_ += hint.width();
164 dprintf("%s #%u Xhint\n",tintstr(),id_);
170 bin64_t Channel::AddData (Datagram& dgram) {
172 if (!file().size()) // know nothing
173 return bin64_t::NONE;
175 bin64_t tosend = bin64_t::NONE;
176 tint luft = send_interval_>>4; // may wake up a bit earlier
177 if (data_out_.size()<cwnd_ &&
178 last_data_out_time_+send_interval_<=NOW+luft) {
179 tosend = DequeueHint();
180 if (tosend==bin64_t::NONE) {
181 dprintf("%s #%u sendctrl no idea what to send\n",tintstr(),id_);
182 if (send_control_!=KEEP_ALIVE_CONTROL)
183 SwitchSendControl(KEEP_ALIVE_CONTROL);
186 dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
187 tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
189 if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
190 return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
192 if (ack_in_.is_empty() && file().size())
193 AddPeakHashes(dgram);
194 AddUncleHashes(dgram,tosend);
195 if (!ack_in_.is_empty()) // TODO: cwnd_>1
196 data_out_cap_ = tosend;
198 if (dgram.size()>254) {
199 dgram.Send(); // kind of fragmentation
200 dgram.Push32(peer_channel_id_);
203 dgram.Push8(SWIFT_DATA);
204 dgram.Push32(tosend.to32());
207 size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
208 // TODO: corrupted data, retries, caching
210 print_error("error on reading");
211 return bin64_t::NONE;
213 assert(dgram.space()>=r+4+1);
216 last_data_out_time_ = NOW;
217 data_out_.push_back(tosend);
218 dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
224 void Channel::AddAck (Datagram& dgram) {
225 if (data_in_==tintbin())
227 dgram.Push8(SWIFT_ACK);
228 dgram.Push32(data_in_.bin.to32()); // FIXME not cover
229 dgram.Push64(data_in_.time); // FIXME 32
230 have_out_.set(data_in_.bin);
231 dprintf("%s #%u +ack %s %s\n",
232 tintstr(),id_,data_in_.bin.str(),tintstr(data_in_.time));
233 if (data_in_.bin.layer()>2)
234 data_in_dbl_ = data_in_.bin;
235 data_in_ = tintbin();
239 void Channel::AddHave (Datagram& dgram) {
240 if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
241 dgram.Push8(SWIFT_HAVE);
242 dgram.Push32(data_in_dbl_.to32());
243 data_in_dbl_=bin64_t::NONE;
245 for(int count=0; count<4; count++) {
246 bin64_t ack = file().ack_out().find_filtered // FIXME: do rotating queue
247 (have_out_, bin64_t::ALL, binmap_t::FILLED);
248 if (ack==bin64_t::NONE)
250 ack = file().ack_out().cover(ack);
252 dgram.Push8(SWIFT_HAVE);
253 dgram.Push32(ack.to32());
254 dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str());
259 void Channel::Recv (Datagram& dgram) {
260 dprintf("%s #%u recvd %ib\n",tintstr(),id_,dgram.size()+4);
262 if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
263 rtt_avg_ = NOW - last_send_time_;
266 dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
268 bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
269 while (dgram.size()) {
270 uint8_t type = dgram.Pull8();
272 case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
273 case SWIFT_DATA: data=OnData(dgram); break;
274 case SWIFT_HAVE: OnHave(dgram); break;
275 case SWIFT_ACK: OnAck(dgram); break;
276 case SWIFT_HASH: OnHash(dgram); break;
277 case SWIFT_HINT: OnHint(dgram); break;
278 case SWIFT_PEX_ADD: OnPex(dgram); break;
280 eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
284 last_recv_time_ = NOW;
285 sent_since_recv_ = 0;
290 void Channel::OnHash (Datagram& dgram) {
291 bin64_t pos = dgram.Pull32();
292 Sha1Hash hash = dgram.PullHash();
293 file().OfferHash(pos,hash);
294 dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
298 void Channel::CleanHintOut (bin64_t pos) {
300 while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
302 if (hi==hint_out_.size())
303 return; // something not hinted or hinted in far past
304 while (hi--) { // removing likely snubbed hints
305 hint_out_size_ -= hint_out_.front().bin.width();
306 hint_out_.pop_front();
308 while (hint_out_.front().bin!=pos) {
309 tintbin f = hint_out_.front();
310 f.bin = f.bin.towards(pos);
311 hint_out_.front().bin = f.bin.sibling();
312 hint_out_.push_front(f);
314 hint_out_.pop_front();
319 bin64_t Channel::OnData (Datagram& dgram) { // TODO: HAVE NONE for corrupted data
320 bin64_t pos = dgram.Pull32();
322 int length = dgram.Pull(&data,1024);
323 bool ok = (pos==bin64_t::NONE) ||
324 (!file().ack_out().get(pos) && file().OfferData(pos, (char*)data, length) );
325 dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
326 data_in_ = tintbin(NOW,bin64_t::NONE);
328 return bin64_t::NONE;
329 bin64_t cover = transfer().ack_out().cover(pos);
330 for(int i=0; i<transfer().cb_installed; i++)
331 if (cover.layer()>=transfer().cb_agg[i])
332 transfer().callbacks[i](transfer().fd(),cover); // FIXME
334 if (pos!=bin64_t::NONE) {
335 if (last_data_in_time_) {
336 tint dip = NOW - last_data_in_time_;
337 dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
339 last_data_in_time_ = NOW;
346 void Channel::OnAck (Datagram& dgram) {
347 bin64_t ackd_pos = dgram.Pull32();
348 tint peer_time = dgram.Pull64(); // FIXME 32
349 // FIXME FIXME: wrap around here
350 if (ackd_pos==bin64_t::NONE)
351 return; // likely, brocken packet / insufficient hashes
352 if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
353 eprintf("invalid ack: %s\n",ackd_pos.str());
356 ack_in_.set(ackd_pos);
358 // find an entry for the send (data out) event
359 while ( di<data_out_.size() && ( data_out_[di]==tintbin() ||
360 !data_out_[di].bin.within(ackd_pos) ) )
362 // FUTURE: delayed acks
363 // rule out retransmits
364 while ( ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
366 dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
367 di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time);
368 if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
369 // round trip time calculations
370 tint rtt = NOW-data_out_[di].time;
371 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
372 dev_avg_ = ( dev_avg_*3 + ::abs(rtt-rtt_avg_) ) >> 2;
373 assert(data_out_[di].time!=TINT_NEVER);
374 // one-way delay calculations
375 tint owd = peer_time - data_out_[di].time;
376 owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
377 owd_current_[owd_cur_bin_] = owd;
378 if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
379 owd_min_bin_start_ = NOW;
380 owd_min_bin_ = (owd_min_bin_+1) & 3;
381 owd_min_bins_[owd_min_bin_] = TINT_NEVER;
383 if (owd_min_bins_[owd_min_bin_]>owd)
384 owd_min_bins_[owd_min_bin_] = owd;
385 dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
386 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str());
388 // early loss detection by packet reordering
389 for (int re=0; re<di-MAX_REORDERING; re++) {
390 if (data_out_[re]==tintbin())
392 ack_not_rcvd_recent_++;
393 data_out_tmo_.push_back(data_out_[re].bin);
394 dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
395 data_out_cap_ = bin64_t::ALL;
396 data_out_[re] = tintbin();
399 if (di!=data_out_.size())
400 data_out_[di]=tintbin();
401 // clear zeroed items
402 while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
403 ack_in_.is_filled(data_out_.front().bin) ) )
404 data_out_.pop_front();
405 assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
409 void Channel::TimeoutDataOut ( ) {
410 // losses: timeouted packets
411 tint timeout = NOW - ack_timeout();
412 while (!data_out_.empty() &&
413 ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
414 if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
415 ack_not_rcvd_recent_++;
416 data_out_cap_ = bin64_t::ALL;
417 data_out_tmo_.push_back(data_out_.front().bin);
418 dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
420 data_out_.pop_front();
422 // clear retransmit queue of older items
423 while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
424 data_out_tmo_.pop_front();
428 void Channel::OnHave (Datagram& dgram) {
429 bin64_t ackd_pos = dgram.Pull32();
430 if (ackd_pos==bin64_t::NONE)
431 return; // wow, peer has hashes
432 ack_in_.set(ackd_pos);
433 dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str());
437 void Channel::OnHint (Datagram& dgram) {
438 bin64_t hint = dgram.Pull32();
439 // FIXME: wake up here
440 hint_in_.push_back(hint);
441 dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
445 void Channel::OnHandshake (Datagram& dgram) {
446 peer_channel_id_ = dgram.Pull32();
447 dprintf("%s #%u -hs %x\n",tintstr(),id_,peer_channel_id_);
448 // self-connection check
450 uint32_t try_id = DecodeID(peer_channel_id_);
451 if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
452 peer_channel_id_ = 0;
454 return; // this is a self-connection
457 // FUTURE: channel forking
461 void Channel::OnPex (Datagram& dgram) {
462 uint32_t ipv4 = dgram.Pull32();
463 uint16_t port = dgram.Pull16();
464 Address addr(ipv4,port);
465 dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
466 transfer().OnPexIn(addr);
470 void Channel::AddPex (Datagram& dgram) {
471 int chid = transfer().RevealChannel(pex_out_);
472 if (chid==-1 || chid==id_)
474 Address a = channels[chid]->peer();
475 dgram.Push8(SWIFT_PEX_ADD);
476 dgram.Push32(a.ipv4());
477 dgram.Push16(a.port());
478 dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
482 void Channel::RecvDatagram (SOCKET socket) {
483 Datagram data(socket);
485 const Address& addr = data.address();
486 #define return_log(...) { fprintf(stderr,__VA_ARGS__); return; }
488 return_log("datagram shorter than 4 bytes %s\n",addr.str());
489 uint32_t mych = data.Pull32();
491 Channel* channel = NULL;
492 if (mych==0) { // handshake initiated
493 if (data.size()<1+4+1+4+Sha1Hash::SIZE)
494 return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
495 tintstr(),data.size(),addr.str());
496 uint8_t hashid = data.Pull8();
497 if (hashid!=SWIFT_HASH)
498 return_log ("%s #0 no hash in the initial handshake %s\n",
499 tintstr(),addr.str());
500 bin64_t pos = data.Pull32();
501 if (pos!=bin64_t::ALL)
502 return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
503 hash = data.PullHash();
504 FileTransfer* file = FileTransfer::Find(hash);
506 return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
507 dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
508 for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
509 if (channels[*i] && channels[*i]->peer_==data.address() &&
510 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
511 return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
512 channel = new Channel(file, socket, data.address());
514 mych = DecodeID(mych);
515 if (mych>=channels.size())
516 return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
517 channel = channels[mych];
519 return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
520 if (channel->peer() != addr)
521 return_log ("%s #%u invalid peer address %s!=%s\n",
522 tintstr(),mych,channel->peer().str(),addr.str());
523 channel->own_id_mentioned_ = true;
525 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
530 void Channel::Loop (tint howlong) {
532 tint limit = Datagram::Time() + howlong;
536 tint send_time(TINT_NEVER);
537 Channel* sender(NULL);
538 while (!sender && !send_queue.is_empty()) { // dequeue
539 tintbin next = send_queue.pop();
540 sender = channel((int)next.bin);
541 send_time = next.time;
542 if (sender && sender->next_send_time_!=send_time &&
543 sender->next_send_time_!=TINT_NEVER )
544 sender = NULL; // it was a stale entry
547 if ( sender!=NULL && send_time<=NOW ) { // it's time
549 dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
553 } else { // it's too early, wait
555 tint towait = min(limit,send_time) - NOW;
556 dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
557 Datagram::Wait(towait);
558 if (sender) // get back to that later
559 send_queue.push(tintbin(send_time,sender->id()));
568 void Channel::Close () {
569 this->SwitchSendControl(CLOSE_CONTROL);
573 void Channel::Reschedule () {
574 next_send_time_ = NextSendTime();
575 if (next_send_time_!=TINT_NEVER) {
576 assert(next_send_time_<NOW+TINT_MIN);
577 send_queue.push(tintbin(next_send_time_,id_));
578 dprintf("%s #%u requeue for %s\n",tintstr(),id_,tintstr(next_send_time_));
580 dprintf("%s #%u closed\n",tintstr(),id_);