cured several connection stall bugs
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  datasendrecv.cpp
3  *  serp++
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include "p2tp.h"
10 #include "compat/util.h"
11
12
13 using namespace p2tp;
14 using namespace std;
15
16 /*
17  TODO  25 Oct 18:55
18  - range: ALL
19  - randomized testing of advanced ops (new testcase)
20  */
21
22 void    Channel::AddPeakHashes (Datagram& dgram) {
23     for(int i=0; i<file().peak_count(); i++) {
24         bin64_t peak = file().peak(i);
25         dgram.Push8(P2TP_HASH);
26         dgram.Push32((uint32_t)peak);
27         dgram.PushHash(file().peak_hash(i));
28         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
29         dprintf("%s #%u +phash %s\n",tintstr(),id,peak.str());
30     }
31 }
32
33
34 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
35     bin64_t peak = file().peak_for(pos);
36     while (pos!=peak && /*((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&*/
37             ack_in_.get(pos.parent())==bins::EMPTY) {
38         bin64_t uncle = pos.sibling();
39         dgram.Push8(P2TP_HASH);
40         dgram.Push32((uint32_t)uncle);
41         dgram.PushHash( file().hash(uncle) );
42         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
43         dprintf("%s #%u +hash %s\n",tintstr(),id,uncle.str());
44         pos = pos.parent();
45     }
46 }
47
48
49 bin64_t        Channel::DequeueHint () {
50     if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {    
51         uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
52         twist &= file().peak(0); // may make it semi-seq here
53         file().ack_out().twist(twist);
54         ack_in_.twist(twist);
55         bin64_t my_pick = 
56             file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED);
57         while (my_pick.width()>max(1,(int)cwnd_))
58             my_pick = my_pick.left();
59         file().ack_out().twist(0);
60         ack_in_.twist(0);
61         if (my_pick!=bin64_t::NONE) {
62             my_pick = my_pick.twisted(twist);
63             hint_in_.push_back(my_pick);
64             dprintf("%s #%u *hint %s\n",tintstr(),id,my_pick.str());
65         }
66     }
67     bin64_t send = bin64_t::NONE;
68     while (!hint_in_.empty() && send==bin64_t::NONE) {
69         bin64_t hint = hint_in_.front().bin;
70         tint time = hint_in_.front().time;
71         hint_in_.pop_front();
72         while (!hint.is_base()) { // FIXME optimize; possible attack
73             hint_in_.push_front(tintbin(time,hint.right()));
74             hint = hint.left();
75         }
76         //if (time < NOW-TINT_SEC*3/2 )
77         //    continue;  bad idea
78         if (ack_in_.get(hint)!=bins::FILLED) 
79             send = hint;
80     }
81     uint64_t mass = 0;
82     for(int i=0; i<hint_in_.size(); i++)
83         mass += hint_in_[i].bin.width();
84     dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
85     return send;
86 }
87
88
89 void    Channel::AddHandshake (Datagram& dgram) {
90     if (!peer_channel_id_) { // initiating
91         dgram.Push8(P2TP_HASH);
92         dgram.Push32(bin64_t::ALL32);
93         dgram.PushHash(file().root_hash());
94         dprintf("%s #%u +hash ALL %s\n",
95                 tintstr(),id,file().root_hash().hex().c_str());
96     }
97     dgram.Push8(P2TP_HANDSHAKE);
98     int encoded = EncodeID(id);
99     dgram.Push32(encoded);
100     dprintf("%s #%u +hs %x\n",tintstr(),id,encoded);
101     ack_out_.clear();
102     AddAck(dgram);
103 }
104
105
106 void    Channel::Send () {
107     Datagram dgram(socket_,peer());
108     dgram.Push32(peer_channel_id_);
109     bin64_t data = bin64_t::NONE;
110     if ( is_established() ) {
111         // FIXME: seeder check
112         AddAck(dgram);
113         if (!file().is_complete())
114             AddHint(dgram);
115         AddPex(dgram);
116         CleanDataOut();
117         data = AddData(dgram);
118     } else {
119         AddHandshake(dgram);
120         AddAck(dgram);
121     }
122     dprintf("%s #%u sent %ib %s:%x\n",
123             tintstr(),id,dgram.size(),peer().str(),peer_channel_id_);
124     if (dgram.size()==4) {// only the channel id; bare keep-alive
125         data = bin64_t::ALL;
126         if (data_out_.size()<cwnd_ && send_control_!=KEEP_ALIVE_CONTROL) {
127             if ( cwnd_ < 1 )
128                 SwitchSendControl(KEEP_ALIVE_CONTROL);
129             else
130                 cwnd_ /= 2;
131         }
132         //if (data_out_.empty() && send_control_!=KEEP_ALIVE_CONTROL)
133         //     SwitchSendControl(KEEP_ALIVE_CONTROL);// we did our best
134         //if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
135         //    return; // don't send empty dgram
136     }
137     if (dgram.Send()==-1)
138         print_error("can't send datagram");
139     last_send_time_ = NOW;
140     sent_since_recv_++;
141     dgrams_sent_++;
142 }
143
144
145 void    Channel::AddHint (Datagram& dgram) {
146
147     tint plan_for = max(TINT_SEC,rtt_avg_*4);
148     
149     tint timed_out = NOW - plan_for*2;
150     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
151         hint_out_size_ -= hint_out_.front().bin.width();
152         hint_out_.pop_front();
153     }
154     
155     /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
156     if (!peer_cwnd)
157         peer_cwnd = 1;*/
158     int plan_pck = std::max ( 1LL, plan_for / dip_avg_ );
159     
160     if ( hint_out_size_ < plan_pck ) {
161             
162         int diff = plan_pck - hint_out_size_; // TODO: aggregate
163         bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
164         
165         if (hint!=bin64_t::NONE) {
166             dgram.Push8(P2TP_HINT);
167             dgram.Push32(hint);
168             dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
169             hint_out_.push_back(hint);
170             hint_out_size_ += hint.width();
171         } else
172             dprintf("%s #%u Xhint\n",tintstr(),id);
173         
174     }
175 }
176
177
178 bin64_t        Channel::AddData (Datagram& dgram) {
179     
180     if (!file().size()) // know nothing
181         return bin64_t::NONE;
182     
183     bin64_t tosend = bin64_t::NONE;
184     if (data_out_.size()<cwnd_ && last_data_out_time_<=NOW-send_interval_) {
185         tosend = DequeueHint();
186         if (tosend==bin64_t::NONE)
187             dprintf("%s #%u out of hints #sendctrl\n",tintstr(),id);
188     } else
189         dprintf("%s #%u no cwnd #sendctrl\n",tintstr(),id);
190     
191     if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty())) 
192         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
193     
194     if (ack_in_.is_empty() && file().size())
195         AddPeakHashes(dgram);
196     AddUncleHashes(dgram,tosend);
197     if (!ack_in_.is_empty()) // TODO: cwnd_>1
198         data_out_cap_ = tosend;
199
200     if (dgram.size()>254) {
201         dgram.Send(); // kind of fragmentation
202         dgram.Push32(peer_channel_id_);
203     }
204     
205     dgram.Push8(P2TP_DATA);
206     dgram.Push32(tosend.to32());
207     
208     uint8_t buf[1024];
209     size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
210     // TODO: corrupted data, retries, caching
211     if (r<0) {
212         print_error("error on reading");
213         return bin64_t::NONE;
214     }
215     assert(dgram.space()>=r+4+1);
216     dgram.Push(buf,r);
217     
218     last_data_out_time_ = NOW;
219     data_out_.push_back(tosend);
220     dprintf("%s #%u +data %s\n",tintstr(),id,tosend.str());
221     
222     return tosend;
223 }
224
225
226 void    Channel::AddTs (Datagram& dgram) {
227     dgram.Push8(P2TP_TS);
228     dgram.Push64(data_in_.time);
229     dprintf("%s #%u +ts %s\n",tintstr(),id,tintstr(data_in_.time));
230 }
231
232
233 void    Channel::AddAck (Datagram& dgram) {
234     if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
235         dgram.Push8(P2TP_ACK);
236         dgram.Push32(data_in_dbl_.to32());
237         data_in_dbl_=bin64_t::NONE;
238     }
239     if (data_in_.time!=TINT_NEVER) { // TODO: ACK NONE for corrupted data
240         AddTs(dgram);
241         bin64_t pos = file().ack_out().cover(data_in_.bin);
242         dgram.Push8(P2TP_ACK);
243         dgram.Push32(pos.to32());
244         //dgram.Push64(data_in_.time);
245         ack_out_.set(pos);
246         dprintf("%s #%u +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
247         data_in_ = tintbin(TINT_NEVER,bin64_t::NONE);
248         if (pos.layer()>2)
249             data_in_dbl_ = pos;
250     }
251     for(int count=0; count<4; count++) {
252         bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, bins::FILLED);
253         if (ack==bin64_t::NONE)
254             break;
255         ack = file().ack_out().cover(ack);
256         ack_out_.set(ack);
257         dgram.Push8(P2TP_ACK);
258         dgram.Push32(ack.to32());
259         dprintf("%s #%u +ack %s\n",tintstr(),id,ack.str());
260     }
261 }
262
263
264 void    Channel::Recv (Datagram& dgram) {
265     dprintf("%s #%u recvd %i\n",tintstr(),id,dgram.size()+4);
266     peer_send_time_ = 0; // has scope of 1 datagram
267     dgrams_rcvd_++;
268     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
269         rtt_avg_ = NOW - last_send_time_;
270         dev_avg_ = rtt_avg_;
271         dip_avg_ = rtt_avg_;
272         dprintf("%s #%u rtt init %lli\n",tintstr(),id,rtt_avg_);
273     }
274     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
275     while (dgram.size()) {
276         uint8_t type = dgram.Pull8();
277         switch (type) {
278             case P2TP_HANDSHAKE: OnHandshake(dgram); break;
279             case P2TP_DATA:      data=OnData(dgram); break;
280             case P2TP_TS:        OnTs(dgram); break;
281             case P2TP_ACK:       OnAck(dgram); break;
282             case P2TP_HASH:      OnHash(dgram); break;
283             case P2TP_HINT:      OnHint(dgram); break;
284             case P2TP_PEX_ADD:   OnPex(dgram); break;
285             default:
286                 eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id,(int)type);
287                 return;
288         }
289     }
290     last_recv_time_ = NOW;
291     sent_since_recv_ = 0;
292 }
293
294
295 void    Channel::OnHash (Datagram& dgram) {
296     bin64_t pos = dgram.Pull32();
297     Sha1Hash hash = dgram.PullHash();
298     file().OfferHash(pos,hash);
299     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
300     dprintf("%s #%u -hash %s\n",tintstr(),id,pos.str());
301 }
302
303
304 void    Channel::CleanHintOut (bin64_t pos) {
305     int hi = 0;
306     while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
307         hi++;
308     if (hi==hint_out_.size())
309         return; // something not hinted or hinted in far past
310     while (hi--) { // removing likely snubbed hints
311         hint_out_size_ -= hint_out_.front().bin.width();
312         hint_out_.pop_front();
313     }
314     while (hint_out_.front().bin!=pos) {
315         tintbin f = hint_out_.front();
316         f.bin = f.bin.towards(pos);
317         hint_out_.front().bin = f.bin.sibling();
318         hint_out_.push_front(f);
319     }
320     hint_out_.pop_front();
321     hint_out_size_--;
322 }
323
324
325 bin64_t Channel::OnData (Datagram& dgram) {
326     bin64_t pos = dgram.Pull32();
327     uint8_t *data;
328     int length = dgram.Pull(&data,1024);
329     bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
330     dprintf("%s #%u %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
331     data_in_ = tintbin(NOW,bin64_t::NONE);
332     if (!ok) 
333         return bin64_t::NONE;
334     data_in_.bin = pos;
335     if (pos!=bin64_t::NONE) {
336         if (last_data_in_time_) {
337             tint dip = NOW - last_data_in_time_;
338             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
339         }
340         last_data_in_time_ = NOW;
341     }
342     CleanHintOut(pos);    
343     return pos;
344 }
345
346
347 void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
348     
349     int max_ack_off = 0;
350     //FIXME do LEDBAT magic somewhere here
351     
352     if (ackd_pos!=bin64_t::NONE) {
353         for (int i=0; i<8 && i<data_out_.size(); i++) {
354             if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
355                 tint rtt = NOW-data_out_[i].time;
356                 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
357                 dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
358                 dprintf("%s #%u rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_);
359                 bin64_t pos = data_out_[i].bin;
360                 ack_rcvd_recent_++;
361                 data_out_[i]=tintbin();
362                 max_ack_off = i;
363                 if (ackd_pos==pos)
364                     break;
365             }
366         }
367         while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
368             data_out_.pop_front();
369             max_ack_off--;
370         }
371         static const int MAX_REORDERING = 2;  // the triple-ACK principle
372         if (max_ack_off>MAX_REORDERING) {
373             while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
374                                    || ack_in_.is_filled(data_out_.front().bin)) ) {
375                 data_out_.pop_front();
376                 max_ack_off--;
377             }
378             while (max_ack_off>MAX_REORDERING) {
379                 ack_not_rcvd_recent_++;
380                 dprintf("%s #%u Rdata %s\n",tintstr(),id,data_out_.front().bin.str());
381                 data_out_.pop_front();
382                 max_ack_off--;
383                 data_out_cap_ = bin64_t::ALL;
384             }
385         }
386     }
387     tint timeout = NOW - rtt_avg_ - 4*std::max(dev_avg_,TINT_MSEC*50);
388     while (!data_out_.empty() && data_out_.front().time<timeout) {
389         if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
390             ack_not_rcvd_recent_++;
391             data_out_cap_ = bin64_t::ALL;
392             dprintf("%s #%u Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
393         }
394         data_out_.pop_front();
395     }
396     while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
397         data_out_.pop_front();
398
399 }
400
401
402 void    Channel::OnAck (Datagram& dgram) {
403     bin64_t ackd_pos = dgram.Pull32();
404     if (ackd_pos==bin64_t::NONE)
405         return; // likely, brocken packet / insufficient hashes
406     if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
407         eprintf("invalid ack: %s\n",ackd_pos.str());
408         return;
409     }
410     dprintf("%s #%u -ack %s\n",tintstr(),id,ackd_pos.str());
411     ack_in_.set(ackd_pos);
412     CleanDataOut(ackd_pos); // FIXME do AFTER all ACKs
413 }
414
415
416 void Channel::OnTs (Datagram& dgram) {
417     peer_send_time_ = dgram.Pull64();
418     dprintf("%s #%u -ts %lli\n",tintstr(),id,peer_send_time_);
419 }
420
421
422 void    Channel::OnHint (Datagram& dgram) {
423     bin64_t hint = dgram.Pull32();
424     hint_in_.push_back(hint);
425     //ack_in_.set(hint,bins::EMPTY);
426     //RequeueSend(cc_->OnHintRecvd(hint));
427     dprintf("%s #%u -hint %s\n",tintstr(),id,hint.str());
428 }
429
430
431 void Channel::OnHandshake (Datagram& dgram) {
432     peer_channel_id_ = dgram.Pull32();
433     dprintf("%s #%u -hs %x\n",tintstr(),id,peer_channel_id_);
434     // self-connection check
435     if (!SELF_CONN_OK) {
436         uint32_t try_id = DecodeID(peer_channel_id_);
437         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
438             delete this;
439             return;
440         }
441     }
442     // FUTURE: channel forking
443 }
444
445
446 void Channel::OnPex (Datagram& dgram) {
447     uint32_t ipv4 = dgram.Pull32();
448     uint16_t port = dgram.Pull16();
449     Address addr(ipv4,port);
450     dprintf("%s #%u -pex %s\n",tintstr(),id,addr.str());
451     transfer().OnPexIn(addr);
452 }
453
454
455 void    Channel::AddPex (Datagram& dgram) {
456     int chid = transfer().RevealChannel(pex_out_);
457     if (chid==-1 || chid==id)
458         return;
459     Address a = channels[chid]->peer();
460     dgram.Push8(P2TP_PEX_ADD);
461     dgram.Push32(a.ipv4());
462     dgram.Push16(a.port());
463     dprintf("%s #%u +pex %s\n",tintstr(),id,a.str());
464 }
465
466
467 Channel*    Channel::RecvDatagram (int socket) {
468     Datagram data(socket);
469     data.Recv();
470     Address& addr = data.addr;
471 #define return_log(...) { eprintf(__VA_ARGS__); return NULL; }
472     if (data.size()<4) 
473         return_log("datagram shorter than 4 bytes %s\n",addr.str());
474     uint32_t mych = data.Pull32();
475     Sha1Hash hash;
476     Channel* channel = NULL;
477     if (!mych) { // handshake initiated
478         if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
479             return_log ("incorrect size %i initial handshake packet %s\n",data.size(),addr.str());
480         uint8_t hashid = data.Pull8();
481         if (hashid!=P2TP_HASH) 
482             return_log ("no hash in the initial handshake %s\n",addr.str());
483         bin64_t pos = data.Pull32();
484         if (pos!=bin64_t::ALL) 
485             return_log ("that is not the root hash %s\n",addr.str());
486         hash = data.PullHash();
487         FileTransfer* file = FileTransfer::Find(hash);
488         if (!file) 
489             return_log ("hash %s unknown, no such file %s\n",hash.hex().c_str(),addr.str());
490         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
491         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
492             if (channels[*i] && channels[*i]->peer_==data.addr && 
493                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
494                 return_log("have a channel already to %s\n",addr.str());
495         channel = new Channel(file, socket, data.address());
496     } else {
497         mych = DecodeID(mych);
498         if (mych>=channels.size()) 
499             return_log("invalid channel #%u, %s\n",mych,addr.str());
500         channel = channels[mych];
501         if (!channel) 
502             return_log ("channel #%u is already closed\n",mych,addr.str());
503         if (channel->peer() != addr) 
504             return_log ("invalid peer address #%u %s!=%s\n",mych,channel->peer().str(),addr.str());
505         channel->own_id_mentioned_ = true;
506     }
507     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
508     channel->Recv(data);
509     return channel;
510 }
511
512
513 void    Channel::Loop (tint howlong) {  
514     
515     tint limit = Datagram::Time() + howlong;
516     
517     do {
518
519         tint send_time(TINT_NEVER);
520         Channel* sender(NULL);
521         while (!sender && !send_queue.is_empty()) { // dequeue
522             tintbin next = send_queue.pop();
523             send_time = next.time;
524             sender = channel((int)next.bin);
525             if (sender && sender->next_send_time_!=send_time &&
526                      sender->next_send_time_!=TINT_NEVER )
527                 sender = NULL; // it was a stale entry
528         }
529         
530         if ( sender!=NULL && send_time<=NOW ) { // it's time
531             
532             if (sender->next_send_time_<NOW+TINT_MIN) {  // either send
533                 dprintf("%s #%u sch_send %s\n",tintstr(),sender->id,
534                         tintstr(send_time));
535                 sender->Send();
536                 sender->Reschedule();
537             } else { // or close the channel
538                 dprintf("%s #%u closed sendctrl\n",tintstr(),sender->id);
539                 delete sender;
540             }
541             
542         } else {  // it's too early, wait
543             
544             tint towait = min(limit,send_time) - NOW;
545             dprintf("%s waiting %lliusec\n",tintstr(),towait);
546             int rd = Datagram::Wait(socket_count,sockets,towait);
547             if (rd!=INVALID_SOCKET) { // in meantime, received something
548                 Channel* receiver = RecvDatagram(rd);
549                 if (receiver) // receiver's state may have changed
550                     receiver->Reschedule();
551             }
552             if (sender)  // get back to that later
553                 send_queue.push(tintbin(send_time,sender->id));
554             
555         }
556         
557     } while (NOW<limit);
558             
559 }
560
561  
562 void Channel::Reschedule () {
563     next_send_time_ = NextSendTime();
564     if (next_send_time_!=TINT_NEVER) {
565         assert(next_send_time_<NOW+TINT_MIN);
566         send_queue.push(tintbin(next_send_time_,id));
567     } else
568         send_queue.push(tintbin(NOW+TINT_MIN,id));
569     dprintf("%s requeue #%u for %s\n",tintstr(),id,tintstr(next_send_time_));
570 }