trial exec
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  datasendrecv.cpp
3  *  serp++
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include <algorithm>
10 //#include <glog/logging.h>
11 #include "p2tp.h"
12 #include "compat/util.h"
13
14
15 using namespace p2tp;
16 using namespace std; // FIXME remove
17
18 /*
19  TODO  25 Oct 18:55
20  - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
21  - ANY_LAYER
22  - range: ALL
23  - randomized testing of advanced ops (new testcase)
24  - PeerCwnd()
25  - bins hint_out_, tbqueue hint_out_ts_
26  
27  */
28
29 void    Channel::AddPeakHashes (Datagram& dgram) {
30         for(int i=0; i<file().peak_count(); i++) {
31         bin64_t peak = file().peak(i);
32                 dgram.Push8(P2TP_HASH);
33                 dgram.Push32((uint32_t)peak);
34                 dgram.PushHash(file().peak_hash(i));
35         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
36         dprintf("%s #%i +phash (%i,%lli)\n",tintstr(),id,peak.layer(),peak.offset());
37         }
38 }
39
40
41 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
42     bin64_t peak = file().peak_for(pos);
43     while (pos!=peak && !data_out_cap_.within(pos.parent()) &&
44             ack_in_.get(pos.parent())==bins::EMPTY) {
45         bin64_t uncle = pos.sibling();
46                 dgram.Push8(P2TP_HASH);
47                 dgram.Push32((uint32_t)uncle);
48                 dgram.PushHash( file().hash(uncle) );
49         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
50         dprintf("%s #%i +hash (%i,%lli)\n",tintstr(),id,uncle.layer(),uncle.offset());
51         pos = pos.parent();
52     }
53 }
54
55
56 bin64_t         Channel::DequeueHint () { // TODO: resilience
57     bin64_t send = bin64_t::NONE;
58     while (!hint_in_.empty() && send==bin64_t::NONE) {
59         bin64_t hint = hint_in_.front().bin;
60         tint time = hint_in_.front().time;
61         hint_in_.pop_front();
62         if (time < NOW-8*rtt_avg_)
63             continue;
64         send = file().ack_out().find_filtered
65             (ack_in_,hint,0,bins::FILLED);
66         dprintf("%s #%i dequeued %lli\n",tintstr(),id,send.base_offset());
67         if (send!=bin64_t::NONE)
68             while (send!=hint) {
69                 hint = hint.towards(send);
70                 hint_in_.push_front(hint.sibling());
71             }
72     }
73     return send;
74 }
75
76
77 /*void  Channel::CleanStaleHints () {
78         while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED ) 
79                 hint_out.pop_front();  // FIXME must normally clear fulfilled entries
80         tint timed_out = NOW - cc_->RoundTripTime()*8;
81         while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
82         file().picker()->Snubbed(hint_out.front().bin);
83                 hint_out.pop_front();
84         }
85 }*/
86
87
88 void    Channel::AddHandshake (Datagram& dgram) {
89         if (!peer_channel_id_) { // initiating
90                 dgram.Push8(P2TP_HASH);
91                 dgram.Push32(bin64_t::ALL32);
92                 dgram.PushHash(file().root_hash());
93         dprintf("%s #%i +hash ALL %s\n",
94                 tintstr(),id,file().root_hash().hex().c_str());
95         }
96         dgram.Push8(P2TP_HANDSHAKE);
97         dgram.Push32(EncodeID(id));
98     dprintf("%s #%i +hs\n",tintstr(),id);
99     ack_out_.clear();
100     AddAck(dgram);
101 }
102
103
104 void    Channel::ClearStaleDataOut() {
105     int oldsize = data_out_.size();
106     while ( data_out_.size() && data_out_.front().time < 
107            NOW - rtt_avg_ - dev_avg_*4 )
108         data_out_.pop_front();
109     if (data_out_.size()!=oldsize) {
110         cc_->OnAckRcvd(bin64_t::NONE);
111         data_out_cap_ = bin64_t::ALL;
112     }
113     while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
114         data_out_.pop_front();
115 }
116
117
118 void    Channel::Send () {
119     Datagram dgram(socket_,peer());
120     dgram.Push32(peer_channel_id_);
121     bin64_t data = bin64_t::NONE;
122     if ( is_established() ) {
123         // FIXME: seeder check
124         AddAck(dgram);
125         if (!file().is_complete())
126             AddHint(dgram);
127         AddPex(dgram);
128         ClearStaleDataOut();
129         if (cc_->MaySendData()) 
130             data = AddData(dgram);
131         else
132             dprintf("%s #%i no cwnd\n",tintstr(),id);
133     } else {
134         AddHandshake(dgram);
135         AddAck(dgram);
136     }
137     dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str().c_str());
138     if (dgram.size()==4) // only the channel id; bare keep-alive
139         data = bin64_t::ALL;
140     cc_->OnDataSent(data);
141     if (dgram.Send()==-1)
142         print_error("can't send datagram");
143 }
144
145
146 void    Channel::AddHint (Datagram& dgram) {
147
148     while (!hint_out_.empty()) {
149         tintbin f = hint_out_.front();
150         if (f.time<NOW-rtt_avg_*8) {
151             hint_out_.pop_front();
152             dprintf("%s #%i !hint (%i,%lli)\n",
153                     tintstr(),id,(int)f.bin.layer(),f.bin.offset());
154             transfer().picker().Expired(f.bin);
155         } else {
156             int status = file().ack_out().get(f.bin);
157             if (status==bins::EMPTY) {
158                 break;
159             } else if (status==bins::FILLED) {
160                 hint_out_.pop_front();
161                 transfer().picker().Expired(f.bin);
162             } else { // mixed
163                 hint_out_.front().bin = f.bin.right();
164                 f.bin = f.bin.left();
165                 hint_out_.push_front(f);
166             } // FIXME: simplify this mess
167         }
168     }
169     /*while (!hint_out_.empty() &&
170             (hint_out_.front().time<NOW-TINT_SEC ||
171             file().ack_out().get(hint_out_.front().bin)==bins::FILLED ) ) {
172         file().picker().Expired(hint_out_.front().bin);
173         hint_out_.pop_front();
174     }*/
175     uint64_t hinted = 0;
176     for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
177         hinted += i->bin.width();
178     //int bps = PeerBPS();
179     //double kbps = max(4,TINT_SEC / dip_avg_);
180     double peer_cwnd = rtt_avg_ / dip_avg_;
181     if (peer_cwnd<1)
182         peer_cwnd = 1;
183     dprintf("%s #%i hinted %lli peer_cwnd %lli/%lli=%f\n",
184             tintstr(),id,hinted,rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_));
185
186     if ( 4*peer_cwnd > hinted ) { //hinted*1024 < peer_cwnd*4 ) {
187         
188         uint8_t layer = 2; // actually, enough
189         bin64_t hint = transfer().picker().Pick(ack_in_,layer);
190         // FIXME FIXME FIXME: any layer
191         if (hint==bin64_t::NONE)
192             hint = transfer().picker().Pick(ack_in_,0);
193         
194         if (hint!=bin64_t::NONE) {
195             hint_out_.push_back(hint);
196             dgram.Push8(P2TP_HINT);
197             dgram.Push32(hint);
198             dprintf("%s #%i +hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
199         }
200         
201     }
202 }
203
204
205 bin64_t         Channel::AddData (Datagram& dgram) {
206         if (!file().size()) // know nothing
207                 return bin64_t::NONE;
208         bin64_t tosend = DequeueHint();
209     if (tosend==bin64_t::NONE) {
210         dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
211         return bin64_t::NONE;
212     }
213     if (ack_in_.is_empty() && file().size())
214         AddPeakHashes(dgram);
215     AddUncleHashes(dgram,tosend);
216     uint8_t buf[1024];
217     size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
218     // TODO: ??? corrupted data, retries
219     if (r<0) {
220         print_error("error on reading");
221         return bin64_t::NONE;
222     }
223     assert(dgram.space()>=r+4+1);
224     dgram.Push8(P2TP_DATA);
225     dgram.Push32(tosend);
226     dgram.Push(buf,r);
227     dprintf("%s #%i +data (%lli)\n",tintstr(),id,tosend.base_offset());
228     data_out_.push_back(tosend);
229     data_out_cap_ = tosend;
230     // FIXME BUG this makes data_out_ all stale  ack_in_.set(tosend);
231         return tosend;
232 }
233
234
235 void    Channel::AddTs (Datagram& dgram) {
236     dgram.Push8(P2TP_TS);
237     dgram.Push64(data_in_.time);
238     dprintf("%s #%i +ts %lli\n",tintstr(),id,data_in_.time);
239 }
240
241
242 void    Channel::AddAck (Datagram& dgram) {
243         if (data_in_.bin!=bin64_t::NONE) {
244         AddTs(dgram);
245         bin64_t pos = data_in_.bin;
246                 dgram.Push8(P2TP_ACK);
247                 dgram.Push32(pos);
248                 //dgram.Push64(data_in_.time);
249         ack_out_.set(pos);
250         dprintf("%s #%i +ack (%i,%lli) %s\n",tintstr(),id,
251                 pos.layer(),pos.offset(),tintstr(data_in_.time));
252         data_in_ = tintbin(0,bin64_t::NONE);
253         }
254     for(int count=0; count<4; count++) {
255         bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, 0, bins::FILLED);
256         // TODO bins::ANY_LAYER
257         if (ack==bin64_t::NONE)
258             break;
259         ack = file().ack_out().cover(ack);
260         ack_out_.set(ack);
261         dgram.Push8(P2TP_ACK);
262         dgram.Push32(ack);
263         dprintf("%s #%i +ack (%i,%lli)\n",tintstr(),id,ack.layer(),ack.offset());
264     }
265 }
266
267
268 void    Channel::Recv (Datagram& dgram) {
269     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
270         rtt_avg_ = NOW - last_send_time_;
271         dev_avg_ = rtt_avg_;
272         dip_avg_ = rtt_avg_;
273         transfer().hs_in_.push_back(id);
274         dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
275     }
276     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
277         while (dgram.size()) {
278                 uint8_t type = dgram.Pull8();
279                 switch (type) {
280             case P2TP_HANDSHAKE: OnHandshake(dgram); break;
281                         case P2TP_DATA:         data=OnData(dgram); break;
282                         case P2TP_TS:       OnTs(dgram); break;
283                         case P2TP_ACK:          OnAck(dgram); break;
284                         case P2TP_HASH:         OnHash(dgram); break;
285                         case P2TP_HINT:         OnHint(dgram); break;
286             case P2TP_PEX_ADD:  OnPex(dgram); break;
287                         default:
288                                 //LOG(ERROR) << this->id_string() << " malformed datagram";
289                                 return;
290                 }
291         }
292     cc_->OnDataRecvd(data);
293     last_recv_time_ = NOW;
294     if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC)
295         Send(); //RequeueSend(NOW);
296 }
297
298
299 void    Channel::OnHash (Datagram& dgram) {
300         bin64_t pos = dgram.Pull32();
301         Sha1Hash hash = dgram.PullHash();
302         file().OfferHash(pos,hash);
303     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
304     dprintf("%s #%i -hash (%i,%lli)\n",tintstr(),id,pos.layer(),pos.offset());
305 }
306
307
308 bin64_t Channel::OnData (Datagram& dgram) {
309         bin64_t pos = dgram.Pull32();
310     uint8_t *data;
311     int length = dgram.Pull(&data,1024);
312     bool ok = file().OfferData(pos, (char*)data, length) ;
313     dprintf("%s #%i %cdata (%lli)\n",tintstr(),id,ok?'-':'!',pos.offset());
314     if (ok) {
315         data_in_ = tintbin(NOW,pos);
316         if (last_recv_time_) {
317             tint dip = NOW - last_recv_time_; // FIXME: was it an ACK?
318             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
319         }
320         transfer().picker().Received(pos); // so dirty; FIXME FIXME FIXME
321         return pos;
322     } else
323         return bin64_t::NONE;
324 }
325
326
327 void    Channel::OnAck (Datagram& dgram) {
328         bin64_t ackd_pos = dgram.Pull32();
329     if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
330         eprintf("invalid ack: (%i,%lli)\n",ackd_pos.layer(),ackd_pos.offset());
331         return;
332     }
333     dprintf("%s #%i -ack (%i,%lli)\n",tintstr(),id,ackd_pos.layer(),ackd_pos.offset());
334     for (int i=0; i<8 && i<data_out_.size(); i++) 
335         if (data_out_[i].bin.within(ackd_pos)) {
336             tint rtt = NOW-data_out_[i].time;
337             rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
338             dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
339             dprintf("%s #%i rtt %lli dev %lli\n",
340                     tintstr(),id,rtt_avg_,dev_avg_);
341             cc_->OnAckRcvd(data_out_[i].bin); // may be invoked twice FIXME FIXME FIXME 
342         }
343         ack_in_.set(ackd_pos);
344     while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
345         data_out_.pop_front();
346 }
347
348
349 /*void  Channel::OnAckTs (Datagram& dgram) {  // FIXME:   OnTs
350         bin64_t pos = dgram.Pull32();
351     tint ts = dgram.Pull64();
352     // TODO sanity check
353     dprintf("%s #%i -ackts (%i,%lli) %s\n",
354             tintstr(),id,pos.layer(),pos.offset(),tintstr(ts));
355         ack_in_.set(pos);
356         cc_->OnAckRcvd(pos,ts);
357 }*/
358
359 void Channel::OnTs (Datagram& dgram) {
360     peer_send_time_ = dgram.Pull64();
361     dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
362 }
363
364
365 void    Channel::OnHint (Datagram& dgram) {
366         bin64_t hint = dgram.Pull32();
367         hint_in_.push_back(hint);
368     ack_in_.set(hint,bins::EMPTY);
369     //RequeueSend(cc_->OnHintRecvd(hint));
370     dprintf("%s #%i -hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
371 }
372
373
374 void Channel::OnHandshake (Datagram& dgram) {
375     peer_channel_id_ = dgram.Pull32();
376     dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
377     // FUTURE: channel forking
378 }
379
380
381 void Channel::OnPex (Datagram& dgram) {
382     uint32_t ipv4 = dgram.Pull32();
383     uint16_t port = dgram.Pull16();
384     Address addr(ipv4,port);
385     dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str().c_str());
386     transfer().OnPexIn(addr);
387 }
388
389
390 void    Channel::AddPex (Datagram& dgram) {
391     int chid = transfer().RevealChannel(pex_out_);
392     if (chid==-1 || chid==id)
393         return;
394     Address a = channels[chid]->peer();
395     dgram.Push8(P2TP_PEX_ADD);
396     dgram.Push32(a.ipv4());
397     dgram.Push16(a.port());
398     dprintf("%s #%i +pex %s\n",tintstr(),id,a.str().c_str());
399 }
400
401
402 void    Channel::Recv (int socket) {
403         Datagram data(socket);
404         data.Recv();
405         if (data.size()<4) 
406                 RETLOG("datagram shorter than 4 bytes");
407         uint32_t mych = data.Pull32();
408         Sha1Hash hash;
409         Channel* channel = NULL;
410         if (!mych) { // handshake initiated
411                 if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
412                         RETLOG ("incorrect size initial handshake packet");
413                 uint8_t hashid = data.Pull8();
414                 if (hashid!=P2TP_HASH) 
415                         RETLOG ("no hash in the initial handshake");
416                 bin64_t pos = data.Pull32();
417                 if (pos!=bin64_t::ALL32) 
418                         RETLOG ("that is not the root hash");
419                 hash = data.PullHash();
420                 FileTransfer* file = FileTransfer::Find(hash);
421                 if (!file) 
422                         RETLOG ("hash unknown, no such file");
423         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
424         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
425             if (channels[*i] && channels[*i]->peer_==data.addr) 
426                 RETLOG("have a channel already");
427                 channel = new Channel(file, socket, data.address());
428         } else {
429                 mych = DecodeID(mych);
430                 if (mych>=channels.size()) {
431             eprintf("invalid channel #%i\n",mych);
432             return;
433         }
434                 channel = channels[mych];
435                 if (!channel) 
436                         RETLOG ("channel is closed");
437                 if (channel->peer() != data.address()) 
438                         RETLOG ("invalid peer address");
439         channel->own_id_mentioned_ = true;
440         }
441     dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
442     channel->Recv(data);
443 }
444
445
446 bool tblater (const tintbin& a, const tintbin& b) {
447     return a.time > b.time;
448 }
449
450
451 void    Channel::RequeueSend (tint next_time) {
452     if (next_time==next_send_time_)
453         return;
454     next_send_time_ = next_time;
455     send_queue.push_back
456         (tintbin(next_time==TINT_NEVER?NOW+TINT_MIN:next_time,id));
457     push_heap(send_queue.begin(),send_queue.end(),tblater);
458     dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
459 }
460
461
462 void    Channel::Loop (tint howlong) {  
463         
464     tint limit = Datagram::Time() + howlong;
465     
466     do {
467
468         tint send_time(TINT_NEVER);
469         Channel* sender(NULL);
470         while (!send_queue.empty()) {
471             send_time = send_queue.front().time;
472             sender = channel((int)send_queue.front().bin);
473             if (sender)
474                 if ( sender->next_send_time_==send_time ||
475                      sender->next_send_time_==TINT_NEVER )
476                 break;
477             sender = NULL; // it was a stale entry
478             pop_heap(send_queue.begin(), send_queue.end(), tblater);
479             send_queue.pop_back();
480         }
481         if (send_time>limit)
482             send_time = limit;
483         if ( sender && sender->next_send_time_ <= NOW ) {
484             dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
485                     tintstr(send_time));
486             sender->Send();
487             sender->last_send_time_ = NOW;
488             sender->RequeueSend(sender->cc_->NextSendTime());
489             pop_heap(send_queue.begin(), send_queue.end(), tblater);
490             send_queue.pop_back();
491         } else if ( send_time > NOW ) {
492             tint towait = send_time - NOW;
493             dprintf("%s waiting %lliusec\n",tintstr(),towait);
494             int rd = Datagram::Wait(socket_count,sockets,towait);
495             if (rd!=INVALID_SOCKET)
496                 Recv(rd);
497         } else { //if (sender->next_send_time_==TINT_NEVER) { 
498             dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
499             delete sender;
500             pop_heap(send_queue.begin(), send_queue.end(), tblater);
501             send_queue.pop_back();
502         }
503         
504     } while (Datagram::Time()<limit);
505         
506 }
507