From 7d22b720ea726bf2bd1fe6a527d82b57b4fb9ba4 Mon Sep 17 00:00:00 2001 From: victor Date: Tue, 27 Oct 2009 13:35:13 +0000 Subject: [PATCH] fixed: peer cwnd collapsing git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@484 e16421f0-f15b-0410-abcd-98678b794739 --- ext/dummy_controller.cpp | 96 +++++++++++++++++++++++++--------------- p2tp.cpp | 2 +- p2tp.h | 4 +- sendrecv.cpp | 4 ++ 4 files changed, 68 insertions(+), 38 deletions(-) diff --git a/ext/dummy_controller.cpp b/ext/dummy_controller.cpp index 2b04eb7..dff6c1e 100755 --- a/ext/dummy_controller.cpp +++ b/ext/dummy_controller.cpp @@ -14,12 +14,12 @@ using namespace p2tp; /** Congestion window evolution: always 1. */ struct BasicController : public CongestionController { - bin64_t last_bin_sent; + tbqueue data_out_; tint dev_avg, rtt_avg; tint last_send_time, last_recv_time, last_cwnd_mark; int cwnd, peer_cwnd, in_flight, cwnd_rcvd; - BasicController () : + BasicController (int chann_id) : CongestionController(chann_id), dev_avg(0), rtt_avg(TINT_SEC), last_send_time(0), last_recv_time(0), last_cwnd_mark(0), cwnd(1), peer_cwnd(1), in_flight(0), cwnd_rcvd(0) @@ -30,7 +30,7 @@ struct BasicController : public CongestionController { } tint RoundTripTimeoutTime() { - return rtt_avg + dev_avg * 4; + return rtt_avg + dev_avg * 8 + TINT_MSEC; } int PeerBPS() { @@ -47,10 +47,10 @@ struct BasicController : public CongestionController { if (cwnd) { // cwnd allows => schedule transmit // otherwise => schedule timeout; may schedule transmit later - if (in_flight=last_send_time+RoundTripTimeoutTime()) { + tint timeout = Datagram::now - RoundTripTimeoutTime(); + if (!data_out_.empty() && data_out_.front().time<=timeout) { + data_out_.clear(); cwnd >>= 1; if (!cwnd) cwnd = 1; - in_flight = 0; - last_bin_sent = bin64_t::NONE; // clear history + dprintf("%s #%i loss cwnd:=%i\n",Datagram::TimeStr(),channel_id,cwnd); } - return cwnd - in_flight; + return cwnd - data_out_.size(); } tint OnDataSent(bin64_t b) { - dprintf("%s 1 cwnd %i peer_cwnd %i\n",Datagram::TimeStr(),cwnd,peer_cwnd); last_send_time = Datagram::now; - last_bin_sent = b; - // sync the sending mode with the actual state of data to send - if (b==bin64_t::NONE) { // sent some metadata - cwnd = 1; - } else if (b==bin64_t::ALL) { // nothing to send, absolutely + if (b==bin64_t::ALL) { // nothing to send, absolutely + data_out_.clear(); cwnd = 0; - } else { // have data, use cong window sending - cwnd = 1; // TODO AIMD (1) here - last_bin_sent = b; - in_flight = 1; + } else if (b==bin64_t::NONE) { // sent some metadata + cwnd = 1; // no more data => no need for cwnd + data_out_.push_back(b); + } else { + data_out_.push_back(b); } - dprintf("%s 2 cwnd %i peer_cwnd %i\n",Datagram::TimeStr(),cwnd,peer_cwnd); + dprintf("%s #%i cwnd %i infl %i peer_cwnd %i\n", + Datagram::TimeStr(),channel_id,cwnd,in_flight,peer_cwnd); return get_send_time(); } + + /*tint OnHintRecvd (bin64_t hint) { + if (!cwnd) { + cwnd = 1; + } + return get_send_time(); + }*/ tint OnDataRecvd(bin64_t b) { + if (data_out_.size() && data_out_.front().bin==bin64_t::NONE) + data_out_.pop_front(); if (b==bin64_t::NONE) { // pong peer_cwnd = 1; + if (!cwnd) + cwnd = 1; return Datagram::now; } else if (b==bin64_t::ALL) { // the peer has nothing to send - peer_cwnd = 0; - if (cwnd==1) { // it was an implicit ACK, keep sending - in_flight = 0; - return Datagram::now; - } else - return get_send_time(); + //peer_cwnd = 0; + return get_send_time(); } else { + //if (!peer_cwnd) + // peer_cwnd = 1; cwnd_rcvd++; if (last_cwnd_mark+rtt_avg> 3; - dev_avg = ( dev_avg*7 + abs(rtt-rtt_avg) ) >> 3; - last_bin_sent = bin64_t::NONE; - in_flight = 0; - // insert AIMD (2) here + tbqueue tmp; + for (int i=0; data_out_.size() && i<6; i++) { + tintbin x = data_out_.front(); + data_out_.pop_front(); + if (x.bin==ackd) { + // van Jacobson's rtt + tint rtt = Datagram::now-x.time; + if (rtt_avg==TINT_SEC) { + rtt_avg = rtt; + } else { + rtt_avg = (rtt_avg*3 + rtt) >> 2; + dev_avg = ( dev_avg*3 + abs(rtt-rtt_avg) ) >> 2; + } + dprintf("%s #%i rtt %lli dev %lli\n", + Datagram::TimeStr(),channel_id,rtt_avg,dev_avg); + // insert AIMD (2) here + break; + } else { + tmp.push_back(x); + } + } + while (tmp.size()) { + data_out_.push_front(tmp.back()); + tmp.pop_back(); } + return get_send_time(); } diff --git a/p2tp.cpp b/p2tp.cpp index fedd68f..e2c59fe 100644 --- a/p2tp.cpp +++ b/p2tp.cpp @@ -44,7 +44,7 @@ Channel::Channel (FileTransfer* file, int socket, Address peer_addr) : peer_ = tracker; this->id = channels.size(); channels.push_back(this); - cc_ = new BasicController(); + cc_ = new BasicController(id); RequeueSend(Datagram::now); } diff --git a/p2tp.h b/p2tp.h index 9b4ecb2..7f3ac0f 100644 --- a/p2tp.h +++ b/p2tp.h @@ -211,7 +211,8 @@ namespace p2tp { }; struct CongestionController { - CongestionController () {} + int channel_id; + CongestionController (int chann_id) : channel_id(chann_id) {} virtual int free_cwnd() = 0; virtual tint RoundTripTime() = 0; virtual tint RoundTripTimeoutTime() = 0; @@ -220,6 +221,7 @@ namespace p2tp { virtual tint OnDataSent(bin64_t b) = 0; virtual tint OnDataRecvd(bin64_t b) = 0; virtual tint OnAckRcvd(bin64_t ackd, tint peer_time=0) = 0; + //virtual tint OnHintRecvd (bin64_t hint) = 0; virtual ~CongestionController() {} }; diff --git a/sendrecv.cpp b/sendrecv.cpp index ffd3667..bda8461 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -59,6 +59,7 @@ bin64_t Channel::DequeueHint () { // TODO: resilience hint_in_.pop_front(); send = file().ack_out().find_filtered (ack_in_,hint,0,bins::FILLED); + dprintf("%s #%i may_send %lli\n",Datagram::TimeStr(),id,send.base_offset()); if (send!=bin64_t::NONE) while (send!=hint) { hint = hint.towards(send); @@ -104,6 +105,8 @@ void Channel::Send () { AddHint(dgram); if (cc_->free_cwnd()) data = AddData(dgram); + else + dprintf("%s #%i no cwnd\n",Datagram::TimeStr(),id); } else { AddHandshake(dgram); AddAck(dgram); @@ -276,6 +279,7 @@ void Channel::OnAckTs (Datagram& dgram) { void Channel::OnHint (Datagram& dgram) { bin64_t hint = dgram.Pull32(); hint_in_.push_back(hint); + //RequeueSend(cc_->OnHintRecvd(hint)); dprintf("%s #%i -hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset()); } -- 2.20.1