3 * most of the swift's state machine
5 * Created by Victor Grishchenko on 3/6/09.
6 * Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
11 #include <algorithm> // kill it
17 using namespace swift;
20 struct event_base *Channel::evbase;
21 MessageQueue Channel::messageQueue;
22 struct event Channel::evrecv;
24 #define DEBUGTRAFFIC 0
26 /** Arno: Victor's design allows a sender to choose some data to push to
27 * a receiver, if that receiver is not HINTing at data. Should be disabled
28 * when the receiver has a download rate limit.
30 #define ENABLE_SENDERSIZE_PUSH 0
33 /** Arno, 2011-11-24: When rate limit is on and the download is in progress
34 * we send HINTs for 2 chunks at the moment. This constant can be used to
35 * get greater granularity. Set to 0 for original behaviour.
37 #define HINT_GRANULARITY 16 // chunks
42 - randomized testing of advanced ops (new testcase)
45 void Channel::AddPeakHashes (struct evbuffer *evb) {
46 for(int i=0; i<file().peak_count(); i++) {
47 bin_t peak = file().peak(i);
48 evbuffer_add_8(evb, SWIFT_HASH);
49 evbuffer_add_32be(evb, bin_toUInt32(peak));
50 evbuffer_add_hash(evb, file().peak_hash(i));
51 char bin_name_buf[32];
52 dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str(bin_name_buf));
57 void Channel::AddUncleHashes (struct evbuffer *evb, bin_t pos) {
59 char bin_name_buf2[32];
60 dprintf("%s #%u +uncle hash for %s\n",tintstr(),id_,pos.str(bin_name_buf2));
62 bin_t peak = file().peak_for(pos);
63 while (pos!=peak && ((NOW&3)==3 || !pos.parent().contains(data_out_cap_)) &&
64 ack_in_.is_empty(pos.parent()) ) {
65 bin_t uncle = pos.sibling();
66 evbuffer_add_8(evb, SWIFT_HASH);
67 evbuffer_add_32be(evb, bin_toUInt32(uncle));
68 evbuffer_add_hash(evb, file().hash(uncle) );
69 char bin_name_buf[32];
70 dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str(bin_name_buf));
76 bin_t Channel::ImposeHint () {
77 uint64_t twist = peer_channel_id_; // got no hints, send something randomly
79 twist &= file().peak(0).toUInt(); // FIXME may make it semi-seq here
81 bin_t my_pick = binmap_t::find_complement(ack_in_, file().ack_out(), twist);
83 my_pick.to_twisted(twist);
84 while (my_pick.base_length()>max(1,(int)cwnd_))
85 my_pick = my_pick.left();
87 return my_pick.twisted(twist);
91 bin_t Channel::DequeueHint (bool *retransmitptr) {
92 bin_t send = bin_t::NONE;
94 // Arno, 2012-01-23: Extra protection against channel loss, don't send DATA
95 if (last_recv_time_ < NOW-(3*TINT_SEC))
98 // Arno, 2012-01-18: Reenable Victor's retransmit
99 if (!data_out_tmo_.empty())
101 tintbin tb = data_out_tmo_.front();
103 data_out_tmo_.pop_front();
104 *retransmitptr = true;
107 *retransmitptr = false;
109 if (ENABLE_SENDERSIZE_PUSH && send.is_none() && hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
110 bin_t my_pick = ImposeHint(); // FIXME move to the loop
111 if (!my_pick.is_none()) {
112 hint_in_.push_back(my_pick);
113 char bin_name_buf[32];
114 dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str(bin_name_buf));
118 while (!hint_in_.empty() && send.is_none()) {
119 bin_t hint = hint_in_.front().bin;
120 tint time = hint_in_.front().time;
121 hint_in_.pop_front();
122 while (!hint.is_base()) { // FIXME optimize; possible attack
123 hint_in_.push_front(tintbin(time,hint.right()));
126 //if (time < NOW-TINT_SEC*3/2 )
127 // continue; bad idea
128 if (!ack_in_.is_filled(hint))
132 // Arno, 2012-03-09: Is mucho expensive on busy server.
133 //for(int i=0; i<hint_in_.size(); i++)
134 // mass += hint_in_[i].bin.base_length();
135 char bin_name_buf[32];
136 dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(bin_name_buf),mass);
141 void Channel::AddHandshake (struct evbuffer *evb) {
142 if (!peer_channel_id_) { // initiating
143 evbuffer_add_8(evb, SWIFT_HASH);
144 evbuffer_add_32be(evb, bin_toUInt32(bin_t::ALL));
145 evbuffer_add_hash(evb, file().root_hash());
146 dprintf("%s #%u +hash ALL %s\n",
147 tintstr(),id_,file().root_hash().hex().c_str());
149 evbuffer_add_8(evb, SWIFT_HANDSHAKE);
151 if (send_control_==CLOSE_CONTROL) {
155 encoded = EncodeID(id_);
156 evbuffer_add_32be(evb, encoded);
157 dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
162 void Channel::Send () {
164 dprintf("%s #%u Send called \n",tintstr(),id_);
166 struct evbuffer *evb = evbuffer_new();
167 evbuffer_add_32be(evb, peer_channel_id_);
168 bin_t data = bin_t::NONE;
169 int evbnonadplen = 0;
170 if ( is_established() ) {
171 if (send_control_!=CLOSE_CONTROL) {
172 // FIXME: seeder check
175 if (!file().is_complete()) {
177 /* Gertjan fix: 7aeea65f3efbb9013f601b22a57ee4a423f1a94d
178 "Only call Reschedule for 'reverse PEX' if the channel is in keep-alive mode"
184 data = AddData(&evb);
186 // Arno: send explicit close
191 AddHave(evb); // Arno, 2011-10-28: from AddHandShake. Why double?
196 lastsendwaskeepalive_ = (evbuffer_get_length(evb) == 4);
198 if (evbuffer_get_length(evb)==4) {// only the channel id; bare keep-alive
201 dprintf("%s #%u sent %ib %s:%x\n",
202 tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
205 messageQueue.AddBuffer(socket_, evb, peer(), this);
208 void Channel::Sent(int bytes, evbuffer *evb, bool tofree)
210 raw_bytes_up_ += bytes;
212 last_send_time_ = NOW;
220 void Channel::AddHint (struct evbuffer *evb) {
223 // Policy is to not send hints when we are above speed limit
224 if (transfer().GetCurrentSpeed(DDIR_DOWNLOAD) > transfer().GetMaxSpeed(DDIR_DOWNLOAD)) {
226 fprintf(stderr,"hint: forbidden#");
231 // 1. Calc max of what we are allowed to request, uncongested bandwidth wise
232 tint plan_for = max(TINT_SEC,rtt_avg_*4);
234 tint timed_out = NOW - plan_for*2;
235 while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
236 hint_out_size_ -= hint_out_.front().bin.base_length();
237 hint_out_.pop_front();
240 int first_plan_pck = max ( (tint)1, plan_for / dip_avg_ );
242 // Riccardo, 2012-04-04: Actually allowed is max minus what we already asked for
243 int queue_allowed_hints = max(0,first_plan_pck-(int)hint_out_size_);
247 // 2. Calc max of what is allowed by the rate limiter
248 int rate_allowed_hints = LONG_MAX;
249 if (transfer().GetMaxSpeed(DDIR_DOWNLOAD) < DBL_MAX)
251 uint64_t rough_global_hint_out_size = 0; // rough estimate, as hint_out_ clean up is not done for all channels
252 std::set<Channel *>::iterator iter;
253 for (iter=transfer().mychannels_.begin(); iter!=transfer().mychannels_.end(); iter++)
257 rough_global_hint_out_size += c->hint_out_size_;
260 // Policy: this channel is allowed to hint at the limit - global_hinted_at
261 // Handle MaxSpeed = unlimited
262 double rate_hints_limit_float = transfer().GetMaxSpeed(DDIR_DOWNLOAD)/((double)file().chunk_size());
264 int rate_hints_limit = (int)min((double)LONG_MAX,rate_hints_limit_float);
266 // Actually allowed is max minus what we already asked for, globally (=all channels)
267 rate_allowed_hints = max(0,rate_hints_limit-(int)rough_global_hint_out_size);
270 // 3. Take the smallest allowance from rate and queue limit
271 uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,queue_allowed_hints);
273 // 4. Ask allowance in blocks of chunks to get pipelining going from serving peer.
274 if (hint_out_size_ == 0 || plan_pck > HINT_GRANULARITY)
276 bin_t hint = transfer().picker().Pick(ack_in_,plan_pck,NOW+plan_for*2);
277 if (!hint.is_none()) {
281 fprintf(stderr,"hint c%d: ask %s\n", id(), hint.str(binstr) );
283 evbuffer_add_8(evb, SWIFT_HINT);
284 evbuffer_add_32be(evb, bin_toUInt32(hint));
285 char bin_name_buf[32];
286 dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(bin_name_buf),hint_out_size_);
287 hint_out_.push_back(hint);
288 hint_out_size_ += hint.base_length();
289 //fprintf(stderr,"send c%d: HINTLEN %i\n", id(), hint.base_length());
290 //fprintf(stderr,"HL %i ", hint.base_length());
293 dprintf("%s #%u Xhint\n",tintstr(),id_);
299 bin_t Channel::AddData (struct evbuffer **evb) {
301 if (transfer().GetCurrentSpeed(DDIR_UPLOAD) > transfer().GetMaxSpeed(DDIR_UPLOAD)) {
302 transfer().OnSendNoData();
306 if (!file().size()) // know nothing
309 bin_t tosend = bin_t::NONE;
310 bool isretransmit = false;
311 tint luft = send_interval_>>4; // may wake up a bit earlier
312 if (data_out_.size()<cwnd_ &&
313 last_data_out_time_+send_interval_<=NOW+luft) {
314 tosend = DequeueHint(&isretransmit);
315 if (tosend.is_none()) {
316 dprintf("%s #%u sendctrl no idea what data to send\n",tintstr(),id_);
317 if (send_control_!=KEEP_ALIVE_CONTROL && send_control_!=CLOSE_CONTROL)
318 SwitchSendControl(KEEP_ALIVE_CONTROL);
321 dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
322 tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
324 if (tosend.is_none())// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
325 return bin_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
327 if (ack_in_.is_empty() && file().size())
331 if (file().get_check_netwvshash())
332 AddUncleHashes(*evb,tosend);
334 if (!ack_in_.is_empty()) // TODO: cwnd_>1
335 data_out_cap_ = tosend;
337 // Arno, 2011-11-03: May happen when first data packet is sent to empty
338 // leech, then peak + uncle hashes may be so big that they don't fit in eth
339 // frame with DATA. Send 2 datagrams then, one with peaks so they have
340 // a better chance of arriving. Optimistic violation of atomic datagram
342 if (file().chunk_size() == SWIFT_DEFAULT_CHUNK_SIZE && evbuffer_get_length(*evb) > SWIFT_MAX_NONDATA_DGRAM_SIZE) {
343 dprintf("%s #%u fsent %ib %s:%x\n",
344 tintstr(),id_,(int)evbuffer_get_length(*evb),peer().str(),
346 messageQueue.AddBuffer(socket_, *evb, peer(), this, false);
347 *evb = evbuffer_new();
348 evbuffer_add_32be(*evb, peer_channel_id_);
351 if (file().chunk_size() != SWIFT_DEFAULT_CHUNK_SIZE && isretransmit) {
353 * Arno, 2012-01-17: We observe strange behaviour when using
354 * fragmented UDP packets. When ULANC sends a specific datagram ("995"),
355 * the 2nd IP packet carrying it gets lost structurally. When
356 * downloading from the same asset hosted on a Linux 32-bit machine
357 * using a Win7 32-bit client (behind a NAT), one specific full
358 * datagram never gets delivered (6970 one before do). A workaround
359 * is to add some random data to the datagram. Hence we introduce
360 * the SWIFT_RANDOMIZE message, that is added to the datagram carrying
361 * the DATA on a retransmit.
364 fprintf(stderr,"AddData: retransmit of randomized chunk %s\n",tosend.str(binstr) );
365 evbuffer_add_8(*evb, SWIFT_RANDOMIZE);
366 evbuffer_add_32be(*evb, (int)rand() );
369 evbuffer_add_8(*evb, SWIFT_DATA);
370 evbuffer_add_32be(*evb, bin_toUInt32(tosend));
372 struct evbuffer_iovec vec;
373 if (evbuffer_reserve_space(*evb, file().chunk_size(), &vec, 1) < 0) {
374 print_error("error on evbuffer_reserve_space");
377 size_t r = pread(file().file_descriptor(),(char *)vec.iov_base,
378 file().chunk_size(),tosend.base_offset()*file().chunk_size());
379 // TODO: corrupted data, retries, caching
381 print_error("error on reading");
383 evbuffer_commit_space(*evb, &vec, 1);
386 // assert(dgram.space()>=r+4+1);
388 if (evbuffer_commit_space(*evb, &vec, 1) < 0) {
389 print_error("error on evbuffer_commit_space");
393 last_data_out_time_ = NOW;
394 data_out_.push_back(tosend);
396 global_bytes_up += r;
398 char bin_name_buf[32];
399 dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str(bin_name_buf));
402 // ARNOSMPTODO: count overhead bytes too? Move to Send() then.
403 transfer_->OnSendData(file().chunk_size());
409 void Channel::AddAck (struct evbuffer *evb) {
410 if (data_in_==tintbin())
411 //if (data_in_.bin==bin64_t::NONE)
413 // sometimes, we send a HAVE (e.g. in case the peer did repetitive send)
414 evbuffer_add_8(evb, data_in_.time==TINT_NEVER?SWIFT_HAVE:SWIFT_ACK);
415 evbuffer_add_32be(evb, bin_toUInt32(data_in_.bin));
416 if (data_in_.time!=TINT_NEVER)
417 evbuffer_add_64be(evb, data_in_.time);
421 fprintf(stderr,"send c%d: ACK %i\n", id(), bin_toUInt32(data_in_.bin));
423 have_out_.set(data_in_.bin);
424 char bin_name_buf[32];
425 dprintf("%s #%u +ack %s %s\n",
426 tintstr(),id_,data_in_.bin.str(bin_name_buf),tintstr(data_in_.time));
427 if (data_in_.bin.layer()>2)
428 data_in_dbl_ = data_in_.bin;
430 //fprintf(stderr,"data_in_ c%d\n", id() );
431 data_in_ = tintbin();
432 //data_in_ = tintbin(NOW,bin64_t::NONE);
436 void Channel::AddHave (struct evbuffer *evb) {
437 if (!data_in_dbl_.is_none()) { // TODO: do redundancy better
438 evbuffer_add_8(evb, SWIFT_HAVE);
439 evbuffer_add_32be(evb, bin_toUInt32(data_in_dbl_));
440 data_in_dbl_=bin_t::NONE;
443 fprintf(stderr,"send c%d: HAVE ",id() );
444 for(int count=0; count<4; count++) {
445 bin_t ack = binmap_t::find_complement(have_out_, file().ack_out(), 0); // FIXME: do rotating queue
448 ack = file().ack_out().cover(ack);
450 evbuffer_add_8(evb, SWIFT_HAVE);
451 evbuffer_add_32be(evb, bin_toUInt32(ack));
454 fprintf(stderr," %i", bin_toUInt32(ack));
456 char bin_name_buf[32];
457 dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str(bin_name_buf));
460 fprintf(stderr,"\n");
465 void Channel::Recv (struct evbuffer *evb) {
466 dprintf("%s #%u recvd %ib\n",tintstr(),id_,(int)evbuffer_get_length(evb)+4);
469 lastrecvwaskeepalive_ = (evbuffer_get_length(evb) == 0);
470 if (lastrecvwaskeepalive_)
471 // Update speed measurements such that they decrease when DL stops
472 transfer().OnRecvData(0);
474 if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
475 rtt_avg_ = NOW - last_send_time_;
478 dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
481 bin_t data = evbuffer_get_length(evb) ? bin_t::NONE : bin_t::ALL;
484 fprintf(stderr,"recv c%d: size %d ", id(), evbuffer_get_length(evb));
486 while (evbuffer_get_length(evb)) {
487 uint8_t type = evbuffer_remove_8(evb);
490 fprintf(stderr," %d\n", type);
493 case SWIFT_HANDSHAKE: OnHandshake(evb); break;
494 case SWIFT_DATA: data=OnData(evb); break;
495 case SWIFT_HAVE: OnHave(evb); break;
496 case SWIFT_ACK: OnAck(evb); break;
497 case SWIFT_HASH: OnHash(evb); break;
498 case SWIFT_HINT: OnHint(evb); break;
499 case SWIFT_PEX_ADD: OnPex(evb); break;
500 case SWIFT_PEX_REQ: OnPexReq(); break;
501 case SWIFT_RANDOMIZE: OnRandomize(evb); break; //FRAGRAND
503 dprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
509 fprintf(stderr,"\n");
513 last_recv_time_ = NOW;
514 sent_since_recv_ = 0;
519 * Arno: FAXME: HASH+DATA should be handled as a transaction: only when the
520 * hashes check out should they be stored in the hashtree, otherwise revert.
522 void Channel::OnHash (struct evbuffer *evb) {
523 bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
524 Sha1Hash hash = evbuffer_remove_hash(evb);
525 file().OfferHash(pos,hash);
526 char bin_name_buf[32];
527 dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str(bin_name_buf));
529 //fprintf(stderr,"HASH %lli hex %s\n",pos.toUInt(), hash.hex().c_str() );
533 void Channel::CleanHintOut (bin_t pos) {
535 while (hi<hint_out_.size() && !hint_out_[hi].bin.contains(pos))
537 if (hi==hint_out_.size())
538 return; // something not hinted or hinted in far past
539 while (hi--) { // removing likely snubbed hints
540 hint_out_size_ -= hint_out_.front().bin.base_length();
541 hint_out_.pop_front();
543 while (hint_out_.front().bin!=pos) {
544 tintbin f = hint_out_.front();
546 assert (f.bin.contains(pos));
554 hint_out_.front().bin = f.bin.sibling();
555 hint_out_.push_front(f);
557 hint_out_.pop_front();
562 bin_t Channel::OnData (struct evbuffer *evb) { // TODO: HAVE NONE for corrupted data
564 char bin_name_buf[32];
565 bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
567 // Arno: Assuming DATA last message in datagram
568 if (evbuffer_get_length(evb) > file().chunk_size()) {
569 dprintf("%s #%u !data chunk size mismatch %s: exp %lu got " PRISIZET "\n",tintstr(),id_,pos.str(bin_name_buf), file().chunk_size(), evbuffer_get_length(evb));
570 fprintf(stderr,"WARNING: chunk size mismatch: exp %lu got " PRISIZET "\n",file().chunk_size(), evbuffer_get_length(evb));
573 int length = (evbuffer_get_length(evb) < file().chunk_size()) ? evbuffer_get_length(evb) : file().chunk_size();
574 if (!file().ack_out().is_empty(pos)) {
575 // Arno, 2012-01-24: print message for duplicate
576 dprintf("%s #%u Ddata %s\n",tintstr(),id_,pos.str(bin_name_buf));
577 evbuffer_drain(evb, length);
578 data_in_ = tintbin(TINT_NEVER,transfer().ack_out().cover(pos));
580 // Arno, 2012-01-24: Make sure data interarrival periods don't get
581 // screwed up because of these (ignored) duplicates.
585 uint8_t *data = evbuffer_pullup(evb, length);
586 data_in_ = tintbin(NOW,bin_t::NONE);
587 if (!file().OfferData(pos, (char*)data, length)) {
588 evbuffer_drain(evb, length);
589 char bin_name_buf[32];
590 dprintf("%s #%u !data %s\n",tintstr(),id_,pos.str(bin_name_buf));
593 evbuffer_drain(evb, length);
594 dprintf("%s #%u -data %s\n",tintstr(),id_,pos.str(bin_name_buf));
597 fprintf(stderr,"$ ");
599 bin_t cover = transfer().ack_out().cover(pos);
600 for(int i=0; i<transfer().cb_installed; i++)
601 if (cover.layer()>=transfer().cb_agg[i])
602 transfer().callbacks[i](transfer().fd(),cover); // FIXME
603 if (cover.layer() >= 5) // Arno: tested with 32K, presently = 2 ** 5 * chunk_size CHUNKSIZE
604 transfer().OnRecvData( pow((double)2,(double)5)*((double)file().chunk_size()) );
609 bytes_down_ += length;
610 global_bytes_down += length;
615 void Channel::UpdateDIP(bin_t pos)
617 if (!pos.is_none()) {
618 if (last_data_in_time_) {
619 tint dip = NOW - last_data_in_time_;
620 dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
622 last_data_in_time_ = NOW;
627 void Channel::OnAck (struct evbuffer *evb) {
628 bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb));
629 tint peer_time = evbuffer_remove_64be(evb); // FIXME 32
630 // FIXME FIXME: wrap around here
631 if (ackd_pos.is_none())
632 return; // likely, broken chunk/ insufficient hashes
633 if (file().size() && ackd_pos.base_offset()>=file().size_in_chunks()) {
634 char bin_name_buf[32];
635 eprintf("invalid ack: %s\n",ackd_pos.str(bin_name_buf));
638 ack_in_.set(ackd_pos);
640 //fprintf(stderr,"OnAck: got bin %s is_complete %d\n", ackd_pos.str(), (int)ack_in_.is_complete_arno( file().ack_out().get_height() ));
643 // find an entry for the send (data out) event
644 while ( di<data_out_.size() && ( data_out_[di]==tintbin() ||
645 !ackd_pos.contains(data_out_[di].bin) ) )
647 // FUTURE: delayed acks
648 // rule out retransmits
649 while ( ri<data_out_tmo_.size() && !ackd_pos.contains(data_out_tmo_[ri].bin) )
651 char bin_name_buf[32];
652 dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
653 di==data_out_.size()?'?':'-',ackd_pos.str(bin_name_buf),peer_time);
654 if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
655 // round trip time calculations
656 tint rtt = NOW-data_out_[di].time;
657 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
658 dev_avg_ = ( dev_avg_*3 + tintabs(rtt-rtt_avg_) ) >> 2;
659 assert(data_out_[di].time!=TINT_NEVER);
660 // one-way delay calculations
661 tint owd = peer_time - data_out_[di].time;
662 owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
663 owd_current_[owd_cur_bin_] = owd;
664 if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
665 owd_min_bin_start_ = NOW;
666 owd_min_bin_ = (owd_min_bin_+1) & 3;
667 owd_min_bins_[owd_min_bin_] = TINT_NEVER;
669 if (owd_min_bins_[owd_min_bin_]>owd)
670 owd_min_bins_[owd_min_bin_] = owd;
671 dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
672 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str(bin_name_buf));
674 // early loss detection by packet reordering
675 for (int re=0; re<di-MAX_REORDERING; re++) {
676 if (data_out_[re]==tintbin())
678 ack_not_rcvd_recent_++;
679 data_out_tmo_.push_back(data_out_[re].bin);
680 dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str(bin_name_buf));
681 data_out_cap_ = bin_t::ALL;
682 data_out_[re] = tintbin();
685 if (di!=data_out_.size())
686 data_out_[di]=tintbin();
687 // clear zeroed items
688 while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
689 ack_in_.is_filled(data_out_.front().bin) ) )
690 data_out_.pop_front();
691 assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
695 void Channel::TimeoutDataOut ( ) {
696 // losses: timeouted packets
697 tint timeout = NOW - ack_timeout();
698 while (!data_out_.empty() &&
699 ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
700 if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
701 ack_not_rcvd_recent_++;
702 data_out_cap_ = bin_t::ALL;
703 data_out_tmo_.push_back(data_out_.front().bin);
704 char bin_name_buf[32];
705 dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str(bin_name_buf));
707 data_out_.pop_front();
709 // clear retransmit queue of older items
710 while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
711 data_out_tmo_.pop_front();
715 void Channel::OnHave (struct evbuffer *evb) {
716 bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb));
717 if (ackd_pos.is_none())
718 return; // wow, peer has hashes
721 if (ENABLE_VOD_PIECEPICKER) {
722 // Ric: check if we should set the size in the file transfer
723 if (transfer().availability().size() <= 0 && file().size() > 0)
725 transfer().availability().setSize(file().size_in_chunks());
727 // Ric: update the availability if needed
728 transfer().availability().set(id_, ack_in_, ackd_pos);
731 ack_in_.set(ackd_pos);
732 char bin_name_buf[32];
733 dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str(bin_name_buf));
735 //fprintf(stderr,"OnHave: got bin %s is_complete %d\n", ackd_pos.str(), IsComplete() );
740 void Channel::OnHint (struct evbuffer *evb) {
741 bin_t hint = bin_fromUInt32(evbuffer_remove_32be(evb));
742 // FIXME: wake up here
743 hint_in_.push_back(hint);
744 char bin_name_buf[32];
745 dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str(bin_name_buf));
749 void Channel::OnHandshake (struct evbuffer *evb) {
751 uint32_t pcid = evbuffer_remove_32be(evb);
752 dprintf("%s #%u -hs %x\n",tintstr(),id_,pcid);
754 if (is_established() && pcid == 0) {
755 // Arno: received explicit close
756 peer_channel_id_ = 0; // == established -> false
761 peer_channel_id_ = pcid;
762 // self-connection check
764 uint32_t try_id = DecodeID(peer_channel_id_);
765 if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
766 peer_channel_id_ = 0;
768 return; // this is a self-connection
772 // FUTURE: channel forking
773 if (is_established())
774 dprintf("%s #%u established %s\n", tintstr(), id_, peer().str());
778 void Channel::OnPex (struct evbuffer *evb) {
779 uint32_t ipv4 = evbuffer_remove_32be(evb);
780 uint16_t port = evbuffer_remove_16be(evb);
781 Address addr(ipv4,port);
782 dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
783 if (transfer().OnPexIn(addr))
784 useless_pex_count_ = 0;
787 dprintf("%s #%u already channel to %s\n", tintstr(),id_,addr.str());
788 useless_pex_count_++;
790 pex_request_outstanding_ = false;
795 void Channel::OnRandomize (struct evbuffer *evb) {
796 dprintf("%s #%u -rand\n",tintstr(),id_ );
797 // Payload is 4 random bytes
798 uint32_t r = evbuffer_remove_32be(evb);
802 void Channel::AddPex (struct evbuffer *evb) {
803 // Gertjan fix: Reverse PEX
804 // PEX messages sent to facilitate NAT/FW puncturing get priority
805 if (!reverse_pex_out_.empty()) {
807 tintbin pex_peer = reverse_pex_out_.front();
808 reverse_pex_out_.pop_front();
809 if (channels[(int) pex_peer.bin.toUInt()] == NULL)
811 Address a = channels[(int) pex_peer.bin.toUInt()]->peer();
812 // Arno, 2012-02-28: Don't send private addresses to non-private peers.
813 if (!a.is_private() || (a.is_private() && peer().is_private()))
815 evbuffer_add_8(evb, SWIFT_PEX_ADD);
816 evbuffer_add_32be(evb, a.ipv4());
817 evbuffer_add_16be(evb, a.port());
818 dprintf("%s #%u +pex (reverse) %s\n",tintstr(),id_,a.str());
820 } while (!reverse_pex_out_.empty() && (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb)) >= 7);
822 // Arno: 2012-02-23: Don't think this is right. Bit of DoS thing,
823 // that you only get back the addr of people that got your addr.
831 // Arno, 2012-02-28: Don't send private addresses to non-private peers.
832 int chid = 0, tries=0;
836 // Arno, 2011-10-03: Choosing Gertjan's RandomChannel over RevealChannel here.
837 chid = transfer().RandomChannel(id_);
838 if (chid==-1 || chid==id_ || tries > 5) {
839 pex_requested_ = false;
842 a = channels[chid]->peer();
843 if (!a.is_private() || (a.is_private() && peer().is_private()))
848 evbuffer_add_8(evb, SWIFT_PEX_ADD);
849 evbuffer_add_32be(evb, a.ipv4());
850 evbuffer_add_16be(evb, a.port());
851 dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
853 pex_requested_ = false;
854 /* Ensure that we don't add the same id to the reverse_pex_out_ queue
856 for (tbqueue::iterator i = channels[chid]->reverse_pex_out_.begin();
857 i != channels[chid]->reverse_pex_out_.end(); i++)
858 if ((int) (i->bin.toUInt()) == id_)
861 dprintf("%s #%u adding pex for channel %u at time %s\n", tintstr(), chid,
862 id_, tintstr(NOW + 2 * TINT_SEC));
863 // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
864 channels[chid]->reverse_pex_out_.push_back(tintbin(NOW + 2 * TINT_SEC, bin_t(id_)));
865 if (channels[chid]->send_control_ == KEEP_ALIVE_CONTROL &&
866 channels[chid]->next_send_time_ > NOW + 2 * TINT_SEC)
867 channels[chid]->Reschedule();
870 void Channel::OnPexReq(void) {
871 dprintf("%s #%u -pex req\n", tintstr(), id_);
872 if (NOW > MIN_PEX_REQUEST_INTERVAL + last_pex_request_time_)
873 pex_requested_ = true;
876 void Channel::AddPexReq(struct evbuffer *evb) {
877 // Rate limit the number of PEX requests
878 if (NOW < next_pex_request_time_)
881 // If no answer has been received from a previous request, count it as useless
882 if (pex_request_outstanding_)
883 useless_pex_count_++;
885 pex_request_outstanding_ = false;
887 // Initiate at most SWIFT_MAX_CONNECTIONS connections
888 if (transfer().hs_in_.size() >= SWIFT_MAX_CONNECTIONS ||
889 // Check whether this channel has been providing useful peer information
890 useless_pex_count_ > 2)
892 // Arno, 2012-02-23: Fix: Code doesn't recover from useless_pex_count_ > 2,
893 // let's just try again in 30s
894 useless_pex_count_ = 0;
895 next_pex_request_time_ = NOW + 30 * TINT_SEC;
900 dprintf("%s #%u +pex req\n", tintstr(), id_);
901 evbuffer_add_8(evb, SWIFT_PEX_REQ);
902 /* Add a little more than the minimum interval, such that the other party is
903 less likely to drop it due to too high rate */
904 next_pex_request_time_ = NOW + MIN_PEX_REQUEST_INTERVAL * 1.1;
905 pex_request_outstanding_ = true;
911 * Channel class methods
914 void Channel::LibeventReceiveCallback(evutil_socket_t fd, short event, void *arg) {
915 // Called by libevent when a datagram is received on the socket
918 event_add(&evrecv, NULL);
921 #define NUM_DATAGRAMS 10
923 void Channel::RecvDatagram (evutil_socket_t socket) {
924 struct evbuffer *pevb[NUM_DATAGRAMS];
925 for (int i=0; i<NUM_DATAGRAMS; ++i)
926 pevb[i] = evbuffer_new();
928 //FIXME: make this more readable
930 addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + NUM_DATAGRAMS * sizeof(struct mptp_dest));
931 addr.addr->count = NUM_DATAGRAMS;
932 RecvFrom(socket, addr, pevb);
934 for (; i<addr.addr->count; ++i) {
935 struct evbuffer *evb = pevb[i];
937 fromi.addr->dests[0].addr = addr.addr->dests[i].addr;
938 fromi.addr->dests[0].port = addr.addr->dests[i].port;
939 size_t evboriglen = evbuffer_get_length(evb);
940 #define return_log(...) { fprintf(stderr,__VA_ARGS__); evbuffer_free(evb); return; }
941 if (evbuffer_get_length(evb)<4)
942 return_log("socket layer weird: datagram shorter than 4 bytes from %s (prob ICMP unreach)\n",fromi.str());
943 uint32_t mych = evbuffer_remove_32be(evb);
945 Channel* channel = NULL;
946 if (mych==0) { // peer initiates handshake
947 if (evbuffer_get_length(evb)<1+4+1+4+Sha1Hash::SIZE)
948 return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
949 tintstr(),(int)evbuffer_get_length(evb),fromi.str());
950 uint8_t hashid = evbuffer_remove_8(evb);
951 if (hashid!=SWIFT_HASH)
952 return_log ("%s #0 no hash in the initial handshake %s\n",
953 tintstr(),fromi.str());
954 bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
956 return_log ("%s #0 that is not the root hash %s\n",tintstr(),fromi.str());
957 hash = evbuffer_remove_hash(evb);
958 FileTransfer* ft = FileTransfer::Find(hash);
960 return_log ("%s #0 hash %s unknown, requested by %s\n",tintstr(),hash.hex().c_str(),fromi.str());
961 dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
963 // Arno, 2012-02-27: Check for duplicate channel
964 Channel* existchannel = ft->FindChannel(fromi,NULL);
967 // Arno: 2011-10-13: Ignore if established, otherwise consider
968 // it a concurrent connection attempt.
969 if (existchannel->is_established()) {
970 // ARNOTODO: Read complete handshake here so we know whether
971 // attempt is to new channel or to existing. Currently read
974 return_log("%s #0 have a channel already to %s\n",tintstr(),fromi.str());
976 channel = existchannel;
977 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: reuse channel %s\n", channel->peer_.str() );
980 if (channel == NULL) {
981 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: create new channel %s\n", addr.str() );
982 channel = new Channel(ft, socket, fromi);
984 //fprintf(stderr,"CHANNEL INCOMING DEF hass %s is id %d\n",hash.hex().c_str(),channel->id());
986 } else { // peer responds to my handshake (and other messages)
987 mych = DecodeID(mych);
988 if (mych>=channels.size())
989 return_log("%s invalid channel #%u, %s\n",tintstr(),mych,fromi.str());
990 channel = channels[mych];
992 return_log ("%s #%u is already closed\n",tintstr(),mych);
993 if (channel->IsDiffSenderOrDuplicate(fromi,mych)) {
994 channel->Schedule4Close();
997 channel->own_id_mentioned_ = true;
999 channel->raw_bytes_down_ += evboriglen;
1000 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
1001 bool wasestablished = channel->is_established();
1003 dprintf("%s #%u peer %s recv_peer %s addr %s\n", tintstr(),mych, channel->peer().str(), channel->recv_peer().str(), fromi.str() );
1009 if (wasestablished && !channel->is_established()) {
1010 // Arno, 2012-01-26: Received an explict close, clean up channel, safely.
1011 channel->Schedule4Close();
1014 for (; i<NUM_DATAGRAMS; ++i)
1015 evbuffer_free(pevb[i]);
1021 * Channel instance methods
1024 void Channel::CloseChannelByAddress(const Address &addr)
1026 // fprintf(stderr,"CloseChannelByAddress: address is %s\n", addr.str() );
1027 std::vector<Channel *>::iterator iter;
1028 for (iter = channels.begin(); iter != channels.end(); iter++)
1031 if (c != NULL && c->peer_ == addr)
1033 // ARNOSMPTODO: will do another send attempt before not being
1035 c->peer_channel_id_ = 0; // established->false, do no more sending
1036 c->Schedule4Close();
1043 void Channel::Close () {
1045 this->SwitchSendControl(CLOSE_CONTROL);
1047 if (is_established())
1048 this->Send(); // Arno: send explicit close
1050 if (ENABLE_VOD_PIECEPICKER) {
1051 // Ric: remove it's binmap from the availability
1052 transfer().availability().remove(id_, ack_in_);
1056 // Arno: ensure LibeventSendCallback is no longer called with ptr to this Channel
1061 void Channel::Reschedule () {
1063 // Arno: CAREFUL: direct send depends on diff between next_send_time_ and
1064 // NOW to be 0, so any calls to Time in between may put things off. Sigh.
1066 next_send_time_ = NextSendTime();
1067 if (next_send_time_!=TINT_NEVER) {
1069 assert(next_send_time_<NOW+TINT_MIN);
1070 tint duein = next_send_time_-NOW;
1072 // Arno, 2011-10-18: libevent's timer implementation appears to be
1073 // really slow, i.e., timers set for 100 usec from now get called
1074 // at least two times later :-( Hence, for sends after receives
1075 // perform them directly.
1076 dprintf("%s #%u requeue direct send\n",tintstr(),id_);
1077 LibeventSendCallback(-1,EV_TIMEOUT,this);
1080 if (evsend_ptr_ != NULL) {
1081 struct timeval duetv = *tint2tv(duein);
1082 evtimer_add(evsend_ptr_,&duetv);
1083 dprintf("%s #%u requeue for %s in %lli\n",tintstr(),id_,tintstr(next_send_time_), duein);
1086 dprintf("%s #%u cannot requeue for %s, closed\n",tintstr(),id_,tintstr(next_send_time_));
1090 dprintf("%s #%u resched, will close\n",tintstr(),id_);
1091 this->Schedule4Close();
1097 * Channel class methods
1099 void Channel::LibeventSendCallback(int fd, short event, void *arg) {
1101 // Called by libevent when it is the requested send time.
1103 Channel * sender = (Channel*) arg;
1104 if (NOW<sender->next_send_time_-TINT_MSEC)
1105 dprintf("%s #%u suspicious send %s<%s\n",tintstr(),
1106 sender->id(),tintstr(NOW),tintstr(sender->next_send_time_));
1107 if (sender->next_send_time_ != TINT_NEVER)