Fixes for the case of high cwnd;
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  sendrecv.cpp
3  *  most of the swift's state machine
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include "swift.h"
10 #include "compat/util.h"
11
12
13 using namespace swift;
14 using namespace std;
15
16 /*
17  TODO  25 Oct 18:55
18  - range: ALL
19  - randomized testing of advanced ops (new testcase)
20  */
21
22 void    Channel::AddPeakHashes (Datagram& dgram) {
23     for(int i=0; i<file().peak_count(); i++) {
24         bin64_t peak = file().peak(i);
25         dgram.Push8(SWIFT_HASH);
26         dgram.Push32((uint32_t)peak);
27         dgram.PushHash(file().peak_hash(i));
28         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
29         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
30     }
31 }
32
33
34 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
35     bin64_t peak = file().peak_for(pos);
36     while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
37             ack_in_.get(pos.parent())==binmap_t::EMPTY  ) {
38         bin64_t uncle = pos.sibling();
39         dgram.Push8(SWIFT_HASH);
40         dgram.Push32((uint32_t)uncle);
41         dgram.PushHash( file().hash(uncle) );
42         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
43         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
44         pos = pos.parent();
45     }
46 }
47
48
49 bin64_t        Channel::DequeueHint () {
50     if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {    
51         uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
52         twist &= file().peak(0); // may make it semi-seq here
53         file().ack_out().twist(twist);
54         ack_in_.twist(twist);
55         bin64_t my_pick = 
56             file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
57         while (my_pick.width()>max(1,(int)cwnd_))
58             my_pick = my_pick.left();
59         file().ack_out().twist(0);
60         ack_in_.twist(0);
61         if (my_pick!=bin64_t::NONE) {
62             my_pick = my_pick.twisted(twist);
63             hint_in_.push_back(my_pick);
64             dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
65         }
66     }
67     bin64_t send = bin64_t::NONE;
68     while (!hint_in_.empty() && send==bin64_t::NONE) {
69         bin64_t hint = hint_in_.front().bin;
70         tint time = hint_in_.front().time;
71         hint_in_.pop_front();
72         while (!hint.is_base()) { // FIXME optimize; possible attack
73             hint_in_.push_front(tintbin(time,hint.right()));
74             hint = hint.left();
75         }
76         //if (time < NOW-TINT_SEC*3/2 )
77         //    continue;  bad idea
78         if (ack_in_.get(hint)!=binmap_t::FILLED) 
79             send = hint;
80     }
81     uint64_t mass = 0;
82     for(int i=0; i<hint_in_.size(); i++)
83         mass += hint_in_[i].bin.width();
84     dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(),mass);
85     return send;
86 }
87
88
89 void    Channel::AddHandshake (Datagram& dgram) {
90     if (!peer_channel_id_) { // initiating
91         dgram.Push8(SWIFT_HASH);
92         dgram.Push32(bin64_t::ALL32);
93         dgram.PushHash(file().root_hash());
94         dprintf("%s #%u +hash ALL %s\n",
95                 tintstr(),id_,file().root_hash().hex().c_str());
96     }
97     dgram.Push8(SWIFT_HANDSHAKE);
98     int encoded = EncodeID(id_);
99     dgram.Push32(encoded);
100     dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
101     ack_out_.clear();
102     AddAck(dgram);
103 }
104
105
106 void    Channel::Send () {
107     Datagram dgram(socket_,peer());
108     dgram.Push32(peer_channel_id_);
109     bin64_t data = bin64_t::NONE;
110     if ( is_established() ) {
111         // FIXME: seeder check
112         AddAck(dgram);
113         if (!file().is_complete())
114             AddHint(dgram);
115         AddPex(dgram);
116         CleanDataOut();
117         data = AddData(dgram);
118     } else {
119         AddHandshake(dgram);
120         AddAck(dgram);
121     }
122     dprintf("%s #%u sent %ib %s:%x\n",
123             tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
124     if (dgram.size()==4) {// only the channel id; bare keep-alive
125         data = bin64_t::ALL;
126         //dprintf("%s #%u considering keepalive %i %f %s\n",
127         //        tintstr(),id_,(int)data_out_.size(),cwnd_,SEND_CONTROL_MODES[send_control_]);
128         //if (data_out_.size()<cwnd_ && send_control_!=KEEP_ALIVE_CONTROL) {
129             //if ( cwnd_ < 1 )
130             //    SwitchSendControl(KEEP_ALIVE_CONTROL);
131             //else
132             //    cwnd_ = cwnd_/2.0;
133         //}
134         //if (data_out_.empty() && send_control_!=KEEP_ALIVE_CONTROL)
135         //     SwitchSendControl(KEEP_ALIVE_CONTROL);// we did our best
136         //if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
137         //    return; // don't send empty dgram
138     }
139     if (dgram.Send()==-1)
140         print_error("can't send datagram");
141     last_send_time_ = NOW;
142     sent_since_recv_++;
143     dgrams_sent_++;
144 }
145
146
147 void    Channel::AddHint (Datagram& dgram) {
148
149     tint plan_for = max(TINT_SEC,rtt_avg_*4);
150     
151     tint timed_out = NOW - plan_for*2;
152     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
153         hint_out_size_ -= hint_out_.front().bin.width();
154         hint_out_.pop_front();
155     }
156     
157     /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
158     if (!peer_cwnd)
159         peer_cwnd = 1;*/
160     int plan_pck = std::max ( (tint)1, plan_for / dip_avg_ );
161     
162     if ( hint_out_size_ < plan_pck ) {
163             
164         int diff = plan_pck - hint_out_size_; // TODO: aggregate
165         bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
166         
167         if (hint!=bin64_t::NONE) {
168             dgram.Push8(SWIFT_HINT);
169             dgram.Push32(hint);
170             dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(),hint_out_size_);
171             hint_out_.push_back(hint);
172             hint_out_size_ += hint.width();
173         } else
174             dprintf("%s #%u Xhint\n",tintstr(),id_);
175         
176     }
177 }
178
179
180 bin64_t        Channel::AddData (Datagram& dgram) {
181     
182     if (!file().size()) // know nothing
183         return bin64_t::NONE;
184     
185     bin64_t tosend = bin64_t::NONE;
186     if (data_out_.size()<cwnd_ && last_data_out_time_<=NOW-send_interval_) {
187         tosend = DequeueHint();
188         if (tosend==bin64_t::NONE) {
189             dprintf("%s #%u no idea what to send #sendctrl\n",tintstr(),id_);
190             if (send_control_!=KEEP_ALIVE_CONTROL)
191                 SwitchSendControl(KEEP_ALIVE_CONTROL);
192         }
193     } else
194         dprintf("%s #%u no cwnd #sendctrl\n",tintstr(),id_);
195     
196     if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty())) 
197         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
198     
199     if (ack_in_.is_empty() && file().size())
200         AddPeakHashes(dgram);
201     AddUncleHashes(dgram,tosend);
202     if (!ack_in_.is_empty()) // TODO: cwnd_>1
203         data_out_cap_ = tosend;
204
205     if (dgram.size()>254) {
206         dgram.Send(); // kind of fragmentation
207         dgram.Push32(peer_channel_id_);
208     }
209     
210     dgram.Push8(SWIFT_DATA);
211     dgram.Push32(tosend.to32());
212     
213     uint8_t buf[1024];
214     size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
215     // TODO: corrupted data, retries, caching
216     if (r<0) {
217         print_error("error on reading");
218         return bin64_t::NONE;
219     }
220     assert(dgram.space()>=r+4+1);
221     dgram.Push(buf,r);
222     
223     last_data_out_time_ = NOW;
224     data_out_.push_back(tosend);
225     dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
226     
227     return tosend;
228 }
229
230
231 void    Channel::AddTs (Datagram& dgram) {
232     dgram.Push8(SWIFT_TS);
233     dgram.Push64(data_in_.time);
234     dprintf("%s #%u +ts %s\n",tintstr(),id_,tintstr(data_in_.time));
235 }
236
237
238 void    Channel::AddAck (Datagram& dgram) {
239     if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
240         dgram.Push8(SWIFT_ACK);
241         dgram.Push32(data_in_dbl_.to32());
242         data_in_dbl_=bin64_t::NONE;
243     }
244     if (data_in_.time!=TINT_NEVER) { // TODO: ACK NONE for corrupted data
245         AddTs(dgram);
246         bin64_t pos = file().ack_out().cover(data_in_.bin);
247         dgram.Push8(SWIFT_ACK);
248         dgram.Push32(pos.to32());
249         //dgram.Push64(data_in_.time);
250         ack_out_.set(pos);
251         dprintf("%s #%u +ack %s %s\n",tintstr(),id_,pos.str(),tintstr(data_in_.time));
252         data_in_ = tintbin(TINT_NEVER,bin64_t::NONE);
253         if (pos.layer()>2)
254             data_in_dbl_ = pos;
255     }
256     for(int count=0; count<4; count++) {
257         bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, binmap_t::FILLED);
258         if (ack==bin64_t::NONE)
259             break;
260         ack = file().ack_out().cover(ack);
261         ack_out_.set(ack);
262         dgram.Push8(SWIFT_ACK);
263         dgram.Push32(ack.to32());
264         dprintf("%s #%u +ack %s\n",tintstr(),id_,ack.str());
265     }
266 }
267
268
269 void    Channel::Recv (Datagram& dgram) {
270     dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4);
271     peer_send_time_ = 0; // has scope of 1 datagram
272     dgrams_rcvd_++;
273     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
274         rtt_avg_ = NOW - last_send_time_;
275         dev_avg_ = rtt_avg_;
276         dip_avg_ = rtt_avg_;
277         dprintf("%s #%u rtt init %lli\n",tintstr(),id_,rtt_avg_);
278     }
279     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
280     while (dgram.size()) {
281         uint8_t type = dgram.Pull8();
282         switch (type) {
283             case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
284             case SWIFT_DATA:      data=OnData(dgram); break;
285             case SWIFT_TS:        OnTs(dgram); break;
286             case SWIFT_ACK:       OnAck(dgram); break;
287             case SWIFT_HASH:      OnHash(dgram); break;
288             case SWIFT_HINT:      OnHint(dgram); break;
289             case SWIFT_PEX_ADD:   OnPex(dgram); break;
290             default:
291                 eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
292                 return;
293         }
294     }
295     last_recv_time_ = NOW;
296     sent_since_recv_ = 0;
297 }
298
299
300 void    Channel::OnHash (Datagram& dgram) {
301     bin64_t pos = dgram.Pull32();
302     Sha1Hash hash = dgram.PullHash();
303     file().OfferHash(pos,hash);
304     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
305     dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
306 }
307
308
309 void    Channel::CleanHintOut (bin64_t pos) {
310     int hi = 0;
311     while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
312         hi++;
313     if (hi==hint_out_.size())
314         return; // something not hinted or hinted in far past
315     while (hi--) { // removing likely snubbed hints
316         hint_out_size_ -= hint_out_.front().bin.width();
317         hint_out_.pop_front();
318     }
319     while (hint_out_.front().bin!=pos) {
320         tintbin f = hint_out_.front();
321         f.bin = f.bin.towards(pos);
322         hint_out_.front().bin = f.bin.sibling();
323         hint_out_.push_front(f);
324     }
325     hint_out_.pop_front();
326     hint_out_size_--;
327 }
328
329
330 bin64_t Channel::OnData (Datagram& dgram) {
331     bin64_t pos = dgram.Pull32();
332     uint8_t *data;
333     int length = dgram.Pull(&data,1024);
334     bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
335     dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
336     data_in_ = tintbin(NOW,bin64_t::NONE);
337     if (!ok) 
338         return bin64_t::NONE;
339     data_in_.bin = pos;
340     if (pos!=bin64_t::NONE) {
341         if (last_data_in_time_) {
342             tint dip = NOW - last_data_in_time_;
343             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
344         }
345         last_data_in_time_ = NOW;
346     }
347     CleanHintOut(pos);    
348     return pos;
349 }
350
351
352 void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
353     
354     int max_ack_off = 0;
355     
356     if (ackd_pos!=bin64_t::NONE) {
357         for (int i=0; i<data_out_.size(); i++) {
358             if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
359                 if (peer_send_time_)
360                     for(tbqueue::iterator j=data_out_tmo_.begin(); j!=data_out_tmo_.end(); j++)
361                         if (j->bin==data_out_[i].bin)
362                             peer_send_time_=0; // possibly retransmit
363                 if (peer_send_time_) { // well, it is sorta ACK ACK
364                     tint rtt = NOW-data_out_[i].time;
365                     rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
366                     dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
367                     assert(data_out_[i].time!=TINT_NEVER);
368                     tint owd = peer_send_time_ - data_out_[i].time;
369                     owd_cur_bin_ = (owd_cur_bin_+1) & 3;
370                     owd_current_[owd_cur_bin_] = owd;
371                     if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
372                         owd_min_bin_start_ = NOW;
373                         owd_min_bin_ = (owd_min_bin_+1) & 3;
374                         owd_min_bins_[owd_min_bin_] = TINT_NEVER;
375                     }
376                     if (owd_min_bins_[owd_min_bin_]>owd)
377                         owd_min_bins_[owd_min_bin_] = owd;
378                     peer_send_time_ = 0;
379                 }
380                 dprintf("%s #%u rtt %lli dev %lli based on %s\n",
381                         tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str());
382                 bin64_t pos = data_out_[i].bin;
383                 ack_rcvd_recent_++;
384                 data_out_[i]=tintbin();
385                 max_ack_off = i;
386                 if (ackd_pos==pos)
387                     break;
388             }
389         }
390         while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
391             data_out_.pop_front();
392             max_ack_off--;
393         }
394         static const int MAX_REORDERING = 2;  // the triple-ACK principle
395         if (max_ack_off>MAX_REORDERING) {
396             while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
397                                    || ack_in_.is_filled(data_out_.front().bin)) ) {
398                 data_out_.pop_front();
399                 max_ack_off--;
400             }
401             while (max_ack_off>MAX_REORDERING) {
402                 ack_not_rcvd_recent_++;
403                 data_out_tmo_.push_back(data_out_.front().bin);
404                 dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
405                 data_out_.pop_front();
406                 max_ack_off--;
407                 data_out_cap_ = bin64_t::ALL;
408             }
409         }
410     }
411     tint timeout = NOW - rtt_avg_ - 4*std::max(dev_avg_,TINT_MSEC*50);
412     while (!data_out_.empty() && data_out_.front().time<timeout) {
413         if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
414             ack_not_rcvd_recent_++;
415             data_out_cap_ = bin64_t::ALL;
416             data_out_tmo_.push_back(data_out_.front().bin);
417             dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
418         }
419         data_out_.pop_front();
420     }
421     while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
422         data_out_.pop_front();
423     assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
424     while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
425         data_out_tmo_.pop_front();
426
427 }
428
429
430 void    Channel::OnAck (Datagram& dgram) {
431     bin64_t ackd_pos = dgram.Pull32();
432     if (ackd_pos==bin64_t::NONE)
433         return; // likely, brocken packet / insufficient hashes
434     if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
435         eprintf("invalid ack: %s\n",ackd_pos.str());
436         return;
437     }
438     dprintf("%s #%u -ack %s\n",tintstr(),id_,ackd_pos.str());
439     ack_in_.set(ackd_pos);
440     CleanDataOut(ackd_pos); // FIXME do AFTER all ACKs
441 }
442
443
444 void Channel::OnTs (Datagram& dgram) {
445     peer_send_time_ = dgram.Pull64();
446     dprintf("%s #%u -ts %lli\n",tintstr(),id_,peer_send_time_);
447 }
448
449
450 void    Channel::OnHint (Datagram& dgram) {
451     bin64_t hint = dgram.Pull32();
452     hint_in_.push_back(hint);
453     //ack_in_.set(hint,binmap_t::EMPTY);
454     //RequeueSend(cc_->OnHintRecvd(hint));
455     dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
456 }
457
458
459 void Channel::OnHandshake (Datagram& dgram) {
460     peer_channel_id_ = dgram.Pull32();
461     dprintf("%s #%u -hs %x\n",tintstr(),id_,peer_channel_id_);
462     // self-connection check
463     if (!SELF_CONN_OK) {
464         uint32_t try_id = DecodeID(peer_channel_id_);
465         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
466             peer_channel_id_ = 0;
467             Close();
468             return; // this is a self-connection
469         }
470     }
471     // FUTURE: channel forking
472 }
473
474
475 void Channel::OnPex (Datagram& dgram) {
476     uint32_t ipv4 = dgram.Pull32();
477     uint16_t port = dgram.Pull16();
478     Address addr(ipv4,port);
479     dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
480     transfer().OnPexIn(addr);
481 }
482
483
484 void    Channel::AddPex (Datagram& dgram) {
485     int chid = transfer().RevealChannel(pex_out_);
486     if (chid==-1 || chid==id_)
487         return;
488     Address a = channels[chid]->peer();
489     dgram.Push8(SWIFT_PEX_ADD);
490     dgram.Push32(a.ipv4());
491     dgram.Push16(a.port());
492     dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
493 }
494
495
496 Channel*    Channel::RecvDatagram (int socket) {
497     Datagram data(socket);
498     data.Recv();
499     const Address& addr = data.address();
500 #define return_log(...) { printf(__VA_ARGS__); return NULL; }
501     if (data.size()<4) 
502         return_log("datagram shorter than 4 bytes %s\n",addr.str());
503     uint32_t mych = data.Pull32();
504     Sha1Hash hash;
505     Channel* channel = NULL;
506     if (!mych) { // handshake initiated
507         if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
508             return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
509                         tintstr(),data.size(),addr.str());
510         uint8_t hashid = data.Pull8();
511         if (hashid!=SWIFT_HASH) 
512             return_log ("%s #0 no hash in the initial handshake %s\n",
513                         tintstr(),addr.str());
514         bin64_t pos = data.Pull32();
515         if (pos!=bin64_t::ALL) 
516             return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
517         hash = data.PullHash();
518         FileTransfer* file = FileTransfer::Find(hash);
519         if (!file) 
520             return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
521         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
522         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
523             if (channels[*i] && channels[*i]->peer_==data.address() && 
524                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
525                 return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
526         channel = new Channel(file, socket, data.address());
527     } else {
528         mych = DecodeID(mych);
529         if (mych>=channels.size()) 
530             return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
531         channel = channels[mych];
532         if (!channel) 
533             return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
534         if (channel->peer() != addr) 
535             return_log ("%s #%u invalid peer address %s!=%s\n",
536                         tintstr(),mych,channel->peer().str(),addr.str());
537         channel->own_id_mentioned_ = true;
538     }
539     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
540     channel->Recv(data);
541     return channel;
542 }
543
544
545 void    Channel::Loop (tint howlong) {  
546     
547     tint limit = Datagram::Time() + howlong;
548     
549     do {
550
551         tint send_time(TINT_NEVER);
552         Channel* sender(NULL);
553         while (!sender && !send_queue.is_empty()) { // dequeue
554             tintbin next = send_queue.pop();
555             sender = channel((int)next.bin);
556             send_time = next.time;
557             if (sender && sender->next_send_time_!=send_time &&
558                      sender->next_send_time_!=TINT_NEVER )
559                 sender = NULL; // it was a stale entry
560         }
561         
562         if ( sender!=NULL && send_time<=NOW ) { // it's time
563             
564             dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
565                     tintstr(send_time));
566             sender->Send();
567             sender->Reschedule();
568             
569         } else {  // it's too early, wait
570             
571             tint towait = min(limit,send_time) - NOW;
572             dprintf("%s waiting %lliusec\n",tintstr(),towait);
573             int rd = Datagram::Wait(socket_count,sockets,towait);
574             if (rd!=INVALID_SOCKET) { // in meantime, received something
575                 Channel* receiver = RecvDatagram(rd);
576                 if (receiver) // receiver's state may have changed
577                     receiver->Reschedule();
578             }
579             if (sender)  // get back to that later
580                 send_queue.push(tintbin(send_time,sender->id()));
581             
582         }
583         
584     } while (NOW<limit);
585             
586 }
587
588  
589 void Channel::Close () {
590     this->SwitchSendControl(CLOSE_CONTROL);
591 }
592
593
594 void Channel::Reschedule () {
595     next_send_time_ = NextSendTime();
596     if (next_send_time_!=TINT_NEVER) {
597         assert(next_send_time_<NOW+TINT_MIN);
598         send_queue.push(tintbin(next_send_time_,id_));
599         dprintf("%s requeue #%u for %s\n",tintstr(),id_,tintstr(next_send_time_));
600     } else {
601         dprintf("%s #%u closed\n",tintstr(),id_);
602         delete this;
603     }
604 }