Compiles well under MinGW and VC++ alike. Last but not least, runs afterwards.
[swift-upb.git] / sendrecv.cpp
1 /*
2  *  sendrecv.cpp
3  *  most of the swift's state machine
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #include "swift.h"
10 #include <algorithm>  // kill it
11
12 using namespace swift;
13 using namespace std;
14
15 /*
16  TODO  25 Oct 18:55
17  - range: ALL
18  - randomized testing of advanced ops (new testcase)
19  */
20
21 void    Channel::AddPeakHashes (Datagram& dgram) {
22     for(int i=0; i<file().peak_count(); i++) {
23         bin64_t peak = file().peak(i);
24         dgram.Push8(SWIFT_HASH);
25         dgram.Push32((uint32_t)peak);
26         dgram.PushHash(file().peak_hash(i));
27         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
28         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
29     }
30 }
31
32
33 void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
34     bin64_t peak = file().peak_for(pos);
35     while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
36             ack_in_.get(pos.parent())==binmap_t::EMPTY  ) {
37         bin64_t uncle = pos.sibling();
38         dgram.Push8(SWIFT_HASH);
39         dgram.Push32((uint32_t)uncle);
40         dgram.PushHash( file().hash(uncle) );
41         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
42         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
43         pos = pos.parent();
44     }
45 }
46
47
48 bin64_t        Channel::DequeueHint () {
49     if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {    
50         uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
51         twist &= file().peak(0); // may make it semi-seq here
52         file().ack_out().twist(twist);
53         ack_in_.twist(twist);
54         bin64_t my_pick = 
55             file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
56         while (my_pick.width()>max(1,(int)cwnd_))
57             my_pick = my_pick.left();
58         file().ack_out().twist(0);
59         ack_in_.twist(0);
60         if (my_pick!=bin64_t::NONE) {
61             my_pick = my_pick.twisted(twist);
62             hint_in_.push_back(my_pick);
63             dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
64         }
65     }
66     bin64_t send = bin64_t::NONE;
67     while (!hint_in_.empty() && send==bin64_t::NONE) {
68         bin64_t hint = hint_in_.front().bin;
69         tint time = hint_in_.front().time;
70         hint_in_.pop_front();
71         while (!hint.is_base()) { // FIXME optimize; possible attack
72             hint_in_.push_front(tintbin(time,hint.right()));
73             hint = hint.left();
74         }
75         //if (time < NOW-TINT_SEC*3/2 )
76         //    continue;  bad idea
77         if (ack_in_.get(hint)!=binmap_t::FILLED) 
78             send = hint;
79     }
80     uint64_t mass = 0;
81     for(int i=0; i<hint_in_.size(); i++)
82         mass += hint_in_[i].bin.width();
83     dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(),mass);
84     return send;
85 }
86
87
88 void    Channel::AddHandshake (Datagram& dgram) {
89     if (!peer_channel_id_) { // initiating
90         dgram.Push8(SWIFT_HASH);
91         dgram.Push32(bin64_t::ALL32);
92         dgram.PushHash(file().root_hash());
93         dprintf("%s #%u +hash ALL %s\n",
94                 tintstr(),id_,file().root_hash().hex().c_str());
95     }
96     dgram.Push8(SWIFT_HANDSHAKE);
97     int encoded = EncodeID(id_);
98     dgram.Push32(encoded);
99     dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
100     ack_out_.clear();
101     AddAck(dgram);
102 }
103
104
105 void    Channel::Send () {
106     Datagram dgram(socket_,peer());
107     dgram.Push32(peer_channel_id_);
108     bin64_t data = bin64_t::NONE;
109     if ( is_established() ) {
110         // FIXME: seeder check
111         AddAck(dgram);
112         if (!file().is_complete())
113             AddHint(dgram);
114         AddPex(dgram);
115         CleanDataOut();
116         data = AddData(dgram);
117     } else {
118         AddHandshake(dgram);
119         AddAck(dgram);
120     }
121     dprintf("%s #%u sent %ib %s:%x\n",
122             tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
123     if (dgram.size()==4) {// only the channel id; bare keep-alive
124         data = bin64_t::ALL;
125         //dprintf("%s #%u considering keepalive %i %f %s\n",
126         //        tintstr(),id_,(int)data_out_.size(),cwnd_,SEND_CONTROL_MODES[send_control_]);
127         //if (data_out_.size()<cwnd_ && send_control_!=KEEP_ALIVE_CONTROL) {
128             //if ( cwnd_ < 1 )
129             //    SwitchSendControl(KEEP_ALIVE_CONTROL);
130             //else
131             //    cwnd_ = cwnd_/2.0;
132         //}
133         //if (data_out_.empty() && send_control_!=KEEP_ALIVE_CONTROL)
134         //     SwitchSendControl(KEEP_ALIVE_CONTROL);// we did our best
135         //if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
136         //    return; // don't send empty dgram
137     }
138     if (dgram.Send()==-1)
139         print_error("can't send datagram");
140     last_send_time_ = NOW;
141     sent_since_recv_++;
142     dgrams_sent_++;
143 }
144
145
146 void    Channel::AddHint (Datagram& dgram) {
147
148     tint plan_for = max(TINT_SEC,rtt_avg_*4);
149     
150     tint timed_out = NOW - plan_for*2;
151     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
152         hint_out_size_ -= hint_out_.front().bin.width();
153         hint_out_.pop_front();
154     }
155     
156     /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
157     if (!peer_cwnd)
158         peer_cwnd = 1;*/
159     int plan_pck = max ( (tint)1, plan_for / dip_avg_ );
160     
161     if ( hint_out_size_ < plan_pck ) {
162             
163         int diff = plan_pck - hint_out_size_; // TODO: aggregate
164         bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
165         
166         if (hint!=bin64_t::NONE) {
167             dgram.Push8(SWIFT_HINT);
168             dgram.Push32(hint);
169             dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(),hint_out_size_);
170             hint_out_.push_back(hint);
171             hint_out_size_ += hint.width();
172         } else
173             dprintf("%s #%u Xhint\n",tintstr(),id_);
174         
175     }
176 }
177
178
179 bin64_t        Channel::AddData (Datagram& dgram) {
180     
181     if (!file().size()) // know nothing
182         return bin64_t::NONE;
183     
184     bin64_t tosend = bin64_t::NONE;
185     tint luft = send_interval_>>4; // may wake up a bit earlier
186     if (data_out_.size()<cwnd_ &&
187             last_data_out_time_+send_interval_<=NOW+luft) {
188         tosend = DequeueHint();
189         if (tosend==bin64_t::NONE) {
190             dprintf("%s #%u sendctrl no idea what to send\n",tintstr(),id_);
191             if (send_control_!=KEEP_ALIVE_CONTROL)
192                 SwitchSendControl(KEEP_ALIVE_CONTROL);
193         }
194     } else
195         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
196                 tintstr(),id_,cwnd_,data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
197     
198     if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty())) 
199         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
200     
201     if (ack_in_.is_empty() && file().size())
202         AddPeakHashes(dgram);
203     AddUncleHashes(dgram,tosend);
204     if (!ack_in_.is_empty()) // TODO: cwnd_>1
205         data_out_cap_ = tosend;
206
207     if (dgram.size()>254) {
208         dgram.Send(); // kind of fragmentation
209         dgram.Push32(peer_channel_id_);
210     }
211     
212     dgram.Push8(SWIFT_DATA);
213     dgram.Push32(tosend.to32());
214     
215     uint8_t buf[1024];
216     size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
217     // TODO: corrupted data, retries, caching
218     if (r<0) {
219         print_error("error on reading");
220         return bin64_t::NONE;
221     }
222     assert(dgram.space()>=r+4+1);
223     dgram.Push(buf,r);
224     
225     last_data_out_time_ = NOW;
226     data_out_.push_back(tosend);
227     dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
228     
229     return tosend;
230 }
231
232
233 void    Channel::AddTs (Datagram& dgram) {
234     dgram.Push8(SWIFT_TS);
235     dgram.Push64(data_in_.time);
236     dprintf("%s #%u +ts %s\n",tintstr(),id_,tintstr(data_in_.time));
237 }
238
239
240 void    Channel::AddAck (Datagram& dgram) {
241     if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
242         dgram.Push8(SWIFT_ACK);
243         dgram.Push32(data_in_dbl_.to32());
244         data_in_dbl_=bin64_t::NONE;
245     }
246     if (data_in_.time!=TINT_NEVER) { // TODO: ACK NONE for corrupted data
247         AddTs(dgram);
248         bin64_t pos = data_in_.bin; // be precise file().ack_out().cover(data_in_.bin);
249         dgram.Push8(SWIFT_ACK);
250         dgram.Push32(pos.to32());
251         //dgram.Push64(data_in_.time);
252         ack_out_.set(pos);
253         dprintf("%s #%u +ack %s %s\n",tintstr(),id_,pos.str(),tintstr(data_in_.time));
254         data_in_ = tintbin(TINT_NEVER,bin64_t::NONE);
255         if (pos.layer()>2)
256             data_in_dbl_ = pos;
257     }
258     for(int count=0; count<4; count++) {
259         bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, binmap_t::FILLED);
260         if (ack==bin64_t::NONE)
261             break;
262         ack = file().ack_out().cover(ack);
263         ack_out_.set(ack);
264         dgram.Push8(SWIFT_ACK);
265         dgram.Push32(ack.to32());
266         dprintf("%s #%u +ack %s\n",tintstr(),id_,ack.str());
267     }
268 }
269
270
271 void    Channel::Recv (Datagram& dgram) {
272     dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4);
273     peer_send_time_ = 0; // has scope of 1 datagram
274     dgrams_rcvd_++;
275     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
276         rtt_avg_ = NOW - last_send_time_;
277         dev_avg_ = rtt_avg_;
278         dip_avg_ = rtt_avg_;
279         dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
280     }
281     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
282     while (dgram.size()) {
283         uint8_t type = dgram.Pull8();
284         switch (type) {
285             case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
286             case SWIFT_DATA:      data=OnData(dgram); break;
287             case SWIFT_TS:        OnTs(dgram); break;
288             case SWIFT_ACK:       OnAck(dgram); break;
289             case SWIFT_HASH:      OnHash(dgram); break;
290             case SWIFT_HINT:      OnHint(dgram); break;
291             case SWIFT_PEX_ADD:   OnPex(dgram); break;
292             default:
293                 eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
294                 return;
295         }
296     }
297     last_recv_time_ = NOW;
298     sent_since_recv_ = 0;
299 }
300
301
302 void    Channel::OnHash (Datagram& dgram) {
303     bin64_t pos = dgram.Pull32();
304     Sha1Hash hash = dgram.PullHash();
305     file().OfferHash(pos,hash);
306     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
307     dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
308 }
309
310
311 void    Channel::CleanHintOut (bin64_t pos) {
312     int hi = 0;
313     while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
314         hi++;
315     if (hi==hint_out_.size())
316         return; // something not hinted or hinted in far past
317     while (hi--) { // removing likely snubbed hints
318         hint_out_size_ -= hint_out_.front().bin.width();
319         hint_out_.pop_front();
320     }
321     while (hint_out_.front().bin!=pos) {
322         tintbin f = hint_out_.front();
323         f.bin = f.bin.towards(pos);
324         hint_out_.front().bin = f.bin.sibling();
325         hint_out_.push_front(f);
326     }
327     hint_out_.pop_front();
328     hint_out_size_--;
329 }
330
331
332 bin64_t Channel::OnData (Datagram& dgram) {
333     bin64_t pos = dgram.Pull32();
334     uint8_t *data;
335     int length = dgram.Pull(&data,1024);
336     bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
337     dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
338     data_in_ = tintbin(NOW,bin64_t::NONE);
339     if (!ok) 
340         return bin64_t::NONE;
341     data_in_.bin = pos;
342     if (pos!=bin64_t::NONE) {
343         if (last_data_in_time_) {
344             tint dip = NOW - last_data_in_time_;
345             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
346         }
347         last_data_in_time_ = NOW;
348     }
349     CleanHintOut(pos);    
350     return pos;
351 }
352
353
354 void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
355     
356     int max_ack_off = 0;
357     
358     if (ackd_pos!=bin64_t::NONE) {
359         for (int i=0; i<data_out_.size(); i++) {
360             if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
361                 if (peer_send_time_)
362                     for(tbqueue::iterator j=data_out_tmo_.begin(); j!=data_out_tmo_.end(); j++)
363                         if (j->bin==data_out_[i].bin)
364                             peer_send_time_=0; // possibly retransmit
365                 if (peer_send_time_) { // well, it is sorta ACK ACK
366                     tint rtt = NOW-data_out_[i].time;
367                     rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
368                     dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
369                     assert(data_out_[i].time!=TINT_NEVER);
370                     tint owd = peer_send_time_ - data_out_[i].time;
371                     owd_cur_bin_ = (owd_cur_bin_+1) & 3;
372                     owd_current_[owd_cur_bin_] = owd;
373                     if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
374                         owd_min_bin_start_ = NOW;
375                         owd_min_bin_ = (owd_min_bin_+1) & 3;
376                         owd_min_bins_[owd_min_bin_] = TINT_NEVER;
377                     }
378                     if (owd_min_bins_[owd_min_bin_]>owd)
379                         owd_min_bins_[owd_min_bin_] = owd;
380                     peer_send_time_ = 0;
381                 }
382                 dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
383                         tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str());
384                 bin64_t pos = data_out_[i].bin;
385                 ack_rcvd_recent_++;
386                 data_out_[i]=tintbin();
387                 max_ack_off = i;
388                 if (ackd_pos==pos)
389                     break;
390             }
391         }
392         while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
393             data_out_.pop_front();
394             max_ack_off--;
395         }
396         static const int MAX_REORDERING = 2;  // the triple-ACK principle
397         if (max_ack_off>MAX_REORDERING) {
398             while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
399                                    || ack_in_.is_filled(data_out_.front().bin)) ) {
400                 data_out_.pop_front();
401                 max_ack_off--;
402             }
403             while (max_ack_off>MAX_REORDERING) {
404                 ack_not_rcvd_recent_++;
405                 data_out_tmo_.push_back(data_out_.front().bin);
406                 dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
407                 data_out_.pop_front();
408                 max_ack_off--;
409                 data_out_cap_ = bin64_t::ALL;
410             }
411         }
412         peer_send_time_ = 0;
413     }
414     tint timeout = NOW - rtt_avg_ - 4*max(dev_avg_,TINT_MSEC*50);
415     while (!data_out_.empty() && data_out_.front().time<timeout) {
416         if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
417             ack_not_rcvd_recent_++;
418             data_out_cap_ = bin64_t::ALL;
419             data_out_tmo_.push_back(data_out_.front().bin);
420             dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
421         }
422         data_out_.pop_front();
423     }
424     while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
425         data_out_.pop_front();
426     assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
427     while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
428         data_out_tmo_.pop_front();
429
430 }
431
432
433 void    Channel::OnAck (Datagram& dgram) {
434     bin64_t ackd_pos = dgram.Pull32();
435     if (ackd_pos==bin64_t::NONE)
436         return; // likely, brocken packet / insufficient hashes
437     if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
438         eprintf("invalid ack: %s\n",ackd_pos.str());
439         return;
440     }
441     dprintf("%s #%u -ack %s\n",tintstr(),id_,ackd_pos.str());
442     ack_in_.set(ackd_pos);
443     CleanDataOut(ackd_pos); // FIXME do AFTER all ACKs
444 }
445
446
447 void Channel::OnTs (Datagram& dgram) {
448     peer_send_time_ = dgram.Pull64();
449     dprintf("%s #%u -ts %lli\n",tintstr(),id_,peer_send_time_);
450 }
451
452
453 void    Channel::OnHint (Datagram& dgram) {
454     bin64_t hint = dgram.Pull32();
455     hint_in_.push_back(hint);
456     //ack_in_.set(hint,binmap_t::EMPTY);
457     //RequeueSend(cc_->OnHintRecvd(hint));
458     dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
459 }
460
461
462 void Channel::OnHandshake (Datagram& dgram) {
463     peer_channel_id_ = dgram.Pull32();
464     dprintf("%s #%u -hs %x\n",tintstr(),id_,peer_channel_id_);
465     // self-connection check
466     if (!SELF_CONN_OK) {
467         uint32_t try_id = DecodeID(peer_channel_id_);
468         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
469             peer_channel_id_ = 0;
470             Close();
471             return; // this is a self-connection
472         }
473     }
474     // FUTURE: channel forking
475 }
476
477
478 void Channel::OnPex (Datagram& dgram) {
479     uint32_t ipv4 = dgram.Pull32();
480     uint16_t port = dgram.Pull16();
481     Address addr(ipv4,port);
482     dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
483     transfer().OnPexIn(addr);
484 }
485
486
487 void    Channel::AddPex (Datagram& dgram) {
488     int chid = transfer().RevealChannel(pex_out_);
489     if (chid==-1 || chid==id_)
490         return;
491     Address a = channels[chid]->peer();
492     dgram.Push8(SWIFT_PEX_ADD);
493     dgram.Push32(a.ipv4());
494     dgram.Push16(a.port());
495     dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
496 }
497
498
499 Channel*    Channel::RecvDatagram (int socket) {
500     Datagram data(socket);
501     data.Recv();
502     const Address& addr = data.address();
503 #define return_log(...) { printf(__VA_ARGS__); return NULL; }
504     if (data.size()<4) 
505         return_log("datagram shorter than 4 bytes %s\n",addr.str());
506     uint32_t mych = data.Pull32();
507     Sha1Hash hash;
508     Channel* channel = NULL;
509     if (!mych) { // handshake initiated
510         if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
511             return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
512                         tintstr(),data.size(),addr.str());
513         uint8_t hashid = data.Pull8();
514         if (hashid!=SWIFT_HASH) 
515             return_log ("%s #0 no hash in the initial handshake %s\n",
516                         tintstr(),addr.str());
517         bin64_t pos = data.Pull32();
518         if (pos!=bin64_t::ALL) 
519             return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
520         hash = data.PullHash();
521         FileTransfer* file = FileTransfer::Find(hash);
522         if (!file) 
523             return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
524         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
525         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
526             if (channels[*i] && channels[*i]->peer_==data.address() && 
527                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
528                 return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
529         channel = new Channel(file, socket, data.address());
530     } else {
531         mych = DecodeID(mych);
532         if (mych>=channels.size()) 
533             return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
534         channel = channels[mych];
535         if (!channel) 
536             return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
537         if (channel->peer() != addr) 
538             return_log ("%s #%u invalid peer address %s!=%s\n",
539                         tintstr(),mych,channel->peer().str(),addr.str());
540         channel->own_id_mentioned_ = true;
541     }
542     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
543     channel->Recv(data);
544     return channel;
545 }
546
547
548 void    Channel::Loop (tint howlong) {  
549     
550     tint limit = Datagram::Time() + howlong;
551     
552     do {
553
554         tint send_time(TINT_NEVER);
555         Channel* sender(NULL);
556         while (!sender && !send_queue.is_empty()) { // dequeue
557             tintbin next = send_queue.pop();
558             sender = channel((int)next.bin);
559             send_time = next.time;
560             if (sender && sender->next_send_time_!=send_time &&
561                      sender->next_send_time_!=TINT_NEVER )
562                 sender = NULL; // it was a stale entry
563         }
564         
565         if ( sender!=NULL && send_time<=NOW ) { // it's time
566             
567             dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
568                     tintstr(send_time));
569             sender->Send();
570             sender->Reschedule();
571             
572         } else {  // it's too early, wait
573             
574             tint towait = min(limit,send_time) - NOW;
575             dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
576             int rd = Datagram::Wait(socket_count,sockets,towait);
577             if (rd!=INVALID_SOCKET) { // in meantime, received something
578                 Channel* receiver = RecvDatagram(rd);
579                 if (receiver) // receiver's state may have changed
580                     receiver->Reschedule();
581             }
582             if (sender)  // get back to that later
583                 send_queue.push(tintbin(send_time,sender->id()));
584             
585         }
586         
587     } while (NOW<limit);
588             
589 }
590
591  
592 void Channel::Close () {
593     this->SwitchSendControl(CLOSE_CONTROL);
594 }
595
596
597 void Channel::Reschedule () {
598     next_send_time_ = NextSendTime();
599     if (next_send_time_!=TINT_NEVER) {
600         assert(next_send_time_<NOW+TINT_MIN);
601         send_queue.push(tintbin(next_send_time_,id_));
602         dprintf("%s #%u requeue for %s\n",tintstr(),id_,tintstr(next_send_time_));
603     } else {
604         dprintf("%s #%u closed\n",tintstr(),id_);
605         delete this;
606     }
607 }