fixed: peer cwnd collapsing
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Tue, 27 Oct 2009 13:35:13 +0000 (13:35 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Tue, 27 Oct 2009 13:35:13 +0000 (13:35 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@484 e16421f0-f15b-0410-abcd-98678b794739

ext/dummy_controller.cpp
p2tp.cpp
p2tp.h
sendrecv.cpp

index 2b04eb7..dff6c1e 100755 (executable)
@@ -14,12 +14,12 @@ using namespace p2tp;
 /** Congestion window evolution: always 1. */
 struct BasicController : public CongestionController {
 
-    bin64_t         last_bin_sent;
+    tbqueue         data_out_;
     tint            dev_avg, rtt_avg;
     tint            last_send_time, last_recv_time, last_cwnd_mark;
     int             cwnd, peer_cwnd, in_flight, cwnd_rcvd;
     
-    BasicController () :
+    BasicController (int chann_id) : CongestionController(chann_id),
     dev_avg(0), rtt_avg(TINT_SEC), 
     last_send_time(0), last_recv_time(0), last_cwnd_mark(0),
     cwnd(1), peer_cwnd(1), in_flight(0), cwnd_rcvd(0)
@@ -30,7 +30,7 @@ struct BasicController : public CongestionController {
     }
     
     tint    RoundTripTimeoutTime() {
-        return rtt_avg + dev_avg * 4;
+        return rtt_avg + dev_avg * 8 + TINT_MSEC;
     }
     
     int     PeerBPS() {
@@ -47,10 +47,10 @@ struct BasicController : public CongestionController {
         if (cwnd) {
             // cwnd allows => schedule transmit
             // otherwise => schedule timeout; may schedule transmit later
-            if (in_flight<cwnd)
-                time = last_send_time + rtt_avg/cwnd; // next send
+            if (free_cwnd())
+                time = last_send_time + RoundTripTime()/cwnd; // next send
             else
-                time = last_send_time + rtt_avg + dev_avg*4; // timeout
+                time = last_send_time + RoundTripTimeoutTime(); // timeout
         } else {
             time = last_send_time + TINT_SEC*58;
         }
@@ -58,47 +58,54 @@ struct BasicController : public CongestionController {
     }
     
     int     free_cwnd () {
-        // check for timeout
-        if (cwnd && cwnd==in_flight && Datagram::now>=last_send_time+RoundTripTimeoutTime()) {
+        tint timeout = Datagram::now - RoundTripTimeoutTime();
+        if (!data_out_.empty() && data_out_.front().time<=timeout) {
+            data_out_.clear();
             cwnd >>= 1;
             if (!cwnd)
                 cwnd = 1;
-            in_flight = 0;
-            last_bin_sent = bin64_t::NONE; // clear history
+            dprintf("%s #%i loss cwnd:=%i\n",Datagram::TimeStr(),channel_id,cwnd);
         }
-        return cwnd - in_flight;
+        return cwnd - data_out_.size();
     }
     
     tint OnDataSent(bin64_t b) {
-        dprintf("%s 1 cwnd %i peer_cwnd %i\n",Datagram::TimeStr(),cwnd,peer_cwnd);
         last_send_time = Datagram::now;
-        last_bin_sent = b;
-        // sync the sending mode with the actual state of data to send
-        if (b==bin64_t::NONE) { // sent some metadata
-            cwnd = 1;
-        } else if (b==bin64_t::ALL) {  // nothing to send, absolutely
+        if (b==bin64_t::ALL) { // nothing to send, absolutely
+            data_out_.clear();
             cwnd = 0;
-        } else { // have data, use cong window sending
-            cwnd = 1;  // TODO AIMD (1) here
-            last_bin_sent = b;
-            in_flight = 1;
+        } else if (b==bin64_t::NONE) { // sent some metadata
+            cwnd = 1; // no more data => no need for cwnd
+            data_out_.push_back(b);
+        } else {
+            data_out_.push_back(b);
         }
-        dprintf("%s 2 cwnd %i peer_cwnd %i\n",Datagram::TimeStr(),cwnd,peer_cwnd);
+        dprintf("%s #%i cwnd %i infl %i peer_cwnd %i\n",
+                Datagram::TimeStr(),channel_id,cwnd,in_flight,peer_cwnd);
         return get_send_time();
     }
+    
+    /*tint OnHintRecvd (bin64_t hint) {
+        if (!cwnd) {
+            cwnd = 1;
+        }
+        return get_send_time();
+    }*/
 
     tint OnDataRecvd(bin64_t b) {
+        if (data_out_.size() && data_out_.front().bin==bin64_t::NONE)
+            data_out_.pop_front();
         if (b==bin64_t::NONE) {  // pong
             peer_cwnd = 1;
+            if (!cwnd)
+                cwnd = 1;
             return Datagram::now;
         } else if (b==bin64_t::ALL) { // the peer has nothing to send
-            peer_cwnd = 0;
-            if (cwnd==1) { // it was an implicit ACK, keep sending
-                in_flight = 0;
-                return Datagram::now;
-            } else 
-                return get_send_time();
+            //peer_cwnd = 0;
+            return get_send_time();
         } else {
+            //if (!peer_cwnd)
+            //    peer_cwnd = 1;
             cwnd_rcvd++;
             if (last_cwnd_mark+rtt_avg<Datagram::now) {
                 last_cwnd_mark = Datagram::now;
@@ -110,15 +117,32 @@ struct BasicController : public CongestionController {
     }
     
     tint OnAckRcvd(bin64_t ackd, tint when) {
-        if (ackd==last_bin_sent) { // calc cwnd/free
-            // van Jacobson's rtt
-            tint rtt = Datagram::now-last_send_time;
-            rtt_avg = (rtt_avg*7 + rtt) >> 3;
-            dev_avg = ( dev_avg*7 + abs(rtt-rtt_avg) ) >> 3;
-            last_bin_sent = bin64_t::NONE;
-            in_flight = 0;
-            // insert AIMD (2) here
+        tbqueue tmp;
+        for (int i=0; data_out_.size() && i<6; i++) {
+            tintbin x = data_out_.front();
+            data_out_.pop_front();
+            if (x.bin==ackd) {
+                // van Jacobson's rtt
+                tint rtt = Datagram::now-x.time;
+                if (rtt_avg==TINT_SEC) {
+                    rtt_avg = rtt;
+                } else {
+                    rtt_avg = (rtt_avg*3 + rtt) >> 2;
+                    dev_avg = ( dev_avg*3 + abs(rtt-rtt_avg) ) >> 2;
+                }
+                dprintf("%s #%i rtt %lli dev %lli\n",
+                        Datagram::TimeStr(),channel_id,rtt_avg,dev_avg);
+                // insert AIMD (2) here
+                break;
+            } else {
+                tmp.push_back(x);
+            }
+        }
+        while (tmp.size()) {
+            data_out_.push_front(tmp.back());
+            tmp.pop_back();
         }
+
         return get_send_time();
     }
    
index fedd68f..e2c59fe 100644 (file)
--- a/p2tp.cpp
+++ b/p2tp.cpp
@@ -44,7 +44,7 @@ Channel::Channel      (FileTransfer* file, int socket, Address peer_addr) :
         peer_ = tracker;
        this->id = channels.size();
        channels.push_back(this);
-    cc_ = new BasicController();
+    cc_ = new BasicController(id);
     RequeueSend(Datagram::now);
 }
 
diff --git a/p2tp.h b/p2tp.h
index 9b4ecb2..7f3ac0f 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -211,7 +211,8 @@ namespace p2tp {
        };
 
        struct CongestionController {
-        CongestionController () {}
+        int channel_id;
+        CongestionController (int chann_id) : channel_id(chann_id) {}
         virtual int     free_cwnd() = 0;
         virtual tint    RoundTripTime() = 0;
         virtual tint    RoundTripTimeoutTime() = 0;
@@ -220,6 +221,7 @@ namespace p2tp {
         virtual tint    OnDataSent(bin64_t b) = 0;
         virtual tint    OnDataRecvd(bin64_t b) = 0;
         virtual tint    OnAckRcvd(bin64_t ackd, tint peer_time=0) = 0;
+        //virtual tint    OnHintRecvd (bin64_t hint) = 0;
                virtual         ~CongestionController() {}
        };
 
index ffd3667..bda8461 100644 (file)
@@ -59,6 +59,7 @@ bin64_t               Channel::DequeueHint () { // TODO: resilience
         hint_in_.pop_front();
         send = file().ack_out().find_filtered
             (ack_in_,hint,0,bins::FILLED);
+        dprintf("%s #%i may_send %lli\n",Datagram::TimeStr(),id,send.base_offset());
         if (send!=bin64_t::NONE)
             while (send!=hint) {
                 hint = hint.towards(send);
@@ -104,6 +105,8 @@ void        Channel::Send () {
         AddHint(dgram);
         if (cc_->free_cwnd()) 
             data = AddData(dgram);
+        else
+            dprintf("%s #%i no cwnd\n",Datagram::TimeStr(),id);
     } else {
         AddHandshake(dgram);
         AddAck(dgram);
@@ -276,6 +279,7 @@ void        Channel::OnAckTs (Datagram& dgram) {
 void   Channel::OnHint (Datagram& dgram) {
        bin64_t hint = dgram.Pull32();
        hint_in_.push_back(hint);
+    //RequeueSend(cc_->OnHintRecvd(hint));
     dprintf("%s #%i -hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset());
 }