v check hints agains ack_out?_
v check data against ack_in
v channel suspend/wake. 3 cong modes state machine - ???
+ * release hints for a dormant channel
* minimize the number of template instantiations
v Channel thinks how much it HINTs a second,
picker thinks which HINTs are snubbed
+ * files <1sec download : how HINTs are sent?
* dead Channels are not killed => cannot open a new one
(have a channel already)
v peers don't cooperate
* RecoverProgress fails sometime
* leecher can't see file is done already
+ * why leecher waits 1sec?
}
tint PingPongController::NextSendTime () {
+ if (unanswered_>=3)
+ return TINT_NEVER;
return ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_*4; // remind on timeout
}
void PingPongController::OnDataSent(bin64_t b) {
- if ( (ch_->last_recv_time_ && ch_->last_recv_time_<NOW-TINT_SEC*3) || //no reply
- (b==bin64_t::ALL && MaySendData()) ) // nothing to send
+ unanswered_++;
+ if ( (b==bin64_t::ALL && MaySendData()) ) // nothing to send
Swap(new KeepAliveController(this));
}
void PingPongController::OnDataRecvd(bin64_t b) {
+ unanswered_ = 0;
}
void PingPongController::OnAckRcvd(bin64_t ackd) {
tint KeepAliveController::NextSendTime () {
if (!delay_)
delay_ = ch_->rtt_avg_;
+ if (ch_->last_recv_time_ < ch_->last_send_time_-TINT_MIN)
+ return TINT_NEVER;
return ch_->last_send_time_ + delay_;
}
struct PingPongController : public SendController {
- int fails_;
+ int unanswered_;
- PingPongController (SendController* orig) : SendController(orig), fails_(0) {}
- PingPongController (Channel* ch) : fails_(0), SendController(ch) {}
+ PingPongController (SendController* orig) :
+ SendController(orig), unanswered_(0) {}
+ PingPongController (Channel* ch) :
+ unanswered_(0), SendController(ch) {}
const char* type() const { return "PingPong"; }
bool MaySendData();
tint NextSendTime ();
tint delay_;
- KeepAliveController(SendController* prev) : SendController(prev),
- delay_(0) {}
+ KeepAliveController(SendController* prev, tint delay=0) :
+ SendController(prev), delay_(delay) {}
const char* type() const { return "KeepAlive"; }
bool MaySendData();
tint NextSendTime () ;
Channel::~Channel () {
channels[id] = NULL;
+ delete cc_;
}
if (next_time==next_send_time_)
return;
next_send_time_ = next_time;
- send_queue.push_back(tintbin(next_time,id));
+ send_queue.push_back
+ (tintbin(next_time==TINT_NEVER?NOW+TINT_MIN:next_time,id));
push_heap(send_queue.begin(),send_queue.end(),tblater);
dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
}
while (!send_queue.empty()) {
send_time = send_queue.front().time;
sender = channel((int)send_queue.front().bin);
- if (sender && sender->next_send_time_==send_time)
+ if (sender)
+ if ( sender->next_send_time_==send_time ||
+ sender->next_send_time_==TINT_NEVER )
break;
sender = NULL; // it was a stale entry
pop_heap(send_queue.begin(), send_queue.end(), tblater);
}
if (send_time>limit)
send_time = limit;
- if ( sender && send_time <= NOW ) {
+ if ( sender && sender->next_send_time_ <= NOW ) {
dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
tintstr(send_time));
sender->Send();
sender->RequeueSend(sender->cc_->NextSendTime());
pop_heap(send_queue.begin(), send_queue.end(), tblater);
send_queue.pop_back();
- } else {
+ } else if ( send_time > NOW ) {
tint towait = send_time - NOW;
dprintf("%s waiting %lliusec\n",tintstr(),towait);
int rd = Datagram::Wait(socket_count,sockets,towait);
if (rd!=INVALID_SOCKET)
Recv(rd);
+ } else { //if (sender->next_send_time_==TINT_NEVER) {
+ dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
+ delete sender;
+ pop_heap(send_queue.begin(), send_queue.end(), tblater);
+ send_queue.pop_back();
}
} while (Datagram::Time()<limit);