12811928f8454171aae023c1d9e59942cad3ea8f
[swifty.git] / src / libswift / sendrecv.cpp
1 /*
2  *  sendrecv.cpp
3  *  most of the swift's state machine
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
7  *
8  */
9 #include "bin_utils.h"
10 #include "swift.h"
11 #include <algorithm>  // kill it
12 #include <cassert>
13 #include <math.h>
14 #include <cfloat>
15 #include "compat.h"
16
17 using namespace swift;
18 using namespace std;
19
20 struct event_base *Channel::evbase;
21 MessageQueue Channel::messageQueue;
22 struct event Channel::evrecv;
23
24 #define DEBUGTRAFFIC    0
25
26 /** Arno: Victor's design allows a sender to choose some data to push to
27  * a receiver, if that receiver is not HINTing at data. Should be disabled
28  * when the receiver has a download rate limit.
29  */
30 #define ENABLE_SENDERSIZE_PUSH 0
31
32
33 /** Arno, 2011-11-24: When rate limit is on and the download is in progress
34  * we send HINTs for 2 chunks at the moment. This constant can be used to
35  * get greater granularity. Set to 0 for original behaviour.
36  */
37 #define HINT_GRANULARITY        16 // chunks
38
39 /*
40  TODO  25 Oct 18:55
41  - range: ALL
42  - randomized testing of advanced ops (new testcase)
43  */
44
45 void    Channel::AddPeakHashes (struct evbuffer *evb) {
46     for(int i=0; i<file().peak_count(); i++) {
47         bin_t peak = file().peak(i);
48         evbuffer_add_8(evb, SWIFT_HASH);
49         evbuffer_add_32be(evb, bin_toUInt32(peak));
50         evbuffer_add_hash(evb, file().peak_hash(i));
51         char bin_name_buf[32];
52         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str(bin_name_buf));
53     }
54 }
55
56
57 void    Channel::AddUncleHashes (struct evbuffer *evb, bin_t pos) {
58
59     char bin_name_buf2[32];
60     dprintf("%s #%u +uncle hash for %s\n",tintstr(),id_,pos.str(bin_name_buf2));
61
62     bin_t peak = file().peak_for(pos);
63     while (pos!=peak && ((NOW&3)==3 || !pos.parent().contains(data_out_cap_)) &&
64             ack_in_.is_empty(pos.parent()) ) {
65         bin_t uncle = pos.sibling();
66         evbuffer_add_8(evb, SWIFT_HASH);
67         evbuffer_add_32be(evb, bin_toUInt32(uncle));
68         evbuffer_add_hash(evb,  file().hash(uncle) );
69         char bin_name_buf[32];
70         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str(bin_name_buf));
71         pos = pos.parent();
72     }
73 }
74
75
76 bin_t           Channel::ImposeHint () {
77     uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
78
79     twist &= file().peak(0).toUInt(); // FIXME may make it semi-seq here
80
81     bin_t my_pick = binmap_t::find_complement(ack_in_, file().ack_out(), twist);
82
83     my_pick.to_twisted(twist);
84     while (my_pick.base_length()>max(1,(int)cwnd_))
85         my_pick = my_pick.left();
86
87     return my_pick.twisted(twist);
88 }
89
90
91 bin_t        Channel::DequeueHint (bool *retransmitptr) {
92     bin_t send = bin_t::NONE;
93
94     // Arno, 2012-01-23: Extra protection against channel loss, don't send DATA
95     if (last_recv_time_ < NOW-(3*TINT_SEC))
96         return bin_t::NONE;
97
98     // Arno, 2012-01-18: Reenable Victor's retransmit
99     if (!data_out_tmo_.empty())
100     {
101         tintbin tb = data_out_tmo_.front();
102         send = tb.bin;
103         data_out_tmo_.pop_front();  
104         *retransmitptr = true;
105     }
106     else
107         *retransmitptr = false;
108
109     if (ENABLE_SENDERSIZE_PUSH && send.is_none() && hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
110         bin_t my_pick = ImposeHint(); // FIXME move to the loop
111         if (!my_pick.is_none()) {
112             hint_in_.push_back(my_pick);
113             char bin_name_buf[32];
114             dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str(bin_name_buf));
115         }
116     }
117     
118     while (!hint_in_.empty() && send.is_none()) {
119         bin_t hint = hint_in_.front().bin;
120         tint time = hint_in_.front().time;
121         hint_in_.pop_front();
122         while (!hint.is_base()) { // FIXME optimize; possible attack
123             hint_in_.push_front(tintbin(time,hint.right()));
124             hint = hint.left();
125         }
126         //if (time < NOW-TINT_SEC*3/2 )
127         //    continue;  bad idea
128         if (!ack_in_.is_filled(hint))
129             send = hint;
130     }
131     uint64_t mass = 0;
132     // Arno, 2012-03-09: Is mucho expensive on busy server.
133     //for(int i=0; i<hint_in_.size(); i++)
134     //    mass += hint_in_[i].bin.base_length();
135     char bin_name_buf[32];
136     dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(bin_name_buf),mass);
137     return send;
138 }
139
140
141 void    Channel::AddHandshake (struct evbuffer *evb) {
142     if (!peer_channel_id_) { // initiating
143         evbuffer_add_8(evb, SWIFT_HASH);
144         evbuffer_add_32be(evb, bin_toUInt32(bin_t::ALL));
145         evbuffer_add_hash(evb, file().root_hash());
146         dprintf("%s #%u +hash ALL %s\n",
147                 tintstr(),id_,file().root_hash().hex().c_str());
148     }
149     evbuffer_add_8(evb, SWIFT_HANDSHAKE);
150     int encoded = -1;
151     if (send_control_==CLOSE_CONTROL) {
152         encoded = 0;
153     }
154     else
155         encoded = EncodeID(id_);
156     evbuffer_add_32be(evb, encoded);
157     dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
158     have_out_.clear();
159 }
160
161
162 void    Channel::Send () {
163
164         dprintf("%s #%u Send called \n",tintstr(),id_);
165
166     struct evbuffer *evb = evbuffer_new();
167     evbuffer_add_32be(evb, peer_channel_id_);
168     bin_t data = bin_t::NONE;
169     int evbnonadplen = 0;
170     if ( is_established() ) {
171         if (send_control_!=CLOSE_CONTROL) {
172                         // FIXME: seeder check
173                         AddHave(evb);
174                         AddAck(evb);
175                         if (!file().is_complete()) {
176                                 AddHint(evb);
177                                 /* Gertjan fix: 7aeea65f3efbb9013f601b22a57ee4a423f1a94d
178                                 "Only call Reschedule for 'reverse PEX' if the channel is in keep-alive mode"
179                                  */
180                                 AddPexReq(evb);
181                         }
182                         AddPex(evb);
183                         TimeoutDataOut();
184                         data = AddData(evb);
185         } else {
186                 // Arno: send explicit close
187                 AddHandshake(evb);
188         }
189     } else {
190         AddHandshake(evb);
191         AddHave(evb); // Arno, 2011-10-28: from AddHandShake. Why double?
192         AddHave(evb);
193         AddAck(evb);
194     }
195
196     lastsendwaskeepalive_ = (evbuffer_get_length(evb) == 4);
197
198     if (evbuffer_get_length(evb)==4) {// only the channel id; bare keep-alive
199         data = bin_t::ALL;
200     }
201     dprintf("%s #%u sent %ib %s:%x\n",
202             tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
203             peer_channel_id_);
204
205         messageQueue.AddBuffer(socket_, evb, peer(), this); 
206 }
207
208 void Channel::Sent(int bytes, evbuffer *evb, bool tofree)
209 {
210         raw_bytes_up_ += bytes;
211         if (tofree) {
212                 last_send_time_ = NOW;
213                 sent_since_recv_++;
214                 dgrams_sent_++;
215                 evbuffer_free(evb);
216                 Reschedule();
217         }
218 }
219
220 void    Channel::AddHint (struct evbuffer *evb) {
221
222         // RATELIMIT
223         // Policy is to not send hints when we are above speed limit
224         if (transfer().GetCurrentSpeed(DDIR_DOWNLOAD) > transfer().GetMaxSpeed(DDIR_DOWNLOAD)) {
225                 if (DEBUGTRAFFIC)
226                         fprintf(stderr,"hint: forbidden#");
227                 return;
228         }
229
230
231         // 1. Calc max of what we are allowed to request, uncongested bandwidth wise
232     tint plan_for = max(TINT_SEC,rtt_avg_*4);
233
234     tint timed_out = NOW - plan_for*2;
235     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
236         hint_out_size_ -= hint_out_.front().bin.base_length();
237         hint_out_.pop_front();
238     }
239
240     int first_plan_pck = max ( (tint)1, plan_for / dip_avg_ );
241
242     // Riccardo, 2012-04-04: Actually allowed is max minus what we already asked for
243     int queue_allowed_hints = max(0,first_plan_pck-(int)hint_out_size_);
244
245
246     // RATELIMIT
247     // 2. Calc max of what is allowed by the rate limiter
248     int rate_allowed_hints = LONG_MAX;
249     if (transfer().GetMaxSpeed(DDIR_DOWNLOAD) < DBL_MAX)
250     {
251                 uint64_t rough_global_hint_out_size = 0; // rough estimate, as hint_out_ clean up is not done for all channels
252                 std::set<Channel *>::iterator iter;
253                 for (iter=transfer().mychannels_.begin(); iter!=transfer().mychannels_.end(); iter++)
254                 {
255                         Channel *c = *iter;
256                         if (c != NULL)
257                                 rough_global_hint_out_size += c->hint_out_size_;
258                 }
259
260                 // Policy: this channel is allowed to hint at the limit - global_hinted_at
261                 // Handle MaxSpeed = unlimited
262                 double rate_hints_limit_float = transfer().GetMaxSpeed(DDIR_DOWNLOAD)/((double)file().chunk_size());
263
264                 int rate_hints_limit = (int)min((double)LONG_MAX,rate_hints_limit_float);
265
266                 // Actually allowed is max minus what we already asked for, globally (=all channels)
267                 rate_allowed_hints = max(0,rate_hints_limit-(int)rough_global_hint_out_size);
268     }
269
270     // 3. Take the smallest allowance from rate and queue limit
271     uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,queue_allowed_hints);
272
273     // 4. Ask allowance in blocks of chunks to get pipelining going from serving peer.
274     if (hint_out_size_ == 0 || plan_pck > HINT_GRANULARITY)
275     {
276         bin_t hint = transfer().picker().Pick(ack_in_,plan_pck,NOW+plan_for*2);
277         if (!hint.is_none()) {
278                 if (DEBUGTRAFFIC)
279                 {
280                         char binstr[32];
281                         fprintf(stderr,"hint c%d: ask %s\n", id(), hint.str(binstr) );
282                 }
283             evbuffer_add_8(evb, SWIFT_HINT);
284             evbuffer_add_32be(evb, bin_toUInt32(hint));
285             char bin_name_buf[32];
286             dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(bin_name_buf),hint_out_size_);
287             hint_out_.push_back(hint);
288             hint_out_size_ += hint.base_length();
289             //fprintf(stderr,"send c%d: HINTLEN %i\n", id(), hint.base_length());
290             //fprintf(stderr,"HL %i ", hint.base_length());
291         }
292         else
293             dprintf("%s #%u Xhint\n",tintstr(),id_);
294
295     }
296 }
297
298
299 bin_t        Channel::AddData (struct evbuffer *evb) {
300         // RATELIMIT
301         if (transfer().GetCurrentSpeed(DDIR_UPLOAD) > transfer().GetMaxSpeed(DDIR_UPLOAD)) {
302                 transfer().OnSendNoData();
303                 return bin_t::NONE;
304         }
305
306     if (!file().size()) // know nothing
307         return bin_t::NONE;
308
309     bin_t tosend = bin_t::NONE;
310     bool isretransmit = false;
311     tint luft = send_interval_>>4; // may wake up a bit earlier
312     if (data_out_.size()<cwnd_ &&
313             last_data_out_time_+send_interval_<=NOW+luft) {
314         tosend = DequeueHint(&isretransmit);
315         if (tosend.is_none()) {
316             dprintf("%s #%u sendctrl no idea what data to send\n",tintstr(),id_);
317             if (send_control_!=KEEP_ALIVE_CONTROL && send_control_!=CLOSE_CONTROL)
318                 SwitchSendControl(KEEP_ALIVE_CONTROL);
319         }
320     } else
321         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
322                 tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
323
324     if (tosend.is_none())// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
325         return bin_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
326
327     if (ack_in_.is_empty() && file().size())
328         AddPeakHashes(evb);
329
330     //NETWVSHASH
331     if (file().get_check_netwvshash())
332         AddUncleHashes(evb,tosend);
333
334     if (!ack_in_.is_empty()) // TODO: cwnd_>1
335         data_out_cap_ = tosend;
336
337     // Arno, 2011-11-03: May happen when first data packet is sent to empty
338     // leech, then peak + uncle hashes may be so big that they don't fit in eth
339     // frame with DATA. Send 2 datagrams then, one with peaks so they have
340     // a better chance of arriving. Optimistic violation of atomic datagram
341     // principle.
342     if (file().chunk_size() == SWIFT_DEFAULT_CHUNK_SIZE && evbuffer_get_length(evb) > SWIFT_MAX_NONDATA_DGRAM_SIZE) {
343         dprintf("%s #%u fsent %ib %s:%x\n",
344                 tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
345                 peer_channel_id_);
346                 messageQueue.AddBuffer(socket_, evb, peer(), this, false);
347         evbuffer_add_32be(evb, peer_channel_id_);
348     }
349
350     if (file().chunk_size() != SWIFT_DEFAULT_CHUNK_SIZE && isretransmit) {
351         /* FRAGRAND
352          * Arno, 2012-01-17: We observe strange behaviour when using
353          * fragmented UDP packets. When ULANC sends a specific datagram ("995"),
354          * the 2nd IP packet carrying it gets lost structurally. When
355          * downloading from the same asset hosted on a Linux 32-bit machine
356          * using a Win7 32-bit client (behind a NAT), one specific full
357          * datagram never gets delivered (6970 one before do). A workaround
358          * is to add some random data to the datagram. Hence we introduce
359          * the SWIFT_RANDOMIZE message, that is added to the datagram carrying
360          * the DATA on a retransmit.
361          */
362              char binstr[32];
363          fprintf(stderr,"AddData: retransmit of randomized chunk %s\n",tosend.str(binstr) );
364          evbuffer_add_8(evb, SWIFT_RANDOMIZE);
365          evbuffer_add_32be(evb, (int)rand() );
366     }
367
368     evbuffer_add_8(evb, SWIFT_DATA);
369     evbuffer_add_32be(evb, bin_toUInt32(tosend));
370
371     struct evbuffer_iovec vec;
372     if (evbuffer_reserve_space(evb, file().chunk_size(), &vec, 1) < 0) {
373         print_error("error on evbuffer_reserve_space");
374         return bin_t::NONE;
375     }
376     size_t r = pread(file().file_descriptor(),(char *)vec.iov_base,
377                      file().chunk_size(),tosend.base_offset()*file().chunk_size());
378     // TODO: corrupted data, retries, caching
379     if (r<0) {
380         print_error("error on reading");
381         vec.iov_len = 0;
382         evbuffer_commit_space(evb, &vec, 1);
383         return bin_t::NONE;
384     }
385     // assert(dgram.space()>=r+4+1);
386     vec.iov_len = r;
387     if (evbuffer_commit_space(evb, &vec, 1) < 0) {
388         print_error("error on evbuffer_commit_space");
389         return bin_t::NONE;
390     }
391
392     last_data_out_time_ = NOW;
393     data_out_.push_back(tosend);
394     bytes_up_ += r;
395     global_bytes_up += r;
396
397     char bin_name_buf[32];
398     dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str(bin_name_buf));
399
400     // RATELIMIT
401     // ARNOSMPTODO: count overhead bytes too? Move to Send() then.
402         transfer_->OnSendData(file().chunk_size());
403
404     return tosend;
405 }
406
407
408 void    Channel::AddAck (struct evbuffer *evb) {
409     if (data_in_==tintbin())
410         //if (data_in_.bin==bin64_t::NONE)
411         return;
412     // sometimes, we send a HAVE (e.g. in case the peer did repetitive send)
413     evbuffer_add_8(evb, data_in_.time==TINT_NEVER?SWIFT_HAVE:SWIFT_ACK);
414     evbuffer_add_32be(evb, bin_toUInt32(data_in_.bin));
415     if (data_in_.time!=TINT_NEVER)
416         evbuffer_add_64be(evb, data_in_.time);
417
418
419         if (DEBUGTRAFFIC)
420                 fprintf(stderr,"send c%d: ACK %i\n", id(), bin_toUInt32(data_in_.bin));
421
422     have_out_.set(data_in_.bin);
423     char bin_name_buf[32];
424     dprintf("%s #%u +ack %s %s\n",
425         tintstr(),id_,data_in_.bin.str(bin_name_buf),tintstr(data_in_.time));
426     if (data_in_.bin.layer()>2)
427         data_in_dbl_ = data_in_.bin;
428
429     //fprintf(stderr,"data_in_ c%d\n", id() );
430     data_in_ = tintbin();
431     //data_in_ = tintbin(NOW,bin64_t::NONE);
432 }
433
434
435 void    Channel::AddHave (struct evbuffer *evb) {
436     if (!data_in_dbl_.is_none()) { // TODO: do redundancy better
437         evbuffer_add_8(evb, SWIFT_HAVE);
438         evbuffer_add_32be(evb, bin_toUInt32(data_in_dbl_));
439         data_in_dbl_=bin_t::NONE;
440     }
441     if (DEBUGTRAFFIC)
442                 fprintf(stderr,"send c%d: HAVE ",id() );
443     for(int count=0; count<4; count++) {
444         bin_t ack = binmap_t::find_complement(have_out_, file().ack_out(), 0); // FIXME: do rotating queue
445         if (ack.is_none())
446             break;
447         ack = file().ack_out().cover(ack);
448         have_out_.set(ack);
449         evbuffer_add_8(evb, SWIFT_HAVE);
450         evbuffer_add_32be(evb, bin_toUInt32(ack));
451
452         if (DEBUGTRAFFIC)
453                 fprintf(stderr," %i", bin_toUInt32(ack));
454
455         char bin_name_buf[32];
456         dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str(bin_name_buf));
457     }
458         if (DEBUGTRAFFIC)
459                 fprintf(stderr,"\n");
460
461 }
462
463
464 void    Channel::Recv (struct evbuffer *evb) {
465     dprintf("%s #%u recvd %ib\n",tintstr(),id_,(int)evbuffer_get_length(evb)+4);
466     dgrams_rcvd_++;
467
468     lastrecvwaskeepalive_ = (evbuffer_get_length(evb) == 0);
469     if (lastrecvwaskeepalive_)
470         // Update speed measurements such that they decrease when DL stops
471         transfer().OnRecvData(0);
472
473     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
474         rtt_avg_ = NOW - last_send_time_;
475         dev_avg_ = rtt_avg_;
476         dip_avg_ = rtt_avg_;
477         dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
478     }
479
480     bin_t data = evbuffer_get_length(evb) ? bin_t::NONE : bin_t::ALL;
481
482         if (DEBUGTRAFFIC)
483                 fprintf(stderr,"recv c%d: size %d ", id(), evbuffer_get_length(evb));
484
485         while (evbuffer_get_length(evb)) {
486         uint8_t type = evbuffer_remove_8(evb);
487
488         if (DEBUGTRAFFIC)
489                 fprintf(stderr," %d", type);
490
491         switch (type) {
492             case SWIFT_HANDSHAKE: OnHandshake(evb); break;
493             case SWIFT_DATA:      data=OnData(evb); break;
494             case SWIFT_HAVE:      OnHave(evb); break;
495             case SWIFT_ACK:       OnAck(evb); break;
496             case SWIFT_HASH:      OnHash(evb); break;
497             case SWIFT_HINT:      OnHint(evb); break;
498             case SWIFT_PEX_ADD:   OnPex(evb); break;
499             case SWIFT_PEX_REQ:   OnPexReq(); break;
500             case SWIFT_RANDOMIZE: OnRandomize(evb); break; //FRAGRAND
501             default:
502                 dprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
503                 return;
504         }
505     }
506     if (DEBUGTRAFFIC)
507     {
508         fprintf(stderr,"\n");
509     }
510
511
512     last_recv_time_ = NOW;
513     sent_since_recv_ = 0;
514     Reschedule();
515 }
516
517 /*
518  * Arno: FAXME: HASH+DATA should be handled as a transaction: only when the
519  * hashes check out should they be stored in the hashtree, otherwise revert.
520  */
521 void    Channel::OnHash (struct evbuffer *evb) {
522         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
523     Sha1Hash hash = evbuffer_remove_hash(evb);
524     file().OfferHash(pos,hash);
525     char bin_name_buf[32];
526     dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str(bin_name_buf));
527
528     //fprintf(stderr,"HASH %lli hex %s\n",pos.toUInt(), hash.hex().c_str() );
529 }
530
531
532 void    Channel::CleanHintOut (bin_t pos) {
533     int hi = 0;
534     while (hi<hint_out_.size() && !hint_out_[hi].bin.contains(pos))
535         hi++;
536     if (hi==hint_out_.size())
537         return; // something not hinted or hinted in far past
538     while (hi--) { // removing likely snubbed hints
539         hint_out_size_ -= hint_out_.front().bin.base_length();
540         hint_out_.pop_front();
541     }
542     while (hint_out_.front().bin!=pos) {
543         tintbin f = hint_out_.front();
544
545         assert (f.bin.contains(pos));
546
547         if (pos < f.bin) {
548             f.bin.to_left();
549         } else {
550             f.bin.to_right();
551         }
552
553         hint_out_.front().bin = f.bin.sibling();
554         hint_out_.push_front(f);
555     }
556     hint_out_.pop_front();
557     hint_out_size_--;
558 }
559
560
561 bin_t Channel::OnData (struct evbuffer *evb) {  // TODO: HAVE NONE for corrupted data
562
563         char bin_name_buf[32];
564         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
565
566     // Arno: Assuming DATA last message in datagram
567     if (evbuffer_get_length(evb) > file().chunk_size()) {
568         dprintf("%s #%u !data chunk size mismatch %s: exp %lu got " PRISIZET "\n",tintstr(),id_,pos.str(bin_name_buf), file().chunk_size(), evbuffer_get_length(evb));
569         fprintf(stderr,"WARNING: chunk size mismatch: exp %lu got " PRISIZET "\n",file().chunk_size(), evbuffer_get_length(evb));
570     }
571
572     int length = (evbuffer_get_length(evb) < file().chunk_size()) ? evbuffer_get_length(evb) : file().chunk_size();
573     if (!file().ack_out().is_empty(pos)) {
574         // Arno, 2012-01-24: print message for duplicate
575         dprintf("%s #%u Ddata %s\n",tintstr(),id_,pos.str(bin_name_buf));
576         evbuffer_drain(evb, length);
577         data_in_ = tintbin(TINT_NEVER,transfer().ack_out().cover(pos));
578
579         // Arno, 2012-01-24: Make sure data interarrival periods don't get
580         // screwed up because of these (ignored) duplicates.
581         UpdateDIP(pos);
582         return bin_t::NONE;
583     }
584     uint8_t *data = evbuffer_pullup(evb, length);
585     data_in_ = tintbin(NOW,bin_t::NONE);
586     if (!file().OfferData(pos, (char*)data, length)) {
587         evbuffer_drain(evb, length);
588         char bin_name_buf[32];
589         dprintf("%s #%u !data %s\n",tintstr(),id_,pos.str(bin_name_buf));
590         return bin_t::NONE;
591     }
592     evbuffer_drain(evb, length);
593     dprintf("%s #%u -data %s\n",tintstr(),id_,pos.str(bin_name_buf));
594
595     if (DEBUGTRAFFIC)
596         fprintf(stderr,"$ ");
597
598     bin_t cover = transfer().ack_out().cover(pos);
599     for(int i=0; i<transfer().cb_installed; i++)
600         if (cover.layer()>=transfer().cb_agg[i])
601             transfer().callbacks[i](transfer().fd(),cover);  // FIXME
602     if (cover.layer() >= 5) // Arno: tested with 32K, presently = 2 ** 5 * chunk_size CHUNKSIZE
603         transfer().OnRecvData( pow((double)2,(double)5)*((double)file().chunk_size()) );
604     data_in_.bin = pos;
605
606     UpdateDIP(pos);
607     CleanHintOut(pos);
608     bytes_down_ += length;
609     global_bytes_down += length;
610     return pos;
611 }
612
613
614 void Channel::UpdateDIP(bin_t pos)
615 {
616         if (!pos.is_none()) {
617                 if (last_data_in_time_) {
618                         tint dip = NOW - last_data_in_time_;
619                         dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
620                 }
621                 last_data_in_time_ = NOW;
622         }
623 }
624
625
626 void    Channel::OnAck (struct evbuffer *evb) {
627     bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb));
628     tint peer_time = evbuffer_remove_64be(evb); // FIXME 32
629     // FIXME FIXME: wrap around here
630     if (ackd_pos.is_none())
631         return; // likely, broken chunk/ insufficient hashes
632     if (file().size() && ackd_pos.base_offset()>=file().size_in_chunks()) {
633         char bin_name_buf[32];
634         eprintf("invalid ack: %s\n",ackd_pos.str(bin_name_buf));
635         return;
636     }
637     ack_in_.set(ackd_pos);
638
639     //fprintf(stderr,"OnAck: got bin %s is_complete %d\n", ackd_pos.str(), (int)ack_in_.is_complete_arno( file().ack_out().get_height() ));
640
641     int di = 0, ri = 0;
642     // find an entry for the send (data out) event
643     while (  di<data_out_.size() && ( data_out_[di]==tintbin() ||
644            !ackd_pos.contains(data_out_[di].bin) )  )
645         di++;
646     // FUTURE: delayed acks
647     // rule out retransmits
648     while (  ri<data_out_tmo_.size() && !ackd_pos.contains(data_out_tmo_[ri].bin) )
649         ri++;
650     char bin_name_buf[32];
651     dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
652             di==data_out_.size()?'?':'-',ackd_pos.str(bin_name_buf),peer_time);
653     if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
654             // round trip time calculations
655         tint rtt = NOW-data_out_[di].time;
656         rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
657         dev_avg_ = ( dev_avg_*3 + tintabs(rtt-rtt_avg_) ) >> 2;
658         assert(data_out_[di].time!=TINT_NEVER);
659             // one-way delay calculations
660         tint owd = peer_time - data_out_[di].time;
661         owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
662         owd_current_[owd_cur_bin_] = owd;
663         if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
664             owd_min_bin_start_ = NOW;
665             owd_min_bin_ = (owd_min_bin_+1) & 3;
666             owd_min_bins_[owd_min_bin_] = TINT_NEVER;
667         }
668         if (owd_min_bins_[owd_min_bin_]>owd)
669             owd_min_bins_[owd_min_bin_] = owd;
670         dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
671                 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str(bin_name_buf));
672         ack_rcvd_recent_++;
673         // early loss detection by packet reordering
674         for (int re=0; re<di-MAX_REORDERING; re++) {
675             if (data_out_[re]==tintbin())
676                 continue;
677             ack_not_rcvd_recent_++;
678             data_out_tmo_.push_back(data_out_[re].bin);
679             dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str(bin_name_buf));
680             data_out_cap_ = bin_t::ALL;
681             data_out_[re] = tintbin();
682         }
683     }
684     if (di!=data_out_.size())
685         data_out_[di]=tintbin();
686     // clear zeroed items
687     while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
688             ack_in_.is_filled(data_out_.front().bin) ) )
689         data_out_.pop_front();
690     assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
691 }
692
693
694 void Channel::TimeoutDataOut ( ) {
695     // losses: timeouted packets
696     tint timeout = NOW - ack_timeout();
697     while (!data_out_.empty() &&
698         ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
699         if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
700             ack_not_rcvd_recent_++;
701             data_out_cap_ = bin_t::ALL;
702             data_out_tmo_.push_back(data_out_.front().bin);
703             char bin_name_buf[32];
704             dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str(bin_name_buf));
705         }
706         data_out_.pop_front();
707     }
708     // clear retransmit queue of older items
709     while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
710         data_out_tmo_.pop_front();
711 }
712
713
714 void Channel::OnHave (struct evbuffer *evb) {
715     bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb));
716     if (ackd_pos.is_none())
717         return; // wow, peer has hashes
718
719     // PPPLUG
720     if (ENABLE_VOD_PIECEPICKER) {
721                 // Ric: check if we should set the size in the file transfer
722                 if (transfer().availability().size() <= 0 && file().size() > 0)
723                 {
724                         transfer().availability().setSize(file().size_in_chunks());
725                 }
726                 // Ric: update the availability if needed
727                 transfer().availability().set(id_, ack_in_, ackd_pos);
728     }
729
730     ack_in_.set(ackd_pos);
731     char bin_name_buf[32];
732     dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str(bin_name_buf));
733
734     //fprintf(stderr,"OnHave: got bin %s is_complete %d\n", ackd_pos.str(), IsComplete() );
735
736 }
737
738
739 void    Channel::OnHint (struct evbuffer *evb) {
740     bin_t hint = bin_fromUInt32(evbuffer_remove_32be(evb));
741     // FIXME: wake up here
742     hint_in_.push_back(hint);
743     char bin_name_buf[32];
744     dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str(bin_name_buf));
745 }
746
747
748 void Channel::OnHandshake (struct evbuffer *evb) {
749
750         uint32_t pcid = evbuffer_remove_32be(evb);
751     dprintf("%s #%u -hs %x\n",tintstr(),id_,pcid);
752
753     if (is_established() && pcid == 0) {
754         // Arno: received explicit close
755         peer_channel_id_ = 0; // == established -> false
756         Close();
757         return;
758     }
759
760     peer_channel_id_ = pcid;
761     // self-connection check
762     if (!SELF_CONN_OK) {
763         uint32_t try_id = DecodeID(peer_channel_id_);
764         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
765             peer_channel_id_ = 0;
766             Close();
767             return; // this is a self-connection
768         }
769     }
770
771     // FUTURE: channel forking
772     if (is_established())
773         dprintf("%s #%u established %s\n", tintstr(), id_, peer().str());
774 }
775
776
777 void Channel::OnPex (struct evbuffer *evb) {
778     uint32_t ipv4 = evbuffer_remove_32be(evb);
779     uint16_t port = evbuffer_remove_16be(evb);
780     Address addr(ipv4,port);
781     dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
782     if (transfer().OnPexIn(addr))
783         useless_pex_count_ = 0;
784     else
785     {
786                 dprintf("%s #%u already channel to %s\n", tintstr(),id_,addr.str());
787         useless_pex_count_++;
788     }
789     pex_request_outstanding_ = false;
790 }
791
792
793 //FRAGRAND
794 void Channel::OnRandomize (struct evbuffer *evb) {
795     dprintf("%s #%u -rand\n",tintstr(),id_ );
796         // Payload is 4 random bytes
797     uint32_t r = evbuffer_remove_32be(evb);
798 }
799
800
801 void    Channel::AddPex (struct evbuffer *evb) {
802         // Gertjan fix: Reverse PEX
803     // PEX messages sent to facilitate NAT/FW puncturing get priority
804     if (!reverse_pex_out_.empty()) {
805         do {
806             tintbin pex_peer = reverse_pex_out_.front();
807             reverse_pex_out_.pop_front();
808             if (channels[(int) pex_peer.bin.toUInt()] == NULL)
809                 continue;
810             Address a = channels[(int) pex_peer.bin.toUInt()]->peer();
811             // Arno, 2012-02-28: Don't send private addresses to non-private peers.
812             if (!a.is_private() || (a.is_private() && peer().is_private()))
813             {
814                 evbuffer_add_8(evb, SWIFT_PEX_ADD);
815                 evbuffer_add_32be(evb, a.ipv4());
816                 evbuffer_add_16be(evb, a.port());
817                 dprintf("%s #%u +pex (reverse) %s\n",tintstr(),id_,a.str());
818             }
819         } while (!reverse_pex_out_.empty() && (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb)) >= 7);
820
821         // Arno: 2012-02-23: Don't think this is right. Bit of DoS thing,
822         // that you only get back the addr of people that got your addr.
823         // Disable for now.
824         //return;
825     }
826
827     if (!pex_requested_)
828         return;
829
830     // Arno, 2012-02-28: Don't send private addresses to non-private peers.
831     int chid = 0, tries=0;
832     Address a;
833     while (true)
834     {
835         // Arno, 2011-10-03: Choosing Gertjan's RandomChannel over RevealChannel here.
836         chid = transfer().RandomChannel(id_);
837         if (chid==-1 || chid==id_ || tries > 5) {
838                 pex_requested_ = false;
839                 return;
840         }
841         a = channels[chid]->peer();
842         if (!a.is_private() || (a.is_private() && peer().is_private()))
843                 break;
844         tries++;
845     }
846
847     evbuffer_add_8(evb, SWIFT_PEX_ADD);
848     evbuffer_add_32be(evb, a.ipv4());
849     evbuffer_add_16be(evb, a.port());
850     dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
851
852     pex_requested_ = false;
853     /* Ensure that we don't add the same id to the reverse_pex_out_ queue
854        more than once. */
855     for (tbqueue::iterator i = channels[chid]->reverse_pex_out_.begin();
856             i != channels[chid]->reverse_pex_out_.end(); i++)
857         if ((int) (i->bin.toUInt()) == id_)
858             return;
859
860     dprintf("%s #%u adding pex for channel %u at time %s\n", tintstr(), chid,
861         id_, tintstr(NOW + 2 * TINT_SEC));
862     // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
863     channels[chid]->reverse_pex_out_.push_back(tintbin(NOW + 2 * TINT_SEC, bin_t(id_)));
864     if (channels[chid]->send_control_ == KEEP_ALIVE_CONTROL &&
865             channels[chid]->next_send_time_ > NOW + 2 * TINT_SEC)
866         channels[chid]->Reschedule();
867 }
868
869 void Channel::OnPexReq(void) {
870     dprintf("%s #%u -pex req\n", tintstr(), id_);
871     if (NOW > MIN_PEX_REQUEST_INTERVAL + last_pex_request_time_)
872         pex_requested_ = true;
873 }
874
875 void Channel::AddPexReq(struct evbuffer *evb) {
876     // Rate limit the number of PEX requests
877     if (NOW < next_pex_request_time_)
878         return;
879
880     // If no answer has been received from a previous request, count it as useless
881     if (pex_request_outstanding_)
882         useless_pex_count_++;
883
884     pex_request_outstanding_ = false;
885
886     // Initiate at most SWIFT_MAX_CONNECTIONS connections
887     if (transfer().hs_in_.size() >= SWIFT_MAX_CONNECTIONS ||
888             // Check whether this channel has been providing useful peer information
889             useless_pex_count_ > 2)
890     {
891         // Arno, 2012-02-23: Fix: Code doesn't recover from useless_pex_count_ > 2,
892         // let's just try again in 30s
893                 useless_pex_count_ = 0;
894                 next_pex_request_time_ = NOW + 30 * TINT_SEC;
895
896         return;
897     }
898
899     dprintf("%s #%u +pex req\n", tintstr(), id_);
900     evbuffer_add_8(evb, SWIFT_PEX_REQ);
901     /* Add a little more than the minimum interval, such that the other party is
902        less likely to drop it due to too high rate */
903     next_pex_request_time_ = NOW + MIN_PEX_REQUEST_INTERVAL * 1.1;
904     pex_request_outstanding_ = true;
905 }
906
907
908
909 /*
910  * Channel class methods
911  */
912
913 void Channel::LibeventReceiveCallback(evutil_socket_t fd, short event, void *arg) {
914         // Called by libevent when a datagram is received on the socket
915     Time();
916     RecvDatagram(fd);
917     event_add(&evrecv, NULL);
918 }
919
920 #define NUM_DATAGRAMS 10
921
922 void    Channel::RecvDatagram (evutil_socket_t socket) {
923     struct evbuffer *pevb[NUM_DATAGRAMS];
924         for (int i=0; i<NUM_DATAGRAMS; ++i)
925                 pevb[i] = evbuffer_new();
926     Address addr;
927         //FIXME: make this more readable
928         free(addr.addr);
929         addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + NUM_DATAGRAMS * sizeof(struct mptp_dest));
930         addr.addr->count = NUM_DATAGRAMS;
931     RecvFrom(socket, addr, pevb);
932         int i = 0;
933         for (; i<addr.addr->count; ++i) {
934                 struct evbuffer *evb = pevb[i];
935                 Address fromi;
936                 fromi.addr->dests[0].addr = addr.addr->dests[i].addr;
937                 fromi.addr->dests[0].port = addr.addr->dests[i].port;
938                 size_t evboriglen = evbuffer_get_length(evb);
939 #define return_log(...) { fprintf(stderr,__VA_ARGS__); evbuffer_free(evb); return; }
940                 if (evbuffer_get_length(evb)<4)
941                         return_log("socket layer weird: datagram shorter than 4 bytes from %s (prob ICMP unreach)\n",fromi.str());
942                 uint32_t mych = evbuffer_remove_32be(evb);
943                 Sha1Hash hash;
944                 Channel* channel = NULL;
945                 if (mych==0) { // peer initiates handshake
946                         if (evbuffer_get_length(evb)<1+4+1+4+Sha1Hash::SIZE)
947                                 return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
948                                                 tintstr(),(int)evbuffer_get_length(evb),fromi.str());
949                         uint8_t hashid = evbuffer_remove_8(evb);
950                         if (hashid!=SWIFT_HASH)
951                                 return_log ("%s #0 no hash in the initial handshake %s\n",
952                                                 tintstr(),fromi.str());
953                         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
954                         if (!pos.is_all())
955                                 return_log ("%s #0 that is not the root hash %s\n",tintstr(),fromi.str());
956                         hash = evbuffer_remove_hash(evb);
957                         FileTransfer* ft = FileTransfer::Find(hash);
958                         if (!ft)
959                                 return_log ("%s #0 hash %s unknown, requested by %s\n",tintstr(),hash.hex().c_str(),fromi.str());
960                         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
961
962                         // Arno, 2012-02-27: Check for duplicate channel
963                         Channel* existchannel = ft->FindChannel(fromi,NULL);
964                         if (existchannel)
965                         {
966                                 // Arno: 2011-10-13: Ignore if established, otherwise consider
967                                 // it a concurrent connection attempt.
968                                 if (existchannel->is_established()) {
969                                         // ARNOTODO: Read complete handshake here so we know whether
970                                         // attempt is to new channel or to existing. Currently read
971                                         // in OnHandshake()
972                                         //
973                                         return_log("%s #0 have a channel already to %s\n",tintstr(),fromi.str());
974                                 } else {
975                                         channel = existchannel;
976                                         //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: reuse channel %s\n", channel->peer_.str() );
977                                 }
978                         }
979                         if (channel == NULL) {
980                                 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: create new channel %s\n", addr.str() );
981                                 channel = new Channel(ft, socket, fromi);
982                         }
983                         //fprintf(stderr,"CHANNEL INCOMING DEF hass %s is id %d\n",hash.hex().c_str(),channel->id());
984
985                 } else { // peer responds to my handshake (and other messages)
986                         mych = DecodeID(mych);
987                         if (mych>=channels.size())
988                                 return_log("%s invalid channel #%u, %s\n",tintstr(),mych,fromi.str());
989                         channel = channels[mych];
990                         if (!channel)
991                                 return_log ("%s #%u is already closed\n",tintstr(),mych);
992                         if (channel->IsDiffSenderOrDuplicate(fromi,mych)) {
993                                 channel->Schedule4Close();
994                                 return;
995                         }
996                         channel->own_id_mentioned_ = true;
997                 }
998                 channel->raw_bytes_down_ += evboriglen;
999                 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
1000                 bool wasestablished = channel->is_established();
1001
1002                 dprintf("%s #%u peer %s recv_peer %s addr %s\n", tintstr(),mych, channel->peer().str(), channel->recv_peer().str(), fromi.str() );
1003
1004                 channel->Recv(evb);
1005
1006                 evbuffer_free(evb);
1007                 //SAFECLOSE
1008                 if (wasestablished && !channel->is_established()) {
1009                         // Arno, 2012-01-26: Received an explict close, clean up channel, safely.
1010                         channel->Schedule4Close();
1011                 }
1012         }
1013         for (; i<NUM_DATAGRAMS; ++i)
1014                 evbuffer_free(pevb[i]);
1015 }
1016
1017
1018
1019 /*
1020  * Channel instance methods
1021  */
1022
1023 void Channel::CloseChannelByAddress(const Address &addr)
1024 {
1025         // fprintf(stderr,"CloseChannelByAddress: address is %s\n", addr.str() );
1026     std::vector<Channel *>::iterator iter;
1027     for (iter = channels.begin(); iter != channels.end(); iter++)
1028     {
1029                 Channel *c = *iter;
1030                 if (c != NULL && c->peer_ == addr)
1031                 {
1032                         // ARNOSMPTODO: will do another send attempt before not being
1033                         // Rescheduled.
1034                         c->peer_channel_id_ = 0; // established->false, do no more sending
1035                         c->Schedule4Close();
1036                         break;
1037                 }
1038     }
1039 }
1040
1041
1042 void Channel::Close () {
1043
1044         this->SwitchSendControl(CLOSE_CONTROL);
1045
1046     if (is_established())
1047         this->Send(); // Arno: send explicit close
1048
1049     if (ENABLE_VOD_PIECEPICKER) {
1050                 // Ric: remove it's binmap from the availability
1051                 transfer().availability().remove(id_, ack_in_);
1052     }
1053
1054     // SAFECLOSE
1055     // Arno: ensure LibeventSendCallback is no longer called with ptr to this Channel
1056     ClearEvents();
1057 }
1058
1059
1060 void Channel::Reschedule () {
1061
1062         // Arno: CAREFUL: direct send depends on diff between next_send_time_ and
1063         // NOW to be 0, so any calls to Time in between may put things off. Sigh.
1064         Time();
1065     next_send_time_ = NextSendTime();
1066     if (next_send_time_!=TINT_NEVER) {
1067
1068         assert(next_send_time_<NOW+TINT_MIN);
1069         tint duein = next_send_time_-NOW;
1070         if (duein <= 0) {
1071                 // Arno, 2011-10-18: libevent's timer implementation appears to be
1072                 // really slow, i.e., timers set for 100 usec from now get called
1073                 // at least two times later :-( Hence, for sends after receives
1074                 // perform them directly.
1075                 dprintf("%s #%u requeue direct send\n",tintstr(),id_);
1076                 LibeventSendCallback(-1,EV_TIMEOUT,this);
1077         }
1078         else {
1079                 if (evsend_ptr_ != NULL) {
1080                         struct timeval duetv = *tint2tv(duein);
1081                         evtimer_add(evsend_ptr_,&duetv);
1082                         dprintf("%s #%u requeue for %s in %lli\n",tintstr(),id_,tintstr(next_send_time_), duein);
1083                 }
1084                 else
1085                         dprintf("%s #%u cannot requeue for %s, closed\n",tintstr(),id_,tintstr(next_send_time_));
1086         }
1087     } else {
1088         // SAFECLOSE
1089         dprintf("%s #%u resched, will close\n",tintstr(),id_);
1090                 this->Schedule4Close();
1091     }
1092 }
1093
1094
1095 /*
1096  * Channel class methods
1097  */
1098 void Channel::LibeventSendCallback(int fd, short event, void *arg) {
1099
1100         // Called by libevent when it is the requested send time.
1101     Time();
1102     Channel * sender = (Channel*) arg;
1103     if (NOW<sender->next_send_time_-TINT_MSEC)
1104         dprintf("%s #%u suspicious send %s<%s\n",tintstr(),
1105                 sender->id(),tintstr(NOW),tintstr(sender->next_send_time_));
1106     if (sender->next_send_time_ != TINT_NEVER)
1107         sender->Send();
1108 }
1109