Fix endianess error introduced earlier in RecvDatagram.
[swifty.git] / src / libswift / sendrecv.cpp
1 /*
2  *  sendrecv.cpp
3  *  most of the swift's state machine
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
7  *
8  */
9 #include "bin_utils.h"
10 #include "swift.h"
11 #include <algorithm>  // kill it
12 #include <cassert>
13 #include <math.h>
14 #include <cfloat>
15 #include "compat.h"
16
17 using namespace swift;
18 using namespace std;
19
20 struct event_base *Channel::evbase;
21 struct event Channel::evrecv;
22
23 #define DEBUGTRAFFIC    0
24
25 /** Arno: Victor's design allows a sender to choose some data to push to
26  * a receiver, if that receiver is not HINTing at data. Should be disabled
27  * when the receiver has a download rate limit.
28  */
29 #define ENABLE_SENDERSIZE_PUSH 0
30
31
32 /** Arno, 2011-11-24: When rate limit is on and the download is in progress
33  * we send HINTs for 2 chunks at the moment. This constant can be used to
34  * get greater granularity. Set to 0 for original behaviour.
35  */
36 #define HINT_GRANULARITY        16 // chunks
37
38 /*
39  TODO  25 Oct 18:55
40  - range: ALL
41  - randomized testing of advanced ops (new testcase)
42  */
43
44 void    Channel::AddPeakHashes (struct evbuffer *evb) {
45     for(int i=0; i<file().peak_count(); i++) {
46         bin_t peak = file().peak(i);
47         evbuffer_add_8(evb, SWIFT_HASH);
48         evbuffer_add_32be(evb, bin_toUInt32(peak));
49         evbuffer_add_hash(evb, file().peak_hash(i));
50         char bin_name_buf[32];
51         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str(bin_name_buf));
52     }
53 }
54
55
56 void    Channel::AddUncleHashes (struct evbuffer *evb, bin_t pos) {
57
58     char bin_name_buf2[32];
59     dprintf("%s #%u +uncle hash for %s\n",tintstr(),id_,pos.str(bin_name_buf2));
60
61     bin_t peak = file().peak_for(pos);
62     while (pos!=peak && ((NOW&3)==3 || !pos.parent().contains(data_out_cap_)) &&
63             ack_in_.is_empty(pos.parent()) ) {
64         bin_t uncle = pos.sibling();
65         evbuffer_add_8(evb, SWIFT_HASH);
66         evbuffer_add_32be(evb, bin_toUInt32(uncle));
67         evbuffer_add_hash(evb,  file().hash(uncle) );
68         char bin_name_buf[32];
69         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str(bin_name_buf));
70         pos = pos.parent();
71     }
72 }
73
74
75 bin_t           Channel::ImposeHint () {
76     uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
77
78     twist &= file().peak(0).toUInt(); // FIXME may make it semi-seq here
79
80     bin_t my_pick = binmap_t::find_complement(ack_in_, file().ack_out(), twist);
81
82     my_pick.to_twisted(twist);
83     while (my_pick.base_length()>max(1,(int)cwnd_))
84         my_pick = my_pick.left();
85
86     return my_pick.twisted(twist);
87 }
88
89
90 bin_t        Channel::DequeueHint (bool *retransmitptr) {
91     bin_t send = bin_t::NONE;
92
93     // Arno, 2012-01-23: Extra protection against channel loss, don't send DATA
94     if (last_recv_time_ < NOW-(3*TINT_SEC))
95         return bin_t::NONE;
96
97     // Arno, 2012-01-18: Reenable Victor's retransmit
98     if (!data_out_tmo_.empty())
99     {
100         tintbin tb = data_out_tmo_.front();
101         send = tb.bin;
102         data_out_tmo_.pop_front();  
103         *retransmitptr = true;
104     }
105     else
106         *retransmitptr = false;
107
108     if (ENABLE_SENDERSIZE_PUSH && send.is_none() && hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
109         bin_t my_pick = ImposeHint(); // FIXME move to the loop
110         if (!my_pick.is_none()) {
111             hint_in_.push_back(my_pick);
112             char bin_name_buf[32];
113             dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str(bin_name_buf));
114         }
115     }
116     
117     while (!hint_in_.empty() && send.is_none()) {
118         bin_t hint = hint_in_.front().bin;
119         tint time = hint_in_.front().time;
120         hint_in_.pop_front();
121         while (!hint.is_base()) { // FIXME optimize; possible attack
122             hint_in_.push_front(tintbin(time,hint.right()));
123             hint = hint.left();
124         }
125         //if (time < NOW-TINT_SEC*3/2 )
126         //    continue;  bad idea
127         if (!ack_in_.is_filled(hint))
128             send = hint;
129     }
130     uint64_t mass = 0;
131     // Arno, 2012-03-09: Is mucho expensive on busy server.
132     //for(int i=0; i<hint_in_.size(); i++)
133     //    mass += hint_in_[i].bin.base_length();
134     char bin_name_buf[32];
135     dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(bin_name_buf),mass);
136     return send;
137 }
138
139
140 void    Channel::AddHandshake (struct evbuffer *evb) {
141     if (!peer_channel_id_) { // initiating
142         evbuffer_add_8(evb, SWIFT_HASH);
143         evbuffer_add_32be(evb, bin_toUInt32(bin_t::ALL));
144         evbuffer_add_hash(evb, file().root_hash());
145         dprintf("%s #%u +hash ALL %s\n",
146                 tintstr(),id_,file().root_hash().hex().c_str());
147     }
148     evbuffer_add_8(evb, SWIFT_HANDSHAKE);
149     int encoded = -1;
150     if (send_control_==CLOSE_CONTROL) {
151         encoded = 0;
152     }
153     else
154         encoded = EncodeID(id_);
155     evbuffer_add_32be(evb, encoded);
156     dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
157     have_out_.clear();
158 }
159
160
161 void    Channel::Send () {
162
163         dprintf("%s #%u Send called \n",tintstr(),id_);
164
165     struct evbuffer *evb = evbuffer_new();
166     evbuffer_add_32be(evb, peer_channel_id_);
167     bin_t data = bin_t::NONE;
168     int evbnonadplen = 0;
169     if ( is_established() ) {
170         if (send_control_!=CLOSE_CONTROL) {
171                         // FIXME: seeder check
172                         AddHave(evb);
173                         AddAck(evb);
174                         if (!file().is_complete()) {
175                                 AddHint(evb);
176                                 /* Gertjan fix: 7aeea65f3efbb9013f601b22a57ee4a423f1a94d
177                                 "Only call Reschedule for 'reverse PEX' if the channel is in keep-alive mode"
178                                  */
179                                 AddPexReq(evb);
180                         }
181                         AddPex(evb);
182                         TimeoutDataOut();
183                         data = AddData(evb);
184         } else {
185                 // Arno: send explicit close
186                 AddHandshake(evb);
187         }
188     } else {
189         AddHandshake(evb);
190         AddHave(evb); // Arno, 2011-10-28: from AddHandShake. Why double?
191         AddHave(evb);
192         AddAck(evb);
193     }
194
195     lastsendwaskeepalive_ = (evbuffer_get_length(evb) == 4);
196
197     if (evbuffer_get_length(evb)==4) {// only the channel id; bare keep-alive
198         data = bin_t::ALL;
199     }
200     dprintf("%s #%u sent %ib %s:%x\n",
201             tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
202             peer_channel_id_);
203     int r = SendTo(socket_,peer(),evb);
204     if (r==-1)
205         print_error("can't send datagram");
206     else
207         raw_bytes_up_ += r;
208     last_send_time_ = NOW;
209     sent_since_recv_++;
210     dgrams_sent_++;
211     evbuffer_free(evb);
212     Reschedule();
213 }
214
215 void    Channel::AddHint (struct evbuffer *evb) {
216
217         // RATELIMIT
218         // Policy is to not send hints when we are above speed limit
219         if (transfer().GetCurrentSpeed(DDIR_DOWNLOAD) > transfer().GetMaxSpeed(DDIR_DOWNLOAD)) {
220                 if (DEBUGTRAFFIC)
221                         fprintf(stderr,"hint: forbidden#");
222                 return;
223         }
224
225
226         // 1. Calc max of what we are allowed to request, uncongested bandwidth wise
227     tint plan_for = max(TINT_SEC,rtt_avg_*4);
228
229     tint timed_out = NOW - plan_for*2;
230     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
231         hint_out_size_ -= hint_out_.front().bin.base_length();
232         hint_out_.pop_front();
233     }
234
235     int first_plan_pck = max ( (tint)1, plan_for / dip_avg_ );
236
237     // Riccardo, 2012-04-04: Actually allowed is max minus what we already asked for
238     int queue_allowed_hints = max(0,first_plan_pck-(int)hint_out_size_);
239
240
241     // RATELIMIT
242     // 2. Calc max of what is allowed by the rate limiter
243     int rate_allowed_hints = LONG_MAX;
244     if (transfer().GetMaxSpeed(DDIR_DOWNLOAD) < DBL_MAX)
245     {
246                 uint64_t rough_global_hint_out_size = 0; // rough estimate, as hint_out_ clean up is not done for all channels
247                 std::set<Channel *>::iterator iter;
248                 for (iter=transfer().mychannels_.begin(); iter!=transfer().mychannels_.end(); iter++)
249                 {
250                         Channel *c = *iter;
251                         if (c != NULL)
252                                 rough_global_hint_out_size += c->hint_out_size_;
253                 }
254
255                 // Policy: this channel is allowed to hint at the limit - global_hinted_at
256                 // Handle MaxSpeed = unlimited
257                 double rate_hints_limit_float = transfer().GetMaxSpeed(DDIR_DOWNLOAD)/((double)file().chunk_size());
258
259                 int rate_hints_limit = (int)min((double)LONG_MAX,rate_hints_limit_float);
260
261                 // Actually allowed is max minus what we already asked for, globally (=all channels)
262                 rate_allowed_hints = max(0,rate_hints_limit-(int)rough_global_hint_out_size);
263     }
264
265     // 3. Take the smallest allowance from rate and queue limit
266     uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,queue_allowed_hints);
267
268     // 4. Ask allowance in blocks of chunks to get pipelining going from serving peer.
269     if (hint_out_size_ == 0 || plan_pck > HINT_GRANULARITY)
270     {
271         bin_t hint = transfer().picker().Pick(ack_in_,plan_pck,NOW+plan_for*2);
272         if (!hint.is_none()) {
273                 if (DEBUGTRAFFIC)
274                 {
275                         char binstr[32];
276                         fprintf(stderr,"hint c%d: ask %s\n", id(), hint.str(binstr) );
277                 }
278             evbuffer_add_8(evb, SWIFT_HINT);
279             evbuffer_add_32be(evb, bin_toUInt32(hint));
280             char bin_name_buf[32];
281             dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(bin_name_buf),hint_out_size_);
282             hint_out_.push_back(hint);
283             hint_out_size_ += hint.base_length();
284             //fprintf(stderr,"send c%d: HINTLEN %i\n", id(), hint.base_length());
285             //fprintf(stderr,"HL %i ", hint.base_length());
286         }
287         else
288             dprintf("%s #%u Xhint\n",tintstr(),id_);
289
290     }
291 }
292
293
294 bin_t        Channel::AddData (struct evbuffer *evb) {
295         // RATELIMIT
296         if (transfer().GetCurrentSpeed(DDIR_UPLOAD) > transfer().GetMaxSpeed(DDIR_UPLOAD)) {
297                 transfer().OnSendNoData();
298                 return bin_t::NONE;
299         }
300
301     if (!file().size()) // know nothing
302         return bin_t::NONE;
303
304     bin_t tosend = bin_t::NONE;
305     bool isretransmit = false;
306     tint luft = send_interval_>>4; // may wake up a bit earlier
307     if (data_out_.size()<cwnd_ &&
308             last_data_out_time_+send_interval_<=NOW+luft) {
309         tosend = DequeueHint(&isretransmit);
310         if (tosend.is_none()) {
311             dprintf("%s #%u sendctrl no idea what data to send\n",tintstr(),id_);
312             if (send_control_!=KEEP_ALIVE_CONTROL && send_control_!=CLOSE_CONTROL)
313                 SwitchSendControl(KEEP_ALIVE_CONTROL);
314         }
315     } else
316         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
317                 tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
318
319     if (tosend.is_none())// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
320         return bin_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
321
322     if (ack_in_.is_empty() && file().size())
323         AddPeakHashes(evb);
324
325     //NETWVSHASH
326     if (file().get_check_netwvshash())
327         AddUncleHashes(evb,tosend);
328
329     if (!ack_in_.is_empty()) // TODO: cwnd_>1
330         data_out_cap_ = tosend;
331
332     // Arno, 2011-11-03: May happen when first data packet is sent to empty
333     // leech, then peak + uncle hashes may be so big that they don't fit in eth
334     // frame with DATA. Send 2 datagrams then, one with peaks so they have
335     // a better chance of arriving. Optimistic violation of atomic datagram
336     // principle.
337     if (file().chunk_size() == SWIFT_DEFAULT_CHUNK_SIZE && evbuffer_get_length(evb) > SWIFT_MAX_NONDATA_DGRAM_SIZE) {
338         dprintf("%s #%u fsent %ib %s:%x\n",
339                 tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
340                 peer_channel_id_);
341         int ret = Channel::SendTo(socket_,peer(),evb); // kind of fragmentation
342         if (ret > 0)
343                 raw_bytes_up_ += ret;
344         evbuffer_add_32be(evb, peer_channel_id_);
345     }
346
347     if (file().chunk_size() != SWIFT_DEFAULT_CHUNK_SIZE && isretransmit) {
348         /* FRAGRAND
349          * Arno, 2012-01-17: We observe strange behaviour when using
350          * fragmented UDP packets. When ULANC sends a specific datagram ("995"),
351          * the 2nd IP packet carrying it gets lost structurally. When
352          * downloading from the same asset hosted on a Linux 32-bit machine
353          * using a Win7 32-bit client (behind a NAT), one specific full
354          * datagram never gets delivered (6970 one before do). A workaround
355          * is to add some random data to the datagram. Hence we introduce
356          * the SWIFT_RANDOMIZE message, that is added to the datagram carrying
357          * the DATA on a retransmit.
358          */
359              char binstr[32];
360          fprintf(stderr,"AddData: retransmit of randomized chunk %s\n",tosend.str(binstr) );
361          evbuffer_add_8(evb, SWIFT_RANDOMIZE);
362          evbuffer_add_32be(evb, (int)rand() );
363     }
364
365     evbuffer_add_8(evb, SWIFT_DATA);
366     evbuffer_add_32be(evb, bin_toUInt32(tosend));
367
368     struct evbuffer_iovec vec;
369     if (evbuffer_reserve_space(evb, file().chunk_size(), &vec, 1) < 0) {
370         print_error("error on evbuffer_reserve_space");
371         return bin_t::NONE;
372     }
373     size_t r = pread(file().file_descriptor(),(char *)vec.iov_base,
374                      file().chunk_size(),tosend.base_offset()*file().chunk_size());
375     // TODO: corrupted data, retries, caching
376     if (r<0) {
377         print_error("error on reading");
378         vec.iov_len = 0;
379         evbuffer_commit_space(evb, &vec, 1);
380         return bin_t::NONE;
381     }
382     // assert(dgram.space()>=r+4+1);
383     vec.iov_len = r;
384     if (evbuffer_commit_space(evb, &vec, 1) < 0) {
385         print_error("error on evbuffer_commit_space");
386         return bin_t::NONE;
387     }
388
389     last_data_out_time_ = NOW;
390     data_out_.push_back(tosend);
391     bytes_up_ += r;
392     global_bytes_up += r;
393
394     char bin_name_buf[32];
395     dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str(bin_name_buf));
396
397     // RATELIMIT
398     // ARNOSMPTODO: count overhead bytes too? Move to Send() then.
399         transfer_->OnSendData(file().chunk_size());
400
401     return tosend;
402 }
403
404
405 void    Channel::AddAck (struct evbuffer *evb) {
406     if (data_in_==tintbin())
407         //if (data_in_.bin==bin64_t::NONE)
408         return;
409     // sometimes, we send a HAVE (e.g. in case the peer did repetitive send)
410     evbuffer_add_8(evb, data_in_.time==TINT_NEVER?SWIFT_HAVE:SWIFT_ACK);
411     evbuffer_add_32be(evb, bin_toUInt32(data_in_.bin));
412     if (data_in_.time!=TINT_NEVER)
413         evbuffer_add_64be(evb, data_in_.time);
414
415
416         if (DEBUGTRAFFIC)
417                 fprintf(stderr,"send c%d: ACK %i\n", id(), bin_toUInt32(data_in_.bin));
418
419     have_out_.set(data_in_.bin);
420     char bin_name_buf[32];
421     dprintf("%s #%u +ack %s %s\n",
422         tintstr(),id_,data_in_.bin.str(bin_name_buf),tintstr(data_in_.time));
423     if (data_in_.bin.layer()>2)
424         data_in_dbl_ = data_in_.bin;
425
426     //fprintf(stderr,"data_in_ c%d\n", id() );
427     data_in_ = tintbin();
428     //data_in_ = tintbin(NOW,bin64_t::NONE);
429 }
430
431
432 void    Channel::AddHave (struct evbuffer *evb) {
433     if (!data_in_dbl_.is_none()) { // TODO: do redundancy better
434         evbuffer_add_8(evb, SWIFT_HAVE);
435         evbuffer_add_32be(evb, bin_toUInt32(data_in_dbl_));
436         data_in_dbl_=bin_t::NONE;
437     }
438     if (DEBUGTRAFFIC)
439                 fprintf(stderr,"send c%d: HAVE ",id() );
440     for(int count=0; count<4; count++) {
441         bin_t ack = binmap_t::find_complement(have_out_, file().ack_out(), 0); // FIXME: do rotating queue
442         if (ack.is_none())
443             break;
444         ack = file().ack_out().cover(ack);
445         have_out_.set(ack);
446         evbuffer_add_8(evb, SWIFT_HAVE);
447         evbuffer_add_32be(evb, bin_toUInt32(ack));
448
449         if (DEBUGTRAFFIC)
450                 fprintf(stderr," %i", bin_toUInt32(ack));
451
452         char bin_name_buf[32];
453         dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str(bin_name_buf));
454     }
455         if (DEBUGTRAFFIC)
456                 fprintf(stderr,"\n");
457
458 }
459
460
461 void    Channel::Recv (struct evbuffer *evb) {
462     dprintf("%s #%u recvd %ib\n",tintstr(),id_,(int)evbuffer_get_length(evb)+4);
463     dgrams_rcvd_++;
464
465     lastrecvwaskeepalive_ = (evbuffer_get_length(evb) == 0);
466     if (lastrecvwaskeepalive_)
467         // Update speed measurements such that they decrease when DL stops
468         transfer().OnRecvData(0);
469
470     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
471         rtt_avg_ = NOW - last_send_time_;
472         dev_avg_ = rtt_avg_;
473         dip_avg_ = rtt_avg_;
474         dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
475     }
476
477     bin_t data = evbuffer_get_length(evb) ? bin_t::NONE : bin_t::ALL;
478
479         if (DEBUGTRAFFIC)
480                 fprintf(stderr,"recv c%d: size %d ", id(), evbuffer_get_length(evb));
481
482         while (evbuffer_get_length(evb)) {
483         uint8_t type = evbuffer_remove_8(evb);
484
485         if (DEBUGTRAFFIC)
486                 fprintf(stderr," %d", type);
487
488         switch (type) {
489             case SWIFT_HANDSHAKE: OnHandshake(evb); break;
490             case SWIFT_DATA:      data=OnData(evb); break;
491             case SWIFT_HAVE:      OnHave(evb); break;
492             case SWIFT_ACK:       OnAck(evb); break;
493             case SWIFT_HASH:      OnHash(evb); break;
494             case SWIFT_HINT:      OnHint(evb); break;
495             case SWIFT_PEX_ADD:   OnPex(evb); break;
496             case SWIFT_PEX_REQ:   OnPexReq(); break;
497             case SWIFT_RANDOMIZE: OnRandomize(evb); break; //FRAGRAND
498             default:
499                 dprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
500                 return;
501         }
502     }
503     if (DEBUGTRAFFIC)
504     {
505         fprintf(stderr,"\n");
506     }
507
508
509     last_recv_time_ = NOW;
510     sent_since_recv_ = 0;
511     Reschedule();
512 }
513
514 /*
515  * Arno: FAXME: HASH+DATA should be handled as a transaction: only when the
516  * hashes check out should they be stored in the hashtree, otherwise revert.
517  */
518 void    Channel::OnHash (struct evbuffer *evb) {
519         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
520     Sha1Hash hash = evbuffer_remove_hash(evb);
521     file().OfferHash(pos,hash);
522     char bin_name_buf[32];
523     dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str(bin_name_buf));
524
525     //fprintf(stderr,"HASH %lli hex %s\n",pos.toUInt(), hash.hex().c_str() );
526 }
527
528
529 void    Channel::CleanHintOut (bin_t pos) {
530     int hi = 0;
531     while (hi<hint_out_.size() && !hint_out_[hi].bin.contains(pos))
532         hi++;
533     if (hi==hint_out_.size())
534         return; // something not hinted or hinted in far past
535     while (hi--) { // removing likely snubbed hints
536         hint_out_size_ -= hint_out_.front().bin.base_length();
537         hint_out_.pop_front();
538     }
539     while (hint_out_.front().bin!=pos) {
540         tintbin f = hint_out_.front();
541
542         assert (f.bin.contains(pos));
543
544         if (pos < f.bin) {
545             f.bin.to_left();
546         } else {
547             f.bin.to_right();
548         }
549
550         hint_out_.front().bin = f.bin.sibling();
551         hint_out_.push_front(f);
552     }
553     hint_out_.pop_front();
554     hint_out_size_--;
555 }
556
557
558 bin_t Channel::OnData (struct evbuffer *evb) {  // TODO: HAVE NONE for corrupted data
559
560         char bin_name_buf[32];
561         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
562
563     // Arno: Assuming DATA last message in datagram
564     if (evbuffer_get_length(evb) > file().chunk_size()) {
565         dprintf("%s #%u !data chunk size mismatch %s: exp %lu got " PRISIZET "\n",tintstr(),id_,pos.str(bin_name_buf), file().chunk_size(), evbuffer_get_length(evb));
566         fprintf(stderr,"WARNING: chunk size mismatch: exp %lu got " PRISIZET "\n",file().chunk_size(), evbuffer_get_length(evb));
567     }
568
569     int length = (evbuffer_get_length(evb) < file().chunk_size()) ? evbuffer_get_length(evb) : file().chunk_size();
570     if (!file().ack_out().is_empty(pos)) {
571         // Arno, 2012-01-24: print message for duplicate
572         dprintf("%s #%u Ddata %s\n",tintstr(),id_,pos.str(bin_name_buf));
573         evbuffer_drain(evb, length);
574         data_in_ = tintbin(TINT_NEVER,transfer().ack_out().cover(pos));
575
576         // Arno, 2012-01-24: Make sure data interarrival periods don't get
577         // screwed up because of these (ignored) duplicates.
578         UpdateDIP(pos);
579         return bin_t::NONE;
580     }
581     uint8_t *data = evbuffer_pullup(evb, length);
582     data_in_ = tintbin(NOW,bin_t::NONE);
583     if (!file().OfferData(pos, (char*)data, length)) {
584         evbuffer_drain(evb, length);
585         char bin_name_buf[32];
586         dprintf("%s #%u !data %s\n",tintstr(),id_,pos.str(bin_name_buf));
587         return bin_t::NONE;
588     }
589     evbuffer_drain(evb, length);
590     dprintf("%s #%u -data %s\n",tintstr(),id_,pos.str(bin_name_buf));
591
592     if (DEBUGTRAFFIC)
593         fprintf(stderr,"$ ");
594
595     bin_t cover = transfer().ack_out().cover(pos);
596     for(int i=0; i<transfer().cb_installed; i++)
597         if (cover.layer()>=transfer().cb_agg[i])
598             transfer().callbacks[i](transfer().fd(),cover);  // FIXME
599     if (cover.layer() >= 5) // Arno: tested with 32K, presently = 2 ** 5 * chunk_size CHUNKSIZE
600         transfer().OnRecvData( pow((double)2,(double)5)*((double)file().chunk_size()) );
601     data_in_.bin = pos;
602
603     UpdateDIP(pos);
604     CleanHintOut(pos);
605     bytes_down_ += length;
606     global_bytes_down += length;
607     return pos;
608 }
609
610
611 void Channel::UpdateDIP(bin_t pos)
612 {
613         if (!pos.is_none()) {
614                 if (last_data_in_time_) {
615                         tint dip = NOW - last_data_in_time_;
616                         dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
617                 }
618                 last_data_in_time_ = NOW;
619         }
620 }
621
622
623 void    Channel::OnAck (struct evbuffer *evb) {
624     bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb));
625     tint peer_time = evbuffer_remove_64be(evb); // FIXME 32
626     // FIXME FIXME: wrap around here
627     if (ackd_pos.is_none())
628         return; // likely, broken chunk/ insufficient hashes
629     if (file().size() && ackd_pos.base_offset()>=file().size_in_chunks()) {
630         char bin_name_buf[32];
631         eprintf("invalid ack: %s\n",ackd_pos.str(bin_name_buf));
632         return;
633     }
634     ack_in_.set(ackd_pos);
635
636     //fprintf(stderr,"OnAck: got bin %s is_complete %d\n", ackd_pos.str(), (int)ack_in_.is_complete_arno( file().ack_out().get_height() ));
637
638     int di = 0, ri = 0;
639     // find an entry for the send (data out) event
640     while (  di<data_out_.size() && ( data_out_[di]==tintbin() ||
641            !ackd_pos.contains(data_out_[di].bin) )  )
642         di++;
643     // FUTURE: delayed acks
644     // rule out retransmits
645     while (  ri<data_out_tmo_.size() && !ackd_pos.contains(data_out_tmo_[ri].bin) )
646         ri++;
647     char bin_name_buf[32];
648     dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
649             di==data_out_.size()?'?':'-',ackd_pos.str(bin_name_buf),peer_time);
650     if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
651             // round trip time calculations
652         tint rtt = NOW-data_out_[di].time;
653         rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
654         dev_avg_ = ( dev_avg_*3 + tintabs(rtt-rtt_avg_) ) >> 2;
655         assert(data_out_[di].time!=TINT_NEVER);
656             // one-way delay calculations
657         tint owd = peer_time - data_out_[di].time;
658         owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
659         owd_current_[owd_cur_bin_] = owd;
660         if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
661             owd_min_bin_start_ = NOW;
662             owd_min_bin_ = (owd_min_bin_+1) & 3;
663             owd_min_bins_[owd_min_bin_] = TINT_NEVER;
664         }
665         if (owd_min_bins_[owd_min_bin_]>owd)
666             owd_min_bins_[owd_min_bin_] = owd;
667         dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
668                 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str(bin_name_buf));
669         ack_rcvd_recent_++;
670         // early loss detection by packet reordering
671         for (int re=0; re<di-MAX_REORDERING; re++) {
672             if (data_out_[re]==tintbin())
673                 continue;
674             ack_not_rcvd_recent_++;
675             data_out_tmo_.push_back(data_out_[re].bin);
676             dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str(bin_name_buf));
677             data_out_cap_ = bin_t::ALL;
678             data_out_[re] = tintbin();
679         }
680     }
681     if (di!=data_out_.size())
682         data_out_[di]=tintbin();
683     // clear zeroed items
684     while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
685             ack_in_.is_filled(data_out_.front().bin) ) )
686         data_out_.pop_front();
687     assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
688 }
689
690
691 void Channel::TimeoutDataOut ( ) {
692     // losses: timeouted packets
693     tint timeout = NOW - ack_timeout();
694     while (!data_out_.empty() &&
695         ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
696         if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
697             ack_not_rcvd_recent_++;
698             data_out_cap_ = bin_t::ALL;
699             data_out_tmo_.push_back(data_out_.front().bin);
700             char bin_name_buf[32];
701             dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str(bin_name_buf));
702         }
703         data_out_.pop_front();
704     }
705     // clear retransmit queue of older items
706     while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
707         data_out_tmo_.pop_front();
708 }
709
710
711 void Channel::OnHave (struct evbuffer *evb) {
712     bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb));
713     if (ackd_pos.is_none())
714         return; // wow, peer has hashes
715
716     // PPPLUG
717     if (ENABLE_VOD_PIECEPICKER) {
718                 // Ric: check if we should set the size in the file transfer
719                 if (transfer().availability().size() <= 0 && file().size() > 0)
720                 {
721                         transfer().availability().setSize(file().size_in_chunks());
722                 }
723                 // Ric: update the availability if needed
724                 transfer().availability().set(id_, ack_in_, ackd_pos);
725     }
726
727     ack_in_.set(ackd_pos);
728     char bin_name_buf[32];
729     dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str(bin_name_buf));
730
731     //fprintf(stderr,"OnHave: got bin %s is_complete %d\n", ackd_pos.str(), IsComplete() );
732
733 }
734
735
736 void    Channel::OnHint (struct evbuffer *evb) {
737     bin_t hint = bin_fromUInt32(evbuffer_remove_32be(evb));
738     // FIXME: wake up here
739     hint_in_.push_back(hint);
740     char bin_name_buf[32];
741     dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str(bin_name_buf));
742 }
743
744
745 void Channel::OnHandshake (struct evbuffer *evb) {
746
747         uint32_t pcid = evbuffer_remove_32be(evb);
748     dprintf("%s #%u -hs %x\n",tintstr(),id_,pcid);
749
750     if (is_established() && pcid == 0) {
751         // Arno: received explicit close
752         peer_channel_id_ = 0; // == established -> false
753         Close();
754         return;
755     }
756
757     peer_channel_id_ = pcid;
758     // self-connection check
759     if (!SELF_CONN_OK) {
760         uint32_t try_id = DecodeID(peer_channel_id_);
761         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
762             peer_channel_id_ = 0;
763             Close();
764             return; // this is a self-connection
765         }
766     }
767
768     // FUTURE: channel forking
769     if (is_established())
770         dprintf("%s #%u established %s\n", tintstr(), id_, peer().str());
771 }
772
773
774 void Channel::OnPex (struct evbuffer *evb) {
775     uint32_t ipv4 = evbuffer_remove_32be(evb);
776     uint16_t port = evbuffer_remove_16be(evb);
777     Address addr(ipv4,port);
778     dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
779     if (transfer().OnPexIn(addr))
780         useless_pex_count_ = 0;
781     else
782     {
783                 dprintf("%s #%u already channel to %s\n", tintstr(),id_,addr.str());
784         useless_pex_count_++;
785     }
786     pex_request_outstanding_ = false;
787 }
788
789
790 //FRAGRAND
791 void Channel::OnRandomize (struct evbuffer *evb) {
792     dprintf("%s #%u -rand\n",tintstr(),id_ );
793         // Payload is 4 random bytes
794     uint32_t r = evbuffer_remove_32be(evb);
795 }
796
797
798 void    Channel::AddPex (struct evbuffer *evb) {
799         // Gertjan fix: Reverse PEX
800     // PEX messages sent to facilitate NAT/FW puncturing get priority
801     if (!reverse_pex_out_.empty()) {
802         do {
803             tintbin pex_peer = reverse_pex_out_.front();
804             reverse_pex_out_.pop_front();
805             if (channels[(int) pex_peer.bin.toUInt()] == NULL)
806                 continue;
807             Address a = channels[(int) pex_peer.bin.toUInt()]->peer();
808             // Arno, 2012-02-28: Don't send private addresses to non-private peers.
809             if (!a.is_private() || (a.is_private() && peer().is_private()))
810             {
811                 evbuffer_add_8(evb, SWIFT_PEX_ADD);
812                 evbuffer_add_32be(evb, a.ipv4());
813                 evbuffer_add_16be(evb, a.port());
814                 dprintf("%s #%u +pex (reverse) %s\n",tintstr(),id_,a.str());
815             }
816         } while (!reverse_pex_out_.empty() && (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb)) >= 7);
817
818         // Arno: 2012-02-23: Don't think this is right. Bit of DoS thing,
819         // that you only get back the addr of people that got your addr.
820         // Disable for now.
821         //return;
822     }
823
824     if (!pex_requested_)
825         return;
826
827     // Arno, 2012-02-28: Don't send private addresses to non-private peers.
828     int chid = 0, tries=0;
829     Address a;
830     while (true)
831     {
832         // Arno, 2011-10-03: Choosing Gertjan's RandomChannel over RevealChannel here.
833         chid = transfer().RandomChannel(id_);
834         if (chid==-1 || chid==id_ || tries > 5) {
835                 pex_requested_ = false;
836                 return;
837         }
838         a = channels[chid]->peer();
839         if (!a.is_private() || (a.is_private() && peer().is_private()))
840                 break;
841         tries++;
842     }
843
844     evbuffer_add_8(evb, SWIFT_PEX_ADD);
845     evbuffer_add_32be(evb, a.ipv4());
846     evbuffer_add_16be(evb, a.port());
847     dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
848
849     pex_requested_ = false;
850     /* Ensure that we don't add the same id to the reverse_pex_out_ queue
851        more than once. */
852     for (tbqueue::iterator i = channels[chid]->reverse_pex_out_.begin();
853             i != channels[chid]->reverse_pex_out_.end(); i++)
854         if ((int) (i->bin.toUInt()) == id_)
855             return;
856
857     dprintf("%s #%u adding pex for channel %u at time %s\n", tintstr(), chid,
858         id_, tintstr(NOW + 2 * TINT_SEC));
859     // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
860     channels[chid]->reverse_pex_out_.push_back(tintbin(NOW + 2 * TINT_SEC, bin_t(id_)));
861     if (channels[chid]->send_control_ == KEEP_ALIVE_CONTROL &&
862             channels[chid]->next_send_time_ > NOW + 2 * TINT_SEC)
863         channels[chid]->Reschedule();
864 }
865
866 void Channel::OnPexReq(void) {
867     dprintf("%s #%u -pex req\n", tintstr(), id_);
868     if (NOW > MIN_PEX_REQUEST_INTERVAL + last_pex_request_time_)
869         pex_requested_ = true;
870 }
871
872 void Channel::AddPexReq(struct evbuffer *evb) {
873     // Rate limit the number of PEX requests
874     if (NOW < next_pex_request_time_)
875         return;
876
877     // If no answer has been received from a previous request, count it as useless
878     if (pex_request_outstanding_)
879         useless_pex_count_++;
880
881     pex_request_outstanding_ = false;
882
883     // Initiate at most SWIFT_MAX_CONNECTIONS connections
884     if (transfer().hs_in_.size() >= SWIFT_MAX_CONNECTIONS ||
885             // Check whether this channel has been providing useful peer information
886             useless_pex_count_ > 2)
887     {
888         // Arno, 2012-02-23: Fix: Code doesn't recover from useless_pex_count_ > 2,
889         // let's just try again in 30s
890                 useless_pex_count_ = 0;
891                 next_pex_request_time_ = NOW + 30 * TINT_SEC;
892
893         return;
894     }
895
896     dprintf("%s #%u +pex req\n", tintstr(), id_);
897     evbuffer_add_8(evb, SWIFT_PEX_REQ);
898     /* Add a little more than the minimum interval, such that the other party is
899        less likely to drop it due to too high rate */
900     next_pex_request_time_ = NOW + MIN_PEX_REQUEST_INTERVAL * 1.1;
901     pex_request_outstanding_ = true;
902 }
903
904
905
906 /*
907  * Channel class methods
908  */
909
910 void Channel::LibeventReceiveCallback(evutil_socket_t fd, short event, void *arg) {
911         // Called by libevent when a datagram is received on the socket
912     Time();
913     RecvDatagram(fd);
914     event_add(&evrecv, NULL);
915 }
916
917 #define NUM_DATAGRAMS 10
918
919 void    Channel::RecvDatagram (evutil_socket_t socket) {
920     struct evbuffer *pevb[NUM_DATAGRAMS];
921         for (int i=0; i<NUM_DATAGRAMS; ++i)
922                 pevb[i] = evbuffer_new();
923     Address addr;
924         //FIXME: make this more readable
925         free(addr.addr);
926         addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + NUM_DATAGRAMS * sizeof(struct mptp_dest));
927         addr.addr->count = NUM_DATAGRAMS;
928     RecvFrom(socket, addr, pevb);
929         int i = 0;
930         for (; i<addr.addr->count; ++i) {
931                 struct evbuffer *evb = pevb[i];
932                 Address fromi;
933                 fromi.addr->dests[0].addr = addr.addr->dests[i].addr;
934                 fromi.addr->dests[0].port = addr.addr->dests[i].port;
935                 size_t evboriglen = evbuffer_get_length(evb);
936 #define return_log(...) { fprintf(stderr,__VA_ARGS__); evbuffer_free(evb); return; }
937                 if (evbuffer_get_length(evb)<4)
938                         return_log("socket layer weird: datagram shorter than 4 bytes from %s (prob ICMP unreach)\n",fromi.str());
939                 uint32_t mych = evbuffer_remove_32be(evb);
940                 Sha1Hash hash;
941                 Channel* channel = NULL;
942                 if (mych==0) { // peer initiates handshake
943                         if (evbuffer_get_length(evb)<1+4+1+4+Sha1Hash::SIZE)
944                                 return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
945                                                 tintstr(),(int)evbuffer_get_length(evb),fromi.str());
946                         uint8_t hashid = evbuffer_remove_8(evb);
947                         if (hashid!=SWIFT_HASH)
948                                 return_log ("%s #0 no hash in the initial handshake %s\n",
949                                                 tintstr(),fromi.str());
950                         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
951                         if (!pos.is_all())
952                                 return_log ("%s #0 that is not the root hash %s\n",tintstr(),fromi.str());
953                         hash = evbuffer_remove_hash(evb);
954                         FileTransfer* ft = FileTransfer::Find(hash);
955                         if (!ft)
956                                 return_log ("%s #0 hash %s unknown, requested by %s\n",tintstr(),hash.hex().c_str(),fromi.str());
957                         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
958
959                         // Arno, 2012-02-27: Check for duplicate channel
960                         Channel* existchannel = ft->FindChannel(fromi,NULL);
961                         if (existchannel)
962                         {
963                                 // Arno: 2011-10-13: Ignore if established, otherwise consider
964                                 // it a concurrent connection attempt.
965                                 if (existchannel->is_established()) {
966                                         // ARNOTODO: Read complete handshake here so we know whether
967                                         // attempt is to new channel or to existing. Currently read
968                                         // in OnHandshake()
969                                         //
970                                         return_log("%s #0 have a channel already to %s\n",tintstr(),fromi.str());
971                                 } else {
972                                         channel = existchannel;
973                                         //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: reuse channel %s\n", channel->peer_.str() );
974                                 }
975                         }
976                         if (channel == NULL) {
977                                 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: create new channel %s\n", addr.str() );
978                                 channel = new Channel(ft, socket, fromi);
979                         }
980                         //fprintf(stderr,"CHANNEL INCOMING DEF hass %s is id %d\n",hash.hex().c_str(),channel->id());
981
982                 } else { // peer responds to my handshake (and other messages)
983                         mych = DecodeID(mych);
984                         if (mych>=channels.size())
985                                 return_log("%s invalid channel #%u, %s\n",tintstr(),mych,fromi.str());
986                         channel = channels[mych];
987                         if (!channel)
988                                 return_log ("%s #%u is already closed\n",tintstr(),mych);
989                         if (channel->IsDiffSenderOrDuplicate(fromi,mych)) {
990                                 channel->Schedule4Close();
991                                 return;
992                         }
993                         channel->own_id_mentioned_ = true;
994                 }
995                 channel->raw_bytes_down_ += evboriglen;
996                 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
997                 bool wasestablished = channel->is_established();
998
999                 dprintf("%s #%u peer %s recv_peer %s addr %s\n", tintstr(),mych, channel->peer().str(), channel->recv_peer().str(), fromi.str() );
1000
1001                 channel->Recv(evb);
1002
1003                 evbuffer_free(evb);
1004                 //SAFECLOSE
1005                 if (wasestablished && !channel->is_established()) {
1006                         // Arno, 2012-01-26: Received an explict close, clean up channel, safely.
1007                         channel->Schedule4Close();
1008                 }
1009         }
1010         for (; i<NUM_DATAGRAMS; ++i)
1011                 evbuffer_free(pevb[i]);
1012 }
1013
1014
1015
1016 /*
1017  * Channel instance methods
1018  */
1019
1020 void Channel::CloseChannelByAddress(const Address &addr)
1021 {
1022         // fprintf(stderr,"CloseChannelByAddress: address is %s\n", addr.str() );
1023     std::vector<Channel *>::iterator iter;
1024     for (iter = channels.begin(); iter != channels.end(); iter++)
1025     {
1026                 Channel *c = *iter;
1027                 if (c != NULL && c->peer_ == addr)
1028                 {
1029                         // ARNOSMPTODO: will do another send attempt before not being
1030                         // Rescheduled.
1031                         c->peer_channel_id_ = 0; // established->false, do no more sending
1032                         c->Schedule4Close();
1033                         break;
1034                 }
1035     }
1036 }
1037
1038
1039 void Channel::Close () {
1040
1041         this->SwitchSendControl(CLOSE_CONTROL);
1042
1043     if (is_established())
1044         this->Send(); // Arno: send explicit close
1045
1046     if (ENABLE_VOD_PIECEPICKER) {
1047                 // Ric: remove it's binmap from the availability
1048                 transfer().availability().remove(id_, ack_in_);
1049     }
1050
1051     // SAFECLOSE
1052     // Arno: ensure LibeventSendCallback is no longer called with ptr to this Channel
1053     ClearEvents();
1054 }
1055
1056
1057 void Channel::Reschedule () {
1058
1059         // Arno: CAREFUL: direct send depends on diff between next_send_time_ and
1060         // NOW to be 0, so any calls to Time in between may put things off. Sigh.
1061         Time();
1062     next_send_time_ = NextSendTime();
1063     if (next_send_time_!=TINT_NEVER) {
1064
1065         assert(next_send_time_<NOW+TINT_MIN);
1066         tint duein = next_send_time_-NOW;
1067         if (duein <= 0) {
1068                 // Arno, 2011-10-18: libevent's timer implementation appears to be
1069                 // really slow, i.e., timers set for 100 usec from now get called
1070                 // at least two times later :-( Hence, for sends after receives
1071                 // perform them directly.
1072                 dprintf("%s #%u requeue direct send\n",tintstr(),id_);
1073                 LibeventSendCallback(-1,EV_TIMEOUT,this);
1074         }
1075         else {
1076                 if (evsend_ptr_ != NULL) {
1077                         struct timeval duetv = *tint2tv(duein);
1078                         evtimer_add(evsend_ptr_,&duetv);
1079                         dprintf("%s #%u requeue for %s in %lli\n",tintstr(),id_,tintstr(next_send_time_), duein);
1080                 }
1081                 else
1082                         dprintf("%s #%u cannot requeue for %s, closed\n",tintstr(),id_,tintstr(next_send_time_));
1083         }
1084     } else {
1085         // SAFECLOSE
1086         dprintf("%s #%u resched, will close\n",tintstr(),id_);
1087                 this->Schedule4Close();
1088     }
1089 }
1090
1091
1092 /*
1093  * Channel class methods
1094  */
1095 void Channel::LibeventSendCallback(int fd, short event, void *arg) {
1096
1097         // Called by libevent when it is the requested send time.
1098     Time();
1099     Channel * sender = (Channel*) arg;
1100     if (NOW<sender->next_send_time_-TINT_MSEC)
1101         dprintf("%s #%u suspicious send %s<%s\n",tintstr(),
1102                 sender->id(),tintstr(NOW),tintstr(sender->next_send_time_));
1103     if (sender->next_send_time_ != TINT_NEVER)
1104         sender->Send();
1105 }
1106