beautification
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  datasendrecv.cpp
3  *  serp++
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include <algorithm>
10 //#include <glog/logging.h>
11 #include "p2tp.h"
12 #include "compat/util.h"
13
14
15 using namespace p2tp;
16 using namespace std; // FIXME remove
17
18 /*
19  TODO  25 Oct 18:55
20  - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
21  - ANY_LAYER
22  - range: ALL
23  - randomized testing of advanced ops (new testcase)
24  - PeerCwnd()
25  - bins hint_out_, tbqueue hint_out_ts_
26  
27  */
28
29 void    Channel::AddPeakHashes (Datagram& dgram) {
30         for(int i=0; i<file().peak_count(); i++) {
31         bin64_t peak = file().peak(i);
32                 dgram.Push8(P2TP_HASH);
33                 dgram.Push32((uint32_t)peak);
34                 dgram.PushHash(file().peak_hash(i));
35         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
36         dprintf("%s #%i +phash (%i,%lli)\n",tintstr(),id,peak.layer(),peak.offset());
37         }
38 }
39
40
41 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
42     bin64_t peak = file().peak_for(pos);
43     while (pos!=peak && ack_in_.get(pos.parent())==bins::EMPTY) {
44         bin64_t uncle = pos.sibling();
45                 dgram.Push8(P2TP_HASH);
46                 dgram.Push32((uint32_t)uncle);
47                 dgram.PushHash( file().hash(uncle) );
48         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
49         dprintf("%s #%i +hash (%i,%lli)\n",tintstr(),id,uncle.layer(),uncle.offset());
50         pos = pos.parent();
51     }
52 }
53
54
55 bin64_t         Channel::DequeueHint () { // TODO: resilience
56     bin64_t send = bin64_t::NONE;
57     while (!hint_in_.empty() && send==bin64_t::NONE) {
58         bin64_t hint = hint_in_.front();
59         hint_in_.pop_front();
60         send = file().ack_out().find_filtered
61             (ack_in_,hint,0,bins::FILLED);
62         dprintf("%s #%i may_send %lli\n",tintstr(),id,send.base_offset());
63         if (send!=bin64_t::NONE)
64             while (send!=hint) {
65                 hint = hint.towards(send);
66                 hint_in_.push_front(hint.sibling());
67             }
68     }
69     return send;
70 }
71
72
73 /*void  Channel::CleanStaleHints () {
74         while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED ) 
75                 hint_out.pop_front();  // FIXME must normally clear fulfilled entries
76         tint timed_out = NOW - cc_->RoundTripTime()*8;
77         while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
78         file().picker()->Snubbed(hint_out.front().bin);
79                 hint_out.pop_front();
80         }
81 }*/
82
83
84 void    Channel::AddHandshake (Datagram& dgram) {
85         if (!peer_channel_id_) { // initiating
86                 dgram.Push8(P2TP_HASH);
87                 dgram.Push32(bin64_t::ALL32);
88                 dgram.PushHash(file().root_hash());
89         dprintf("%s #%i +hash ALL %s\n",
90                 tintstr(),id,file().root_hash().hex().c_str());
91         }
92         dgram.Push8(P2TP_HANDSHAKE);
93         dgram.Push32(EncodeID(id));
94     dprintf("%s #%i +hs\n",tintstr(),id);
95     ack_out_.clear();
96     AddAck(dgram);
97 }
98
99
100 void    Channel::ClearStaleDataOut() {
101     int oldsize = data_out_.size();
102     while ( data_out_.size() && data_out_.front().time < 
103            NOW - rtt_avg_ - dev_avg_*4 )
104         data_out_.pop_front();
105     if (data_out_.size()!=oldsize)
106         cc_->OnAckRcvd(bin64_t::NONE);
107     while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
108         data_out_.pop_front();
109 }
110
111
112 void    Channel::Send () {
113     Datagram dgram(socket_,peer());
114     dgram.Push32(peer_channel_id_);
115     bin64_t data = bin64_t::NONE;
116     if ( is_established() ) {
117         // FIXME: seeder check
118         AddAck(dgram);
119         if (!file().is_complete())
120             AddHint(dgram);
121         AddPex(dgram);
122         ClearStaleDataOut();
123         if (cc_->MaySendData()) 
124             data = AddData(dgram);
125         else
126             dprintf("%s #%i no cwnd\n",tintstr(),id);
127     } else {
128         AddHandshake(dgram);
129         AddAck(dgram);
130     }
131     dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str().c_str());
132     if (dgram.size()==4) // only the channel id; bare keep-alive
133         data = bin64_t::ALL;
134     cc_->OnDataSent(data);
135         if (dgram.Send()==-1)
136         print_error("can't send datagram");
137     last_send_time_ = NOW;
138     RequeueSend(cc_->NextSendTime());
139 }
140
141
142 void    Channel::AddHint (Datagram& dgram) {
143
144     while (!hint_out_.empty()) {
145         tintbin f = hint_out_.front();
146         if (f.time<NOW-rtt_avg_*8) {
147             hint_out_.pop_front();
148             transfer().picker().Expired(f.bin);
149         } else {
150             int status = file().ack_out().get(f.bin);
151             if (status==bins::EMPTY) {
152                 transfer().picker().Expired(f.bin);
153                 break;
154             } else if (status==bins::FILLED) {
155                 hint_out_.pop_front();
156                 transfer().picker().Expired(f.bin);
157             } else { // mixed
158                 hint_out_.front().bin = f.bin.right();
159                 f.bin = f.bin.left();
160                 hint_out_.push_front(f);
161             }
162         }
163     }
164     /*while (!hint_out_.empty() &&
165             (hint_out_.front().time<NOW-TINT_SEC ||
166             file().ack_out().get(hint_out_.front().bin)==bins::FILLED ) ) {
167         file().picker().Expired(hint_out_.front().bin);
168         hint_out_.pop_front();
169     }*/
170     uint64_t hinted = 0;
171     for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
172         hinted += i->bin.width();
173     //int bps = PeerBPS();
174     //double kbps = max(4,TINT_SEC / dip_avg_);
175     double peer_cwnd = rtt_avg_ / dip_avg_;
176     if (peer_cwnd<1)
177         peer_cwnd = 1;
178     dprintf("%s #%i hinted %lli peer_cwnd %lli/%lli=%f\n",
179             tintstr(),id,hinted,rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_));
180
181     if ( 4*peer_cwnd > hinted ) { //hinted*1024 < peer_cwnd*4 ) {
182         
183         uint8_t layer = 2; // actually, enough
184         bin64_t hint = transfer().picker().Pick(ack_in_,layer);
185         // FIXME FIXME FIXME: any layer
186         if (hint==bin64_t::NONE)
187             hint = transfer().picker().Pick(ack_in_,0);
188         
189         if (hint!=bin64_t::NONE) {
190             hint_out_.push_back(hint);
191             dgram.Push8(P2TP_HINT);
192             dgram.Push32(hint);
193             dprintf("%s #%i +hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
194         }
195         
196     }
197 }
198
199
200 bin64_t         Channel::AddData (Datagram& dgram) {
201         if (!file().size()) // know nothing
202                 return bin64_t::NONE;
203         bin64_t tosend = DequeueHint();
204     if (tosend==bin64_t::NONE) {
205         dprintf("%s #%i out of hints\n",tintstr(),id);
206         return bin64_t::NONE;
207     }
208     if (ack_in_.is_empty() && file().size())
209         AddPeakHashes(dgram);
210     AddUncleHashes(dgram,tosend);
211     uint8_t buf[1024];
212     size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
213     // TODO: ??? corrupted data, retries
214     if (r<0) {
215         print_error("error on reading");
216         return bin64_t::NONE;
217     }
218     assert(dgram.space()>=r+4+1);
219     dgram.Push8(P2TP_DATA);
220     dgram.Push32(tosend);
221     dgram.Push(buf,r);
222     dprintf("%s #%i +data (%lli)\n",tintstr(),id,tosend.base_offset());
223     data_out_.push_back(tosend);
224     ack_in_.set(tosend);
225         return tosend;
226 }
227
228
229 void    Channel::AddTs (Datagram& dgram) {
230     dgram.Push8(P2TP_TS);
231     dgram.Push64(data_in_.time);
232     dprintf("%s #%i +ts %lli\n",tintstr(),id,data_in_.time);
233 }
234
235
236 void    Channel::AddAck (Datagram& dgram) {
237         if (data_in_.bin!=bin64_t::NONE) {
238         AddTs(dgram);
239         bin64_t pos = data_in_.bin;
240                 dgram.Push8(P2TP_ACK);
241                 dgram.Push32(pos);
242                 //dgram.Push64(data_in_.time);
243         ack_out_.set(pos);
244         dprintf("%s #%i +ack (%i,%lli) %s\n",tintstr(),id,
245                 pos.layer(),pos.offset(),tintstr(data_in_.time));
246         data_in_ = tintbin(0,bin64_t::NONE);
247         }
248     for(int count=0; count<4; count++) {
249         bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, 0, bins::FILLED);
250         // TODO bins::ANY_LAYER
251         if (ack==bin64_t::NONE)
252             break;
253         ack = file().ack_out().cover(ack);
254         ack_out_.set(ack);
255         dgram.Push8(P2TP_ACK);
256         dgram.Push32(ack);
257         dprintf("%s #%i +ack (%i,%lli)\n",tintstr(),id,ack.layer(),ack.offset());
258     }
259 }
260
261
262 void    Channel::Recv (Datagram& dgram) {
263     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
264         rtt_avg_ = NOW - last_send_time_;
265         dev_avg_ = rtt_avg_;
266         dip_avg_ = rtt_avg_;
267         transfer().hs_in_.push_back(id);
268         dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
269     }
270     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
271         while (dgram.size()) {
272                 uint8_t type = dgram.Pull8();
273                 switch (type) {
274             case P2TP_HANDSHAKE: OnHandshake(dgram); break;
275                         case P2TP_DATA:         data=OnData(dgram); break;
276                         case P2TP_TS:       OnTs(dgram); break;
277                         case P2TP_ACK:          OnAck(dgram); break;
278                         case P2TP_HASH:         OnHash(dgram); break;
279                         case P2TP_HINT:         OnHint(dgram); break;
280             case P2TP_PEX_ADD:  OnPex(dgram); break;
281                         default:
282                                 //LOG(ERROR) << this->id_string() << " malformed datagram";
283                                 return;
284                 }
285         }
286     cc_->OnDataRecvd(data);
287     last_recv_time_ = NOW;
288     if (data!=bin64_t::ALL)
289         RequeueSend(NOW);
290 }
291
292
293 void    Channel::OnHash (Datagram& dgram) {
294         bin64_t pos = dgram.Pull32();
295         Sha1Hash hash = dgram.PullHash();
296         file().OfferHash(pos,hash);
297     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
298     dprintf("%s #%i -hash (%i,%lli)\n",tintstr(),id,pos.layer(),pos.offset());
299 }
300
301
302 bin64_t Channel::OnData (Datagram& dgram) {
303         bin64_t pos = dgram.Pull32();
304     uint8_t *data;
305     int length = dgram.Pull(&data,1024);
306     bool ok = file().OfferData(pos, (char*)data, length) ;
307     dprintf("%s #%i %cdata (%lli)\n",tintstr(),id,ok?'-':'!',pos.offset());
308     if (ok) {
309         data_in_ = tintbin(NOW,pos);
310         if (last_recv_time_) {
311             tint dip = NOW - last_recv_time_;
312             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
313         }
314         transfer().picker().Received(pos); // so dirty; FIXME FIXME FIXME
315         return pos;
316     } else
317         return bin64_t::NONE;
318 }
319
320
321 void    Channel::OnAck (Datagram& dgram) {
322         bin64_t ackd_pos = dgram.Pull32();
323     if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
324         eprintf("invalid ack: (%i,%lli)\n",ackd_pos.layer(),ackd_pos.offset());
325         return;
326     }
327     dprintf("%s #%i -ack (%i,%lli)\n",tintstr(),id,ackd_pos.layer(),ackd_pos.offset());
328     for (int i=0; i<8 && i<data_out_.size(); i++) 
329         if (data_out_[i].bin.within(ackd_pos)) {
330             tint rtt = NOW-data_out_[i].time;
331             rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
332             dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
333             dprintf("%s #%i rtt %lli dev %lli\n",
334                     tintstr(),id,rtt_avg_,dev_avg_);
335             cc_->OnAckRcvd(data_out_[i].bin);
336         }
337         ack_in_.set(ackd_pos);
338     while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
339         data_out_.pop_front();
340 }
341
342
343 /*void  Channel::OnAckTs (Datagram& dgram) {  // FIXME:   OnTs
344         bin64_t pos = dgram.Pull32();
345     tint ts = dgram.Pull64();
346     // TODO sanity check
347     dprintf("%s #%i -ackts (%i,%lli) %s\n",
348             tintstr(),id,pos.layer(),pos.offset(),tintstr(ts));
349         ack_in_.set(pos);
350         cc_->OnAckRcvd(pos,ts);
351 }*/
352
353 void Channel::OnTs (Datagram& dgram) {
354     peer_send_time_ = dgram.Pull64();
355     dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
356 }
357
358
359 void    Channel::OnHint (Datagram& dgram) {
360         bin64_t hint = dgram.Pull32();
361         hint_in_.push_back(hint);
362     ack_in_.set(hint,bins::EMPTY);
363     //RequeueSend(cc_->OnHintRecvd(hint));
364     dprintf("%s #%i -hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
365 }
366
367
368 void Channel::OnHandshake (Datagram& dgram) {
369     peer_channel_id_ = dgram.Pull32();
370     dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
371     // FUTURE: channel forking
372 }
373
374
375 void Channel::OnPex (Datagram& dgram) {
376     uint32_t ipv4 = dgram.Pull32();
377     uint16_t port = dgram.Pull16();
378     Address addr(ipv4,port);
379     dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str().c_str());
380     transfer().OnPexIn(addr);
381 }
382
383
384 void    Channel::AddPex (Datagram& dgram) {
385     int chid = transfer().RevealChannel(pex_out_);
386     if (chid==-1 || chid==id)
387         return;
388     Address a = channels[chid]->peer();
389     dgram.Push8(P2TP_PEX_ADD);
390     dgram.Push32(a.ipv4());
391     dgram.Push16(a.port());
392     dprintf("%s #%i +pex %s\n",tintstr(),id,a.str().c_str());
393 }
394
395
396 void    Channel::Recv (int socket) {
397         Datagram data(socket);
398         data.Recv();
399         if (data.size()<4) 
400                 RETLOG("datagram shorter than 4 bytes");
401         uint32_t mych = data.Pull32();
402         Sha1Hash hash;
403         Channel* channel = NULL;
404         if (!mych) { // handshake initiated
405                 if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
406                         RETLOG ("incorrect size initial handshake packet");
407                 uint8_t hashid = data.Pull8();
408                 if (hashid!=P2TP_HASH) 
409                         RETLOG ("no hash in the initial handshake");
410                 bin64_t pos = data.Pull32();
411                 if (pos!=bin64_t::ALL32) 
412                         RETLOG ("that is not the root hash");
413                 hash = data.PullHash();
414                 FileTransfer* file = FileTransfer::Find(hash);
415                 if (!file) 
416                         RETLOG ("hash unknown, no such file");
417         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
418         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
419             if (channels[*i] && channels[*i]->peer_==data.addr) 
420                 RETLOG("have a channel already");
421                 channel = new Channel(file, socket, data.address());
422         } else {
423                 mych = DecodeID(mych);
424                 if (mych>=channels.size()) {
425             eprintf("invalid channel #%i\n",mych);
426             return;
427         }
428                 channel = channels[mych];
429                 if (!channel) 
430                         RETLOG ("channel is closed");
431                 if (channel->peer() != data.address()) 
432                         RETLOG ("invalid peer address");
433         channel->own_id_mentioned_ = true;
434         }
435     dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
436     channel->Recv(data);
437 }
438
439
440 bool tblater (const tintbin& a, const tintbin& b) {
441     return a.time > b.time;
442 }
443
444
445 void    Channel::RequeueSend (tint next_time) {
446     if (next_time==next_send_time_)
447         return;
448     next_send_time_ = next_time;
449     send_queue.push_back(tintbin(next_time,id));
450     push_heap(send_queue.begin(),send_queue.end(),tblater);
451     dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
452 }
453
454
455 void    Channel::Loop (tint howlong) {  
456         
457     tint limit = Datagram::Time() + howlong;
458     
459     do {
460
461         tint send_time(TINT_NEVER);
462         Channel* sender(NULL);
463         while (!send_queue.empty()) {
464             send_time = send_queue.front().time;
465             sender = channel((int)send_queue.front().bin);
466             if (sender && sender->next_send_time_==send_time)
467                 break;
468             sender = NULL; // it was a stale entry
469             pop_heap(send_queue.begin(), send_queue.end(), tblater);
470             send_queue.pop_back();
471         }
472         if (send_time>limit)
473             send_time = limit;
474         if ( sender && send_time <= NOW ) {
475             dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
476                     tintstr(send_time));
477             sender->Send();
478             pop_heap(send_queue.begin(), send_queue.end(), tblater);
479             send_queue.pop_back();
480         } else {
481             tint towait = send_time - NOW;
482             dprintf("%s waiting %lliusec\n",tintstr(),towait);
483             int rd = Datagram::Wait(socket_count,sockets,towait);
484             if (rd!=INVALID_SOCKET)
485                 Recv(rd);
486         }
487         
488     } while (Datagram::Time()<limit);
489         
490 }
491