}
-bool PingPongController::MaySendData(){
- return ch_->data_out_.empty();
+void SendController::Schedule (tint next_time) {
+ ch_->Schedule(next_time);
}
-
-tint PingPongController::NextSendTime () {
- if (unanswered_>=3)
- return TINT_NEVER;
- return ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_*4; // remind on timeout
-}
-
-void PingPongController::OnDataSent(bin64_t b) {
- unanswered_++;
- if ( (b==bin64_t::ALL && MaySendData()) ) // nothing to send
- Swap(new KeepAliveController(this));
-}
-
-void PingPongController::OnDataRecvd(bin64_t b) {
- unanswered_ = 0;
-}
-
-void PingPongController::OnAckRcvd(bin64_t ackd) {
- //if (ch_->data_out_.empty())
- Swap(new SlowStartController(this));
+
+
+
+KeepAliveController::KeepAliveController (Channel* ch) : SendController(ch), delay_(ch->rtt_avg_) {
}
KeepAliveController::KeepAliveController(SendController* prev, tint delay) :
SendController(prev), delay_(delay) {
- ch_->dev_avg_ = TINT_SEC; // without active measurement, rtt is unreliable
+ ch_->dev_avg_ = TINT_SEC; // without constant active measurement, rtt is unreliable
+ delay_=ch_->rtt_avg_;
}
bool KeepAliveController::MaySendData() {
return true;
}
-tint KeepAliveController::NextSendTime () {
- if (!delay_)
- delay_ = ch_->rtt_avg_;
- if (ch_->last_recv_time_ < ch_->last_send_time_-TINT_MIN)
- return TINT_NEVER;
- return ch_->last_send_time_ + delay_;
-}
-
+
void KeepAliveController::OnDataSent(bin64_t b) {
- delay_ = (NOW - std::max(ch_->last_send_time_,ch_->last_recv_time_)) * 3 / 2;
- if (delay_>TINT_SEC*58)
- delay_ = TINT_SEC*58;
- if (b!=bin64_t::ALL && b!=bin64_t::NONE)
+ if (b==bin64_t::ALL || b==bin64_t::NONE) {
+ delay_ = delay_ * 2; // backing off
+ if (delay_>TINT_SEC*58) // keep NAT mappings alive
+ delay_ = TINT_SEC*58;
+ if (delay_>=4*TINT_SEC && ch_->last_recv_time_ < NOW-TINT_MIN)
+ Schedule(TINT_NEVER); // no response; enter close timeout
+ else
+ Schedule(NOW+delay_); // all right, just keep it alive
+ } else {
+ Schedule(NOW+ch_->rtt_avg_); // cwnd==1 => next send in 1 rtt
Swap(new SlowStartController(this));
+ }
}
void KeepAliveController::OnDataRecvd(bin64_t b) {
+ if (b!=bin64_t::NONE && b!=bin64_t::ALL) { // channel is alive
+ delay_ = ch_->rtt_avg_;
+ Schedule(NOW); // schedule an ACK; TODO: aggregate
+ }
}
void KeepAliveController::OnAckRcvd(bin64_t ackd) {
+ // probably to something sent by CwndControllers before this one got installed
}
}
bool CwndController::MaySendData() {
+ tint spacing = ch_->rtt_avg_ / cwnd_;
dprintf("%s #%i sendctrl may send %i < %f & %s (rtt %lli)\n",tintstr(),
- ch_->id,(int)ch_->data_out_.size(),cwnd_,tintstr(NextSendTime()),
- ch_->rtt_avg_);
+ ch_->id,(int)ch_->data_out_.size(),cwnd_,
+ tintstr(ch_->last_send_data_time_+spacing), ch_->rtt_avg_);
return ch_->data_out_.empty() ||
- (ch_->data_out_.size() < cwnd_ && NOW >= NextSendTime());
-}
-
-tint CwndController::NextSendTime () {
- tint sendtime;
- if (ch_->data_out_.size() < cwnd_)
- sendtime = ch_->last_send_time_ + (ch_->rtt_avg_ / cwnd_);
- else
- sendtime = ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_ * 4 ;
- return sendtime;
+ (ch_->data_out_.size() < cwnd_ && NOW-ch_->last_send_data_time_ >= spacing);
}
+
void CwndController::OnDataSent(bin64_t b) {
- if (b==bin64_t::ALL || b==bin64_t::NONE) {
- if (MaySendData())
- Swap(new KeepAliveController(this));
- }
+ if ( (b==bin64_t::ALL || b==bin64_t::NONE) && MaySendData() ) { // no more data
+ Schedule(NOW+ch_->rtt_avg_);
+ Swap(new KeepAliveController(this));
+ } else {
+ tint spacing = ch_->rtt_avg_ / cwnd_;
+ if (ch_->data_out_.size() < cwnd_) { // have cwnd; not the right time yet
+ Schedule(ch_->last_send_data_time_+spacing);
+ } else { // no free cwnd
+ tint timeout = std::max( ch_->rtt_avg_+ch_->dev_avg_*4, 500*TINT_MSEC );
+ assert(!ch_->data_out_.empty());
+ Schedule(ch_->data_out_.front().time+timeout); // wait for ACK or timeout
+ }
+ }
}
-
+
+
void CwndController::OnDataRecvd(bin64_t b) {
+ if (b!=bin64_t::NONE && b!=bin64_t::ALL) {
+ Schedule(NOW); // send ACK; todo: aggregate ACKs
+ }
}
void CwndController::OnAckRcvd(bin64_t ackd) {
else
cwnd_ += 1.0/cwnd_;
dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_);
+ tint spacing = ch_->rtt_avg_ / cwnd_;
+ Schedule(ch_->last_send_time_+spacing);
}
}
cwnd_ /= 2;
}
-
-void AIMDController::OnAckRcvd (bin64_t pos) {
- if (pos==bin64_t::NONE) {
- dprintf("%s #%i sendctrl loss detected\n",tintstr(),ch_->id);
- if (NOW>last_change_+ch_->rtt_avg_) {
- cwnd_ /= 2;
- last_change_ = NOW;
- }
- } else {
- cwnd_ += 1.0/cwnd_;
- dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_);
- }
-}
-
dgram.Push32((uint32_t)peak);
dgram.PushHash(file().peak_hash(i));
//DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
- dprintf("%s #%i +phash (%i,%lli)\n",tintstr(),id,peak.layer(),peak.offset());
+ dprintf("%s #%i +phash %s\n",tintstr(),id,peak.str());
}
}
dgram.Push32((uint32_t)uncle);
dgram.PushHash( file().hash(uncle) );
//DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
- dprintf("%s #%i +hash (%i,%lli)\n",tintstr(),id,uncle.layer(),uncle.offset());
+ dprintf("%s #%i +hash %s\n",tintstr(),id,uncle.str());
pos = pos.parent();
}
}
bin64_t hint = hint_in_.front().bin;
tint time = hint_in_.front().time;
hint_in_.pop_front();
- //if (time < NOW-2*TINT_SEC ) //NOW-8*rtt_avg_)
- // continue;
+ if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_)
+ continue;
send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
send = send.left_foot(); // single packet
dprintf("%s #%i dequeued %lli\n",tintstr(),id,send.base_offset());
}
-/*void Channel::CleanStaleHints () {
- while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED )
- hint_out.pop_front(); // FIXME must normally clear fulfilled entries
- tint timed_out = NOW - cc_->RoundTripTime()*8;
- while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
- file().picker()->Snubbed(hint_out.front().bin);
- hint_out.pop_front();
- }
-}*/
-
-
void Channel::AddHandshake (Datagram& dgram) {
if (!peer_channel_id_) { // initiating
dgram.Push8(P2TP_HASH);
void Channel::ClearStaleDataOut() {
int oldsize = data_out_.size();
- while ( data_out_.size() && data_out_.front().time <
- NOW - rtt_avg_ - dev_avg_*4 )
+ tint timeout = NOW - max( rtt_avg_-dev_avg_*4, 500*TINT_MSEC );
+ while ( data_out_.size() && data_out_.front().time < timeout ) {
+ dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
data_out_.pop_front();
+ }
if (data_out_.size()!=oldsize) {
cc_->OnAckRcvd(bin64_t::NONE);
data_out_cap_ = bin64_t::ALL;
}
- while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
+ while (data_out_.size() && (data_out_.front()==tintbin() || ack_in_.get(data_out_.front().bin)==bins::FILLED))
data_out_.pop_front();
}
AddHint(dgram);
AddPex(dgram);
ClearStaleDataOut();
- if (cc_->MaySendData())
- data = AddData(dgram);
- else
- dprintf("%s #%i no cwnd\n",tintstr(),id);
+ data = AddData(dgram);
} else {
AddHandshake(dgram);
AddAck(dgram);
}
+void Channel::CleanStaleHintOut () {
+ tint timed_out = NOW - 8*rtt_avg_;
+ while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
+ transfer().picker().Expired(hint_out_.front().bin);
+ hint_out_.pop_front();
+ }
+}
+
+
void Channel::AddHint (Datagram& dgram) {
+ CleanStaleHintOut();
+
+ uint64_t hint_out_mass=0;
+ for(int i=0; i<hint_out_.size(); i++)
+ hint_out_mass += hint_out_[i].bin.width();
+
int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
if (!peer_cwnd)
peer_cwnd = 1;
int peer_pps = TINT_SEC / dip_avg_;
if (!peer_pps)
peer_pps = 1;
- dprintf("%s #%i hint_out_ %lli+%lli mark (%i,%lli) peer_cwnd %lli/%lli=%f\n",
- tintstr(),id,hint_out_,hint_out_am_,(int)hint_out_mark_.bin.layer(),
- hint_out_mark_.bin.offset(),
- rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_));
-
- if ( hint_out_mark_.time < NOW - TINT_SEC*2 ) { //NOW-rtt_avg_*8-dev_avg_) {
- hint_out_mark_.bin=bin64_t::NONE;
- //hint_out_ = hint_out_am_;
- //hint_out_am_ = 0;
- }
- if ( peer_pps > hint_out_+hint_out_am_ ) { //4*peer_cwnd
+ if ( hint_out_mass < 4*peer_cwnd ) {
- int diff = peer_pps - hint_out_ - hint_out_am_; // 4*peer_cwnd
+ int diff = 5*peer_cwnd - hint_out_mass;
if (diff>4 && diff>2*peer_cwnd)
diff >>= 1;
- bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+TINT_SEC*3/2); //rtt_avg_*8+TINT_MSEC*10
+ bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
if (hint!=bin64_t::NONE) {
dgram.Push8(P2TP_HINT);
dgram.Push32(hint);
- dprintf("%s #%i +hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
- if (hint_out_mark_.bin==bin64_t::NONE) {
- hint_out_mark_ = tintbin(NOW,hint);
- hint_out_ = hint_out_am_;
- hint_out_am_ = 0;
- }
- hint_out_am_ += hint.width();
- //hint_out_ += hint.width();
- }
+ dprintf("%s #%i +hint %s\n",tintstr(),id,hint.str());
+ hint_out_.push_back(hint);
+ } else
+ printf("%s #%i Xhint\n",tintstr(),id);
}
}
bin64_t Channel::AddData (Datagram& dgram) {
- if (!file().size()) // know nothing
+
+ if (!file().size()) // know nothing
return bin64_t::NONE;
- bin64_t tosend = DequeueHint();
- if (tosend==bin64_t::NONE) {
- dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
- return bin64_t::NONE;
- }
- if (ack_in_.is_empty() && file().size())
- AddPeakHashes(dgram);
- AddUncleHashes(dgram,tosend);
- uint8_t buf[1024];
- size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
- // TODO: ??? corrupted data, retries
- if (r<0) {
- print_error("error on reading");
- return bin64_t::NONE;
+
+ bin64_t tosend = bin64_t::NONE;
+ if (cc_->MaySendData()) {
+ tosend = DequeueHint();
+ if (tosend==bin64_t::NONE)
+ dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
+ } else
+ dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id);
+
+ if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty()))
+ return bin64_t::NONE; // once in a while, empty data is sent just to check rtt
+
+ if (tosend!=bin64_t::NONE) { // hashes
+ if (ack_in_.is_empty() && file().size())
+ AddPeakHashes(dgram);
+ AddUncleHashes(dgram,tosend);
+ data_out_cap_ = tosend;
}
- assert(dgram.space()>=r+4+1);
+
dgram.Push8(P2TP_DATA);
- dgram.Push32(tosend);
- dgram.Push(buf,r);
- dprintf("%s #%i +data (%lli)\n",tintstr(),id,tosend.base_offset());
+ dgram.Push32(tosend.to32());
+
+ if (tosend!=bin64_t::NONE) { // data
+ uint8_t buf[1024];
+ size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
+ // TODO: corrupted data, retries, caching
+ if (r<0) {
+ print_error("error on reading");
+ return bin64_t::NONE;
+ }
+ assert(dgram.space()>=r+4+1);
+ dgram.Push(buf,r);
+ }
+
+ last_send_data_time_ = NOW;
data_out_.push_back(tosend);
- data_out_cap_ = tosend;
- // FIXME BUG this makes data_out_ all stale ack_in_.set(tosend);
+ dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str());
+
return tosend;
}
dgram.Push32(pos);
//dgram.Push64(data_in_.time);
ack_out_.set(pos);
- dprintf("%s #%i +ack (%i,%lli) %s\n",tintstr(),id,
- pos.layer(),pos.offset(),tintstr(data_in_.time));
+ dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
data_in_ = tintbin(0,bin64_t::NONE);
}
for(int count=0; count<4; count++) {
ack_out_.set(ack);
dgram.Push8(P2TP_ACK);
dgram.Push32(ack);
- dprintf("%s #%i +ack (%i,%lli)\n",tintstr(),id,ack.layer(),ack.offset());
+ dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str());
}
}
}
cc_->OnDataRecvd(data);
last_recv_time_ = NOW;
- if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC)
+ if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC) {
+ Datagram::Time();
Send();
+ }
}
Sha1Hash hash = dgram.PullHash();
file().OfferHash(pos,hash);
//DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
- dprintf("%s #%i -hash (%i,%lli)\n",tintstr(),id,pos.layer(),pos.offset());
+ dprintf("%s #%i -hash %s\n",tintstr(),id,pos.str());
+}
+
+
+void Channel::CleanFulfilledHints (bin64_t pos) {
+ int hi = 0;
+ while (hi<hint_out_.size() && hi<8 && !pos.within(hint_out_[hi].bin))
+ hi++;
+ if (hi<8 && hi<hint_out_.size()) {
+ while (hi--) {
+ transfer().picker().Expired(hint_out_.front().bin);
+ hint_out_.pop_front();
+ }
+ while (hint_out_.front().bin!=pos) {
+ tintbin f = hint_out_.front();
+ f.bin = f.bin.towards(pos);
+ hint_out_.front().bin = f.bin.sibling();
+ hint_out_.push_front(f);
+ }
+ hint_out_.pop_front();
+ }
+ // every HINT ends up as either Expired or Received
+ transfer().picker().Received(pos);
}
bin64_t pos = dgram.Pull32();
uint8_t *data;
int length = dgram.Pull(&data,1024);
- bool ok = file().OfferData(pos, (char*)data, length) ;
+ bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
dprintf("%s #%i %cdata (%lli)\n",tintstr(),id,ok?'-':'!',pos.offset());
if (!ok)
return bin64_t::NONE;
data_in_ = tintbin(NOW,pos);
- transfer().picker().Received(pos); // FIXME ugly
- if (last_data_time_) {
- tint dip = NOW - last_data_time_;
- dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
- }
- last_data_time_ = NOW;
- if (pos.within(hint_out_mark_.bin)) {
- hint_out_mark_.bin = bin64_t::NONE;
+ if (pos!=bin64_t::NONE) {
+ if (last_recv_data_time_) {
+ tint dip = NOW - last_recv_data_time_;
+ dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
+ }
+ last_recv_data_time_ = NOW;
}
- if (hint_out_)
- hint_out_--;
- else if (hint_out_am_) // probably, the marking HINT was lost or whatever
- hint_out_am_--;
+ CleanFulfilledHints(pos);
return pos;
}
-void Channel::OnAck (Datagram& dgram) {
- bin64_t ackd_pos = dgram.Pull32();
- if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
- eprintf("invalid ack: (%i,%lli)\n",ackd_pos.layer(),ackd_pos.offset());
- return;
- }
- dprintf("%s #%i -ack (%i,%lli)\n",tintstr(),id,ackd_pos.layer(),ackd_pos.offset());
+void Channel::CleanFulfilledDataOut (bin64_t ackd_pos) {
for (int i=0; i<8 && i<data_out_.size(); i++)
- if (data_out_[i].bin.within(ackd_pos)) {
+ if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
tint rtt = NOW-data_out_[i].time;
- rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
+ rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
dprintf("%s #%i rtt %lli dev %lli\n",
tintstr(),id,rtt_avg_,dev_avg_);
- cc_->OnAckRcvd(data_out_[i].bin); // may be invoked twice FIXME FIXME FIXME
+ cc_->OnAckRcvd(data_out_[i].bin);
+ data_out_[i]=tintbin();
}
- ack_in_.set(ackd_pos);
- while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
+ while ( data_out_.size() && ( data_out_.front()==tintbin() ||
+ ack_in_.get(data_out_.front().bin)==bins::FILLED ) )
data_out_.pop_front();
}
-/*void Channel::OnAckTs (Datagram& dgram) { // FIXME: OnTs
- bin64_t pos = dgram.Pull32();
- tint ts = dgram.Pull64();
- // TODO sanity check
- dprintf("%s #%i -ackts (%i,%lli) %s\n",
- tintstr(),id,pos.layer(),pos.offset(),tintstr(ts));
- ack_in_.set(pos);
- cc_->OnAckRcvd(pos,ts);
-}*/
+void Channel::OnAck (Datagram& dgram) {
+ bin64_t ackd_pos = dgram.Pull32();
+ if (ackd_pos!=bin64_t::NONE && file().size() && ackd_pos.base_offset()>=file().packet_size()) {
+ eprintf("invalid ack: %s\n",ackd_pos.str());
+ return;
+ }
+ dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str());
+ ack_in_.set(ackd_pos);
+ CleanFulfilledDataOut(ackd_pos);
+}
+
void Channel::OnTs (Datagram& dgram) {
peer_send_time_ = dgram.Pull64();
hint_in_.push_back(hint);
//ack_in_.set(hint,bins::EMPTY);
//RequeueSend(cc_->OnHintRecvd(hint));
- dprintf("%s #%i -hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
+ dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str());
}
if (hashid!=P2TP_HASH)
RETLOG ("no hash in the initial handshake");
bin64_t pos = data.Pull32();
- if (pos!=bin64_t::ALL32)
+ if (pos!=bin64_t::ALL)
RETLOG ("that is not the root hash");
hash = data.PullHash();
FileTransfer* file = FileTransfer::Find(hash);
RETLOG ("invalid peer address");
channel->own_id_mentioned_ = true;
}
- dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
+ //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
channel->Recv(data);
}
-bool tblater (const tintbin& a, const tintbin& b) {
- return a.time > b.time;
-}
-
-
-void Channel::RequeueSend (tint next_time) {
- if (next_time==next_send_time_)
- return;
- next_send_time_ = next_time;
- send_queue.push_back
- (tintbin(next_time==TINT_NEVER?NOW+TINT_MIN:next_time,id));
- push_heap(send_queue.begin(),send_queue.end(),tblater);
- dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
-}
-
-
void Channel::Loop (tint howlong) {
tint limit = Datagram::Time() + howlong;
tint send_time(TINT_NEVER);
Channel* sender(NULL);
- while (!send_queue.empty()) {
- send_time = send_queue.front().time;
- sender = channel((int)send_queue.front().bin);
+ while (!send_queue.is_empty()) {
+ send_time = send_queue.peek().time;
+ sender = channel((int)send_queue.peek().bin);
if (sender)
if ( sender->next_send_time_==send_time ||
sender->next_send_time_==TINT_NEVER )
break;
sender = NULL; // it was a stale entry
- pop_heap(send_queue.begin(), send_queue.end(), tblater);
- send_queue.pop_back();
+ send_queue.pop();
}
if (send_time>limit)
send_time = limit;
tintstr(send_time));
sender->Send();
sender->last_send_time_ = NOW;
- sender->RequeueSend(sender->cc_->NextSendTime());
- pop_heap(send_queue.begin(), send_queue.end(), tblater);
- send_queue.pop_back();
+ // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl
+ send_queue.pop();
} else if ( send_time > NOW ) {
tint towait = send_time - NOW;
dprintf("%s waiting %lliusec\n",tintstr(),towait);
int rd = Datagram::Wait(socket_count,sockets,towait);
if (rd!=INVALID_SOCKET)
RecvDatagram(rd);
- } else { //if (sender->next_send_time_==TINT_NEVER) {
+ } else if (sender) { // FIXME FIXME FIXME REWRITE!!! if (sender->next_send_time_==TINT_NEVER) {
dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
delete sender;
- pop_heap(send_queue.begin(), send_queue.end(), tblater);
- send_queue.pop_back();
+ send_queue.pop();
}
} while (Datagram::Time()<limit);
}
+
+void Channel::Schedule (tint next_time) {
+ if (next_time==next_send_time_)
+ return;
+ next_send_time_ = next_time;
+ if (next_time==TINT_NEVER)
+ next_time = NOW + TINT_MIN; // 1min timeout
+ send_queue.push(tintbin(next_time,id));
+ dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
+}
\ No newline at end of file