3 * most of the swift's state machine
5 * Created by Victor Grishchenko on 3/6/09.
6 * Copyright 2009 Delft University of Technology. All rights reserved.
10 #include <algorithm> // kill it
12 using namespace swift;
18 - randomized testing of advanced ops (new testcase)
21 void Channel::AddPeakHashes (Datagram& dgram) {
22 for(int i=0; i<file().peak_count(); i++) {
23 bin64_t peak = file().peak(i);
24 dgram.Push8(SWIFT_HASH);
25 dgram.Push32((uint32_t)peak);
26 dgram.PushHash(file().peak_hash(i));
27 //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
28 dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
33 void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
34 bin64_t peak = file().peak_for(pos);
35 while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
36 ack_in_.get(pos.parent())==binmap_t::EMPTY ) {
37 bin64_t uncle = pos.sibling();
38 dgram.Push8(SWIFT_HASH);
39 dgram.Push32((uint32_t)uncle);
40 dgram.PushHash( file().hash(uncle) );
41 //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
42 dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
48 bin64_t Channel::DequeueHint () {
49 if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
50 uint64_t twist = peer_channel_id_; // got no hints, send something randomly
51 twist &= file().peak(0); // may make it semi-seq here
52 file().ack_out().twist(twist);
55 file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
56 while (my_pick.width()>max(1,(int)cwnd_))
57 my_pick = my_pick.left();
58 file().ack_out().twist(0);
60 if (my_pick!=bin64_t::NONE) {
61 my_pick = my_pick.twisted(twist);
62 hint_in_.push_back(my_pick);
63 dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
66 bin64_t send = bin64_t::NONE;
67 while (!hint_in_.empty() && send==bin64_t::NONE) {
68 bin64_t hint = hint_in_.front().bin;
69 tint time = hint_in_.front().time;
71 while (!hint.is_base()) { // FIXME optimize; possible attack
72 hint_in_.push_front(tintbin(time,hint.right()));
75 //if (time < NOW-TINT_SEC*3/2 )
77 if (ack_in_.get(hint)!=binmap_t::FILLED)
81 for(int i=0; i<hint_in_.size(); i++)
82 mass += hint_in_[i].bin.width();
83 dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(),mass);
88 void Channel::AddHandshake (Datagram& dgram) {
89 if (!peer_channel_id_) { // initiating
90 dgram.Push8(SWIFT_HASH);
91 dgram.Push32(bin64_t::ALL32);
92 dgram.PushHash(file().root_hash());
93 dprintf("%s #%u +hash ALL %s\n",
94 tintstr(),id_,file().root_hash().hex().c_str());
96 dgram.Push8(SWIFT_HANDSHAKE);
97 int encoded = EncodeID(id_);
98 dgram.Push32(encoded);
99 dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
105 void Channel::Send () {
106 Datagram dgram(socket_,peer());
107 dgram.Push32(peer_channel_id_);
108 bin64_t data = bin64_t::NONE;
109 if ( is_established() ) {
110 // FIXME: seeder check
112 if (!file().is_complete())
116 data = AddData(dgram);
121 dprintf("%s #%u sent %ib %s:%x\n",
122 tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
123 if (dgram.size()==4) {// only the channel id; bare keep-alive
125 //dprintf("%s #%u considering keepalive %i %f %s\n",
126 // tintstr(),id_,(int)data_out_.size(),cwnd_,SEND_CONTROL_MODES[send_control_]);
127 //if (data_out_.size()<cwnd_ && send_control_!=KEEP_ALIVE_CONTROL) {
129 // SwitchSendControl(KEEP_ALIVE_CONTROL);
131 // cwnd_ = cwnd_/2.0;
133 //if (data_out_.empty() && send_control_!=KEEP_ALIVE_CONTROL)
134 // SwitchSendControl(KEEP_ALIVE_CONTROL);// we did our best
135 //if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
136 // return; // don't send empty dgram
138 if (dgram.Send()==-1)
139 print_error("can't send datagram");
140 last_send_time_ = NOW;
146 void Channel::AddHint (Datagram& dgram) {
148 tint plan_for = max(TINT_SEC,rtt_avg_*4);
150 tint timed_out = NOW - plan_for*2;
151 while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
152 hint_out_size_ -= hint_out_.front().bin.width();
153 hint_out_.pop_front();
156 /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
159 int plan_pck = max ( (tint)1, plan_for / dip_avg_ );
161 if ( hint_out_size_ < plan_pck ) {
163 int diff = plan_pck - hint_out_size_; // TODO: aggregate
164 bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
166 if (hint!=bin64_t::NONE) {
167 dgram.Push8(SWIFT_HINT);
169 dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(),hint_out_size_);
170 hint_out_.push_back(hint);
171 hint_out_size_ += hint.width();
173 dprintf("%s #%u Xhint\n",tintstr(),id_);
179 bin64_t Channel::AddData (Datagram& dgram) {
181 if (!file().size()) // know nothing
182 return bin64_t::NONE;
184 bin64_t tosend = bin64_t::NONE;
185 tint luft = send_interval_>>4; // may wake up a bit earlier
186 if (data_out_.size()<cwnd_ &&
187 last_data_out_time_+send_interval_<=NOW+luft) {
188 tosend = DequeueHint();
189 if (tosend==bin64_t::NONE) {
190 dprintf("%s #%u sendctrl no idea what to send\n",tintstr(),id_);
191 if (send_control_!=KEEP_ALIVE_CONTROL)
192 SwitchSendControl(KEEP_ALIVE_CONTROL);
195 dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
196 tintstr(),id_,cwnd_,data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
198 if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
199 return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
201 if (ack_in_.is_empty() && file().size())
202 AddPeakHashes(dgram);
203 AddUncleHashes(dgram,tosend);
204 if (!ack_in_.is_empty()) // TODO: cwnd_>1
205 data_out_cap_ = tosend;
207 if (dgram.size()>254) {
208 dgram.Send(); // kind of fragmentation
209 dgram.Push32(peer_channel_id_);
212 dgram.Push8(SWIFT_DATA);
213 dgram.Push32(tosend.to32());
216 size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
217 // TODO: corrupted data, retries, caching
219 print_error("error on reading");
220 return bin64_t::NONE;
222 assert(dgram.space()>=r+4+1);
225 last_data_out_time_ = NOW;
226 data_out_.push_back(tosend);
227 dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
233 void Channel::AddTs (Datagram& dgram) {
234 dgram.Push8(SWIFT_TS);
235 dgram.Push64(data_in_.time);
236 dprintf("%s #%u +ts %s\n",tintstr(),id_,tintstr(data_in_.time));
240 void Channel::AddAck (Datagram& dgram) {
241 if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
242 dgram.Push8(SWIFT_ACK);
243 dgram.Push32(data_in_dbl_.to32());
244 data_in_dbl_=bin64_t::NONE;
246 if (data_in_.time!=TINT_NEVER) { // TODO: ACK NONE for corrupted data
248 bin64_t pos = data_in_.bin; // be precise file().ack_out().cover(data_in_.bin);
249 dgram.Push8(SWIFT_ACK);
250 dgram.Push32(pos.to32());
251 //dgram.Push64(data_in_.time);
253 dprintf("%s #%u +ack %s %s\n",tintstr(),id_,pos.str(),tintstr(data_in_.time));
254 data_in_ = tintbin(TINT_NEVER,bin64_t::NONE);
258 for(int count=0; count<4; count++) {
259 bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, binmap_t::FILLED);
260 if (ack==bin64_t::NONE)
262 ack = file().ack_out().cover(ack);
264 dgram.Push8(SWIFT_ACK);
265 dgram.Push32(ack.to32());
266 dprintf("%s #%u +ack %s\n",tintstr(),id_,ack.str());
271 void Channel::Recv (Datagram& dgram) {
272 dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4);
273 peer_send_time_ = 0; // has scope of 1 datagram
275 if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
276 rtt_avg_ = NOW - last_send_time_;
279 dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
281 bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
282 while (dgram.size()) {
283 uint8_t type = dgram.Pull8();
285 case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
286 case SWIFT_DATA: data=OnData(dgram); break;
287 case SWIFT_TS: OnTs(dgram); break;
288 case SWIFT_ACK: OnAck(dgram); break;
289 case SWIFT_HASH: OnHash(dgram); break;
290 case SWIFT_HINT: OnHint(dgram); break;
291 case SWIFT_PEX_ADD: OnPex(dgram); break;
293 eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
297 last_recv_time_ = NOW;
298 sent_since_recv_ = 0;
302 void Channel::OnHash (Datagram& dgram) {
303 bin64_t pos = dgram.Pull32();
304 Sha1Hash hash = dgram.PullHash();
305 file().OfferHash(pos,hash);
306 //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
307 dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
311 void Channel::CleanHintOut (bin64_t pos) {
313 while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
315 if (hi==hint_out_.size())
316 return; // something not hinted or hinted in far past
317 while (hi--) { // removing likely snubbed hints
318 hint_out_size_ -= hint_out_.front().bin.width();
319 hint_out_.pop_front();
321 while (hint_out_.front().bin!=pos) {
322 tintbin f = hint_out_.front();
323 f.bin = f.bin.towards(pos);
324 hint_out_.front().bin = f.bin.sibling();
325 hint_out_.push_front(f);
327 hint_out_.pop_front();
332 bin64_t Channel::OnData (Datagram& dgram) {
333 bin64_t pos = dgram.Pull32();
335 int length = dgram.Pull(&data,1024);
336 bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
337 dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
338 data_in_ = tintbin(NOW,bin64_t::NONE);
340 return bin64_t::NONE;
342 if (pos!=bin64_t::NONE) {
343 if (last_data_in_time_) {
344 tint dip = NOW - last_data_in_time_;
345 dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
347 last_data_in_time_ = NOW;
354 void Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
358 if (ackd_pos!=bin64_t::NONE) {
359 for (int i=0; i<data_out_.size(); i++) {
360 if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
362 for(tbqueue::iterator j=data_out_tmo_.begin(); j!=data_out_tmo_.end(); j++)
363 if (j->bin==data_out_[i].bin)
364 peer_send_time_=0; // possibly retransmit
365 if (peer_send_time_) { // well, it is sorta ACK ACK
366 tint rtt = NOW-data_out_[i].time;
367 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
368 dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
369 assert(data_out_[i].time!=TINT_NEVER);
370 tint owd = peer_send_time_ - data_out_[i].time;
371 owd_cur_bin_ = (owd_cur_bin_+1) & 3;
372 owd_current_[owd_cur_bin_] = owd;
373 if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
374 owd_min_bin_start_ = NOW;
375 owd_min_bin_ = (owd_min_bin_+1) & 3;
376 owd_min_bins_[owd_min_bin_] = TINT_NEVER;
378 if (owd_min_bins_[owd_min_bin_]>owd)
379 owd_min_bins_[owd_min_bin_] = owd;
382 dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
383 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str());
384 bin64_t pos = data_out_[i].bin;
386 data_out_[i]=tintbin();
392 while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
393 data_out_.pop_front();
396 static const int MAX_REORDERING = 2; // the triple-ACK principle
397 if (max_ack_off>MAX_REORDERING) {
398 while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
399 || ack_in_.is_filled(data_out_.front().bin)) ) {
400 data_out_.pop_front();
403 while (max_ack_off>MAX_REORDERING) {
404 ack_not_rcvd_recent_++;
405 data_out_tmo_.push_back(data_out_.front().bin);
406 dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
407 data_out_.pop_front();
409 data_out_cap_ = bin64_t::ALL;
414 tint timeout = NOW - rtt_avg_ - 4*max(dev_avg_,TINT_MSEC*50);
415 while (!data_out_.empty() && data_out_.front().time<timeout) {
416 if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
417 ack_not_rcvd_recent_++;
418 data_out_cap_ = bin64_t::ALL;
419 data_out_tmo_.push_back(data_out_.front().bin);
420 dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
422 data_out_.pop_front();
424 while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
425 data_out_.pop_front();
426 assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
427 while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
428 data_out_tmo_.pop_front();
433 void Channel::OnAck (Datagram& dgram) {
434 bin64_t ackd_pos = dgram.Pull32();
435 if (ackd_pos==bin64_t::NONE)
436 return; // likely, brocken packet / insufficient hashes
437 if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
438 eprintf("invalid ack: %s\n",ackd_pos.str());
441 dprintf("%s #%u -ack %s\n",tintstr(),id_,ackd_pos.str());
442 ack_in_.set(ackd_pos);
443 CleanDataOut(ackd_pos); // FIXME do AFTER all ACKs
447 void Channel::OnTs (Datagram& dgram) {
448 peer_send_time_ = dgram.Pull64();
449 dprintf("%s #%u -ts %lli\n",tintstr(),id_,peer_send_time_);
453 void Channel::OnHint (Datagram& dgram) {
454 bin64_t hint = dgram.Pull32();
455 hint_in_.push_back(hint);
456 //ack_in_.set(hint,binmap_t::EMPTY);
457 //RequeueSend(cc_->OnHintRecvd(hint));
458 dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
462 void Channel::OnHandshake (Datagram& dgram) {
463 peer_channel_id_ = dgram.Pull32();
464 dprintf("%s #%u -hs %x\n",tintstr(),id_,peer_channel_id_);
465 // self-connection check
467 uint32_t try_id = DecodeID(peer_channel_id_);
468 if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
469 peer_channel_id_ = 0;
471 return; // this is a self-connection
474 // FUTURE: channel forking
478 void Channel::OnPex (Datagram& dgram) {
479 uint32_t ipv4 = dgram.Pull32();
480 uint16_t port = dgram.Pull16();
481 Address addr(ipv4,port);
482 dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
483 transfer().OnPexIn(addr);
487 void Channel::AddPex (Datagram& dgram) {
488 int chid = transfer().RevealChannel(pex_out_);
489 if (chid==-1 || chid==id_)
491 Address a = channels[chid]->peer();
492 dgram.Push8(SWIFT_PEX_ADD);
493 dgram.Push32(a.ipv4());
494 dgram.Push16(a.port());
495 dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
499 Channel* Channel::RecvDatagram (int socket) {
500 Datagram data(socket);
502 const Address& addr = data.address();
503 #define return_log(...) { printf(__VA_ARGS__); return NULL; }
505 return_log("datagram shorter than 4 bytes %s\n",addr.str());
506 uint32_t mych = data.Pull32();
508 Channel* channel = NULL;
509 if (!mych) { // handshake initiated
510 if (data.size()<1+4+1+4+Sha1Hash::SIZE)
511 return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
512 tintstr(),data.size(),addr.str());
513 uint8_t hashid = data.Pull8();
514 if (hashid!=SWIFT_HASH)
515 return_log ("%s #0 no hash in the initial handshake %s\n",
516 tintstr(),addr.str());
517 bin64_t pos = data.Pull32();
518 if (pos!=bin64_t::ALL)
519 return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
520 hash = data.PullHash();
521 FileTransfer* file = FileTransfer::Find(hash);
523 return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
524 dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
525 for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
526 if (channels[*i] && channels[*i]->peer_==data.address() &&
527 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
528 return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
529 channel = new Channel(file, socket, data.address());
531 mych = DecodeID(mych);
532 if (mych>=channels.size())
533 return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
534 channel = channels[mych];
536 return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
537 if (channel->peer() != addr)
538 return_log ("%s #%u invalid peer address %s!=%s\n",
539 tintstr(),mych,channel->peer().str(),addr.str());
540 channel->own_id_mentioned_ = true;
542 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
548 void Channel::Loop (tint howlong) {
550 tint limit = Datagram::Time() + howlong;
554 tint send_time(TINT_NEVER);
555 Channel* sender(NULL);
556 while (!sender && !send_queue.is_empty()) { // dequeue
557 tintbin next = send_queue.pop();
558 sender = channel((int)next.bin);
559 send_time = next.time;
560 if (sender && sender->next_send_time_!=send_time &&
561 sender->next_send_time_!=TINT_NEVER )
562 sender = NULL; // it was a stale entry
565 if ( sender!=NULL && send_time<=NOW ) { // it's time
567 dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
570 sender->Reschedule();
572 } else { // it's too early, wait
574 tint towait = min(limit,send_time) - NOW;
575 dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
576 int rd = Datagram::Wait(socket_count,sockets,towait);
577 if (rd!=INVALID_SOCKET) { // in meantime, received something
578 Channel* receiver = RecvDatagram(rd);
579 if (receiver) // receiver's state may have changed
580 receiver->Reschedule();
582 if (sender) // get back to that later
583 send_queue.push(tintbin(send_time,sender->id()));
592 void Channel::Close () {
593 this->SwitchSendControl(CLOSE_CONTROL);
597 void Channel::Reschedule () {
598 next_send_time_ = NextSendTime();
599 if (next_send_time_!=TINT_NEVER) {
600 assert(next_send_time_<NOW+TINT_MIN);
601 send_queue.push(tintbin(next_send_time_,id_));
602 dprintf("%s #%u requeue for %s\n",tintstr(),id_,tintstr(next_send_time_));
604 dprintf("%s #%u closed\n",tintstr(),id_);