new send control (barely works)
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Thu, 5 Nov 2009 08:55:29 +0000 (08:55 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Thu, 5 Nov 2009 08:55:29 +0000 (08:55 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@498 e16421f0-f15b-0410-abcd-98678b794739

SConstruct
datagram.cpp
exec/leecher.cpp
ext/dummy_controller.cpp [deleted file]
ext/send_control.cpp [new file with mode: 0644]
ext/send_control.h [new file with mode: 0644]
p2tp.cpp
p2tp.h
sendrecv.cpp
tests/connecttest.cpp

index c28c801..62d2347 100644 (file)
@@ -27,7 +27,7 @@ TestDir='tests'
 
 target = 'p2tp'
 source = [ 'bin64.cpp','hashtree.cpp','datagram.cpp','bins.cpp',
-    'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 
+    'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 'ext/send_control.cpp',
     'compat/hirestimeofday.cpp', 'compat/util.cpp']
 
 env = Environment()
index 86f1b6e..50ef4d8 100644 (file)
@@ -26,7 +26,9 @@ uint64_t Datagram::dgrams_up=0, Datagram::dgrams_down=0,
          Datagram::bytes_up=0, Datagram::bytes_down=0;
 
 char* Datagram::TimeStr (tint time) {
-    static char ret_str[128];
+    static char ret_str[32][4]; // wow
+    static int i;
+    i = (i+1) & 3;
     if (time==0)
         time = now;
     time -= epoch;
@@ -39,8 +41,8 @@ char* Datagram::TimeStr (tint time) {
     int msecs = time/TINT_MSEC;
     time %= TINT_MSEC;
     int usecs = time/TINT_uSEC;
-    sprintf(ret_str,"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
-    return ret_str;
+    sprintf(ret_str[i],"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
+    return ret_str[i];
 }
     
 int Datagram::Send () {
index 17d1aa7..a68ebf9 100644 (file)
@@ -3,7 +3,7 @@
  *  p2tp
  *
  *  Created by Victor Grishchenko on 11/3/09.
- *  Copyright 2009 Delft Technical University. All rights reserved.
+ *  Copyright 2009 Delft University of Technology. All rights reserved.
  *
  */
 #include "p2tp.h"
diff --git a/ext/dummy_controller.cpp b/ext/dummy_controller.cpp
deleted file mode 100755 (executable)
index 9eeaeca..0000000
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- *  dummy_controller.cpp
- *  p2tp
- *
- *  Created by Victor Grishchenko on 10/16/09.
- *  Copyright 2009 Delft Technical University. All rights reserved.
- *
- */
-#include "p2tp.h"
-
-using namespace p2tp;
-
-
-/** Congestion window evolution: always 1. */
-struct BasicController : public CongestionController {
-
-    tbqueue         data_out_;
-    tint            dev_avg, rtt_avg, diat_avg;
-    tint            last_send_time, last_recv_time, last_cwnd_mark;
-    int             cwnd, peer_cwnd, cwnd_rcvd;
-    bin64_t         last_recv_bin;
-    
-    BasicController (int chann_id) : CongestionController(chann_id),
-    dev_avg(0), rtt_avg(TINT_SEC), 
-    last_send_time(0), last_recv_time(0), last_cwnd_mark(0),
-    cwnd(1), peer_cwnd(1), cwnd_rcvd(0), diat_avg(TINT_SEC),
-    last_recv_bin(bin64_t::NONE)
-    { }
-    
-    tint    RoundTripTime() {
-        return rtt_avg;
-    }
-    
-    tint    RoundTripTimeoutTime() {
-        return rtt_avg + dev_avg * 8 + TINT_MSEC;
-    }
-    
-    int     in_flight() const {return data_out_.size();}
-    
-    int     PeerBPS() {
-        //return (peer_cwnd<<10) * TINT_SEC / rtt_avg;
-        return TINT_SEC * 1024 / diat_avg;
-    }
-    
-    float PeerCWindow() {
-        return peer_cwnd;
-    }
-
-    /** It is provided with an argument when sending time is not clear from the context. */
-    tint    get_send_time () {
-        tint time = TINT_NEVER;
-        if (cwnd) {
-            // cwnd allows => schedule transmit
-            // otherwise => schedule timeout; may schedule transmit later
-            if (free_cwnd())
-                time = last_send_time + RoundTripTime()/cwnd; // next send
-            else
-                time = last_send_time + RoundTripTimeoutTime(); // timeout
-        } else {
-            time = last_send_time + TINT_SEC*58;
-        }
-        return time;
-    }
-    
-    int     free_cwnd () {
-        tint timeout = Datagram::now - RoundTripTimeoutTime();
-        if (!data_out_.empty() && data_out_.front().time<=timeout) {
-            data_out_.clear();
-            cwnd >>= 1;
-            if (!cwnd)
-                cwnd = 1;
-            dprintf("%s #%i loss cwnd:=%i\n",Datagram::TimeStr(),channel_id,cwnd);
-        }
-        return cwnd - data_out_.size();
-    }
-    
-    tint OnDataSent(bin64_t b) {
-        last_send_time = Datagram::now;
-        if (b==bin64_t::ALL) { // nothing to send, absolutely
-            data_out_.clear();
-            cwnd >>= 1;
-            if (!cwnd)
-                cwnd = 1;
-        } else if (b==bin64_t::NONE) { // sent some metadata
-            cwnd = 1; // no more data => no need for cwnd
-            data_out_.clear();
-            data_out_.push_back(b);
-        } else {
-            data_out_.push_back(b);
-        }
-        dprintf("%s #%i cwnd %i infl %i peer_cwnd %i //%lli\n",
-            Datagram::TimeStr(),channel_id,cwnd,in_flight(),peer_cwnd,
-            (uint64_t)b);
-        return get_send_time();
-    }
-    
-
-    tint OnDataRecvd(bin64_t b) {
-        if (last_recv_bin!=bin64_t::ALL && last_recv_bin!=bin64_t::NONE) {
-            tint diat = Datagram::now - last_recv_time;
-            diat_avg = ( diat_avg*3 + diat ) >> 2;
-        }
-        last_recv_bin = b;
-        last_recv_time = Datagram::now;
-        if (rtt_avg==TINT_SEC && last_send_time) {
-            rtt_avg = Datagram::now - last_send_time;
-            dev_avg = rtt_avg;
-        }
-        if (data_out_.size() && data_out_.front().bin==bin64_t::NONE)
-            data_out_.pop_front();
-        if (b==bin64_t::NONE) {  // pong
-            peer_cwnd = 1;
-            if (!cwnd)
-                cwnd = 1;
-            return Datagram::now;
-        } else if (b==bin64_t::ALL) { // the peer has nothing to send
-            //peer_cwnd = 0;
-            peer_cwnd = 1;
-            return get_send_time();
-        } else {
-            //if (!peer_cwnd)
-            //    peer_cwnd = 1;
-            cwnd_rcvd++;
-            if (last_cwnd_mark+rtt_avg<Datagram::now) {
-                last_cwnd_mark = Datagram::now;
-                peer_cwnd = cwnd_rcvd;
-                cwnd_rcvd = 0;
-                dprintf("%s #%i peer_cwnd %i\n",
-                        Datagram::TimeStr(),channel_id,peer_cwnd);
-            }
-            return Datagram::now; // at least, send an ACK
-        }
-    }
-    
-    tint OnAckRcvd(bin64_t ackd, tint when) { // FIXME::: NO ACKS => NO RTT
-        tbqueue tmp;
-        for (int i=0; data_out_.size() && i<6; i++) {
-            tintbin x = data_out_.front();
-            data_out_.pop_front();
-            if (x.bin.within(ackd)) {
-                // van Jacobson's rtt
-                tint rtt = Datagram::now-x.time;
-                rtt_avg = (rtt_avg*3 + rtt) >> 2;
-                dev_avg = ( dev_avg*3 + abs(rtt-rtt_avg) ) >> 2;
-                dprintf("%s #%i rtt %lli dev %lli\n",
-                        Datagram::TimeStr(),channel_id,rtt_avg,dev_avg);
-                // insert AIMD (2) here
-                cwnd++;
-                break;
-            } else {
-                tmp.push_back(x);
-            }
-        }
-        while (tmp.size()) {
-            data_out_.push_front(tmp.back());
-            tmp.pop_back();
-        }
-
-        return get_send_time();
-    }
-   
-    ~BasicController() {
-    }
-    
-};
-
-
-/*
- /** A packet was sent; in case it had data, b is the bin. *
-void OnDataSent(bin64_t b) {
-    if (b==bin64_t::NONE) {
-        if (free_cwnd()>0) { // nothing to send; suspend
-            cwnd = 0; 
-            in_flight = 0;  
-            set_send_time(last_send_time+KEEPALIVE);
-        } else if (cwnd==0) { // suspended; keepalives only
-            set_send_time(last_send_time+KEEPALIVE);
-        } else { // probably, packet loss => stall
-            tint timeout = last_send_time + rtt_avg + (dev_avg<<2);
-            if (timeout<=Datagram::now) { // loss
-                if (timeout+2*rtt_avg>Datagram::now) {
-                    in_flight = 0;
-                    set_send_time(Datagram::now);
-                } else { // too bad
-                    set_send_time(TINT_NEVER);
-                }
-            } else
-                set_send_time(timeout);
-        }
-    } else { // HANDSHAKE goes here with b==ALL
-        in_flight++;
-        set_send_time(Datagram::now + rtt_avg/cwnd);
-    }
-    last_bin_sent = b;
-    last_send_time = Datagram::now;
-}
-
-void OnDataRecvd(bin64_t b) {
-    last_recv_time = Datagram::now;
-    set_send_time(Datagram::now); // to send a reply if needed
-}
-
-void OnAckRcvd(const tintbin& ack) {
-    last_recv_time = Datagram::now;
-    if (ack==bin64_t::NONE || last_bin_sent!=ack.bin)
-        return;
-    last_bin_sent = bin64_t::NONE;
-    in_flight--;
-    tint nst = last_send_time + ( cwnd  ?  rtt_avg/cwnd : KEEPALIVE );
-    if (nst<Datagram::now) 
-        nst = Datagram::now; // in case we were waiting for free cwnd space
-    set_send_time(nst);
-} // TODO: dont distinguish last send time and last data sent time
-// SOLUTION: once free_cwnd==0 => don't invoke OnDataSent
-// TODO: once it's time, but free_cwnd=0 => need to set timeout
-
- */
diff --git a/ext/send_control.cpp b/ext/send_control.cpp
new file mode 100644 (file)
index 0000000..50d569e
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ *  send_control.cpp
+ *  p2tp
+ *
+ *  Created by Victor Grishchenko on 11/4/09.
+ *  Copyright 2009 Delft University of Technology. All rights reserved.
+ *
+ */
+#include "p2tp.h"
+
+
+using namespace p2tp;
+
+
+void    SendController::Swap (SendController* newctrl) {
+    dprintf("%s #%i sendctrl %s->%s\n",Datagram::TimeStr(),ch_->id,type(),newctrl->type());
+    assert(this==ch_->cc_);
+    ch_->cc_ = newctrl;
+    delete this;
+}
+
+
+bool    PingPongController::MaySendData(){
+    return ch_->data_out_.empty();
+}
+    
+tint    PingPongController::NextSendTime () {
+    return ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_*4;  // remind on timeout
+}
+    
+void    PingPongController::OnDataSent(bin64_t b) {
+    if ( (ch_->last_recv_time_ && ch_->last_recv_time_<Datagram::now-TINT_SEC*3) || //no reply
+         (b==bin64_t::ALL && MaySendData()) ) // nothing to send
+        Swap(new KeepAliveController(this));
+}
+    
+void    PingPongController::OnDataRecvd(bin64_t b) {
+}
+    
+void    PingPongController::OnAckRcvd(bin64_t ackd) {
+    if (ch_->data_out_.empty())
+        Swap(new SlowStartController(this));
+}
+
+
+    
+bool    KeepAliveController::MaySendData() {
+    return true;
+}
+    
+tint    KeepAliveController::NextSendTime () {
+    return ch_->last_send_time_ + TINT_SEC*58;
+}
+    
+void    KeepAliveController::OnDataSent(bin64_t b) {
+    if (b!=bin64_t::ALL)
+        Swap(new PingPongController(this));
+}
+    
+void    KeepAliveController::OnDataRecvd(bin64_t b) {
+}
+    
+void    KeepAliveController::OnAckRcvd(bin64_t ackd) {
+}
+    
+
+
+bool    CwndController::MaySendData() {
+    return ch_->data_out_.size() < cwnd_  &&  Datagram::now >= NextSendTime();
+}
+    
+tint    CwndController::NextSendTime () {
+    if (ch_->data_out_.size() < cwnd_)
+        return ch_->last_send_time_ + ch_->rtt_avg_ / cwnd_;
+    else
+        return ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_ * 4 ;
+}
+    
+void    CwndController::OnDataSent(bin64_t b) {
+    if (b==bin64_t::ALL || b==bin64_t::NONE) {
+        if (MaySendData())
+            Swap(new PingPongController(this));
+    } 
+}
+    
+void    CwndController::OnDataRecvd(bin64_t b) {
+}
+    
+void    CwndController::OnAckRcvd(bin64_t ackd) {
+    if (ackd==bin64_t::NONE) {
+        cwnd_ /= 2;
+    } else {
+        if (cwnd_<1)
+            cwnd_ *= 2;
+        else 
+            cwnd_ += 1/cwnd_;
+    }
+}
+
+
+void SlowStartController::OnAckRcvd (bin64_t pos) {
+    if (pos!=bin64_t::NONE)
+        cwnd_ += 1;
+    else 
+        cwnd_ /= 2;
+}
+    
\ No newline at end of file
diff --git a/ext/send_control.h b/ext/send_control.h
new file mode 100644 (file)
index 0000000..b8d4a77
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ *  send_control.h
+ *  p2tp
+ *
+ *  Created by Victor Grishchenko on 11/4/09.
+ *  Copyright 2009 Delft University of Technology. All rights reserved.
+ *
+ */
+// included into p2tp.h
+#ifndef P2TP_SEND_CONTROL
+#define P2TP_SEND_CONTROL
+
+class Channel;
+
+struct SendController {
+    
+    Channel*    ch_;
+    
+    SendController (Channel* ch) : ch_(ch) {}
+    
+    SendController(SendController* orig) : ch_(orig->ch_) { }
+    
+    void    Swap (SendController* replacement);
+    
+    virtual const char* type() const = 0;
+    
+    virtual bool    MaySendData() = 0;
+    virtual tint    NextSendTime () = 0;
+    
+    /** A datagram was sent to the peer.
+     *  @param data the bin number for the data sent; bin64_t::NONE if only
+                    metadata was sent; bin64_t::ALL if datagram was empty */
+    virtual void    OnDataSent(bin64_t data) = 0;
+    
+    /** A datagram was received from the peer.
+        @param data follows the same conventions as data in OnDataSent() */
+    virtual void    OnDataRecvd(bin64_t data) = 0;
+    
+    /** An acknowledgement on OUR data message was receiveed from the peer.
+        @param ackd bin number for the data sent; bin64_t::NONE if no
+                    acknowledgement was received (timeout event) */
+    virtual void    OnAckRcvd(bin64_t ackd) = 0;
+    
+    virtual         ~SendController() {}
+};
+
+
+struct PingPongController : public SendController {
+    
+    int     fails_;
+
+    PingPongController (SendController* orig) : SendController(orig), fails_(0) {} 
+    PingPongController (Channel* ch) : fails_(0), SendController(ch) {}
+    const char* type() const { return "PingPong"; }
+    bool    MaySendData();
+    tint    NextSendTime ();
+    void    OnDataSent(bin64_t b);
+    void    OnDataRecvd(bin64_t b);
+    void    OnAckRcvd(bin64_t ackd) ;
+    ~PingPongController() {}
+    
+};
+
+
+struct KeepAliveController : public SendController {
+    
+    KeepAliveController(SendController* prev) : SendController(prev){}
+    const char* type() const { return "KeepAlive"; }
+    bool    MaySendData();
+    tint    NextSendTime () ;
+    void    OnDataSent(bin64_t b) ;
+    void    OnDataRecvd(bin64_t b) ;
+    void    OnAckRcvd(bin64_t ackd) ;
+    
+};
+
+
+struct CwndController : public SendController {
+    
+    float   cwnd_;
+    
+    CwndController(SendController* orig, int cwnd=1) :
+    SendController(orig), cwnd_(cwnd) {    }
+    
+    bool    MaySendData() ;
+    tint    NextSendTime () ;
+    void    OnDataSent(bin64_t b) ;
+    void    OnDataRecvd(bin64_t b) ;
+    void    OnAckRcvd(bin64_t ackd) ;
+    
+};
+
+
+struct SlowStartController : public CwndController {
+    
+    SlowStartController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {}
+    const char* type() const { return "SlowStart"; }
+    void    OnAckRcvd(bin64_t ackd) ;
+    
+};
+
+#endif
\ No newline at end of file
index 81fac8d..db8f530 100644 (file)
--- a/p2tp.cpp
+++ b/p2tp.cpp
@@ -31,20 +31,20 @@ int Channel::sockets[8] = {0,0,0,0,0,0,0,0};
 int Channel::socket_count = 0;
 Address Channel::tracker;
 tbqueue Channel::send_queue;
-#include "ext/dummy_controller.cpp"
 #include "ext/simple_selector.cpp"
 PeerSelector* Channel::peer_selector = new SimpleSelector();
 
 Channel::Channel       (FileTransfer* file, int socket, Address peer_addr) :
        file_(file), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
     socket_(socket==-1?sockets[0]:socket), // FIXME
-    own_id_mentioned_(false), next_send_time_(0)
+    own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
+    last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC)
 {
     if (peer_==Address())
         peer_ = tracker;
        this->id = channels.size();
        channels.push_back(this);
-    cc_ = new BasicController(id);
+    cc_ = new PingPongController(this);
     RequeueSend(Datagram::now);
 }
 
diff --git a/p2tp.h b/p2tp.h
index e7cbf76..e7b46e1 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -82,14 +82,14 @@ namespace p2tp {
                P2TP_HANDSHAKE = 0,
                P2TP_DATA = 1,
                P2TP_ACK = 2,
-               P2TP_ACK_TS = 8,
+               P2TP_TS = 8,
                P2TP_HINT = 3,
                P2TP_HASH = 4,
                P2TP_PEX_ADD = 5,
                P2TP_PEX_RM = 6,
                P2TP_MESSAGE_COUNT = 7
        } messageid_t;
-
+    
     class PiecePicker;
     class CongestionController;
     class PeerSelector;
@@ -212,20 +212,9 @@ namespace p2tp {
         friend void    Close (int fd) ;
        };
 
-       struct CongestionController {
-        int channel_id;
-        CongestionController (int chann_id) : channel_id(chann_id) {}
-        virtual int     free_cwnd() = 0;
-        virtual tint    RoundTripTime() = 0;
-        virtual tint    RoundTripTimeoutTime() = 0;
-        virtual int     PeerBPS() = 0;
-        virtual float   PeerCWindow() = 0;
-        virtual tint    OnDataSent(bin64_t b) = 0;
-        virtual tint    OnDataRecvd(bin64_t b) = 0;
-        virtual tint    OnAckRcvd(bin64_t ackd, tint peer_time=0) = 0;
-        //virtual tint    OnHintRecvd (bin64_t hint) = 0;
-               virtual         ~CongestionController() {}
-       };
+    
+#include "ext/send_control.h"
+    
 
     class PiecePicker {
     public:
@@ -234,12 +223,14 @@ namespace p2tp {
         virtual void    Received (bin64_t b) = 0;
     };
 
+    
     class PeerSelector {
     public:
         virtual void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) = 0;
         virtual Datagram::Address GetPeer (const Sha1Hash& for_root) = 0;
     };
 
+    
     class DataStorer {
     public:
         DataStorer (const Sha1Hash& id, size_t size);
@@ -254,8 +245,8 @@ namespace p2tp {
         lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
         (There was a seductive idea to remove channels, just put the root
         hash or a fragment of it into every datagram.) */
-       class Channel {
-       public:
+       struct Channel {  // normally, API users do not deal with the structure
+
                Channel (FileTransfer* file, int socket=-1, Address peer=Address());
                ~Channel();
 
@@ -266,7 +257,7 @@ namespace p2tp {
                void            Send ();
 
                void            OnAck (Datagram& dgram);
-               void            OnAckTs (Datagram& dgram);
+               void            OnTs (Datagram& dgram);
                bin64_t         OnData (Datagram& dgram);
                void            OnHint (Datagram& dgram);
                void            OnHash (Datagram& dgram);
@@ -275,6 +266,7 @@ namespace p2tp {
                void            AddHandshake (Datagram& dgram);
                bin64_t         AddData (Datagram& dgram);
                void            AddAck (Datagram& dgram);
+               void            AddTs (Datagram& dgram);
                void            AddHint (Datagram& dgram);
                void            AddUncleHashes (Datagram& dgram, bin64_t pos);
                void            AddPeakHashes (Datagram& dgram);
@@ -285,14 +277,13 @@ namespace p2tp {
         bool        is_established () { return peer_channel_id_ && own_id_mentioned_; }
         FileTransfer& file() { return *file_; }
         const Address& peer() const { return peer_; }
-
+        
                static int DecodeID(int scrambled);
                static int EncodeID(int unscrambled);
                static Channel* channel(int i) {
             return i<channels.size()?channels[i]:NULL;
         }
 
-       private:
 
                /** Channel id: index in the channel array. */
                uint32_t        id;
@@ -309,6 +300,8 @@ namespace p2tp {
                bins            ack_in_;
                /**     Last data received; needs to be acked immediately. */
                tintbin         data_in_;
+        /** The history of data sent and still unacknowledged. */
+        tbqueue     data_out_;
         /** Index in the history array. */
                bins        ack_out_;
                /**     Transmit schedule: in most cases filled with the peer's hints */
@@ -316,20 +309,28 @@ namespace p2tp {
                /** Hints sent (to detect and reschedule ignored hints). */
                tbqueue         hint_out_;
                /** The congestion control strategy. */
-               CongestionController    *cc_;
+               SendController  *cc_;
         /** Types of messages the peer accepts. */
         uint64_t    cap_in_;
         /** For repeats. */
                //tint          last_send_time, last_recv_time;
         /** PEX progress */
         int         pex_out_;
-
+        /** Smoothed averages for RTT, RTT deviation and data interarrival periods. */
+        tint        rtt_avg_, dev_avg_, dip_avg_;
+        tint        last_send_time_;
+        tint        last_recv_time_;
         tint        next_send_time_;
+        tint        peer_send_time_;
         static      tbqueue send_queue;
-        void        RequeueSend (tint next_time);
         
-        /** Get a rewuest for one packet from the queue of peer's requests. */
+        void        RequeueSend (tint next_time);
+        int         PeerBPS() const {
+            return TINT_SEC / dip_avg_ * 1024;
+        }
+        /** Get a request for one packet from the queue of peer's requests. */
         bin64_t                DequeueHint();
+        void        ClearStaleDataOut ();
         //void        CleanStaleHints();
 
         static PeerSelector* peer_selector;
@@ -350,6 +351,7 @@ namespace p2tp {
         friend int      Open (const char*, const Sha1Hash&) ; // FIXME
         
         friend class FileTransfer; // FIXME!!!
+        friend class SendController; // FIXME!!!
        };
 
 
index e91635c..dc17aa9 100644 (file)
 #include <glog/logging.h>
 #include "p2tp.h"
 
-#include "ext/dummy_controller.cpp"
 
-using namespace std;
 using namespace p2tp;
+using namespace std; // FIXME remove
 
 /*
  TODO  25 Oct 18:55
@@ -97,6 +96,16 @@ void Channel::AddHandshake (Datagram& dgram) {
 }
 
 
+void    Channel::ClearStaleDataOut() {
+    int oldsize = data_out_.size();
+    while ( data_out_.size() && data_out_.front().time < 
+           Datagram::now - rtt_avg_ - dev_avg_*4 )
+        data_out_.pop_front();
+    if (data_out_.size()!=oldsize)
+        cc_->OnAckRcvd(bin64_t::NONE);
+}
+
+
 void   Channel::Send () {
     Datagram dgram(socket_,peer());
     dgram.Push32(peer_channel_id_);
@@ -104,7 +113,8 @@ void        Channel::Send () {
     if ( is_established() ) {
         AddAck(dgram);
         AddHint(dgram);
-        if (cc_->free_cwnd()) 
+        ClearStaleDataOut();
+        if (cc_->MaySendData()) 
             data = AddData(dgram);
         else
             dprintf("%s #%i no cwnd\n",Datagram::TimeStr(),id);
@@ -116,7 +126,9 @@ void        Channel::Send () {
        PCHECK( dgram.Send() != -1 )<<"error sending";
     if (dgram.size()==4) // only the channel id; bare keep-alive
         data = bin64_t::ALL;
-    RequeueSend(cc_->OnDataSent(data));
+    cc_->OnDataSent(data);
+    last_send_time_ = Datagram::now;
+    RequeueSend(cc_->NextSendTime());
 }
 
 
@@ -131,7 +143,7 @@ void        Channel::AddHint (Datagram& dgram) {
     uint64_t hinted = 0;
     for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
         hinted += i->bin.width();
-    int bps = cc_->PeerBPS();
+    int bps = PeerBPS();
     dprintf("%s #%i hinted %lli peer_bps %i\n",Datagram::TimeStr(),id,hinted,bps);
     //float peer_cwnd = cc_->PeerBPS() * cc_->RoundTripTime() / TINT_SEC;
     
@@ -175,18 +187,27 @@ bin64_t           Channel::AddData (Datagram& dgram) {
     dgram.Push32(tosend);
     dgram.Push(buf,r);
     dprintf("%s #%i +data (%lli)\n",Datagram::TimeStr(),id,tosend.base_offset());
+    data_out_.push_back(tosend);
        return tosend;
 }
 
 
+void   Channel::AddTs (Datagram& dgram) {
+    dgram.Push8(P2TP_TS);
+    dgram.Push64(data_in_.time);
+    dprintf("%s #%i +ts %lli\n",Datagram::TimeStr(),id,data_in_.time);
+}
+
+
 void   Channel::AddAck (Datagram& dgram) {
        if (data_in_.bin!=bin64_t::NONE) {
+        AddTs(dgram);
         bin64_t pos = data_in_.bin;
-               dgram.Push8(P2TP_ACK_TS);
+               dgram.Push8(P2TP_ACK);
                dgram.Push32(pos);
-               dgram.Push64(data_in_.time);
+               //dgram.Push64(data_in_.time);
         ack_out_.set(pos);
-        dprintf("%s #%i +ackts (%i,%lli) %s\n",Datagram::TimeStr(),id,
+        dprintf("%s #%i +ack (%i,%lli) %s\n",Datagram::TimeStr(),id,
                 pos.layer(),pos.offset(),Datagram::TimeStr(data_in_.time));
         data_in_ = tintbin(0,bin64_t::NONE);
        }
@@ -212,7 +233,7 @@ void        Channel::Recv (Datagram& dgram) {
                switch (type) {
             case P2TP_HANDSHAKE: OnHandshake(dgram); break;
                        case P2TP_DATA:         data=OnData(dgram); break;
-                       case P2TP_ACK_TS:       OnAckTs(dgram); break;
+                       case P2TP_TS:       OnTs(dgram); break;
                        case P2TP_ACK:          OnAck(dgram); break;
                        case P2TP_HASH:         OnHash(dgram); break;
                        case P2TP_HINT:         OnHint(dgram); break;
@@ -222,7 +243,10 @@ void       Channel::Recv (Datagram& dgram) {
                                return;
                }
        }
-    RequeueSend(cc_->OnDataRecvd(data));
+    cc_->OnDataRecvd(data);
+    last_recv_time_ = Datagram::now;
+    if (data!=bin64_t::ALL)
+        RequeueSend(Datagram::now);
 }
 
 
@@ -243,6 +267,10 @@ bin64_t Channel::OnData (Datagram& dgram) {
     dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset());
     if (ok) {
         data_in_ = tintbin(Datagram::now,pos);
+        if (last_recv_time_) {
+            tint dip = Datagram::now - last_recv_time_;
+            dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
+        }
         return pos;
     } else
         return bin64_t::NONE;
@@ -250,22 +278,40 @@ bin64_t Channel::OnData (Datagram& dgram) {
 
 
 void   Channel::OnAck (Datagram& dgram) {
-       // note: no bound checking
-       bin64_t pos = dgram.Pull32();
-    dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,pos.layer(),pos.offset());
-       ack_in_.set(pos);
-       RequeueSend(cc_->OnAckRcvd(pos,0));
+       bin64_t ackd_pos = dgram.Pull32();
+    if (ackd_pos.base_offset()>file().size())
+        return;
+    dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,ackd_pos.layer(),ackd_pos.offset());
+    for (int i=0; i<8 && i<data_out_.size(); i++) 
+        if (data_out_[i].bin.within(ackd_pos)) {
+            tintbin x = data_out_[i];
+            data_out_[i].bin = bin64_t::ALL;
+            tint rtt = Datagram::now-x.time;
+            rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
+            dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+            dprintf("%s #%i rtt %lli dev %lli\n",
+                    Datagram::TimeStr(),id,rtt_avg_,dev_avg_);
+            cc_->OnAckRcvd(x.bin);
+        }
+    while (data_out_.size() && data_out_.front().bin==bin64_t::ALL)
+        data_out_.pop_front();
+       ack_in_.set(ackd_pos);
 }
 
 
-void   Channel::OnAckTs (Datagram& dgram) {
+/*void Channel::OnAckTs (Datagram& dgram) {  // FIXME:   OnTs
        bin64_t pos = dgram.Pull32();
     tint ts = dgram.Pull64();
     // TODO sanity check
     dprintf("%s #%i -ackts (%i,%lli) %s\n",
             Datagram::TimeStr(),id,pos.layer(),pos.offset(),Datagram::TimeStr(ts));
        ack_in_.set(pos);
-       RequeueSend(cc_->OnAckRcvd(pos,ts));
+       cc_->OnAckRcvd(pos,ts);
+}*/
+
+void Channel::OnTs (Datagram& dgram) {
+    peer_send_time_ = dgram.Pull64();
+    dprintf("%s #%i -ts %lli\n",Datagram::TimeStr(),id,peer_send_time_);
 }
 
 
@@ -278,8 +324,6 @@ void        Channel::OnHint (Datagram& dgram) {
 
 
 void Channel::OnHandshake (Datagram& dgram) {
-    if (!peer_channel_id_)
-        cc_->OnAckRcvd(bin64_t::ALL);
     peer_channel_id_ = dgram.Pull32();
     dprintf("%s #%i -hs %i\n",Datagram::TimeStr(),id,peer_channel_id_);
     // FUTURE: channel forking
index 21ab809..dd9094a 100644 (file)
@@ -55,7 +55,7 @@ TEST(P2TP,CwndTest) {
     unlink("doc/sofi-copy.jpg");
     struct stat st;
        ASSERT_EQ(0,stat("doc/sofi.jpg", &st));
-    int size = st.st_size, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ;
+    int size = st.st_size;//, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ;
     
        /*int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
        int size = 60<<10; //rand()%(1<<19) + (1<<19);