logging
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  datasendrecv.cpp
3  *  serp++
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include "p2tp.h"
10 #include "compat/util.h"
11
12
13 using namespace p2tp;
14 using namespace std; // FIXME remove
15
16 /*
17  TODO  25 Oct 18:55
18  - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
19  - ANY_LAYER
20  - range: ALL
21  - randomized testing of advanced ops (new testcase)
22  - PeerCwnd()
23  - bins hint_out_, tbqueue hint_out_ts_
24  
25  */
26
27 void    Channel::AddPeakHashes (Datagram& dgram) {
28         for(int i=0; i<file().peak_count(); i++) {
29         bin64_t peak = file().peak(i);
30                 dgram.Push8(P2TP_HASH);
31                 dgram.Push32((uint32_t)peak);
32                 dgram.PushHash(file().peak_hash(i));
33         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
34         dprintf("%s #%i +phash %s\n",tintstr(),id,peak.str());
35         }
36 }
37
38
39 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
40     bin64_t peak = file().peak_for(pos);
41     while (pos!=peak && ((NOW&7)==7 || !data_out_cap_.within(pos.parent())) &&
42             ack_in_.get(pos.parent())==bins::EMPTY) {
43         bin64_t uncle = pos.sibling();
44                 dgram.Push8(P2TP_HASH);
45                 dgram.Push32((uint32_t)uncle);
46                 dgram.PushHash( file().hash(uncle) );
47         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
48         dprintf("%s #%i +hash %s\n",tintstr(),id,uncle.str());
49         pos = pos.parent();
50     }
51 }
52
53
54 bin64_t         Channel::DequeueHint () { // TODO: resilience
55     bin64_t send = bin64_t::NONE;
56     while (!hint_in_.empty() && send==bin64_t::NONE) {
57         bin64_t hint = hint_in_.front().bin;
58         tint time = hint_in_.front().time;
59         hint_in_.pop_front();
60         if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_)
61             continue;
62         send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
63         send = send.left_foot(); // single packet
64         if (send!=bin64_t::NONE)
65             while (send!=hint) {
66                 hint = hint.towards(send);
67                 hint_in_.push_front(hint.sibling());
68             }
69     }
70     uint64_t mass = 0;
71     for(int i=0; i<hint_in_.size(); i++)
72         mass += hint_in_[i].bin.width();
73     dprintf("%s #%i dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
74     return send;
75 }
76
77
78 void    Channel::AddHandshake (Datagram& dgram) {
79         if (!peer_channel_id_) { // initiating
80                 dgram.Push8(P2TP_HASH);
81                 dgram.Push32(bin64_t::ALL32);
82                 dgram.PushHash(file().root_hash());
83         dprintf("%s #%i +hash ALL %s\n",
84                 tintstr(),id,file().root_hash().hex().c_str());
85         }
86         dgram.Push8(P2TP_HANDSHAKE);
87         dgram.Push32(EncodeID(id));
88     dprintf("%s #%i +hs\n",tintstr(),id);
89     ack_out_.clear();
90     AddAck(dgram);
91 }
92
93
94 void    Channel::ClearStaleDataOut() {
95     int oldsize = data_out_.size();
96     tint timeout = NOW - max( rtt_avg_-dev_avg_*4, 500*TINT_MSEC );
97     while ( data_out_.size() && data_out_.front().time < timeout &&
98             ack_in_.get(data_out_.front().bin)==bins::EMPTY ) {
99         dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
100         data_out_.pop_front();
101     }
102     if (data_out_.size()!=oldsize) {
103         cc_->OnAckRcvd(bin64_t::NONE);
104         data_out_cap_ = bin64_t::ALL;
105     }
106     while (data_out_.size() && (data_out_.front()==tintbin() || ack_in_.get(data_out_.front().bin)==bins::FILLED))
107         data_out_.pop_front();
108 }
109
110
111 void    Channel::Send () {
112     Datagram dgram(socket_,peer());
113     dgram.Push32(peer_channel_id_);
114     bin64_t data = bin64_t::NONE;
115     if ( is_established() ) {
116         // FIXME: seeder check
117         AddAck(dgram);
118         if (!file().is_complete())
119             AddHint(dgram);
120         AddPex(dgram);
121         ClearStaleDataOut();
122         data = AddData(dgram);
123     } else {
124         AddHandshake(dgram);
125         AddAck(dgram);
126     }
127     dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str().c_str());
128     if (dgram.size()==4) // only the channel id; bare keep-alive
129         data = bin64_t::ALL;
130     cc_->OnDataSent(data);
131     if (dgram.Send()==-1)
132         print_error("can't send datagram");
133 }
134
135
136 void    Channel::CleanStaleHintOut () {
137     tint timed_out = NOW - 8*rtt_avg_; // FIXME BULLSHIT (take rtt=0)
138         while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
139         transfer().picker().Expired(hint_out_.front().bin);
140                 hint_out_.pop_front();
141         }
142 }
143
144
145 void    Channel::AddHint (Datagram& dgram) {
146
147     CleanStaleHintOut();
148     
149     uint64_t hint_out_mass=0;
150     for(int i=0; i<hint_out_.size(); i++)
151         hint_out_mass += hint_out_[i].bin.width();
152     
153     int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
154     if (!peer_cwnd)
155         peer_cwnd = 1;
156     int peer_pps = TINT_SEC / dip_avg_;
157     if (!peer_pps)
158         peer_pps = 1;
159     
160     if ( hint_out_mass < peer_pps ) { //4*peer_cwnd ) {
161             
162         int diff = peer_pps - hint_out_mass;
163         //if (diff>4 && diff>2*peer_cwnd)
164         //    diff >>= 1;
165         bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
166         
167         if (hint!=bin64_t::NONE) {
168             dgram.Push8(P2TP_HINT);
169             dgram.Push32(hint);
170             dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_mass);
171             hint_out_.push_back(hint);
172         } else
173             dprintf("%s #%i .hint\n",tintstr(),id);
174         
175     }
176 }
177
178
179 bin64_t         Channel::AddData (Datagram& dgram) {
180         
181     if (!file().size()) // know nothing
182                 return bin64_t::NONE;
183     
184         bin64_t tosend = bin64_t::NONE;
185     if (cc_->MaySendData()) {
186         tosend = DequeueHint();
187         if (tosend==bin64_t::NONE)
188             dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
189     } else
190         dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id);
191     
192     if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty())) 
193         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt
194     
195     if (tosend!=bin64_t::NONE) { // hashes
196         if (ack_in_.is_empty() && file().size())
197             AddPeakHashes(dgram);
198         AddUncleHashes(dgram,tosend);
199         data_out_cap_ = tosend;
200     }
201     
202     dgram.Push8(P2TP_DATA);
203     dgram.Push32(tosend.to32());
204     
205     if (tosend!=bin64_t::NONE) { // data
206         uint8_t buf[1024];
207         size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
208         // TODO: corrupted data, retries, caching
209         if (r<0) {
210             print_error("error on reading");
211             return bin64_t::NONE;
212         }
213         assert(dgram.space()>=r+4+1);
214         dgram.Push(buf,r);
215     }
216     
217     last_send_data_time_ = NOW;
218     data_out_.push_back(tosend);
219     dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str());
220     
221         return tosend;
222 }
223
224
225 void    Channel::AddTs (Datagram& dgram) {
226     dgram.Push8(P2TP_TS);
227     dgram.Push64(data_in_.time);
228     dprintf("%s #%i +ts %s\n",tintstr(),id,tintstr(data_in_.time));
229 }
230
231
232 void    Channel::AddAck (Datagram& dgram) {
233     if (data_in_dbl_!=bin64_t::NONE) {
234                 dgram.Push8(P2TP_ACK);
235                 dgram.Push32(data_in_dbl_);
236         data_in_dbl_=bin64_t::NONE;
237     }
238         if (data_in_.bin!=bin64_t::NONE) {
239         AddTs(dgram);
240         bin64_t pos = file().ack_out().cover(data_in_.bin);
241                 dgram.Push8(P2TP_ACK);
242                 dgram.Push32(pos);
243                 //dgram.Push64(data_in_.time);
244         ack_out_.set(pos);
245         dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
246         data_in_ = tintbin(0,bin64_t::NONE);
247         if (pos.layer()>2)
248             data_in_dbl_ = pos;
249         }
250     for(int count=0; count<4; count++) {
251         bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, bins::FILLED);
252         if (ack==bin64_t::NONE)
253             break;
254         ack = file().ack_out().cover(ack);
255         ack_out_.set(ack);
256         dgram.Push8(P2TP_ACK);
257         dgram.Push32(ack);
258         dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str());
259     }
260 }
261
262
263 void    Channel::Recv (Datagram& dgram) {
264     dprintf("%s #%i recvd %i\n",tintstr(),id,dgram.size()+4);
265     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
266         rtt_avg_ = NOW - last_send_time_;
267         dev_avg_ = rtt_avg_;
268         dip_avg_ = rtt_avg_;
269         transfer().hs_in_.push_back(id);
270         dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
271     }
272     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
273         while (dgram.size()) {
274                 uint8_t type = dgram.Pull8();
275                 switch (type) {
276             case P2TP_HANDSHAKE: OnHandshake(dgram); break;
277                         case P2TP_DATA:         data=OnData(dgram); break;
278                         case P2TP_TS:       OnTs(dgram); break;
279                         case P2TP_ACK:          OnAck(dgram); break;
280                         case P2TP_HASH:         OnHash(dgram); break;
281                         case P2TP_HINT:         OnHint(dgram); break;
282             case P2TP_PEX_ADD:  OnPex(dgram); break;
283                         default:
284                                 eprintf("%s #%i ?msg id unknown %i\n",tintstr(),id,(int)type);
285                                 return;
286                 }
287         }
288     cc_->OnDataRecvd(data);
289     last_recv_time_ = NOW;
290 }
291
292
293 void    Channel::OnHash (Datagram& dgram) {
294         bin64_t pos = dgram.Pull32();
295         Sha1Hash hash = dgram.PullHash();
296         file().OfferHash(pos,hash);
297     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
298     dprintf("%s #%i -hash %s\n",tintstr(),id,pos.str());
299 }
300
301
302 void    Channel::CleanFulfilledHints (bin64_t pos) {
303     int hi = 0;
304     while (hi<hint_out_.size() && hi<8 && !pos.within(hint_out_[hi].bin))
305         hi++;
306     if (hi<8 && hi<hint_out_.size()) {
307         while (hi--) {
308             transfer().picker().Expired(hint_out_.front().bin);
309             hint_out_.pop_front();
310         }
311         while (hint_out_.front().bin!=pos) {
312             tintbin f = hint_out_.front();
313             f.bin = f.bin.towards(pos);
314             hint_out_.front().bin = f.bin.sibling();
315             hint_out_.push_front(f);
316         }
317         hint_out_.pop_front();
318     }
319      // every HINT ends up as either Expired or Received 
320     transfer().picker().Received(pos);
321 }
322
323
324 bin64_t Channel::OnData (Datagram& dgram) {
325         bin64_t pos = dgram.Pull32();
326     uint8_t *data;
327     int length = dgram.Pull(&data,1024);
328     bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
329     dprintf("%s #%i %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
330     if (!ok) 
331         return bin64_t::NONE;
332     data_in_ = tintbin(NOW,pos);
333     if (pos!=bin64_t::NONE) {
334         if (last_recv_data_time_) {
335             tint dip = NOW - last_recv_data_time_;
336             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
337         }
338         last_recv_data_time_ = NOW;
339     }
340     CleanFulfilledHints(pos);    
341     return pos;
342 }
343
344
345 void    Channel::CleanFulfilledDataOut (bin64_t ackd_pos) {
346     for (int i=0; i<8 && i<data_out_.size(); i++) 
347         if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
348             tint rtt = NOW-data_out_[i].time;
349             rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
350             dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
351             dprintf("%s #%i rtt %lli dev %lli\n",
352                     tintstr(),id,rtt_avg_,dev_avg_);
353             cc_->OnAckRcvd(data_out_[i].bin);
354             data_out_[i]=tintbin();
355         }
356     while ( data_out_.size() && ( data_out_.front()==tintbin() ||
357                                  ack_in_.get(data_out_.front().bin)==bins::FILLED ) )
358         data_out_.pop_front();
359 }
360
361
362 void    Channel::OnAck (Datagram& dgram) {
363         bin64_t ackd_pos = dgram.Pull32();
364     if (ackd_pos!=bin64_t::NONE && file().size() && ackd_pos.base_offset()>=file().packet_size()) {
365         eprintf("invalid ack: %s\n",ackd_pos.str());
366         return;
367     }
368     dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str());
369     ack_in_.set(ackd_pos);
370     CleanFulfilledDataOut(ackd_pos);
371 }
372
373
374 void Channel::OnTs (Datagram& dgram) {
375     peer_send_time_ = dgram.Pull64();
376     dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
377 }
378
379
380 void    Channel::OnHint (Datagram& dgram) {
381         bin64_t hint = dgram.Pull32();
382         hint_in_.push_back(hint);
383     //ack_in_.set(hint,bins::EMPTY);
384     //RequeueSend(cc_->OnHintRecvd(hint));
385     dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str());
386 }
387
388
389 void Channel::OnHandshake (Datagram& dgram) {
390     peer_channel_id_ = dgram.Pull32();
391     dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
392     // FUTURE: channel forking
393 }
394
395
396 void Channel::OnPex (Datagram& dgram) {
397     uint32_t ipv4 = dgram.Pull32();
398     uint16_t port = dgram.Pull16();
399     Address addr(ipv4,port);
400     dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str().c_str());
401     transfer().OnPexIn(addr);
402 }
403
404
405 void    Channel::AddPex (Datagram& dgram) {
406     int chid = transfer().RevealChannel(pex_out_);
407     if (chid==-1 || chid==id)
408         return;
409     Address a = channels[chid]->peer();
410     dgram.Push8(P2TP_PEX_ADD);
411     dgram.Push32(a.ipv4());
412     dgram.Push16(a.port());
413     dprintf("%s #%i +pex %s\n",tintstr(),id,a.str().c_str());
414 }
415
416
417 void    Channel::RecvDatagram (int socket) {
418         Datagram data(socket);
419         data.Recv();
420         if (data.size()<4) 
421                 RETLOG("datagram shorter than 4 bytes");
422         uint32_t mych = data.Pull32();
423         Sha1Hash hash;
424         Channel* channel = NULL;
425         if (!mych) { // handshake initiated
426                 if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
427                         RETLOG ("incorrect size initial handshake packet");
428                 uint8_t hashid = data.Pull8();
429                 if (hashid!=P2TP_HASH) 
430                         RETLOG ("no hash in the initial handshake");
431                 bin64_t pos = data.Pull32();
432                 if (pos!=bin64_t::ALL) 
433                         RETLOG ("that is not the root hash");
434                 hash = data.PullHash();
435                 FileTransfer* file = FileTransfer::Find(hash);
436                 if (!file) 
437                         RETLOG ("hash unknown, no such file");
438         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
439         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
440             if (channels[*i] && channels[*i]->peer_==data.addr && 
441                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2) 
442                 RETLOG("have a channel already");
443                 channel = new Channel(file, socket, data.address());
444         } else {
445                 mych = DecodeID(mych);
446                 if (mych>=channels.size()) {
447             eprintf("invalid channel #%i\n",mych);
448             return;
449         }
450                 channel = channels[mych];
451                 if (!channel) 
452                         RETLOG ("channel is closed");
453                 if (channel->peer() != data.address()) 
454                         RETLOG ("invalid peer address");
455         channel->own_id_mentioned_ = true;
456         }
457     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
458     channel->Recv(data);
459 }
460
461
462 void    Channel::Loop (tint howlong) {  
463         
464     tint limit = Datagram::Time() + howlong;
465     
466     do {
467
468         tint send_time(TINT_NEVER);
469         Channel* sender(NULL);
470         while (!send_queue.is_empty()) {
471             send_time = send_queue.peek().time;
472             sender = channel((int)send_queue.peek().bin);
473             if (sender)
474                 if ( sender->next_send_time_==send_time ||
475                      sender->next_send_time_==TINT_NEVER )
476                 break;
477             sender = NULL; // it was a stale entry
478             send_queue.pop();
479         }
480         if (send_time>limit)
481             send_time = limit;
482         if ( sender && sender->next_send_time_ <= NOW ) {
483             dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
484                     tintstr(send_time));
485             sender->Send();
486             sender->last_send_time_ = NOW;
487             // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl
488             send_queue.pop();
489         } else if ( send_time > NOW ) {
490             tint towait = send_time - NOW;
491             dprintf("%s waiting %lliusec\n",tintstr(),towait);
492             int rd = Datagram::Wait(socket_count,sockets,towait);
493             if (rd!=INVALID_SOCKET)
494                 RecvDatagram(rd);
495         } else  { // FIXME FIXME FIXME REWRITE!!!  if (sender->next_send_time_==TINT_NEVER) { 
496             if (sender) {
497             dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
498             delete sender;
499             }
500             send_queue.pop();
501         }
502         
503     } while (Datagram::Time()<limit);
504         
505 }
506
507  
508 void Channel::Schedule (tint next_time) {
509     next_send_time_ = next_time;
510     if (next_time==TINT_NEVER)
511         next_time = NOW + TINT_MIN; // 1min timeout
512     send_queue.push(tintbin(next_time,id));
513     dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
514 }