add .gitignore
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  sendrecv.cpp
3  *  most of the swift's state machine
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include "swift.h"
10 #include <algorithm>  // kill it
11
12 using namespace swift;
13 using namespace std;
14
15 /*
16  TODO  25 Oct 18:55
17  - range: ALL
18  - randomized testing of advanced ops (new testcase)
19  */
20
21 void    Channel::AddPeakHashes (Datagram& dgram) {
22     for(int i=0; i<file().peak_count(); i++) {
23         bin64_t peak = file().peak(i);
24         dgram.Push8(SWIFT_HASH);
25         dgram.Push32((uint32_t)peak);
26         dgram.PushHash(file().peak_hash(i));
27         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
28     }
29 }
30
31
32 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
33     bin64_t peak = file().peak_for(pos);
34     while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
35             ack_in_.get(pos.parent())==binmap_t::EMPTY  ) {
36         bin64_t uncle = pos.sibling();
37         dgram.Push8(SWIFT_HASH);
38         dgram.Push32((uint32_t)uncle);
39         dgram.PushHash( file().hash(uncle) );
40         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
41         pos = pos.parent();
42     }
43 }
44
45
46 bin64_t           Channel::ImposeHint () {
47     uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
48     twist &= file().peak(0); // FIXME may make it semi-seq here
49     file().ack_out().twist(twist);
50     ack_in_.twist(twist);
51     bin64_t my_pick =
52         file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
53     while (my_pick.width()>max(1,(int)cwnd_))
54         my_pick = my_pick.left();
55     file().ack_out().twist(0);
56     ack_in_.twist(0);
57     return my_pick.twisted(twist);
58 }
59
60
61 bin64_t        Channel::DequeueHint () {
62     if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
63         bin64_t my_pick = ImposeHint(); // FIXME move to the loop
64         if (my_pick!=bin64_t::NONE) {
65             hint_in_.push_back(my_pick);
66             dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
67         }
68     }
69     bin64_t send = bin64_t::NONE;
70     while (!hint_in_.empty() && send==bin64_t::NONE) {
71         bin64_t hint = hint_in_.front().bin;
72         tint time = hint_in_.front().time;
73         hint_in_.pop_front();
74         while (!hint.is_base()) { // FIXME optimize; possible attack
75             hint_in_.push_front(tintbin(time,hint.right()));
76             hint = hint.left();
77         }
78         //if (time < NOW-TINT_SEC*3/2 )
79         //    continue;  bad idea
80         if (ack_in_.get(hint)!=binmap_t::FILLED)
81             send = hint;
82     }
83     uint64_t mass = 0;
84     for(int i=0; i<hint_in_.size(); i++)
85         mass += hint_in_[i].bin.width();
86     dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(),mass);
87     return send;
88 }
89
90
91 void    Channel::AddHandshake (Datagram& dgram) {
92     if (!peer_channel_id_) { // initiating
93         dgram.Push8(SWIFT_HASH);
94         dgram.Push32(bin64_t::ALL32);
95         dgram.PushHash(file().root_hash());
96         dprintf("%s #%u +hash ALL %s\n",
97                 tintstr(),id_,file().root_hash().hex().c_str());
98     }
99     dgram.Push8(SWIFT_HANDSHAKE);
100     int encoded = EncodeID(id_);
101     dgram.Push32(encoded);
102     dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
103     have_out_.clear();
104     AddHave(dgram);
105 }
106
107
108 void    Channel::Send () {
109     Datagram dgram(socket_,peer());
110     dgram.Push32(peer_channel_id_);
111     bin64_t data = bin64_t::NONE;
112     if ( is_established() ) {
113         // FIXME: seeder check
114         AddHave(dgram);
115         AddAck(dgram);
116         if (!file().is_complete())
117             AddHint(dgram);
118         AddPex(dgram);
119         TimeoutDataOut();
120         data = AddData(dgram);
121     } else {
122         AddHandshake(dgram);
123         AddHave(dgram);
124         AddAck(dgram);
125     }
126     dprintf("%s #%u sent %ib %s:%x\n",
127             tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
128     if (dgram.size()==4) {// only the channel id; bare keep-alive
129         data = bin64_t::ALL;
130     }
131     if (dgram.Send()==-1)
132         print_error("can't send datagram");
133     last_send_time_ = NOW;
134     sent_since_recv_++;
135     dgrams_sent_++;
136     Reschedule();
137 }
138
139
140 void    Channel::AddHint (Datagram& dgram) {
141
142     tint plan_for = max(TINT_SEC,rtt_avg_*4);
143
144     tint timed_out = NOW - plan_for*2;
145     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
146         hint_out_size_ -= hint_out_.front().bin.width();
147         hint_out_.pop_front();
148     }
149
150     int plan_pck = max ( (tint)1, plan_for / dip_avg_ );
151
152     if ( hint_out_size_ < plan_pck ) {
153
154         int diff = plan_pck - hint_out_size_; // TODO: aggregate
155         bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
156
157         if (hint!=bin64_t::NONE) {
158             dgram.Push8(SWIFT_HINT);
159             dgram.Push32(hint);
160             dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(),hint_out_size_);
161             hint_out_.push_back(hint);
162             hint_out_size_ += hint.width();
163         } else
164             dprintf("%s #%u Xhint\n",tintstr(),id_);
165
166     }
167 }
168
169
170 bin64_t        Channel::AddData (Datagram& dgram) {
171
172     if (!file().size()) // know nothing
173         return bin64_t::NONE;
174
175     bin64_t tosend = bin64_t::NONE;
176     tint luft = send_interval_>>4; // may wake up a bit earlier
177     if (data_out_.size()<cwnd_ &&
178             last_data_out_time_+send_interval_<=NOW+luft) {
179         tosend = DequeueHint();
180         if (tosend==bin64_t::NONE) {
181             dprintf("%s #%u sendctrl no idea what to send\n",tintstr(),id_);
182             if (send_control_!=KEEP_ALIVE_CONTROL)
183                 SwitchSendControl(KEEP_ALIVE_CONTROL);
184         }
185     } else
186         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
187                 tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
188
189     if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
190         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
191
192     if (ack_in_.is_empty() && file().size())
193         AddPeakHashes(dgram);
194     AddUncleHashes(dgram,tosend);
195     if (!ack_in_.is_empty()) // TODO: cwnd_>1
196         data_out_cap_ = tosend;
197
198     if (dgram.size()>254) {
199         dgram.Send(); // kind of fragmentation
200         dgram.Push32(peer_channel_id_);
201     }
202
203     dgram.Push8(SWIFT_DATA);
204     dgram.Push32(tosend.to32());
205
206     uint8_t buf[1024];
207     size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
208     // TODO: corrupted data, retries, caching
209     if (r<0) {
210         print_error("error on reading");
211         return bin64_t::NONE;
212     }
213     assert(dgram.space()>=r+4+1);
214     dgram.Push(buf,r);
215
216     last_data_out_time_ = NOW;
217     data_out_.push_back(tosend);
218     dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
219
220     return tosend;
221 }
222
223
224 void    Channel::AddAck (Datagram& dgram) {
225     if (data_in_==tintbin())
226         return;
227     dgram.Push8(SWIFT_ACK);
228     dgram.Push32(data_in_.bin.to32()); // FIXME not cover
229     dgram.Push64(data_in_.time); // FIXME 32
230     have_out_.set(data_in_.bin);
231     dprintf("%s #%u +ack %s %s\n",
232         tintstr(),id_,data_in_.bin.str(),tintstr(data_in_.time));
233     if (data_in_.bin.layer()>2)
234         data_in_dbl_ = data_in_.bin;
235     data_in_ = tintbin();
236 }
237
238
239 void    Channel::AddHave (Datagram& dgram) {
240     if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
241         dgram.Push8(SWIFT_HAVE);
242         dgram.Push32(data_in_dbl_.to32());
243         data_in_dbl_=bin64_t::NONE;
244     }
245     for(int count=0; count<4; count++) {
246         bin64_t ack = file().ack_out().find_filtered // FIXME: do rotating queue
247             (have_out_, bin64_t::ALL, binmap_t::FILLED);
248         if (ack==bin64_t::NONE)
249             break;
250         ack = file().ack_out().cover(ack);
251         have_out_.set(ack);
252         dgram.Push8(SWIFT_HAVE);
253         dgram.Push32(ack.to32());
254         dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str());
255     }
256 }
257
258
259 void    Channel::Recv (Datagram& dgram) {
260     dprintf("%s #%u recvd %ib\n",tintstr(),id_,dgram.size()+4);
261     dgrams_rcvd_++;
262     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
263         rtt_avg_ = NOW - last_send_time_;
264         dev_avg_ = rtt_avg_;
265         dip_avg_ = rtt_avg_;
266         dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
267     }
268     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
269     while (dgram.size()) {
270         uint8_t type = dgram.Pull8();
271         switch (type) {
272             case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
273             case SWIFT_DATA:      data=OnData(dgram); break;
274             case SWIFT_HAVE:      OnHave(dgram); break;
275             case SWIFT_ACK:       OnAck(dgram); break;
276             case SWIFT_HASH:      OnHash(dgram); break;
277             case SWIFT_HINT:      OnHint(dgram); break;
278             case SWIFT_PEX_ADD:   OnPex(dgram); break;
279             default:
280                 eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
281                 return;
282         }
283     }
284     last_recv_time_ = NOW;
285     sent_since_recv_ = 0;
286     Reschedule();
287 }
288
289
290 void    Channel::OnHash (Datagram& dgram) {
291     bin64_t pos = dgram.Pull32();
292     Sha1Hash hash = dgram.PullHash();
293     file().OfferHash(pos,hash);
294     dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
295 }
296
297
298 void    Channel::CleanHintOut (bin64_t pos) {
299     int hi = 0;
300     while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
301         hi++;
302     if (hi==hint_out_.size())
303         return; // something not hinted or hinted in far past
304     while (hi--) { // removing likely snubbed hints
305         hint_out_size_ -= hint_out_.front().bin.width();
306         hint_out_.pop_front();
307     }
308     while (hint_out_.front().bin!=pos) {
309         tintbin f = hint_out_.front();
310         f.bin = f.bin.towards(pos);
311         hint_out_.front().bin = f.bin.sibling();
312         hint_out_.push_front(f);
313     }
314     hint_out_.pop_front();
315     hint_out_size_--;
316 }
317
318
319 bin64_t Channel::OnData (Datagram& dgram) {  // TODO: HAVE NONE for corrupted data
320     bin64_t pos = dgram.Pull32();
321     uint8_t *data;
322     int length = dgram.Pull(&data,1024);
323     bool ok = (pos==bin64_t::NONE) || 
324         (!file().ack_out().get(pos) && file().OfferData(pos, (char*)data, length) );
325     dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
326     data_in_ = tintbin(NOW,bin64_t::NONE);
327     if (!ok)
328         return bin64_t::NONE;
329     bin64_t cover = transfer().ack_out().cover(pos);
330     for(int i=0; i<transfer().cb_installed; i++)
331         if (cover.layer()>=transfer().cb_agg[i])
332             transfer().callbacks[i](transfer().fd(),cover);  // FIXME
333     data_in_.bin = pos;
334     if (pos!=bin64_t::NONE) {
335         if (last_data_in_time_) {
336             tint dip = NOW - last_data_in_time_;
337             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
338         }
339         last_data_in_time_ = NOW;
340     }
341     CleanHintOut(pos);
342     return pos;
343 }
344
345
346 void    Channel::OnAck (Datagram& dgram) {
347     bin64_t ackd_pos = dgram.Pull32();
348     tint peer_time = dgram.Pull64(); // FIXME 32
349     // FIXME FIXME: wrap around here
350     if (ackd_pos==bin64_t::NONE)
351         return; // likely, brocken packet / insufficient hashes
352     if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
353         eprintf("invalid ack: %s\n",ackd_pos.str());
354         return;
355     }
356     ack_in_.set(ackd_pos);
357     int di = 0, ri = 0;
358     // find an entry for the send (data out) event
359     while (  di<data_out_.size() && ( data_out_[di]==tintbin() ||
360            !data_out_[di].bin.within(ackd_pos) )  )
361         di++;
362     // FUTURE: delayed acks
363     // rule out retransmits
364     while (  ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
365         ri++;
366     dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
367             di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time);
368     if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
369             // round trip time calculations
370         tint rtt = NOW-data_out_[di].time;
371         rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
372         dev_avg_ = ( dev_avg_*3 + ::abs(rtt-rtt_avg_) ) >> 2;
373         assert(data_out_[di].time!=TINT_NEVER);
374             // one-way delay calculations
375         tint owd = peer_time - data_out_[di].time;
376         owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
377         owd_current_[owd_cur_bin_] = owd;
378         if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
379             owd_min_bin_start_ = NOW;
380             owd_min_bin_ = (owd_min_bin_+1) & 3;
381             owd_min_bins_[owd_min_bin_] = TINT_NEVER;
382         }
383         if (owd_min_bins_[owd_min_bin_]>owd)
384             owd_min_bins_[owd_min_bin_] = owd;
385         dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
386                 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str());
387         ack_rcvd_recent_++;
388         // early loss detection by packet reordering
389         for (int re=0; re<di-MAX_REORDERING; re++) {
390             if (data_out_[re]==tintbin())
391                 continue;
392             ack_not_rcvd_recent_++;
393             data_out_tmo_.push_back(data_out_[re].bin);
394             dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
395             data_out_cap_ = bin64_t::ALL;
396             data_out_[re] = tintbin();
397         }
398     }
399     if (di!=data_out_.size())
400         data_out_[di]=tintbin();
401     // clear zeroed items
402     while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
403             ack_in_.is_filled(data_out_.front().bin) ) )
404         data_out_.pop_front();
405     assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
406 }
407
408
409 void Channel::TimeoutDataOut ( ) {
410     // losses: timeouted packets
411     tint timeout = NOW - ack_timeout();
412     while (!data_out_.empty() && 
413         ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
414         if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
415             ack_not_rcvd_recent_++;
416             data_out_cap_ = bin64_t::ALL;
417             data_out_tmo_.push_back(data_out_.front().bin);
418             dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
419         }
420         data_out_.pop_front();
421     }
422     // clear retransmit queue of older items
423     while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
424         data_out_tmo_.pop_front();
425 }
426
427
428 void Channel::OnHave (Datagram& dgram) {
429     bin64_t ackd_pos = dgram.Pull32();
430     if (ackd_pos==bin64_t::NONE)
431         return; // wow, peer has hashes
432     ack_in_.set(ackd_pos);
433     dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str());
434 }
435
436
437 void    Channel::OnHint (Datagram& dgram) {
438     bin64_t hint = dgram.Pull32();
439     // FIXME: wake up here
440     hint_in_.push_back(hint);
441     dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
442 }
443
444
445 void Channel::OnHandshake (Datagram& dgram) {
446     peer_channel_id_ = dgram.Pull32();
447     dprintf("%s #%u -hs %x\n",tintstr(),id_,peer_channel_id_);
448     // self-connection check
449     if (!SELF_CONN_OK) {
450         uint32_t try_id = DecodeID(peer_channel_id_);
451         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
452             peer_channel_id_ = 0;
453             Close();
454             return; // this is a self-connection
455         }
456     }
457     // FUTURE: channel forking
458 }
459
460
461 void Channel::OnPex (Datagram& dgram) {
462     uint32_t ipv4 = dgram.Pull32();
463     uint16_t port = dgram.Pull16();
464     Address addr(ipv4,port);
465     dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
466     transfer().OnPexIn(addr);
467 }
468
469
470 void    Channel::AddPex (Datagram& dgram) {
471     int chid = transfer().RevealChannel(pex_out_);
472     if (chid==-1 || chid==id_)
473         return;
474     Address a = channels[chid]->peer();
475     dgram.Push8(SWIFT_PEX_ADD);
476     dgram.Push32(a.ipv4());
477     dgram.Push16(a.port());
478     dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
479 }
480
481
482 void    Channel::RecvDatagram (SOCKET socket) {
483     Datagram data(socket);
484     data.Recv();
485     const Address& addr = data.address();
486 #define return_log(...) { fprintf(stderr,__VA_ARGS__); return; }
487     if (data.size()<4)
488         return_log("datagram shorter than 4 bytes %s\n",addr.str());
489     uint32_t mych = data.Pull32();
490     Sha1Hash hash;
491     Channel* channel = NULL;
492     if (mych==0) { // handshake initiated
493         if (data.size()<1+4+1+4+Sha1Hash::SIZE)
494             return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
495                         tintstr(),data.size(),addr.str());
496         uint8_t hashid = data.Pull8();
497         if (hashid!=SWIFT_HASH)
498             return_log ("%s #0 no hash in the initial handshake %s\n",
499                         tintstr(),addr.str());
500         bin64_t pos = data.Pull32();
501         if (pos!=bin64_t::ALL)
502             return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
503         hash = data.PullHash();
504         FileTransfer* file = FileTransfer::Find(hash);
505         if (!file)
506             return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
507         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
508         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
509             if (channels[*i] && channels[*i]->peer_==data.address() &&
510                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
511                 return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
512         channel = new Channel(file, socket, data.address());
513     } else {
514         mych = DecodeID(mych);
515         if (mych>=channels.size())
516             return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
517         channel = channels[mych];
518         if (!channel)
519             return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
520         if (channel->peer() != addr)
521             return_log ("%s #%u invalid peer address %s!=%s\n",
522                         tintstr(),mych,channel->peer().str(),addr.str());
523         channel->own_id_mentioned_ = true;
524     }
525     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
526     channel->Recv(data);
527 }
528
529
530 void    Channel::Loop (tint howlong) {
531
532     tint limit = Datagram::Time() + howlong;
533
534     do {
535
536         tint send_time(TINT_NEVER);
537         Channel* sender(NULL);
538         while (!sender && !send_queue.is_empty()) { // dequeue
539             tintbin next = send_queue.pop();
540             sender = channel((int)next.bin);
541             send_time = next.time;
542             if (sender && sender->next_send_time_!=send_time &&
543                      sender->next_send_time_!=TINT_NEVER )
544                 sender = NULL; // it was a stale entry
545         }
546
547         if ( sender!=NULL && send_time<=NOW ) { // it's time
548
549             dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
550                     tintstr(send_time));
551             sender->Send();
552
553         } else {  // it's too early, wait
554
555             tint towait = min(limit,send_time) - NOW;
556             dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
557             Datagram::Wait(towait);
558             if (sender)  // get back to that later
559                 send_queue.push(tintbin(send_time,sender->id()));
560
561         }
562
563     } while (NOW<limit);
564
565 }
566
567
568 void Channel::Close () {
569     this->SwitchSendControl(CLOSE_CONTROL);
570 }
571
572
573 void Channel::Reschedule () {
574     next_send_time_ = NextSendTime();
575     if (next_send_time_!=TINT_NEVER) {
576         assert(next_send_time_<NOW+TINT_MIN);
577         send_queue.push(tintbin(next_send_time_,id_));
578         dprintf("%s #%u requeue for %s\n",tintstr(),id_,tintstr(next_send_time_));
579     } else {
580         dprintf("%s #%u closed\n",tintstr(),id_);
581         delete this;
582     }
583 }