hints are optional
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  datasendrecv.cpp
3  *  serp++
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include "p2tp.h"
10 #include "compat/util.h"
11
12
13 using namespace p2tp;
14 using namespace std; // FIXME remove
15
16 /*
17  TODO  25 Oct 18:55
18  - range: ALL
19  - randomized testing of advanced ops (new testcase)
20  */
21
22 void    Channel::AddPeakHashes (Datagram& dgram) {
23     for(int i=0; i<file().peak_count(); i++) {
24         bin64_t peak = file().peak(i);
25         dgram.Push8(P2TP_HASH);
26         dgram.Push32((uint32_t)peak);
27         dgram.PushHash(file().peak_hash(i));
28         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
29         dprintf("%s #%i +phash %s\n",tintstr(),id,peak.str());
30     }
31 }
32
33
34 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
35     bin64_t peak = file().peak_for(pos);
36     while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
37             ack_in_.get(pos.parent())==bins::EMPTY) {
38         bin64_t uncle = pos.sibling();
39         dgram.Push8(P2TP_HASH);
40         dgram.Push32((uint32_t)uncle);
41         dgram.PushHash( file().hash(uncle) );
42         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
43         dprintf("%s #%i +hash %s\n",tintstr(),id,uncle.str());
44         pos = pos.parent();
45     }
46 }
47
48
49 bin64_t        Channel::DequeueHint () { // TODO: resilience
50     bin64_t send = bin64_t::NONE;
51     while (!hint_in_.empty() && send==bin64_t::NONE) {
52         bin64_t hint = hint_in_.front().bin;
53         tint time = hint_in_.front().time;
54         hint_in_.pop_front();
55         //if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_)
56         //    continue;
57         // Totally flawed:
58         // a. May empty the queue when you least expect
59         // b. May lose parts of partially ACKd HINTs
60         send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
61         send = send.left_foot(); // single packet
62         if (send!=bin64_t::NONE)
63             while (send!=hint) {
64                 hint = hint.towards(send);
65                 hint_in_.push_front(hint.sibling());
66             }
67     }
68     if (send==bin64_t::NONE) {
69         send = file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED);
70         if (send!=bin64_t::NONE)
71             send = send.left_foot();
72     }
73     uint64_t mass = 0;
74     for(int i=0; i<hint_in_.size(); i++)
75         mass += hint_in_[i].bin.width();
76     dprintf("%s #%i dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
77     return send;
78 }
79
80
81 void    Channel::AddHandshake (Datagram& dgram) {
82     if (!peer_channel_id_) { // initiating
83         dgram.Push8(P2TP_HASH);
84         dgram.Push32(bin64_t::ALL32);
85         dgram.PushHash(file().root_hash());
86         dprintf("%s #%i +hash ALL %s\n",
87                 tintstr(),id,file().root_hash().hex().c_str());
88     }
89     dgram.Push8(P2TP_HANDSHAKE);
90     int encoded = EncodeID(id);
91     dgram.Push32(encoded);
92     dprintf("%s #%i +hs %i\n",tintstr(),id,encoded);
93     ack_out_.clear();
94     AddAck(dgram);
95 }
96
97
98 void    Channel::Send () {
99     Datagram dgram(socket_,peer());
100     dgram.Push32(peer_channel_id_);
101     bin64_t data = bin64_t::NONE;
102     if ( is_established() ) {
103         // FIXME: seeder check
104         AddAck(dgram);
105         if (!file().is_complete())
106             AddHint(dgram);
107         AddPex(dgram);
108         CleanDataOut();
109         data = AddData(dgram);
110     } else {
111         AddHandshake(dgram);
112         AddAck(dgram);
113     }
114     dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str());
115     if (dgram.size()==4) // only the channel id; bare keep-alive
116         data = bin64_t::ALL;
117     cc_->OnDataSent(data);
118     if (dgram.Send()==-1)
119         print_error("can't send datagram");
120 }
121
122
123 void    Channel::AddHint (Datagram& dgram) {
124
125     tint timed_out = NOW - TINT_SEC*3/2;
126     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
127         hint_out_size_ -= hint_out_.front().bin.width();
128         hint_out_.pop_front();
129     }
130     
131     int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
132     if (!peer_cwnd)
133         peer_cwnd = 1;
134     int peer_pps = TINT_SEC / dip_avg_; // data packets per sec
135     if (!peer_pps)
136         peer_pps = 1;
137     
138     if ( hint_out_size_ < peer_pps ) { //4*peer_cwnd ) {
139             
140         int diff = peer_pps - hint_out_size_;
141         //if (diff>4 && diff>2*peer_cwnd)
142         //    diff >>= 1;
143         bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
144         
145         if (hint!=bin64_t::NONE) {
146             dgram.Push8(P2TP_HINT);
147             dgram.Push32(hint);
148             dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
149             hint_out_.push_back(hint);
150             hint_out_size_ += hint.width();
151         } else
152             dprintf("%s #%i .hint\n",tintstr(),id);
153         
154     }
155 }
156
157
158 bin64_t        Channel::AddData (Datagram& dgram) {
159     
160     if (!file().size()) // know nothing
161         return bin64_t::NONE;
162     
163     bin64_t tosend = bin64_t::NONE;
164     if (cc_->MaySendData()) {
165         tosend = DequeueHint();
166         if (tosend==bin64_t::NONE)
167             dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
168     } else
169         dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id);
170     
171     if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty())) 
172         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt
173     
174     if (tosend!=bin64_t::NONE) { // hashes
175         if (ack_in_.is_empty() && file().size())
176             AddPeakHashes(dgram);
177         AddUncleHashes(dgram,tosend);
178         if (!ack_in_.is_empty()) // TODO: cwnd_>1
179             data_out_cap_ = tosend;
180     }
181
182     if (dgram.size()>254) {
183         dgram.Send(); // kind of fragmentation
184         dgram.Push32(peer_channel_id_);
185     }
186     
187     dgram.Push8(P2TP_DATA);
188     dgram.Push32(tosend.to32());
189     
190     if (tosend!=bin64_t::NONE) { // data
191         uint8_t buf[1024];
192         size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
193         // TODO: corrupted data, retries, caching
194         if (r<0) {
195             print_error("error on reading");
196             return bin64_t::NONE;
197         }
198         assert(dgram.space()>=r+4+1);
199         dgram.Push(buf,r);
200     }
201     
202     last_send_data_time_ = NOW;
203     data_out_.push_back(tosend);
204     dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str());
205     
206     return tosend;
207 }
208
209
210 void    Channel::AddTs (Datagram& dgram) {
211     dgram.Push8(P2TP_TS);
212     dgram.Push64(data_in_.time);
213     dprintf("%s #%i +ts %s\n",tintstr(),id,tintstr(data_in_.time));
214 }
215
216
217 void    Channel::AddAck (Datagram& dgram) {
218     if (data_in_dbl_!=bin64_t::NONE) {
219         dgram.Push8(P2TP_ACK);
220         dgram.Push32(data_in_dbl_);
221         data_in_dbl_=bin64_t::NONE;
222     }
223     if (data_in_.bin!=bin64_t::NONE) {
224         AddTs(dgram);
225         bin64_t pos = file().ack_out().cover(data_in_.bin);
226         dgram.Push8(P2TP_ACK);
227         dgram.Push32(pos);
228         //dgram.Push64(data_in_.time);
229         ack_out_.set(pos);
230         dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
231         data_in_ = tintbin(0,bin64_t::NONE);
232         if (pos.layer()>2)
233             data_in_dbl_ = pos;
234     }
235     for(int count=0; count<4; count++) {
236         bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, bins::FILLED);
237         if (ack==bin64_t::NONE)
238             break;
239         ack = file().ack_out().cover(ack);
240         ack_out_.set(ack);
241         dgram.Push8(P2TP_ACK);
242         dgram.Push32(ack);
243         dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str());
244     }
245 }
246
247
248 void    Channel::Recv (Datagram& dgram) {
249     dprintf("%s #%i recvd %i\n",tintstr(),id,dgram.size()+4);
250     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
251         rtt_avg_ = NOW - last_send_time_;
252         dev_avg_ = rtt_avg_;
253         dip_avg_ = rtt_avg_;
254         transfer().hs_in_.push_back(id);
255         dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
256     }
257     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
258     while (dgram.size()) {
259         uint8_t type = dgram.Pull8();
260         switch (type) {
261             case P2TP_HANDSHAKE: OnHandshake(dgram); break;
262             case P2TP_DATA:        data=OnData(dgram); break;
263             case P2TP_TS:       OnTs(dgram); break;
264             case P2TP_ACK:        OnAck(dgram); break;
265             case P2TP_HASH:        OnHash(dgram); break;
266             case P2TP_HINT:        OnHint(dgram); break;
267             case P2TP_PEX_ADD:  OnPex(dgram); break;
268             default:
269                 eprintf("%s #%i ?msg id unknown %i\n",tintstr(),id,(int)type);
270                 return;
271         }
272     }
273     cc_->OnDataRecvd(data);
274     last_recv_time_ = NOW;
275 }
276
277
278 void    Channel::OnHash (Datagram& dgram) {
279     bin64_t pos = dgram.Pull32();
280     Sha1Hash hash = dgram.PullHash();
281     file().OfferHash(pos,hash);
282     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
283     dprintf("%s #%i -hash %s\n",tintstr(),id,pos.str());
284 }
285
286
287 void    Channel::CleanHintOut (bin64_t pos) {
288     int hi = 0;
289     while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
290         hi++;
291     if (hi==hint_out_.size())
292         return; // something not hinted or hinted in far past
293     while (hi--) { // removing likely snubbed hints
294         hint_out_size_ -= hint_out_.front().bin.width();
295         hint_out_.pop_front();
296     }
297     while (hint_out_.front().bin!=pos) {
298         tintbin f = hint_out_.front();
299         f.bin = f.bin.towards(pos);
300         hint_out_.front().bin = f.bin.sibling();
301         hint_out_.push_front(f);
302     }
303     hint_out_.pop_front();
304     hint_out_size_--;
305 }
306
307
308 bin64_t Channel::OnData (Datagram& dgram) {
309     bin64_t pos = dgram.Pull32();
310     uint8_t *data;
311     int length = dgram.Pull(&data,1024);
312     bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
313     dprintf("%s #%i %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
314     if (!ok) 
315         return bin64_t::NONE;
316     data_in_ = tintbin(NOW,pos);
317     if (pos!=bin64_t::NONE) {
318         if (last_recv_data_time_) {
319             tint dip = NOW - last_recv_data_time_;
320             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
321         }
322         last_recv_data_time_ = NOW;
323     }
324     CleanHintOut(pos);    
325     return pos;
326 }
327
328
329 void    Channel::CleanDataOut (bin64_t ackd_pos) {
330     
331     int max_ack_off = 0;
332     
333     if (ackd_pos!=bin64_t::NONE) {
334         for (int i=0; i<8 && i<data_out_.size(); i++) {
335             if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
336                 tint rtt = NOW-data_out_[i].time;
337                 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
338                 dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
339                 dprintf("%s #%i rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_);
340                 bin64_t pos = data_out_[i].bin;
341                 cc_->OnAckRcvd(pos);
342                 data_out_[i]=tintbin();
343                 max_ack_off = i;
344                 if (ackd_pos==pos)
345                     break;
346             }
347         }
348         while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
349             data_out_.pop_front();
350             max_ack_off--;
351         }
352         static const int MAX_REORDERING = 2;  // the triple-ACK principle
353         if (max_ack_off>MAX_REORDERING) {
354             while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
355                                    || ack_in_.is_filled(data_out_.front().bin)) ) {
356                 data_out_.pop_front();
357                 max_ack_off--;
358             }
359             while (max_ack_off>MAX_REORDERING) {
360                 cc_->OnAckRcvd(bin64_t::NONE);
361                 dprintf("%s #%i Rdata %s\n",tintstr(),id,data_out_.front().bin.str());
362                 data_out_.pop_front();
363                 max_ack_off--;
364                 data_out_cap_ = bin64_t::ALL;
365             }
366         }
367     }
368     tint timeout = NOW - rtt_avg_ - 4*std::max(dev_avg_,TINT_MSEC*50);
369     while (!data_out_.empty() && data_out_.front().time<timeout) {
370         if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
371             cc_->OnAckRcvd(bin64_t::NONE);
372             data_out_cap_ = bin64_t::ALL;
373             dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
374         }
375         data_out_.pop_front();
376     }
377     while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
378         data_out_.pop_front();
379
380 }
381
382
383 void    Channel::OnAck (Datagram& dgram) {
384     bin64_t ackd_pos = dgram.Pull32();
385     if (ackd_pos!=bin64_t::NONE && file().size() && ackd_pos.base_offset()>=file().packet_size()) {
386         eprintf("invalid ack: %s\n",ackd_pos.str());
387         return;
388     }
389     dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str());
390     ack_in_.set(ackd_pos);
391     CleanDataOut(ackd_pos);
392 }
393
394
395 void Channel::OnTs (Datagram& dgram) {
396     peer_send_time_ = dgram.Pull64();
397     dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
398 }
399
400
401 void    Channel::OnHint (Datagram& dgram) {
402     bin64_t hint = dgram.Pull32();
403     hint_in_.push_back(hint);
404     //ack_in_.set(hint,bins::EMPTY);
405     //RequeueSend(cc_->OnHintRecvd(hint));
406     dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str());
407 }
408
409
410 void Channel::OnHandshake (Datagram& dgram) {
411     peer_channel_id_ = dgram.Pull32();
412     dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
413     // FUTURE: channel forking
414 }
415
416
417 void Channel::OnPex (Datagram& dgram) {
418     uint32_t ipv4 = dgram.Pull32();
419     uint16_t port = dgram.Pull16();
420     Address addr(ipv4,port);
421     dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str());
422     transfer().OnPexIn(addr);
423 }
424
425
426 void    Channel::AddPex (Datagram& dgram) {
427     int chid = transfer().RevealChannel(pex_out_);
428     if (chid==-1 || chid==id)
429         return;
430     Address a = channels[chid]->peer();
431     dgram.Push8(P2TP_PEX_ADD);
432     dgram.Push32(a.ipv4());
433     dgram.Push16(a.port());
434     dprintf("%s #%i +pex %s\n",tintstr(),id,a.str());
435 }
436
437
438 void    Channel::RecvDatagram (int socket) {
439     Datagram data(socket);
440     data.Recv();
441     Address& addr = data.addr;
442 #define return_log(...) { eprintf(__VA_ARGS__); return; }
443     if (data.size()<4) 
444         return_log("datagram shorter than 4 bytes %s\n",addr.str());
445     uint32_t mych = data.Pull32();
446     Sha1Hash hash;
447     Channel* channel = NULL;
448     if (!mych) { // handshake initiated
449         if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
450             return_log ("incorrect size %i initial handshake packet %s\n",data.size(),addr.str());
451         uint8_t hashid = data.Pull8();
452         if (hashid!=P2TP_HASH) 
453             return_log ("no hash in the initial handshake %s\n",addr.str());
454         bin64_t pos = data.Pull32();
455         if (pos!=bin64_t::ALL) 
456             return_log ("that is not the root hash %s\n",addr.str());
457         hash = data.PullHash();
458         FileTransfer* file = FileTransfer::Find(hash);
459         if (!file) 
460             return_log ("hash %s unknown, no such file %s\n",hash.hex().c_str(),addr.str());
461         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
462         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
463             if (channels[*i] && channels[*i]->peer_==data.addr && 
464                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2) 
465                 return_log("have a channel already to %s\n",addr.str());
466         channel = new Channel(file, socket, data.address());
467     } else {
468         mych = DecodeID(mych);
469         if (mych>=channels.size()) 
470             return_log("invalid channel #%i, %s\n",mych,addr.str());
471         channel = channels[mych];
472         if (!channel) 
473             return_log ("channel #%i is already closed\n",mych,addr.str());
474         if (channel->peer() != addr) 
475             return_log ("invalid peer address #%i %s!=%s\n",mych,channel->peer().str(),addr.str());
476         channel->own_id_mentioned_ = true;
477     }
478     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
479     channel->Recv(data);
480 }
481
482
483 void    Channel::Loop (tint howlong) {  
484     
485     tint limit = Datagram::Time() + howlong;
486     
487     do {
488
489         tint send_time(TINT_NEVER);
490         Channel* sender(NULL);
491         while (!send_queue.is_empty()) {
492             send_time = send_queue.peek().time;
493             sender = channel((int)send_queue.peek().bin);
494             if (sender)
495                 if ( sender->next_send_time_==send_time ||
496                      sender->next_send_time_==TINT_NEVER )
497                 break;
498             sender = NULL; // it was a stale entry
499             send_queue.pop();
500         }
501         if (send_time>limit)
502             send_time = limit;
503         if ( sender && sender->next_send_time_ <= NOW ) {
504             dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
505                     tintstr(send_time));
506             sender->Send();
507             sender->last_send_time_ = NOW;
508             // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl
509             send_queue.pop();
510         } else if ( send_time > NOW ) {
511             tint towait = send_time - NOW;
512             dprintf("%s waiting %lliusec\n",tintstr(),towait);
513             int rd = Datagram::Wait(socket_count,sockets,towait);
514             if (rd!=INVALID_SOCKET)
515                 RecvDatagram(rd);
516         } else  { // FIXME FIXME FIXME REWRITE!!!  if (sender->next_send_time_==TINT_NEVER) { 
517             if (sender) {
518             dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
519             delete sender;
520             }
521             send_queue.pop();
522         }
523         
524     } while (Datagram::Time()<limit);
525         
526 }
527
528  
529 void Channel::Schedule (tint next_time) {
530     next_send_time_ = next_time;
531     if (next_time==TINT_NEVER)
532         next_time = NOW + TINT_MIN; // 1min timeout
533     send_queue.push(tintbin(next_time,id));
534     dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
535 }