cbdf264bb56a96b7e777023a85dc0be20f0f1d59
[swifty.git] / src / libswift / sendrecv.cpp
1 /*
2  *  sendrecv.cpp
3  *  most of the swift's state machine
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
7  *
8  */
9 #include "bin_utils.h"
10 #include "swift.h"
11 #include <algorithm>  // kill it
12 #include <cassert>
13 #include <math.h>
14 #include <cfloat>
15 #include "compat.h"
16
17 using namespace swift;
18 using namespace std;
19
20 struct event_base *Channel::evbase;
21 MessageQueue Channel::messageQueue;
22 struct event Channel::evrecv;
23
24 #define DEBUGTRAFFIC    0
25
26 /** Arno: Victor's design allows a sender to choose some data to push to
27  * a receiver, if that receiver is not HINTing at data. Should be disabled
28  * when the receiver has a download rate limit.
29  */
30 #define ENABLE_SENDERSIZE_PUSH 0
31
32
33 /** Arno, 2011-11-24: When rate limit is on and the download is in progress
34  * we send HINTs for 2 chunks at the moment. This constant can be used to
35  * get greater granularity. Set to 0 for original behaviour.
36  */
37 #define HINT_GRANULARITY        16 // chunks
38
39 /*
40  TODO  25 Oct 18:55
41  - range: ALL
42  - randomized testing of advanced ops (new testcase)
43  */
44
45 void    Channel::AddPeakHashes (struct evbuffer *evb) {
46     for(int i=0; i<file().peak_count(); i++) {
47         bin_t peak = file().peak(i);
48         evbuffer_add_8(evb, SWIFT_HASH);
49         evbuffer_add_32be(evb, bin_toUInt32(peak));
50         evbuffer_add_hash(evb, file().peak_hash(i));
51         char bin_name_buf[32];
52         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str(bin_name_buf));
53     }
54 }
55
56
57 void    Channel::AddUncleHashes (struct evbuffer *evb, bin_t pos) {
58
59     char bin_name_buf2[32];
60     dprintf("%s #%u +uncle hash for %s\n",tintstr(),id_,pos.str(bin_name_buf2));
61
62     bin_t peak = file().peak_for(pos);
63     while (pos!=peak && ((NOW&3)==3 || !pos.parent().contains(data_out_cap_)) &&
64             ack_in_.is_empty(pos.parent()) ) {
65         bin_t uncle = pos.sibling();
66         evbuffer_add_8(evb, SWIFT_HASH);
67         evbuffer_add_32be(evb, bin_toUInt32(uncle));
68         evbuffer_add_hash(evb,  file().hash(uncle) );
69         char bin_name_buf[32];
70         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str(bin_name_buf));
71         pos = pos.parent();
72     }
73 }
74
75
76 bin_t           Channel::ImposeHint () {
77     uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
78
79     twist &= file().peak(0).toUInt(); // FIXME may make it semi-seq here
80
81     bin_t my_pick = binmap_t::find_complement(ack_in_, file().ack_out(), twist);
82
83     my_pick.to_twisted(twist);
84     while (my_pick.base_length()>max(1,(int)cwnd_))
85         my_pick = my_pick.left();
86
87     return my_pick.twisted(twist);
88 }
89
90
91 bin_t        Channel::DequeueHint (bool *retransmitptr) {
92     bin_t send = bin_t::NONE;
93
94     // Arno, 2012-01-23: Extra protection against channel loss, don't send DATA
95     if (last_recv_time_ < NOW-(3*TINT_SEC))
96         return bin_t::NONE;
97
98     // Arno, 2012-01-18: Reenable Victor's retransmit
99     if (!data_out_tmo_.empty())
100     {
101         tintbin tb = data_out_tmo_.front();
102         send = tb.bin;
103         data_out_tmo_.pop_front();  
104         *retransmitptr = true;
105     }
106     else
107         *retransmitptr = false;
108
109     if (ENABLE_SENDERSIZE_PUSH && send.is_none() && hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
110         bin_t my_pick = ImposeHint(); // FIXME move to the loop
111         if (!my_pick.is_none()) {
112             hint_in_.push_back(my_pick);
113             char bin_name_buf[32];
114             dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str(bin_name_buf));
115         }
116     }
117     
118     while (!hint_in_.empty() && send.is_none()) {
119         bin_t hint = hint_in_.front().bin;
120         tint time = hint_in_.front().time;
121         hint_in_.pop_front();
122         while (!hint.is_base()) { // FIXME optimize; possible attack
123             hint_in_.push_front(tintbin(time,hint.right()));
124             hint = hint.left();
125         }
126         //if (time < NOW-TINT_SEC*3/2 )
127         //    continue;  bad idea
128         if (!ack_in_.is_filled(hint))
129             send = hint;
130     }
131     uint64_t mass = 0;
132     // Arno, 2012-03-09: Is mucho expensive on busy server.
133     //for(int i=0; i<hint_in_.size(); i++)
134     //    mass += hint_in_[i].bin.base_length();
135     char bin_name_buf[32];
136     dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(bin_name_buf),mass);
137     return send;
138 }
139
140
141 void    Channel::AddHandshake (struct evbuffer *evb) {
142     if (!peer_channel_id_) { // initiating
143         evbuffer_add_8(evb, SWIFT_HASH);
144         evbuffer_add_32be(evb, bin_toUInt32(bin_t::ALL));
145         evbuffer_add_hash(evb, file().root_hash());
146         dprintf("%s #%u +hash ALL %s\n",
147                 tintstr(),id_,file().root_hash().hex().c_str());
148     }
149     evbuffer_add_8(evb, SWIFT_HANDSHAKE);
150     int encoded = -1;
151     if (send_control_==CLOSE_CONTROL) {
152         encoded = 0;
153     }
154     else
155         encoded = EncodeID(id_);
156     evbuffer_add_32be(evb, encoded);
157     dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
158     have_out_.clear();
159 }
160
161
162 void    Channel::Send () {
163
164         dprintf("%s #%u Send called \n",tintstr(),id_);
165
166     struct evbuffer *evb = evbuffer_new();
167     evbuffer_add_32be(evb, peer_channel_id_);
168     bin_t data = bin_t::NONE;
169     int evbnonadplen = 0;
170     if ( is_established() ) {
171         if (send_control_!=CLOSE_CONTROL) {
172                         // FIXME: seeder check
173                         AddHave(evb);
174                         AddAck(evb);
175                         if (!file().is_complete()) {
176                                 AddHint(evb);
177                                 /* Gertjan fix: 7aeea65f3efbb9013f601b22a57ee4a423f1a94d
178                                 "Only call Reschedule for 'reverse PEX' if the channel is in keep-alive mode"
179                                  */
180                                 AddPexReq(evb);
181                         }
182                         AddPex(evb);
183                         TimeoutDataOut();
184                         data = AddData(&evb);
185         } else {
186                 // Arno: send explicit close
187                 AddHandshake(evb);
188         }
189     } else {
190         AddHandshake(evb);
191         AddHave(evb); // Arno, 2011-10-28: from AddHandShake. Why double?
192         AddHave(evb);
193         AddAck(evb);
194     }
195
196     lastsendwaskeepalive_ = (evbuffer_get_length(evb) == 4);
197
198     if (evbuffer_get_length(evb)==4) {// only the channel id; bare keep-alive
199         data = bin_t::ALL;
200     }
201     dprintf("%s #%u sent %ib %s:%x\n",
202             tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
203             peer_channel_id_);
204
205         messageQueue.AddBuffer(socket_, evb, peer(), this); 
206 }
207
208 void Channel::Sent(int bytes, evbuffer *evb, bool tofree)
209 {
210         raw_bytes_up_ += bytes;
211         if (tofree) {
212                 last_send_time_ = NOW;
213                 sent_since_recv_++;
214                 dgrams_sent_++;
215                 evbuffer_free(evb);
216                 Reschedule();
217         }
218 }
219
220 void    Channel::AddHint (struct evbuffer *evb) {
221
222         // RATELIMIT
223         // Policy is to not send hints when we are above speed limit
224         if (transfer().GetCurrentSpeed(DDIR_DOWNLOAD) > transfer().GetMaxSpeed(DDIR_DOWNLOAD)) {
225                 if (DEBUGTRAFFIC)
226                         fprintf(stderr,"hint: forbidden#");
227                 return;
228         }
229
230
231         // 1. Calc max of what we are allowed to request, uncongested bandwidth wise
232     tint plan_for = max(TINT_SEC,rtt_avg_*4);
233
234     tint timed_out = NOW - plan_for*2;
235     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
236         hint_out_size_ -= hint_out_.front().bin.base_length();
237         hint_out_.pop_front();
238     }
239
240     int first_plan_pck = max ( (tint)1, plan_for / dip_avg_ );
241
242     // Riccardo, 2012-04-04: Actually allowed is max minus what we already asked for
243     int queue_allowed_hints = max(0,first_plan_pck-(int)hint_out_size_);
244
245
246     // RATELIMIT
247     // 2. Calc max of what is allowed by the rate limiter
248     int rate_allowed_hints = LONG_MAX;
249     if (transfer().GetMaxSpeed(DDIR_DOWNLOAD) < DBL_MAX)
250     {
251                 uint64_t rough_global_hint_out_size = 0; // rough estimate, as hint_out_ clean up is not done for all channels
252                 std::set<Channel *>::iterator iter;
253                 for (iter=transfer().mychannels_.begin(); iter!=transfer().mychannels_.end(); iter++)
254                 {
255                         Channel *c = *iter;
256                         if (c != NULL)
257                                 rough_global_hint_out_size += c->hint_out_size_;
258                 }
259
260                 // Policy: this channel is allowed to hint at the limit - global_hinted_at
261                 // Handle MaxSpeed = unlimited
262                 double rate_hints_limit_float = transfer().GetMaxSpeed(DDIR_DOWNLOAD)/((double)file().chunk_size());
263
264                 int rate_hints_limit = (int)min((double)LONG_MAX,rate_hints_limit_float);
265
266                 // Actually allowed is max minus what we already asked for, globally (=all channels)
267                 rate_allowed_hints = max(0,rate_hints_limit-(int)rough_global_hint_out_size);
268     }
269
270     // 3. Take the smallest allowance from rate and queue limit
271     uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,queue_allowed_hints);
272
273     // 4. Ask allowance in blocks of chunks to get pipelining going from serving peer.
274     if (hint_out_size_ == 0 || plan_pck > HINT_GRANULARITY)
275     {
276         bin_t hint = transfer().picker().Pick(ack_in_,plan_pck,NOW+plan_for*2);
277         if (!hint.is_none()) {
278                 if (DEBUGTRAFFIC)
279                 {
280                         char binstr[32];
281                         fprintf(stderr,"hint c%d: ask %s\n", id(), hint.str(binstr) );
282                 }
283             evbuffer_add_8(evb, SWIFT_HINT);
284             evbuffer_add_32be(evb, bin_toUInt32(hint));
285             char bin_name_buf[32];
286             dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(bin_name_buf),hint_out_size_);
287             hint_out_.push_back(hint);
288             hint_out_size_ += hint.base_length();
289             //fprintf(stderr,"send c%d: HINTLEN %i\n", id(), hint.base_length());
290             //fprintf(stderr,"HL %i ", hint.base_length());
291         }
292         else
293             dprintf("%s #%u Xhint\n",tintstr(),id_);
294
295     }
296 }
297
298
299 bin_t        Channel::AddData (struct evbuffer **evb) {
300         // RATELIMIT
301         if (transfer().GetCurrentSpeed(DDIR_UPLOAD) > transfer().GetMaxSpeed(DDIR_UPLOAD)) {
302                 transfer().OnSendNoData();
303                 return bin_t::NONE;
304         }
305
306     if (!file().size()) // know nothing
307         return bin_t::NONE;
308
309     bin_t tosend = bin_t::NONE;
310     bool isretransmit = false;
311     tint luft = send_interval_>>4; // may wake up a bit earlier
312     if (data_out_.size()<cwnd_ &&
313             last_data_out_time_+send_interval_<=NOW+luft) {
314         tosend = DequeueHint(&isretransmit);
315         if (tosend.is_none()) {
316             dprintf("%s #%u sendctrl no idea what data to send\n",tintstr(),id_);
317             if (send_control_!=KEEP_ALIVE_CONTROL && send_control_!=CLOSE_CONTROL)
318                 SwitchSendControl(KEEP_ALIVE_CONTROL);
319         }
320     } else
321         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
322                 tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
323
324     if (tosend.is_none())// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
325         return bin_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
326
327     if (ack_in_.is_empty() && file().size())
328         AddPeakHashes(*evb);
329
330     //NETWVSHASH
331     if (file().get_check_netwvshash())
332         AddUncleHashes(*evb,tosend);
333
334     if (!ack_in_.is_empty()) // TODO: cwnd_>1
335         data_out_cap_ = tosend;
336
337     // Arno, 2011-11-03: May happen when first data packet is sent to empty
338     // leech, then peak + uncle hashes may be so big that they don't fit in eth
339     // frame with DATA. Send 2 datagrams then, one with peaks so they have
340     // a better chance of arriving. Optimistic violation of atomic datagram
341     // principle.
342     if (file().chunk_size() == SWIFT_DEFAULT_CHUNK_SIZE && evbuffer_get_length(*evb) > SWIFT_MAX_NONDATA_DGRAM_SIZE) {
343         dprintf("%s #%u fsent %ib %s:%x\n",
344                 tintstr(),id_,(int)evbuffer_get_length(*evb),peer().str(),
345                 peer_channel_id_);
346                 messageQueue.AddBuffer(socket_, *evb, peer(), this, false);
347                 *evb = evbuffer_new();
348         evbuffer_add_32be(*evb, peer_channel_id_);
349     }
350
351     if (file().chunk_size() != SWIFT_DEFAULT_CHUNK_SIZE && isretransmit) {
352         /* FRAGRAND
353          * Arno, 2012-01-17: We observe strange behaviour when using
354          * fragmented UDP packets. When ULANC sends a specific datagram ("995"),
355          * the 2nd IP packet carrying it gets lost structurally. When
356          * downloading from the same asset hosted on a Linux 32-bit machine
357          * using a Win7 32-bit client (behind a NAT), one specific full
358          * datagram never gets delivered (6970 one before do). A workaround
359          * is to add some random data to the datagram. Hence we introduce
360          * the SWIFT_RANDOMIZE message, that is added to the datagram carrying
361          * the DATA on a retransmit.
362          */
363              char binstr[32];
364          fprintf(stderr,"AddData: retransmit of randomized chunk %s\n",tosend.str(binstr) );
365          evbuffer_add_8(*evb, SWIFT_RANDOMIZE);
366          evbuffer_add_32be(*evb, (int)rand() );
367     }
368
369     evbuffer_add_8(*evb, SWIFT_DATA);
370     evbuffer_add_32be(*evb, bin_toUInt32(tosend));
371
372     struct evbuffer_iovec vec;
373     if (evbuffer_reserve_space(*evb, file().chunk_size(), &vec, 1) < 0) {
374         print_error("error on evbuffer_reserve_space");
375         return bin_t::NONE;
376     }
377     size_t r = pread(file().file_descriptor(),(char *)vec.iov_base,
378                      file().chunk_size(),tosend.base_offset()*file().chunk_size());
379     // TODO: corrupted data, retries, caching
380     if (r<0) {
381         print_error("error on reading");
382         vec.iov_len = 0;
383         evbuffer_commit_space(*evb, &vec, 1);
384         return bin_t::NONE;
385     }
386     // assert(dgram.space()>=r+4+1);
387     vec.iov_len = r;
388     if (evbuffer_commit_space(*evb, &vec, 1) < 0) {
389         print_error("error on evbuffer_commit_space");
390         return bin_t::NONE;
391     }
392
393     last_data_out_time_ = NOW;
394     data_out_.push_back(tosend);
395     bytes_up_ += r;
396     global_bytes_up += r;
397
398     char bin_name_buf[32];
399     dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str(bin_name_buf));
400
401     // RATELIMIT
402     // ARNOSMPTODO: count overhead bytes too? Move to Send() then.
403         transfer_->OnSendData(file().chunk_size());
404
405     return tosend;
406 }
407
408
409 void    Channel::AddAck (struct evbuffer *evb) {
410     if (data_in_==tintbin())
411         //if (data_in_.bin==bin64_t::NONE)
412         return;
413     // sometimes, we send a HAVE (e.g. in case the peer did repetitive send)
414     evbuffer_add_8(evb, data_in_.time==TINT_NEVER?SWIFT_HAVE:SWIFT_ACK);
415     evbuffer_add_32be(evb, bin_toUInt32(data_in_.bin));
416     if (data_in_.time!=TINT_NEVER)
417         evbuffer_add_64be(evb, data_in_.time);
418
419
420         if (DEBUGTRAFFIC)
421                 fprintf(stderr,"send c%d: ACK %i\n", id(), bin_toUInt32(data_in_.bin));
422
423     have_out_.set(data_in_.bin);
424     char bin_name_buf[32];
425     dprintf("%s #%u +ack %s %s\n",
426         tintstr(),id_,data_in_.bin.str(bin_name_buf),tintstr(data_in_.time));
427     if (data_in_.bin.layer()>2)
428         data_in_dbl_ = data_in_.bin;
429
430     //fprintf(stderr,"data_in_ c%d\n", id() );
431     data_in_ = tintbin();
432     //data_in_ = tintbin(NOW,bin64_t::NONE);
433 }
434
435
436 void    Channel::AddHave (struct evbuffer *evb) {
437     if (!data_in_dbl_.is_none()) { // TODO: do redundancy better
438         evbuffer_add_8(evb, SWIFT_HAVE);
439         evbuffer_add_32be(evb, bin_toUInt32(data_in_dbl_));
440         data_in_dbl_=bin_t::NONE;
441     }
442     if (DEBUGTRAFFIC)
443                 fprintf(stderr,"send c%d: HAVE ",id() );
444     for(int count=0; count<4; count++) {
445         bin_t ack = binmap_t::find_complement(have_out_, file().ack_out(), 0); // FIXME: do rotating queue
446         if (ack.is_none())
447             break;
448         ack = file().ack_out().cover(ack);
449         have_out_.set(ack);
450         evbuffer_add_8(evb, SWIFT_HAVE);
451         evbuffer_add_32be(evb, bin_toUInt32(ack));
452
453         if (DEBUGTRAFFIC)
454                 fprintf(stderr," %i", bin_toUInt32(ack));
455
456         char bin_name_buf[32];
457         dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str(bin_name_buf));
458     }
459         if (DEBUGTRAFFIC)
460                 fprintf(stderr,"\n");
461
462 }
463
464
465 void    Channel::Recv (struct evbuffer *evb) {
466     dprintf("%s #%u recvd %ib\n",tintstr(),id_,(int)evbuffer_get_length(evb)+4);
467     dgrams_rcvd_++;
468
469     lastrecvwaskeepalive_ = (evbuffer_get_length(evb) == 0);
470     if (lastrecvwaskeepalive_)
471         // Update speed measurements such that they decrease when DL stops
472         transfer().OnRecvData(0);
473
474     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
475         rtt_avg_ = NOW - last_send_time_;
476         dev_avg_ = rtt_avg_;
477         dip_avg_ = rtt_avg_;
478         dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
479     }
480
481     bin_t data = evbuffer_get_length(evb) ? bin_t::NONE : bin_t::ALL;
482
483         if (DEBUGTRAFFIC)
484                 fprintf(stderr,"recv c%d: size %d ", id(), evbuffer_get_length(evb));
485
486         while (evbuffer_get_length(evb)) {
487         uint8_t type = evbuffer_remove_8(evb);
488
489         if (DEBUGTRAFFIC)
490                 fprintf(stderr," %d\n", type);
491
492         switch (type) {
493             case SWIFT_HANDSHAKE: OnHandshake(evb); break;
494             case SWIFT_DATA:      data=OnData(evb); break;
495             case SWIFT_HAVE:      OnHave(evb); break;
496             case SWIFT_ACK:       OnAck(evb); break;
497             case SWIFT_HASH:      OnHash(evb); break;
498             case SWIFT_HINT:      OnHint(evb); break;
499             case SWIFT_PEX_ADD:   OnPex(evb); break;
500             case SWIFT_PEX_REQ:   OnPexReq(); break;
501             case SWIFT_RANDOMIZE: OnRandomize(evb); break; //FRAGRAND
502             default:
503                 dprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
504                 return;
505         }
506     }
507     if (DEBUGTRAFFIC)
508     {
509         fprintf(stderr,"\n");
510     }
511
512
513     last_recv_time_ = NOW;
514     sent_since_recv_ = 0;
515     Reschedule();
516 }
517
518 /*
519  * Arno: FAXME: HASH+DATA should be handled as a transaction: only when the
520  * hashes check out should they be stored in the hashtree, otherwise revert.
521  */
522 void    Channel::OnHash (struct evbuffer *evb) {
523         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
524     Sha1Hash hash = evbuffer_remove_hash(evb);
525     file().OfferHash(pos,hash);
526     char bin_name_buf[32];
527     dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str(bin_name_buf));
528
529     //fprintf(stderr,"HASH %lli hex %s\n",pos.toUInt(), hash.hex().c_str() );
530 }
531
532
533 void    Channel::CleanHintOut (bin_t pos) {
534     int hi = 0;
535     while (hi<hint_out_.size() && !hint_out_[hi].bin.contains(pos))
536         hi++;
537     if (hi==hint_out_.size())
538         return; // something not hinted or hinted in far past
539     while (hi--) { // removing likely snubbed hints
540         hint_out_size_ -= hint_out_.front().bin.base_length();
541         hint_out_.pop_front();
542     }
543     while (hint_out_.front().bin!=pos) {
544         tintbin f = hint_out_.front();
545
546         assert (f.bin.contains(pos));
547
548         if (pos < f.bin) {
549             f.bin.to_left();
550         } else {
551             f.bin.to_right();
552         }
553
554         hint_out_.front().bin = f.bin.sibling();
555         hint_out_.push_front(f);
556     }
557     hint_out_.pop_front();
558     hint_out_size_--;
559 }
560
561
562 bin_t Channel::OnData (struct evbuffer *evb) {  // TODO: HAVE NONE for corrupted data
563
564         char bin_name_buf[32];
565         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
566
567     // Arno: Assuming DATA last message in datagram
568     if (evbuffer_get_length(evb) > file().chunk_size()) {
569         dprintf("%s #%u !data chunk size mismatch %s: exp %lu got " PRISIZET "\n",tintstr(),id_,pos.str(bin_name_buf), file().chunk_size(), evbuffer_get_length(evb));
570         fprintf(stderr,"WARNING: chunk size mismatch: exp %lu got " PRISIZET "\n",file().chunk_size(), evbuffer_get_length(evb));
571     }
572
573     int length = (evbuffer_get_length(evb) < file().chunk_size()) ? evbuffer_get_length(evb) : file().chunk_size();
574     if (!file().ack_out().is_empty(pos)) {
575         // Arno, 2012-01-24: print message for duplicate
576         dprintf("%s #%u Ddata %s\n",tintstr(),id_,pos.str(bin_name_buf));
577         evbuffer_drain(evb, length);
578         data_in_ = tintbin(TINT_NEVER,transfer().ack_out().cover(pos));
579
580         // Arno, 2012-01-24: Make sure data interarrival periods don't get
581         // screwed up because of these (ignored) duplicates.
582         UpdateDIP(pos);
583         return bin_t::NONE;
584     }
585     uint8_t *data = evbuffer_pullup(evb, length);
586     data_in_ = tintbin(NOW,bin_t::NONE);
587     if (!file().OfferData(pos, (char*)data, length)) {
588         evbuffer_drain(evb, length);
589         char bin_name_buf[32];
590         dprintf("%s #%u !data %s\n",tintstr(),id_,pos.str(bin_name_buf));
591         return bin_t::NONE;
592     }
593     evbuffer_drain(evb, length);
594     dprintf("%s #%u -data %s\n",tintstr(),id_,pos.str(bin_name_buf));
595
596     if (DEBUGTRAFFIC)
597         fprintf(stderr,"$ ");
598
599     bin_t cover = transfer().ack_out().cover(pos);
600     for(int i=0; i<transfer().cb_installed; i++)
601         if (cover.layer()>=transfer().cb_agg[i])
602             transfer().callbacks[i](transfer().fd(),cover);  // FIXME
603     if (cover.layer() >= 5) // Arno: tested with 32K, presently = 2 ** 5 * chunk_size CHUNKSIZE
604         transfer().OnRecvData( pow((double)2,(double)5)*((double)file().chunk_size()) );
605     data_in_.bin = pos;
606
607     UpdateDIP(pos);
608     CleanHintOut(pos);
609     bytes_down_ += length;
610     global_bytes_down += length;
611     return pos;
612 }
613
614
615 void Channel::UpdateDIP(bin_t pos)
616 {
617         if (!pos.is_none()) {
618                 if (last_data_in_time_) {
619                         tint dip = NOW - last_data_in_time_;
620                         dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
621                 }
622                 last_data_in_time_ = NOW;
623         }
624 }
625
626
627 void    Channel::OnAck (struct evbuffer *evb) {
628     bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb));
629     tint peer_time = evbuffer_remove_64be(evb); // FIXME 32
630     // FIXME FIXME: wrap around here
631     if (ackd_pos.is_none())
632         return; // likely, broken chunk/ insufficient hashes
633     if (file().size() && ackd_pos.base_offset()>=file().size_in_chunks()) {
634         char bin_name_buf[32];
635         eprintf("invalid ack: %s\n",ackd_pos.str(bin_name_buf));
636         return;
637     }
638     ack_in_.set(ackd_pos);
639
640     //fprintf(stderr,"OnAck: got bin %s is_complete %d\n", ackd_pos.str(), (int)ack_in_.is_complete_arno( file().ack_out().get_height() ));
641
642     int di = 0, ri = 0;
643     // find an entry for the send (data out) event
644     while (  di<data_out_.size() && ( data_out_[di]==tintbin() ||
645            !ackd_pos.contains(data_out_[di].bin) )  )
646         di++;
647     // FUTURE: delayed acks
648     // rule out retransmits
649     while (  ri<data_out_tmo_.size() && !ackd_pos.contains(data_out_tmo_[ri].bin) )
650         ri++;
651     char bin_name_buf[32];
652     dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
653             di==data_out_.size()?'?':'-',ackd_pos.str(bin_name_buf),peer_time);
654     if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
655             // round trip time calculations
656         tint rtt = NOW-data_out_[di].time;
657         rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
658         dev_avg_ = ( dev_avg_*3 + tintabs(rtt-rtt_avg_) ) >> 2;
659         assert(data_out_[di].time!=TINT_NEVER);
660             // one-way delay calculations
661         tint owd = peer_time - data_out_[di].time;
662         owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
663         owd_current_[owd_cur_bin_] = owd;
664         if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
665             owd_min_bin_start_ = NOW;
666             owd_min_bin_ = (owd_min_bin_+1) & 3;
667             owd_min_bins_[owd_min_bin_] = TINT_NEVER;
668         }
669         if (owd_min_bins_[owd_min_bin_]>owd)
670             owd_min_bins_[owd_min_bin_] = owd;
671         dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
672                 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str(bin_name_buf));
673         ack_rcvd_recent_++;
674         // early loss detection by packet reordering
675         for (int re=0; re<di-MAX_REORDERING; re++) {
676             if (data_out_[re]==tintbin())
677                 continue;
678             ack_not_rcvd_recent_++;
679             data_out_tmo_.push_back(data_out_[re].bin);
680             dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str(bin_name_buf));
681             data_out_cap_ = bin_t::ALL;
682             data_out_[re] = tintbin();
683         }
684     }
685     if (di!=data_out_.size())
686         data_out_[di]=tintbin();
687     // clear zeroed items
688     while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
689             ack_in_.is_filled(data_out_.front().bin) ) )
690         data_out_.pop_front();
691     assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
692 }
693
694
695 void Channel::TimeoutDataOut ( ) {
696     // losses: timeouted packets
697     tint timeout = NOW - ack_timeout();
698     while (!data_out_.empty() &&
699         ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
700         if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
701             ack_not_rcvd_recent_++;
702             data_out_cap_ = bin_t::ALL;
703             data_out_tmo_.push_back(data_out_.front().bin);
704             char bin_name_buf[32];
705             dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str(bin_name_buf));
706         }
707         data_out_.pop_front();
708     }
709     // clear retransmit queue of older items
710     while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
711         data_out_tmo_.pop_front();
712 }
713
714
715 void Channel::OnHave (struct evbuffer *evb) {
716     bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb));
717     if (ackd_pos.is_none())
718         return; // wow, peer has hashes
719
720     // PPPLUG
721     if (ENABLE_VOD_PIECEPICKER) {
722                 // Ric: check if we should set the size in the file transfer
723                 if (transfer().availability().size() <= 0 && file().size() > 0)
724                 {
725                         transfer().availability().setSize(file().size_in_chunks());
726                 }
727                 // Ric: update the availability if needed
728                 transfer().availability().set(id_, ack_in_, ackd_pos);
729     }
730
731     ack_in_.set(ackd_pos);
732     char bin_name_buf[32];
733     dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str(bin_name_buf));
734
735     //fprintf(stderr,"OnHave: got bin %s is_complete %d\n", ackd_pos.str(), IsComplete() );
736
737 }
738
739
740 void    Channel::OnHint (struct evbuffer *evb) {
741     bin_t hint = bin_fromUInt32(evbuffer_remove_32be(evb));
742     // FIXME: wake up here
743     hint_in_.push_back(hint);
744     char bin_name_buf[32];
745     dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str(bin_name_buf));
746 }
747
748
749 void Channel::OnHandshake (struct evbuffer *evb) {
750
751         uint32_t pcid = evbuffer_remove_32be(evb);
752     dprintf("%s #%u -hs %x\n",tintstr(),id_,pcid);
753
754     if (is_established() && pcid == 0) {
755         // Arno: received explicit close
756         peer_channel_id_ = 0; // == established -> false
757         Close();
758         return;
759     }
760
761     peer_channel_id_ = pcid;
762     // self-connection check
763     if (!SELF_CONN_OK) {
764         uint32_t try_id = DecodeID(peer_channel_id_);
765         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
766             peer_channel_id_ = 0;
767             Close();
768             return; // this is a self-connection
769         }
770     }
771
772     // FUTURE: channel forking
773     if (is_established())
774         dprintf("%s #%u established %s\n", tintstr(), id_, peer().str());
775 }
776
777
778 void Channel::OnPex (struct evbuffer *evb) {
779     uint32_t ipv4 = evbuffer_remove_32be(evb);
780     uint16_t port = evbuffer_remove_16be(evb);
781     Address addr(ipv4,port);
782     dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
783     if (transfer().OnPexIn(addr))
784         useless_pex_count_ = 0;
785     else
786     {
787                 dprintf("%s #%u already channel to %s\n", tintstr(),id_,addr.str());
788         useless_pex_count_++;
789     }
790     pex_request_outstanding_ = false;
791 }
792
793
794 //FRAGRAND
795 void Channel::OnRandomize (struct evbuffer *evb) {
796     dprintf("%s #%u -rand\n",tintstr(),id_ );
797         // Payload is 4 random bytes
798     uint32_t r = evbuffer_remove_32be(evb);
799 }
800
801
802 void    Channel::AddPex (struct evbuffer *evb) {
803         // Gertjan fix: Reverse PEX
804     // PEX messages sent to facilitate NAT/FW puncturing get priority
805     if (!reverse_pex_out_.empty()) {
806         do {
807             tintbin pex_peer = reverse_pex_out_.front();
808             reverse_pex_out_.pop_front();
809             if (channels[(int) pex_peer.bin.toUInt()] == NULL)
810                 continue;
811             Address a = channels[(int) pex_peer.bin.toUInt()]->peer();
812             // Arno, 2012-02-28: Don't send private addresses to non-private peers.
813             if (!a.is_private() || (a.is_private() && peer().is_private()))
814             {
815                 evbuffer_add_8(evb, SWIFT_PEX_ADD);
816                 evbuffer_add_32be(evb, a.ipv4());
817                 evbuffer_add_16be(evb, a.port());
818                 dprintf("%s #%u +pex (reverse) %s\n",tintstr(),id_,a.str());
819             }
820         } while (!reverse_pex_out_.empty() && (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb)) >= 7);
821
822         // Arno: 2012-02-23: Don't think this is right. Bit of DoS thing,
823         // that you only get back the addr of people that got your addr.
824         // Disable for now.
825         //return;
826     }
827
828     if (!pex_requested_)
829         return;
830
831     // Arno, 2012-02-28: Don't send private addresses to non-private peers.
832     int chid = 0, tries=0;
833     Address a;
834     while (true)
835     {
836         // Arno, 2011-10-03: Choosing Gertjan's RandomChannel over RevealChannel here.
837         chid = transfer().RandomChannel(id_);
838         if (chid==-1 || chid==id_ || tries > 5) {
839                 pex_requested_ = false;
840                 return;
841         }
842         a = channels[chid]->peer();
843         if (!a.is_private() || (a.is_private() && peer().is_private()))
844                 break;
845         tries++;
846     }
847
848     evbuffer_add_8(evb, SWIFT_PEX_ADD);
849     evbuffer_add_32be(evb, a.ipv4());
850     evbuffer_add_16be(evb, a.port());
851     dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
852
853     pex_requested_ = false;
854     /* Ensure that we don't add the same id to the reverse_pex_out_ queue
855        more than once. */
856     for (tbqueue::iterator i = channels[chid]->reverse_pex_out_.begin();
857             i != channels[chid]->reverse_pex_out_.end(); i++)
858         if ((int) (i->bin.toUInt()) == id_)
859             return;
860
861     dprintf("%s #%u adding pex for channel %u at time %s\n", tintstr(), chid,
862         id_, tintstr(NOW + 2 * TINT_SEC));
863     // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
864     channels[chid]->reverse_pex_out_.push_back(tintbin(NOW + 2 * TINT_SEC, bin_t(id_)));
865     if (channels[chid]->send_control_ == KEEP_ALIVE_CONTROL &&
866             channels[chid]->next_send_time_ > NOW + 2 * TINT_SEC)
867         channels[chid]->Reschedule();
868 }
869
870 void Channel::OnPexReq(void) {
871     dprintf("%s #%u -pex req\n", tintstr(), id_);
872     if (NOW > MIN_PEX_REQUEST_INTERVAL + last_pex_request_time_)
873         pex_requested_ = true;
874 }
875
876 void Channel::AddPexReq(struct evbuffer *evb) {
877     // Rate limit the number of PEX requests
878     if (NOW < next_pex_request_time_)
879         return;
880
881     // If no answer has been received from a previous request, count it as useless
882     if (pex_request_outstanding_)
883         useless_pex_count_++;
884
885     pex_request_outstanding_ = false;
886
887     // Initiate at most SWIFT_MAX_CONNECTIONS connections
888     if (transfer().hs_in_.size() >= SWIFT_MAX_CONNECTIONS ||
889             // Check whether this channel has been providing useful peer information
890             useless_pex_count_ > 2)
891     {
892         // Arno, 2012-02-23: Fix: Code doesn't recover from useless_pex_count_ > 2,
893         // let's just try again in 30s
894                 useless_pex_count_ = 0;
895                 next_pex_request_time_ = NOW + 30 * TINT_SEC;
896
897         return;
898     }
899
900     dprintf("%s #%u +pex req\n", tintstr(), id_);
901     evbuffer_add_8(evb, SWIFT_PEX_REQ);
902     /* Add a little more than the minimum interval, such that the other party is
903        less likely to drop it due to too high rate */
904     next_pex_request_time_ = NOW + MIN_PEX_REQUEST_INTERVAL * 1.1;
905     pex_request_outstanding_ = true;
906 }
907
908
909
910 /*
911  * Channel class methods
912  */
913
914 void Channel::LibeventReceiveCallback(evutil_socket_t fd, short event, void *arg) {
915         // Called by libevent when a datagram is received on the socket
916     Time();
917     RecvDatagram(fd);
918     event_add(&evrecv, NULL);
919 }
920
921 #define NUM_DATAGRAMS 10
922
923 void    Channel::RecvDatagram (evutil_socket_t socket) {
924     struct evbuffer *pevb[NUM_DATAGRAMS];
925         for (int i=0; i<NUM_DATAGRAMS; ++i)
926                 pevb[i] = evbuffer_new();
927     Address addr;
928         //FIXME: make this more readable
929         free(addr.addr);
930         addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + NUM_DATAGRAMS * sizeof(struct mptp_dest));
931         addr.addr->count = NUM_DATAGRAMS;
932     RecvFrom(socket, addr, pevb);
933         int i = 0;
934         for (; i<addr.addr->count; ++i) {
935                 struct evbuffer *evb = pevb[i];
936                 Address fromi;
937                 fromi.addr->dests[0].addr = addr.addr->dests[i].addr;
938                 fromi.addr->dests[0].port = addr.addr->dests[i].port;
939                 size_t evboriglen = evbuffer_get_length(evb);
940 #define return_log(...) { fprintf(stderr,__VA_ARGS__); evbuffer_free(evb); return; }
941                 if (evbuffer_get_length(evb)<4)
942                         return_log("socket layer weird: datagram shorter than 4 bytes from %s (prob ICMP unreach)\n",fromi.str());
943                 uint32_t mych = evbuffer_remove_32be(evb);
944                 Sha1Hash hash;
945                 Channel* channel = NULL;
946                 if (mych==0) { // peer initiates handshake
947                         if (evbuffer_get_length(evb)<1+4+1+4+Sha1Hash::SIZE)
948                                 return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
949                                                 tintstr(),(int)evbuffer_get_length(evb),fromi.str());
950                         uint8_t hashid = evbuffer_remove_8(evb);
951                         if (hashid!=SWIFT_HASH)
952                                 return_log ("%s #0 no hash in the initial handshake %s\n",
953                                                 tintstr(),fromi.str());
954                         bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
955                         if (!pos.is_all())
956                                 return_log ("%s #0 that is not the root hash %s\n",tintstr(),fromi.str());
957                         hash = evbuffer_remove_hash(evb);
958                         FileTransfer* ft = FileTransfer::Find(hash);
959                         if (!ft)
960                                 return_log ("%s #0 hash %s unknown, requested by %s\n",tintstr(),hash.hex().c_str(),fromi.str());
961                         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
962
963                         // Arno, 2012-02-27: Check for duplicate channel
964                         Channel* existchannel = ft->FindChannel(fromi,NULL);
965                         if (existchannel)
966                         {
967                                 // Arno: 2011-10-13: Ignore if established, otherwise consider
968                                 // it a concurrent connection attempt.
969                                 if (existchannel->is_established()) {
970                                         // ARNOTODO: Read complete handshake here so we know whether
971                                         // attempt is to new channel or to existing. Currently read
972                                         // in OnHandshake()
973                                         //
974                                         return_log("%s #0 have a channel already to %s\n",tintstr(),fromi.str());
975                                 } else {
976                                         channel = existchannel;
977                                         //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: reuse channel %s\n", channel->peer_.str() );
978                                 }
979                         }
980                         if (channel == NULL) {
981                                 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: create new channel %s\n", addr.str() );
982                                 channel = new Channel(ft, socket, fromi);
983                         }
984                         //fprintf(stderr,"CHANNEL INCOMING DEF hass %s is id %d\n",hash.hex().c_str(),channel->id());
985
986                 } else { // peer responds to my handshake (and other messages)
987                         mych = DecodeID(mych);
988                         if (mych>=channels.size())
989                                 return_log("%s invalid channel #%u, %s\n",tintstr(),mych,fromi.str());
990                         channel = channels[mych];
991                         if (!channel)
992                                 return_log ("%s #%u is already closed\n",tintstr(),mych);
993                         if (channel->IsDiffSenderOrDuplicate(fromi,mych)) {
994                                 channel->Schedule4Close();
995                                 return;
996                         }
997                         channel->own_id_mentioned_ = true;
998                 }
999                 channel->raw_bytes_down_ += evboriglen;
1000                 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
1001                 bool wasestablished = channel->is_established();
1002
1003                 dprintf("%s #%u peer %s recv_peer %s addr %s\n", tintstr(),mych, channel->peer().str(), channel->recv_peer().str(), fromi.str() );
1004
1005                 channel->Recv(evb);
1006
1007                 evbuffer_free(evb);
1008                 //SAFECLOSE
1009                 if (wasestablished && !channel->is_established()) {
1010                         // Arno, 2012-01-26: Received an explict close, clean up channel, safely.
1011                         channel->Schedule4Close();
1012                 }
1013         }
1014         for (; i<NUM_DATAGRAMS; ++i)
1015                 evbuffer_free(pevb[i]);
1016 }
1017
1018
1019
1020 /*
1021  * Channel instance methods
1022  */
1023
1024 void Channel::CloseChannelByAddress(const Address &addr)
1025 {
1026         // fprintf(stderr,"CloseChannelByAddress: address is %s\n", addr.str() );
1027     std::vector<Channel *>::iterator iter;
1028     for (iter = channels.begin(); iter != channels.end(); iter++)
1029     {
1030                 Channel *c = *iter;
1031                 if (c != NULL && c->peer_ == addr)
1032                 {
1033                         // ARNOSMPTODO: will do another send attempt before not being
1034                         // Rescheduled.
1035                         c->peer_channel_id_ = 0; // established->false, do no more sending
1036                         c->Schedule4Close();
1037                         break;
1038                 }
1039     }
1040 }
1041
1042
1043 void Channel::Close () {
1044
1045         this->SwitchSendControl(CLOSE_CONTROL);
1046
1047     if (is_established())
1048         this->Send(); // Arno: send explicit close
1049
1050     if (ENABLE_VOD_PIECEPICKER) {
1051                 // Ric: remove it's binmap from the availability
1052                 transfer().availability().remove(id_, ack_in_);
1053     }
1054
1055     // SAFECLOSE
1056     // Arno: ensure LibeventSendCallback is no longer called with ptr to this Channel
1057     ClearEvents();
1058 }
1059
1060
1061 void Channel::Reschedule () {
1062
1063         // Arno: CAREFUL: direct send depends on diff between next_send_time_ and
1064         // NOW to be 0, so any calls to Time in between may put things off. Sigh.
1065         Time();
1066     next_send_time_ = NextSendTime();
1067     if (next_send_time_!=TINT_NEVER) {
1068
1069         assert(next_send_time_<NOW+TINT_MIN);
1070         tint duein = next_send_time_-NOW;
1071         if (duein <= 0) {
1072                 // Arno, 2011-10-18: libevent's timer implementation appears to be
1073                 // really slow, i.e., timers set for 100 usec from now get called
1074                 // at least two times later :-( Hence, for sends after receives
1075                 // perform them directly.
1076                 dprintf("%s #%u requeue direct send\n",tintstr(),id_);
1077                 LibeventSendCallback(-1,EV_TIMEOUT,this);
1078         }
1079         else {
1080                 if (evsend_ptr_ != NULL) {
1081                         struct timeval duetv = *tint2tv(duein);
1082                         evtimer_add(evsend_ptr_,&duetv);
1083                         dprintf("%s #%u requeue for %s in %lli\n",tintstr(),id_,tintstr(next_send_time_), duein);
1084                 }
1085                 else
1086                         dprintf("%s #%u cannot requeue for %s, closed\n",tintstr(),id_,tintstr(next_send_time_));
1087         }
1088     } else {
1089         // SAFECLOSE
1090         dprintf("%s #%u resched, will close\n",tintstr(),id_);
1091                 this->Schedule4Close();
1092     }
1093 }
1094
1095
1096 /*
1097  * Channel class methods
1098  */
1099 void Channel::LibeventSendCallback(int fd, short event, void *arg) {
1100
1101         // Called by libevent when it is the requested send time.
1102     Time();
1103     Channel * sender = (Channel*) arg;
1104     if (NOW<sender->next_send_time_-TINT_MSEC)
1105         dprintf("%s #%u suspicious send %s<%s\n",tintstr(),
1106                 sender->id(),tintstr(NOW),tintstr(sender->next_send_time_));
1107     if (sender->next_send_time_ != TINT_NEVER)
1108         sender->Send();
1109 }
1110