e357a02fda17b309d556ca139e3b59cd0c0aead9
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  sendrecv.cpp
3  *  most of the swift's state machine
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include "swift.h"
10 #include <algorithm>  // kill it
11
12 using namespace swift;
13 using namespace std;
14
15 /*
16  TODO  25 Oct 18:55
17  - range: ALL
18  - randomized testing of advanced ops (new testcase)
19  */
20
21 void    Channel::AddPeakHashes (Datagram& dgram) {
22     for(int i=0; i<file().peak_count(); i++) {
23         bin64_t peak = file().peak(i);
24         dgram.Push8(SWIFT_HASH);
25         dgram.Push32((uint32_t)peak);
26         dgram.PushHash(file().peak_hash(i));
27         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
28         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
29     }
30 }
31
32
33 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
34     bin64_t peak = file().peak_for(pos);
35     while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
36             ack_in_.get(pos.parent())==binmap_t::EMPTY  ) {
37         bin64_t uncle = pos.sibling();
38         dgram.Push8(SWIFT_HASH);
39         dgram.Push32((uint32_t)uncle);
40         dgram.PushHash( file().hash(uncle) );
41         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
42         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
43         pos = pos.parent();
44     }
45 }
46
47
48 bin64_t           Channel::ImposeHint () {
49     uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
50     twist &= file().peak(0); // FIXME may make it semi-seq here
51     file().ack_out().twist(twist);
52     ack_in_.twist(twist);
53     bin64_t my_pick =
54         file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
55     while (my_pick.width()>max(1,(int)cwnd_))
56         my_pick = my_pick.left();
57     file().ack_out().twist(0);
58     ack_in_.twist(0);
59     return my_pick.twisted(twist);
60 }
61
62
63 bin64_t        Channel::DequeueHint () {
64     if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
65         bin64_t my_pick = ImposeHint(); // FIXME move to the loop
66         if (my_pick!=bin64_t::NONE) {
67             hint_in_.push_back(my_pick);
68             dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
69         }
70     }
71     bin64_t send = bin64_t::NONE;
72     while (!hint_in_.empty() && send==bin64_t::NONE) {
73         bin64_t hint = hint_in_.front().bin;
74         tint time = hint_in_.front().time;
75         hint_in_.pop_front();
76         while (!hint.is_base()) { // FIXME optimize; possible attack
77             hint_in_.push_front(tintbin(time,hint.right()));
78             hint = hint.left();
79         }
80         //if (time < NOW-TINT_SEC*3/2 )
81         //    continue;  bad idea
82         if (ack_in_.get(hint)!=binmap_t::FILLED)
83             send = hint;
84     }
85     uint64_t mass = 0;
86     for(int i=0; i<hint_in_.size(); i++)
87         mass += hint_in_[i].bin.width();
88     dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(),mass);
89     return send;
90 }
91
92
93 void    Channel::AddHandshake (Datagram& dgram) {
94     if (!peer_channel_id_) { // initiating
95         dgram.Push8(SWIFT_HASH);
96         dgram.Push32(bin64_t::ALL32);
97         dgram.PushHash(file().root_hash());
98         dprintf("%s #%u +hash ALL %s\n",
99                 tintstr(),id_,file().root_hash().hex().c_str());
100     }
101     dgram.Push8(SWIFT_HANDSHAKE);
102     int encoded = EncodeID(id_);
103     dgram.Push32(encoded);
104     dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
105     ack_out_.clear();
106     AddAck(dgram);
107 }
108
109
110 void    Channel::Send () {
111     Datagram dgram(socket_,peer());
112     dgram.Push32(peer_channel_id_);
113     bin64_t data = bin64_t::NONE;
114     if ( is_established() ) {
115         // FIXME: seeder check
116         AddAck(dgram);
117         if (!file().is_complete())
118             AddHint(dgram);
119         AddPex(dgram);
120         TimeoutDataOut();
121         data = AddData(dgram);
122     } else {
123         AddHandshake(dgram);
124         AddAck(dgram);
125     }
126     dprintf("%s #%u sent %ib %s:%x\n",
127             tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
128     if (dgram.size()==4) {// only the channel id; bare keep-alive
129         data = bin64_t::ALL;
130     }
131     if (dgram.Send()==-1)
132         print_error("can't send datagram");
133     last_send_time_ = NOW;
134     sent_since_recv_++;
135     dgrams_sent_++;
136 }
137
138
139 void    Channel::AddHint (Datagram& dgram) {
140
141     tint plan_for = max(TINT_SEC,rtt_avg_*4);
142
143     tint timed_out = NOW - plan_for*2;
144     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
145         hint_out_size_ -= hint_out_.front().bin.width();
146         hint_out_.pop_front();
147     }
148
149     int plan_pck = max ( (tint)1, plan_for / dip_avg_ );
150
151     if ( hint_out_size_ < plan_pck ) {
152
153         int diff = plan_pck - hint_out_size_; // TODO: aggregate
154         bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
155
156         if (hint!=bin64_t::NONE) {
157             dgram.Push8(SWIFT_HINT);
158             dgram.Push32(hint);
159             dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(),hint_out_size_);
160             hint_out_.push_back(hint);
161             hint_out_size_ += hint.width();
162         } else
163             dprintf("%s #%u Xhint\n",tintstr(),id_);
164
165     }
166 }
167
168
169 bin64_t        Channel::AddData (Datagram& dgram) {
170
171     if (!file().size()) // know nothing
172         return bin64_t::NONE;
173
174     bin64_t tosend = bin64_t::NONE;
175     tint luft = send_interval_>>4; // may wake up a bit earlier
176     if (data_out_.size()<cwnd_ &&
177             last_data_out_time_+send_interval_<=NOW+luft) {
178         tosend = DequeueHint();
179         if (tosend==bin64_t::NONE) {
180             dprintf("%s #%u sendctrl no idea what to send\n",tintstr(),id_);
181             if (send_control_!=KEEP_ALIVE_CONTROL)
182                 SwitchSendControl(KEEP_ALIVE_CONTROL);
183         }
184     } else
185         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
186                 tintstr(),id_,cwnd_,data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
187
188     if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
189         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
190
191     if (ack_in_.is_empty() && file().size())
192         AddPeakHashes(dgram);
193     AddUncleHashes(dgram,tosend);
194     if (!ack_in_.is_empty()) // TODO: cwnd_>1
195         data_out_cap_ = tosend;
196
197     if (dgram.size()>254) {
198         dgram.Send(); // kind of fragmentation
199         dgram.Push32(peer_channel_id_);
200     }
201
202     dgram.Push8(SWIFT_DATA);
203     dgram.Push32(tosend.to32());
204
205     uint8_t buf[1024];
206     size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
207     // TODO: corrupted data, retries, caching
208     if (r<0) {
209         print_error("error on reading");
210         return bin64_t::NONE;
211     }
212     assert(dgram.space()>=r+4+1);
213     dgram.Push(buf,r);
214
215     last_data_out_time_ = NOW;
216     data_out_.push_back(tosend);
217     dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
218
219     return tosend;
220 }
221
222
223 void    Channel::AddAck (Datagram& dgram) {
224     if (data_in_==tintbin())
225         return;
226     dgram.Push8(SWIFT_ACK);
227     dgram.Push32(data_in_.bin.to32());
228     dgram.Push32(data_in_.time);
229     ack_out_.set(data_in_.bin);
230     dprintf("%s #%u +ack %s %s\n",
231         tintstr(),id_,data_in_.bin.str(),tintstr(data_in_.time));
232     if (data_in_.bin.layer()>2)
233         data_in_dbl_ = data_in_.bin;
234     data_in_ = tintbin();
235 }
236
237
238 void    Channel::AddHave (Datagram& dgram) {
239     if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
240         dgram.Push8(SWIFT_HAVE);
241         dgram.Push32(data_in_dbl_.to32());
242         data_in_dbl_=bin64_t::NONE;
243     }
244     for(int count=0; count<4; count++) {
245         bin64_t ack = file().ack_out().find_filtered // FIXME: do rotating queue
246             (ack_out_, bin64_t::ALL, binmap_t::FILLED);
247         if (ack==bin64_t::NONE)
248             break;
249         ack = file().ack_out().cover(ack);
250         ack_out_.set(ack);
251         dgram.Push8(SWIFT_HAVE);
252         dgram.Push32(ack.to32());
253         dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str());
254     }
255 }
256
257
258 void    Channel::Recv (Datagram& dgram) {
259     dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4);
260     dgrams_rcvd_++;
261     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
262         rtt_avg_ = NOW - last_send_time_;
263         dev_avg_ = rtt_avg_;
264         dip_avg_ = rtt_avg_;
265         dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
266     }
267     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
268     while (dgram.size()) {
269         uint8_t type = dgram.Pull8();
270         switch (type) {
271             case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
272             case SWIFT_DATA:      data=OnData(dgram); break;
273             case SWIFT_HAVE:      OnHave(dgram); break;
274             case SWIFT_ACK:       OnAck(dgram); break;
275             case SWIFT_HASH:      OnHash(dgram); break;
276             case SWIFT_HINT:      OnHint(dgram); break;
277             case SWIFT_PEX_ADD:   OnPex(dgram); break;
278             default:
279                 eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
280                 return;
281         }
282     }
283     last_recv_time_ = NOW;
284     sent_since_recv_ = 0;
285 }
286
287
288 void    Channel::OnHash (Datagram& dgram) {
289     bin64_t pos = dgram.Pull32();
290     Sha1Hash hash = dgram.PullHash();
291     file().OfferHash(pos,hash);
292     dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
293 }
294
295
296 void    Channel::CleanHintOut (bin64_t pos) {
297     int hi = 0;
298     while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
299         hi++;
300     if (hi==hint_out_.size())
301         return; // something not hinted or hinted in far past
302     while (hi--) { // removing likely snubbed hints
303         hint_out_size_ -= hint_out_.front().bin.width();
304         hint_out_.pop_front();
305     }
306     while (hint_out_.front().bin!=pos) {
307         tintbin f = hint_out_.front();
308         f.bin = f.bin.towards(pos);
309         hint_out_.front().bin = f.bin.sibling();
310         hint_out_.push_front(f);
311     }
312     hint_out_.pop_front();
313     hint_out_size_--;
314 }
315
316
317 bin64_t Channel::OnData (Datagram& dgram) {  // TODO: HAVE NONE for corrupted data
318     bin64_t pos = dgram.Pull32();
319     uint8_t *data;
320     int length = dgram.Pull(&data,1024);
321     bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
322     dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
323     data_in_ = tintbin(NOW,bin64_t::NONE);
324     if (!ok)
325         return bin64_t::NONE;
326     data_in_.bin = pos;
327     if (pos!=bin64_t::NONE) {
328         if (last_data_in_time_) {
329             tint dip = NOW - last_data_in_time_;
330             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
331         }
332         last_data_in_time_ = NOW;
333     }
334     CleanHintOut(pos);
335     return pos;
336 }
337
338
339 void    Channel::OnAck (Datagram& dgram) {
340     bin64_t ackd_pos = dgram.Pull32();
341     tint peer_time_ = dgram.Pull64();
342     if (ackd_pos==bin64_t::NONE)
343         return; // likely, brocken packet / insufficient hashes
344     if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
345         eprintf("invalid ack: %s\n",ackd_pos.str());
346         return;
347     }
348     ack_in_.set(ackd_pos);
349     int di = 0, ri = 0;
350     // find an entry for the send (data out) event
351     while (  di<data_out_.size() && ( data_out_[di]==tintbin() ||
352            !data_out_[di].bin.within(ackd_pos) )  )
353         di++;
354     // FUTURE: delayed acks
355     // rule out retransmits
356     while (  ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
357         ri++;
358     dprintf("%s #%u %cack %s %s\n",tintstr(),id_,
359             di==data_out_.size()?'?':'-',ackd_pos.str(),tintstr(peer_time_));
360     if (ri==data_out_tmo_.size()) { // not a retransmit
361             // round trip time calculations
362         tint rtt = NOW-data_out_[di].time;
363         rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
364         dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
365         assert(data_out_[di].time!=TINT_NEVER);
366             // one-way delay calculations
367         tint owd = peer_time_ - data_out_[di].time;
368         owd_cur_bin_ = (owd_cur_bin_+1) & 3;
369         owd_current_[owd_cur_bin_] = owd;
370         if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
371             owd_min_bin_start_ = NOW;
372             owd_min_bin_ = (owd_min_bin_+1) & 3;
373             owd_min_bins_[owd_min_bin_] = TINT_NEVER;
374         }
375         if (owd_min_bins_[owd_min_bin_]>owd)
376             owd_min_bins_[owd_min_bin_] = owd;
377         dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
378                 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str());
379         ack_rcvd_recent_++;
380         data_out_[di]=tintbin();
381     }
382     // early loss detection by packet reordering
383     for (int re=0; re<di-MAX_REORDERING; re++) {
384         if (data_out_[re]==tintbin())
385             continue;
386         ack_not_rcvd_recent_++;
387         data_out_tmo_.push_back(data_out_.front().bin);
388         dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
389         data_out_cap_ = bin64_t::ALL;
390         data_out_[ri] = tintbin();
391     }
392     // clear zeroed items
393     while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
394             ack_in_.is_filled(data_out_.front().bin) ) )
395         data_out_.pop_front();
396     assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
397 }
398
399
400 void Channel::TimeoutDataOut ( ) {
401     // losses: timeouted packets
402     tint timeout = NOW - ack_timeout();
403     for (int i=0; i<data_out_.size() && data_out_[i].time<timeout; i++) {
404         if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
405             ack_not_rcvd_recent_++;
406             data_out_cap_ = bin64_t::ALL;
407             data_out_tmo_.push_back(data_out_.front().bin);
408             dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
409         }
410         data_out_.pop_front();
411     }
412     // clear retransmit queue of older items
413     while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
414         data_out_tmo_.pop_front();
415 }
416
417
418 void Channel::OnHave (Datagram& dgram) {
419     bin64_t ackd_pos = dgram.Pull32();
420     if (ackd_pos==bin64_t::NONE)
421         return; // wow, peer has hashes
422     ack_in_.set(ackd_pos);
423     dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str());
424 }
425
426
427 void    Channel::OnHint (Datagram& dgram) {
428     bin64_t hint = dgram.Pull32();
429     // FIXME: wake up here
430     hint_in_.push_back(hint);
431     dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
432 }
433
434
435 void Channel::OnHandshake (Datagram& dgram) {
436     peer_channel_id_ = dgram.Pull32();
437     dprintf("%s #%u -hs %x\n",tintstr(),id_,peer_channel_id_);
438     // self-connection check
439     if (!SELF_CONN_OK) {
440         uint32_t try_id = DecodeID(peer_channel_id_);
441         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
442             peer_channel_id_ = 0;
443             Close();
444             return; // this is a self-connection
445         }
446     }
447     // FUTURE: channel forking
448 }
449
450
451 void Channel::OnPex (Datagram& dgram) {
452     uint32_t ipv4 = dgram.Pull32();
453     uint16_t port = dgram.Pull16();
454     Address addr(ipv4,port);
455     dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
456     transfer().OnPexIn(addr);
457 }
458
459
460 void    Channel::AddPex (Datagram& dgram) {
461     int chid = transfer().RevealChannel(pex_out_);
462     if (chid==-1 || chid==id_)
463         return;
464     Address a = channels[chid]->peer();
465     dgram.Push8(SWIFT_PEX_ADD);
466     dgram.Push32(a.ipv4());
467     dgram.Push16(a.port());
468     dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
469 }
470
471
472 Channel*    Channel::RecvDatagram (int socket) {
473     Datagram data(socket);
474     data.Recv();
475     const Address& addr = data.address();
476 #define return_log(...) { printf(__VA_ARGS__); return NULL; }
477     if (data.size()<4)
478         return_log("datagram shorter than 4 bytes %s\n",addr.str());
479     uint32_t mych = data.Pull32();
480     Sha1Hash hash;
481     Channel* channel = NULL;
482     if (!mych) { // handshake initiated
483         if (data.size()<1+4+1+4+Sha1Hash::SIZE)
484             return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
485                         tintstr(),data.size(),addr.str());
486         uint8_t hashid = data.Pull8();
487         if (hashid!=SWIFT_HASH)
488             return_log ("%s #0 no hash in the initial handshake %s\n",
489                         tintstr(),addr.str());
490         bin64_t pos = data.Pull32();
491         if (pos!=bin64_t::ALL)
492             return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
493         hash = data.PullHash();
494         FileTransfer* file = FileTransfer::Find(hash);
495         if (!file)
496             return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
497         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
498         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
499             if (channels[*i] && channels[*i]->peer_==data.address() &&
500                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
501                 return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
502         channel = new Channel(file, socket, data.address());
503     } else {
504         mych = DecodeID(mych);
505         if (mych>=channels.size())
506             return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
507         channel = channels[mych];
508         if (!channel)
509             return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
510         if (channel->peer() != addr)
511             return_log ("%s #%u invalid peer address %s!=%s\n",
512                         tintstr(),mych,channel->peer().str(),addr.str());
513         channel->own_id_mentioned_ = true;
514     }
515     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
516     channel->Recv(data);
517     return channel;
518 }
519
520
521 void    Channel::Loop (tint howlong) {
522
523     tint limit = Datagram::Time() + howlong;
524
525     do {
526
527         tint send_time(TINT_NEVER);
528         Channel* sender(NULL);
529         while (!sender && !send_queue.is_empty()) { // dequeue
530             tintbin next = send_queue.pop();
531             sender = channel((int)next.bin);
532             send_time = next.time;
533             if (sender && sender->next_send_time_!=send_time &&
534                      sender->next_send_time_!=TINT_NEVER )
535                 sender = NULL; // it was a stale entry
536         }
537
538         if ( sender!=NULL && send_time<=NOW ) { // it's time
539
540             dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
541                     tintstr(send_time));
542             sender->Send();
543             sender->Reschedule();
544
545         } else {  // it's too early, wait
546
547             tint towait = min(limit,send_time) - NOW;
548             dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
549             int rd = Datagram::Wait(socket_count,sockets,towait);
550             if (rd!=INVALID_SOCKET) { // in meantime, received something
551                 Channel* receiver = RecvDatagram(rd);
552                 if (receiver) // receiver's state may have changed
553                     receiver->Reschedule();
554             }
555             if (sender)  // get back to that later
556                 send_queue.push(tintbin(send_time,sender->id()));
557
558         }
559
560     } while (NOW<limit);
561
562 }
563
564
565 void Channel::Close () {
566     this->SwitchSendControl(CLOSE_CONTROL);
567 }
568
569
570 void Channel::Reschedule () {
571     TimeoutDataOut(); // precaution to know free cwnd
572     next_send_time_ = NextSendTime();
573     if (next_send_time_!=TINT_NEVER) {
574         assert(next_send_time_<NOW+TINT_MIN);
575         send_queue.push(tintbin(next_send_time_,id_));
576         dprintf("%s #%u requeue for %s\n",tintstr(),id_,tintstr(next_send_time_));
577     } else {
578         dprintf("%s #%u closed\n",tintstr(),id_);
579         delete this;
580     }
581 }