refactored; barely works
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Fri, 11 Dec 2009 16:06:08 +0000 (16:06 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Fri, 11 Dec 2009 16:06:08 +0000 (16:06 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@734 e16421f0-f15b-0410-abcd-98678b794739

12 files changed:
BUGS
SConstruct
compat.cpp
ext/send_control.cpp [deleted file]
ext/send_control.h [deleted file]
ext/seq_picker.cpp
p2tp.cpp
p2tp.h
send_control.cpp [new file with mode: 0644]
sendrecv.cpp
tests/bin64test.cpp
transfer.cpp

diff --git a/BUGS b/BUGS
index 280fd6b..373b47e 100644 (file)
--- a/BUGS
+++ b/BUGS
     v peers don't cooperate
     * RecoverProgress fails sometime
     v leecher can't see file is done already
-    * why leecher waits 1sec?
+    v why leecher waits 1sec?
     * hint queue buildup
     * file operations are not 64-bit ready
         http://mail.python.org/pipermail/patches/2000-June/000848.html
     * recovery: last packet
-    * no-HINT sending to a dead peer
+    v no-HINT sending to a dead peer
     * what if rtt>1sec
-    * unHINTed repeated sending
-    * 1259859412.out#8,9 connection breaks, #8 rtt 1000, #9 hint - 
+    v unHINTed repeated sending
+    v 1259859412.out#8,9 connection breaks, #8 rtt 1000, #9 hint - 
      mudachestvo, cwnd => send int 0.5sec
      0_11_10_075_698 #9 sendctrl may send 0 < 0.000000 & 1732919509_-49_-45_-200_-111 (rtt 59661)
      0_11_10_075_698 #9 +data (0,194)
@@ -37,3 +37,8 @@
      0_11_10_575_703 #9 Tdata (0,194)
      0_11_10_575_703 #9 sendctrl may send 0 < 0.000000 & 1732919509_-49_-44_-700_-110 (rtt 59661)
     * complete peer reconnects 1259967418.out.gz
+    * underhinting causes repetition causes interarr underest causes underhinting
+    * misterious initiating handshake bursts
+    v whether sending is limited by cwnd or app
+    * actually: whether packets are ACKed faster than sent
+    * uproot DATA NONE: complicates and deceives
index 2cb36f7..0764672 100644 (file)
@@ -21,7 +21,7 @@ TestDir='tests'
 
 target = 'p2tp'
 source = [ 'bin64.cpp','sha1.cpp','hashtree.cpp','datagram.cpp','bins.cpp',
-    'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 'ext/send_control.cpp',
+    'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 'send_control.cpp',
     'compat/hirestimeofday.cpp', 'compat.cpp', 'compat/util.cpp']
 
 env = Environment()
index f269da2..0945744 100644 (file)
@@ -54,7 +54,7 @@ void print_error(const char* msg) {
 #ifdef _WIN32
     int e = WSAGetLastError();
     if (e)
-        fprintf(stderr,"network error #%i\n",e);
+        fprintf(stderr,"network error #%u\n",e);
 #endif
 }
 
diff --git a/ext/send_control.cpp b/ext/send_control.cpp
deleted file mode 100644 (file)
index c2c1cad..0000000
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- *  send_control.cpp
- *  p2tp
- *
- *  Created by Victor Grishchenko on 11/4/09.
- *  Copyright 2009 Delft University of Technology. All rights reserved.
- *
- */
-#include "p2tp.h"
-#ifdef _MSC_VER
-    // To avoid complaints about std::max. Appears to work in VS2008
-    #undef min
-    #undef max
-#endif
-
-using namespace p2tp;
-
-
-void    SendController::Swap (SendController* newctrl) {
-    dprintf("%s #%i sendctrl %s->%s\n",tintstr(),ch_->id,type(),newctrl->type());
-    assert(this==ch_->cc_);
-    ch_->cc_ = newctrl;
-    delete this;
-}
-
-
-void    SendController::Schedule (tint next_time) {
-    ch_->Schedule(next_time);
-}
-
-bool    PingPongController::MaySendData() {
-    return ch_->data_out_.empty();
-}
-
-void    PingPongController::OnDataSent(bin64_t b) {
-    Schedule(NOW+ch_->rtt_avg_+std::max(ch_->dev_avg_*4,500*TINT_MSEC));
-    if (++sent_>=10 || ++unanswered_>=3)
-        Swap(new KeepAliveController(this));
-}
-
-void    PingPongController::OnDataRecvd(bin64_t b) {
-    unanswered_ = 0;
-    Schedule(NOW); // pong
-}
-
-void    PingPongController::OnAckRcvd(bin64_t ackd) {
-    if (ackd!=bin64_t::NONE) {
-        Schedule(NOW);
-        Swap(new SlowStartController(this));
-    }
-}
-
-
-KeepAliveController::KeepAliveController (Channel* ch) : 
-SendController(ch), delay_(ch->rtt_avg_) {
-}
-
-
-KeepAliveController::KeepAliveController(SendController* prev, tint delay) :
-SendController(prev), delay_(delay) {
-    ch_->dev_avg_ = TINT_SEC; // without constant active measurement, rtt is unreliable
-    delay_=ch_->rtt_avg_;
-}
-
-bool    KeepAliveController::MaySendData() {
-    return true;
-}
-
-
-void    KeepAliveController::OnDataSent(bin64_t b) {
-    if (b==bin64_t::ALL || b==bin64_t::NONE) {
-        if (delay_>TINT_SEC*58) // keep NAT mappings alive
-            delay_ = TINT_SEC*58;
-        if (delay_>=4*TINT_SEC && ch_->last_recv_time_ < NOW-TINT_MIN)
-            Schedule(TINT_NEVER); // no response; enter close timeout
-        else
-            Schedule(NOW+delay_); // all right, just keep it alive
-        delay_ = delay_ * 2; // backing off
-    } else {
-        Schedule(NOW+ch_->rtt_avg_); // cwnd==1 => next send in 1 rtt
-        Swap(new SlowStartController(this));
-    }
-}
-
-void    KeepAliveController::OnDataRecvd(bin64_t b) {
-    if (b!=bin64_t::NONE && b!=bin64_t::ALL) { // channel is alive
-        delay_ = ch_->rtt_avg_;
-        Schedule(NOW); // schedule an ACK; TODO: aggregate
-    }
-}
-
-void    KeepAliveController::OnAckRcvd(bin64_t ackd) {
-    // probably to something sent by CwndControllers before this one got installed
-}
-
-
-CwndController::CwndController(SendController* orig, int cwnd) :
-SendController(orig), cwnd_(cwnd), last_change_(0) {
-}
-
-bool    CwndController::MaySendData() {
-    tint spacing = ch_->rtt_avg_ / cwnd_;
-    dprintf("%s #%i sendctrl may send %i < %f & %s (rtt %lli)\n",tintstr(),
-            ch_->id,(int)ch_->data_out_.size(),cwnd_,
-            tintstr(ch_->last_send_data_time_+spacing), ch_->rtt_avg_);
-    return  ch_->data_out_.empty() ||
-            (ch_->data_out_.size() < cwnd_  &&  NOW-ch_->last_send_data_time_ >= spacing);
-}
-
-
-void    CwndController::OnDataSent(bin64_t b) {
-    if ( (b==bin64_t::ALL || b==bin64_t::NONE) && MaySendData() ) {// no more data (no hints?)
-        Schedule(NOW+ch_->rtt_avg_); // soft pause; nothing to send yet
-        if (ch_->last_send_data_time_ < NOW-ch_->rtt_avg_)
-            Swap(new KeepAliveController(this)); // really, nothing to send
-    } else { // FIXME: mandatory rescheduling after send/recv; based on state
-        tint spacing = ch_->rtt_avg_ / cwnd_;
-        if (ch_->data_out_.size() < cwnd_) { // have cwnd; not the right time yet
-            Schedule(ch_->last_send_data_time_+spacing);
-        } else {    // no free cwnd
-            tint timeout = std::max( ch_->rtt_avg_+ch_->dev_avg_*4, 500*TINT_MSEC );
-            assert(!ch_->data_out_.empty());
-            Schedule(ch_->data_out_.front().time+timeout); // wait for ACK or timeout
-        }
-    }
-}
-
-
-void    CwndController::OnDataRecvd(bin64_t b) {
-    if (b!=bin64_t::NONE && b!=bin64_t::ALL) {
-        Schedule(NOW); // send ACK; todo: aggregate ACKs
-    }
-}
-
-void    CwndController::OnAckRcvd(bin64_t ackd) {
-    if (ackd==bin64_t::NONE) {
-        dprintf("%s #%i sendctrl loss detected\n",tintstr(),ch_->id);
-        if (NOW>last_change_+ch_->rtt_avg_) {
-            cwnd_ /= 2;
-            last_change_ = NOW;
-        }
-    } else {
-        if (cwnd_<1)
-            cwnd_ *= 2;
-        else
-            cwnd_ += 1.0/cwnd_;
-        dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_);
-    }
-    tint spacing = ch_->rtt_avg_ / cwnd_;
-    Schedule(ch_->last_send_time_+spacing);
-}
-
-
-void SlowStartController::OnAckRcvd (bin64_t pos) {
-    if (pos!=bin64_t::NONE) {
-        cwnd_ += 1;
-        if (TINT_SEC*cwnd_/ch_->rtt_avg_>=10) {
-            Schedule(NOW);
-            Swap(new AIMDController(this,cwnd_));
-        }
-    } else
-        cwnd_ /= 2;
-}
-
diff --git a/ext/send_control.h b/ext/send_control.h
deleted file mode 100644 (file)
index e4887e0..0000000
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *  send_control.h
- *  p2tp
- *
- *  Created by Victor Grishchenko on 11/4/09.
- *  Copyright 2009 Delft University of Technology. All rights reserved.
- *
- */
-// included into p2tp.h
-#ifndef P2TP_SEND_CONTROL
-#define P2TP_SEND_CONTROL
-
-class Channel;
-
-struct SendController {
-    
-    Channel*    ch_;
-    
-    SendController (Channel* ch) : ch_(ch) {}
-    
-    SendController(SendController* orig) : ch_(orig->ch_) { }
-    
-    void    Swap (SendController* replacement);
-    void    Schedule (tint time);
-    
-    virtual const char* type() const = 0;
-    
-    virtual bool    MaySendData() = 0;
-    
-    /** A datagram was sent to the peer.
-     *  @param data the bin number for the data sent; bin64_t::NONE if only
-                    metadata was sent; bin64_t::ALL if datagram was empty */
-    virtual void    OnDataSent(bin64_t data) = 0;
-    
-    /** A datagram was received from the peer.
-        @param data follows the same conventions as data in OnDataSent() */
-    virtual void    OnDataRecvd(bin64_t data) = 0;
-    
-    /** An acknowledgement on OUR data message was receiveed from the peer.
-        @param ackd bin number for the data sent; bin64_t::NONE if no
-                    acknowledgement was received (timeout event) */
-    virtual void    OnAckRcvd(bin64_t ackd) = 0;
-    
-    virtual         ~SendController() {}
-};
-
-struct PingPongController : public SendController {
-    
-    int     sent_, unanswered_;
-    
-    PingPongController (SendController* orig) :
-        SendController(orig), sent_(0), unanswered_(0) {} 
-    PingPongController (Channel* ch) : 
-        unanswered_(0), sent_(0), SendController(ch) {}
-    const char* type() const { return "PingPong"; }
-    bool    MaySendData();
-    void    OnDataSent(bin64_t b);
-    void    OnDataRecvd(bin64_t b);
-    void    OnAckRcvd(bin64_t ackd) ;
-    ~PingPongController() {}
-    
-};
-/** Mission of the keepalive controller to keep the channel
-    alive as no data sending happens; If no data is transmitted
-    in either direction, inter-packet times grow exponentially
-    till 58 sec, which refresh period is deemed necessary to keep
-    NAT mappings alive. */
-struct KeepAliveController : public SendController {
-
-    tint delay_;
-    
-    KeepAliveController (Channel* ch);
-    KeepAliveController(SendController* prev, tint delay=0) ;
-    const char* type() const { return "KeepAlive"; }
-    bool    MaySendData();
-    void    OnDataSent(bin64_t b) ;
-    void    OnDataRecvd(bin64_t b) ;
-    void    OnAckRcvd(bin64_t ackd) ;
-    
-};
-
-
-/** Base class for any congestion window based algorithm. */
-struct CwndController : public SendController {
-    
-    double  cwnd_;
-    tint    last_change_;
-    
-    CwndController(SendController* orig, int cwnd=1) ;
-    
-    bool    MaySendData() ;
-    void    OnDataSent(bin64_t b) ;
-    void    OnDataRecvd(bin64_t b) ;
-    void    OnAckRcvd(bin64_t ackd) ;
-    
-};
-
-
-/** TCP-like exponential "slow" start algorithm. */
-struct SlowStartController : public CwndController {
-    
-    SlowStartController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {}
-    const char* type() const { return "SlowStart"; }
-    void    OnAckRcvd(bin64_t ackd) ;
-    
-};
-
-
-/** The classics: additive increase - multiplicative decrease algorithm.
-    A naive version of "standard" TCP congestion control. Potentially useful
-    for seedboxes, so needs to be improved. (QUBIC?) */
-struct AIMDController : public CwndController {
-    
-    AIMDController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {}
-    const char* type() const { return "AIMD"; }
-    //void    OnAckRcvd(bin64_t ackd) ;
-    
-};
-
-
-#endif
index 91222fe..2a2290d 100644 (file)
@@ -34,7 +34,7 @@ public:
     }
     
     virtual bin64_t Pick (bins& offer, uint64_t max_width, tint expires) {
-        while (hint_out_.size() && hint_out_.front().time<NOW-TINT_SEC*3/2) {
+        while (hint_out_.size() && hint_out_.front().time<NOW-TINT_SEC*3/2) { // FIXME sec
             ack_hint_out_.copy_range(file().ack_out(), hint_out_.front().bin);
             hint_out_.pop_front();
         }
index 2de79ba..66984fd 100644 (file)
--- a/p2tp.cpp
+++ b/p2tp.cpp
@@ -40,24 +40,30 @@ PeerSelector* Channel::peer_selector = new SimpleSelector();
 Channel::Channel    (FileTransfer* transfer, int socket, Address peer_addr) :
     transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
     socket_(socket==-1?sockets[0]:socket), // FIXME
-    data_out_cap_(bin64_t::ALL), last_send_data_time_(0), last_recv_data_time_(0),
+    data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0),
     own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
     last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
-    data_in_dbl_(bin64_t::NONE), hint_out_size_(0)
+    data_in_dbl_(bin64_t::NONE), hint_out_size_(0),
+    cwnd_(1), send_interval_(TINT_SEC), send_control_(PING_PONG_CONTROL),
+    sent_since_recv_(0), ack_rcvd_recent_(0), ack_not_rcvd_recent_(0),
+    last_loss_time_(0), owd_min_bin_(0), owd_min_bin_start_(NOW), owd_cur_bin_(0)
 {
     if (peer_==Address())
         peer_ = tracker;
     this->id = channels.size();
     channels.push_back(this);
-    cc_ = new PingPongController(this);
-    dprintf("%s #%i init %s\n",tintstr(),id,peer_.str());
-    Schedule(NOW); // FIXME ugly
+    transfer_->hs_in_.push_back(id);
+    for(int i=0; i<4; i++) {
+        owd_min_bins_[i] = TINT_NEVER;
+        owd_current_[i] = TINT_NEVER;
+    }
+    Reschedule();
+    dprintf("%s #%u init %s\n",tintstr(),id,peer_.str());
 }
 
 
 Channel::~Channel () {
     channels[id] = NULL;
-    delete cc_;
 }
 
 
diff --git a/p2tp.h b/p2tp.h
index 0a872f5..8c51f3a 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -220,8 +220,17 @@ namespace p2tp {
 
         Channel    (FileTransfer* file, int socket=-1, Address peer=Address());
         ~Channel();
-
-        static void    RecvDatagram (int socket);
+        
+        typedef enum {
+            KEEP_ALIVE_CONTROL,
+            PING_PONG_CONTROL,
+            SLOW_START_CONTROL,
+            AIMD_CONTROL,
+            LEDBAT_CONTROL
+        } send_control_t;
+
+        static Channel*
+                    RecvDatagram (int socket);
         static void Loop (tint till);
 
         void        Recv (Datagram& dgram);
@@ -229,13 +238,13 @@ namespace p2tp {
 
         void        OnAck (Datagram& dgram);
         void        OnTs (Datagram& dgram);
-        bin64_t        OnData (Datagram& dgram);
+        bin64_t     OnData (Datagram& dgram);
         void        OnHint (Datagram& dgram);
         void        OnHash (Datagram& dgram);
         void        OnPex (Datagram& dgram);
         void        OnHandshake (Datagram& dgram);
         void        AddHandshake (Datagram& dgram);
-        bin64_t        AddData (Datagram& dgram);
+        bin64_t     AddData (Datagram& dgram);
         void        AddAck (Datagram& dgram);
         void        AddTs (Datagram& dgram);
         void        AddHint (Datagram& dgram);
@@ -243,13 +252,34 @@ namespace p2tp {
         void        AddPeakHashes (Datagram& dgram);
         void        AddPex (Datagram& dgram);
 
+        void        BackOffOnLosses ();
+        tint        SwitchSendControl (int control_mode);
+        tint        NextSendTime ();
+        tint        KeepAliveNextSendTime ();
+        tint        PingPongNextSendTime ();
+        tint        CwndRateNextSendTime ();
+        tint        SlowStartNextSendTime ();
+        tint        AimdNextSendTime ();
+        tint        LedbatNextSendTime ();
+        
+        static int  MAX_REORDERING;
+        static tint TIMEOUT;
+        static tint MIN_DEV;
+        static tint MAX_SEND_INTERVAL;
+        static tint LEDBAT_TARGET;
+        static float LEDBAT_GAIN;
+        static tint LEDBAT_DELAY_BIN;
+        
         const std::string id_string () const;
         /** A channel is "established" if had already sent and received packets. */
         bool        is_established () { return peer_channel_id_ && own_id_mentioned_; }
         FileTransfer& transfer() { return *transfer_; }
         HashTree&   file () { return transfer_->file(); }
         const Address& peer() const { return peer_; }
-
+        tint ack_timeout () {
+            return rtt_avg_ + std::max(dev_avg_,MIN_DEV)*4;
+        }
+        
         static int DecodeID(int scrambled);
         static int EncodeID(int unscrambled);
         static Channel* channel(int i) {
@@ -283,8 +313,6 @@ namespace p2tp {
         /** Hints sent (to detect and reschedule ignored hints). */
         tbqueue     hint_out_;
         uint64_t    hint_out_size_;
-        /** The congestion control strategy. */
-        SendController    *cc_;
         /** Types of messages the peer accepts. */
         uint64_t    cap_in_;
         /** For repeats. */
@@ -295,29 +323,46 @@ namespace p2tp {
         tint        rtt_avg_, dev_avg_, dip_avg_;
         tint        last_send_time_;
         tint        last_recv_time_;
-        tint        last_send_data_time_;
-        tint        last_recv_data_time_;
+        tint        last_data_out_time_;
+        tint        last_data_in_time_;
+        tint        last_loss_time_;
         tint        next_send_time_;
         tint        peer_send_time_;
-        static      tbheap send_queue;
+        /** Congestion window; TODO: int, bytes. */
+        float       cwnd_;
+        /** Data sending interval. */
+        tint        send_interval_;
+        /** The congestion control strategy. */
+        int         send_control_;
+        /** Datagrams (not data) sent since last recv.    */
+        int         sent_since_recv_;
+        /** Recent acknowlegements for data previously sent.    */
+        int         ack_rcvd_recent_;
+        /** Recent non-acknowlegements (losses) of data previously sent.    */
+        int         ack_not_rcvd_recent_;
+        /** LEDBAT one-way delay machinery */
+        tint        owd_min_bins_[4];
+        int         owd_min_bin_;
+        tint        owd_min_bin_start_;
+        tint        owd_current_[4];
+        int         owd_cur_bin_;
 
         int         PeerBPS() const {
             return TINT_SEC / dip_avg_ * 1024;
         }
         /** Get a request for one packet from the queue of peer's requests. */
-        bin64_t        DequeueHint();
+        bin64_t     DequeueHint();
         void        CleanDataOut (bin64_t acks_pos=bin64_t::NONE);
         void        CleanStaleHintOut();
         void        CleanHintOut(bin64_t pos);
-        void        Schedule(tint send_time);
+        void        Reschedule();
 
         static PeerSelector* peer_selector;
 
-        static int      MAX_REORDERING;
-        static tint     TIMEOUT;
         static SOCKET   sockets[8];
         static int      socket_count;
         static tint     last_tick;
+        static tbheap   send_queue;        
 
         static Address  tracker;
         static std::vector<Channel*> channels;
diff --git a/send_control.cpp b/send_control.cpp
new file mode 100644 (file)
index 0000000..ff46b4d
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ *  send_control.cpp
+ *  p2tp
+ *
+ *  Created by Victor Grishchenko on 12/10/09.
+ *  Copyright 2009 Delft University of Technology. All rights reserved.
+ *
+ */
+
+#include "p2tp.h"
+
+using namespace p2tp;
+using namespace std;
+
+tint Channel::MIN_DEV = 50*TINT_MSEC;
+tint Channel::MAX_SEND_INTERVAL = TINT_SEC*58;
+tint Channel::LEDBAT_TARGET = TINT_MSEC*25;
+float Channel::LEDBAT_GAIN = 1.0/LEDBAT_TARGET;
+tint Channel::LEDBAT_DELAY_BIN = TINT_SEC*30;
+
+
+tint    Channel::NextSendTime () {
+    switch (send_control_) {
+        case KEEP_ALIVE_CONTROL: return KeepAliveNextSendTime();
+        case PING_PONG_CONTROL:  return PingPongNextSendTime();
+        case SLOW_START_CONTROL: return SlowStartNextSendTime();
+        case AIMD_CONTROL:       return AimdNextSendTime();
+        case LEDBAT_CONTROL:     return LedbatNextSendTime();
+        default:                 assert(false);
+    }
+}
+
+tint    Channel::SwitchSendControl (int control_mode) {
+    dprintf("%s #%u sendctrl %i->%i\n",tintstr(),id,send_control_,control_mode);
+    switch (control_mode) {
+        case KEEP_ALIVE_CONTROL:
+            send_interval_ = max(TINT_SEC/10,rtt_avg_);
+            dev_avg_ = max(TINT_SEC,rtt_avg_);
+            cwnd_ = 1;
+            break;
+        case PING_PONG_CONTROL:
+            dev_avg_ = max(TINT_SEC,rtt_avg_);
+            cwnd_ = 1;
+            break;
+        case SLOW_START_CONTROL:
+            break;
+        case AIMD_CONTROL:
+            break;
+        case LEDBAT_CONTROL:
+            break;
+        default: 
+            assert(false);
+    }
+    send_control_ = control_mode;
+    return NextSendTime();
+}
+
+// TODO: transitions, consistently
+// TODO: may send data
+tint    Channel::KeepAliveNextSendTime () {
+    if (sent_since_recv_>=3 && last_recv_time_<NOW-TINT_MIN)
+        return TINT_NEVER;
+    if (ack_rcvd_recent_)
+        return SwitchSendControl(SLOW_START_CONTROL);
+    send_interval_ <<= 1;
+    if (send_interval_>MAX_SEND_INTERVAL)
+        send_interval_ = MAX_SEND_INTERVAL;
+    return last_send_time_ + send_interval_;
+}
+
+tint    Channel::PingPongNextSendTime () {
+    if (last_recv_time_ < last_send_time_-TINT_SEC*3) {
+        // FIXME keepalive <-> pingpong (peers, transition)
+    } // last_data_out_time_ < last_send_time_ - TINT_SEC...
+    if (false)
+        return SwitchSendControl(KEEP_ALIVE_CONTROL);
+    if (ack_rcvd_recent_)
+        return SwitchSendControl(SLOW_START_CONTROL);
+    if (last_recv_time_>last_send_time_)
+        return NOW;
+    else if (last_send_time_)
+        return last_send_time_ + ack_timeout();
+    else
+        return NOW;
+}
+
+tint    Channel::CwndRateNextSendTime () {
+    send_interval_ = rtt_avg_/cwnd_;
+    if (data_out_.size()<cwnd_) {
+        return last_data_out_time_ + send_interval_;
+    } else {
+        tint next_timeout = data_out_.front().time + ack_timeout();
+        return last_data_out_time_ + next_timeout;
+    }
+}
+
+void    Channel::BackOffOnLosses () {
+    ack_rcvd_recent_ = 0;
+    ack_not_rcvd_recent_ =  0;
+    if (last_loss_time_<NOW-rtt_avg_) {
+        cwnd_ /= 2;
+        last_loss_time_ = NOW;
+    }
+}
+
+tint    Channel::SlowStartNextSendTime () {
+    if (ack_not_rcvd_recent_) {
+        BackOffOnLosses();
+        return SwitchSendControl(AIMD_CONTROL);
+    } 
+    cwnd_+=ack_rcvd_recent_;
+    ack_rcvd_recent_=0;
+    return CwndRateNextSendTime();
+}
+
+tint    Channel::AimdNextSendTime () {
+    if (ack_not_rcvd_recent_)
+        BackOffOnLosses();
+    cwnd_ += ack_rcvd_recent_/cwnd_;
+    ack_rcvd_recent_=0;
+    return CwndRateNextSendTime();
+}
+
+tint Channel::LedbatNextSendTime () {
+    tint owd_cur(TINT_NEVER), owd_min(TINT_NEVER);
+    for(int i=0; i<4; i++) {
+        if (owd_min>owd_min_bins_[i])
+            owd_min = owd_min_bins_[i];
+        if (owd_cur>owd_current_[i])
+            owd_cur = owd_current_[i];
+    }
+    if (ack_not_rcvd_recent_)
+        BackOffOnLosses();
+    ack_rcvd_recent_ = 0;
+    tint queueing_delay = owd_cur - owd_min;
+    tint off_target = LEDBAT_TARGET - queueing_delay;
+    cwnd_ += LEDBAT_GAIN * off_target / cwnd_;
+    return CwndRateNextSendTime();
+}
+
+
+
index 2b7ebd6..374512b 100644 (file)
@@ -11,7 +11,7 @@
 
 
 using namespace p2tp;
-using namespace std; // FIXME remove
+using namespace std;
 
 /*
  TODO  25 Oct 18:55
@@ -26,7 +26,7 @@ void    Channel::AddPeakHashes (Datagram& dgram) {
         dgram.Push32((uint32_t)peak);
         dgram.PushHash(file().peak_hash(i));
         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
-        dprintf("%s #%i +phash %s\n",tintstr(),id,peak.str());
+        dprintf("%s #%u +phash %s\n",tintstr(),id,peak.str());
     }
 }
 
@@ -40,40 +40,48 @@ void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
         dgram.Push32((uint32_t)uncle);
         dgram.PushHash( file().hash(uncle) );
         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
-        dprintf("%s #%i +hash %s\n",tintstr(),id,uncle.str());
+        dprintf("%s #%u +hash %s\n",tintstr(),id,uncle.str());
         pos = pos.parent();
     }
 }
 
 
-bin64_t        Channel::DequeueHint () { // TODO: resilience
+bin64_t        Channel::DequeueHint () {
+    if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {    
+        uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
+        twist &= file().peak(0); // may make it semi-seq here
+        file().ack_out().twist(twist);
+        ack_in_.twist(twist);
+        bin64_t my_pick = 
+            file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED);
+        while (my_pick.width()>max(1,(int)cwnd_))
+            my_pick = my_pick.left();
+        file().ack_out().twist(0);
+        ack_in_.twist(0);
+        if (my_pick!=bin64_t::NONE) {
+            my_pick = my_pick.twisted(twist);
+            hint_in_.push_back(my_pick);
+            dprintf("%s #%u *hint %s\n",tintstr(),id,my_pick.str());
+        }
+    }
     bin64_t send = bin64_t::NONE;
     while (!hint_in_.empty() && send==bin64_t::NONE) {
         bin64_t hint = hint_in_.front().bin;
         tint time = hint_in_.front().time;
         hint_in_.pop_front();
-        //if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_)
-        //    continue;
-        // Totally flawed:
-        // a. May empty the queue when you least expect
-        // b. May lose parts of partially ACKd HINTs
-        send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
-        send = send.left_foot(); // single packet
-        if (send!=bin64_t::NONE)
-            while (send!=hint) {
-                hint = hint.towards(send);
-                hint_in_.push_front(hint.sibling());
-            }
-    }
-    if (send==bin64_t::NONE) {
-        send = file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED);
-        if (send!=bin64_t::NONE) // NEED FIXME: twist here!!!
-            send = send.left_foot();
+        while (!hint.is_base()) { // FIXME optimize; possible attack
+            hint_in_.push_front(tintbin(time,hint.right()));
+            hint = hint.left();
+        }
+        //if (time < NOW-TINT_SEC*3/2 )
+        //    continue;  bad idea
+        if (ack_in_.get(hint)!=bins::FILLED) 
+            send = hint;
     }
     uint64_t mass = 0;
     for(int i=0; i<hint_in_.size(); i++)
         mass += hint_in_[i].bin.width();
-    dprintf("%s #%i dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
+    dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
     return send;
 }
 
@@ -83,13 +91,13 @@ void    Channel::AddHandshake (Datagram& dgram) {
         dgram.Push8(P2TP_HASH);
         dgram.Push32(bin64_t::ALL32);
         dgram.PushHash(file().root_hash());
-        dprintf("%s #%i +hash ALL %s\n",
+        dprintf("%s #%u +hash ALL %s\n",
                 tintstr(),id,file().root_hash().hex().c_str());
     }
     dgram.Push8(P2TP_HANDSHAKE);
     int encoded = EncodeID(id);
     dgram.Push32(encoded);
-    dprintf("%s #%i +hs %i\n",tintstr(),id,encoded);
+    dprintf("%s #%u +hs %i\n",tintstr(),id,encoded);
     ack_out_.clear();
     AddAck(dgram);
 }
@@ -111,45 +119,50 @@ void    Channel::Send () {
         AddHandshake(dgram);
         AddAck(dgram);
     }
-    dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str());
-    if (dgram.size()==4) // only the channel id; bare keep-alive
+    dprintf("%s #%u sent %ib %s:%u\n",
+            tintstr(),id,dgram.size(),peer().str(),peer_channel_id_);
+    if (dgram.size()==4) {// only the channel id; bare keep-alive
         data = bin64_t::ALL;
-    cc_->OnDataSent(data);
+        if (send_control_!=KEEP_ALIVE_CONTROL) // we did our best
+            SwitchSendControl(KEEP_ALIVE_CONTROL);
+        if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
+            return; // don't send empty dgram
+    }
     if (dgram.Send()==-1)
         print_error("can't send datagram");
+    last_send_time_ = NOW;
+    sent_since_recv_++;
 }
 
 
 void    Channel::AddHint (Datagram& dgram) {
 
-    tint timed_out = NOW - TINT_SEC*3/2;
+    tint plan_for = max(TINT_SEC,rtt_avg_*4);
+    
+    tint timed_out = NOW - plan_for*2;
     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
         hint_out_size_ -= hint_out_.front().bin.width();
         hint_out_.pop_front();
     }
     
-    int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
+    /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
     if (!peer_cwnd)
-        peer_cwnd = 1;
-    int peer_pps = TINT_SEC / dip_avg_; // data packets per sec
-    if (!peer_pps)
-        peer_pps = 1;
+        peer_cwnd = 1;*/
+    int plan_pck = std::max ( 1LL, plan_for / dip_avg_ );
     
-    if ( hint_out_size_ < peer_pps ) { //4*peer_cwnd ) {
+    if ( hint_out_size_ < plan_pck ) {
             
-        int diff = peer_pps - hint_out_size_;
-        //if (diff>4 && diff>2*peer_cwnd)
-        //    diff >>= 1;
-        bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
+        int diff = plan_pck - hint_out_size_; // TODO: aggregate
+        bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
         
         if (hint!=bin64_t::NONE) {
             dgram.Push8(P2TP_HINT);
             dgram.Push32(hint);
-            dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
+            dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
             hint_out_.push_back(hint);
             hint_out_size_ += hint.width();
         } else
-            dprintf("%s #%i .hint\n",tintstr(),id);
+            dprintf("%s #%u Xhint\n",tintstr(),id);
         
     }
 }
@@ -161,15 +174,15 @@ bin64_t        Channel::AddData (Datagram& dgram) {
         return bin64_t::NONE;
     
     bin64_t tosend = bin64_t::NONE;
-    if (cc_->MaySendData()) {
+    if (data_out_.size()<cwnd_ && last_data_out_time_<=NOW-send_interval_) {
         tosend = DequeueHint();
         if (tosend==bin64_t::NONE)
-            dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
+            dprintf("%s #%u out of hints #sendctrl\n",tintstr(),id);
     } else
-        dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id);
+        dprintf("%s #%u no cwnd #sendctrl\n",tintstr(),id);
     
-    if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty())) 
-        return bin64_t::NONE; // once in a while, empty data is sent just to check rtt
+    if (tosend==bin64_t::NONE && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty())) 
+        return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXME
     
     if (tosend!=bin64_t::NONE) { // hashes
         if (ack_in_.is_empty() && file().size())
@@ -199,9 +212,9 @@ bin64_t        Channel::AddData (Datagram& dgram) {
         dgram.Push(buf,r);
     }
     
-    last_send_data_time_ = NOW;
+    last_data_out_time_ = NOW;
     data_out_.push_back(tosend);
-    dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str());
+    dprintf("%s #%u +data %s\n",tintstr(),id,tosend.str());
     
     return tosend;
 }
@@ -210,7 +223,7 @@ bin64_t        Channel::AddData (Datagram& dgram) {
 void    Channel::AddTs (Datagram& dgram) {
     dgram.Push8(P2TP_TS);
     dgram.Push64(data_in_.time);
-    dprintf("%s #%i +ts %s\n",tintstr(),id,tintstr(data_in_.time));
+    dprintf("%s #%u +ts %s\n",tintstr(),id,tintstr(data_in_.time));
 }
 
 
@@ -227,7 +240,7 @@ void    Channel::AddAck (Datagram& dgram) {
         dgram.Push32(pos);
         //dgram.Push64(data_in_.time);
         ack_out_.set(pos);
-        dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
+        dprintf("%s #%u +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
         data_in_ = tintbin(0,bin64_t::NONE);
         if (pos.layer()>2)
             data_in_dbl_ = pos;
@@ -240,38 +253,38 @@ void    Channel::AddAck (Datagram& dgram) {
         ack_out_.set(ack);
         dgram.Push8(P2TP_ACK);
         dgram.Push32(ack);
-        dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str());
+        dprintf("%s #%u +ack %s\n",tintstr(),id,ack.str());
     }
 }
 
 
 void    Channel::Recv (Datagram& dgram) {
-    dprintf("%s #%i recvd %i\n",tintstr(),id,dgram.size()+4);
+    dprintf("%s #%u recvd %i\n",tintstr(),id,dgram.size()+4);
+    peer_send_time_ = 0; // has scope of 1 datagram
     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
         rtt_avg_ = NOW - last_send_time_;
         dev_avg_ = rtt_avg_;
         dip_avg_ = rtt_avg_;
-        transfer().hs_in_.push_back(id);
-        dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
+        dprintf("%s #%u rtt init %lli\n",tintstr(),id,rtt_avg_);
     }
     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
     while (dgram.size()) {
         uint8_t type = dgram.Pull8();
         switch (type) {
             case P2TP_HANDSHAKE: OnHandshake(dgram); break;
-            case P2TP_DATA:        data=OnData(dgram); break;
-            case P2TP_TS:       OnTs(dgram); break;
-            case P2TP_ACK:        OnAck(dgram); break;
-            case P2TP_HASH:        OnHash(dgram); break;
-            case P2TP_HINT:        OnHint(dgram); break;
-            case P2TP_PEX_ADD:  OnPex(dgram); break;
+            case P2TP_DATA:      data=OnData(dgram); break;
+            case P2TP_TS:        OnTs(dgram); break;
+            case P2TP_ACK:       OnAck(dgram); break;
+            case P2TP_HASH:      OnHash(dgram); break;
+            case P2TP_HINT:      OnHint(dgram); break;
+            case P2TP_PEX_ADD:   OnPex(dgram); break;
             default:
-                eprintf("%s #%i ?msg id unknown %i\n",tintstr(),id,(int)type);
+                eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id,(int)type);
                 return;
         }
     }
-    cc_->OnDataRecvd(data);
     last_recv_time_ = NOW;
+    sent_since_recv_ = 0;
 }
 
 
@@ -280,7 +293,7 @@ void    Channel::OnHash (Datagram& dgram) {
     Sha1Hash hash = dgram.PullHash();
     file().OfferHash(pos,hash);
     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
-    dprintf("%s #%i -hash %s\n",tintstr(),id,pos.str());
+    dprintf("%s #%u -hash %s\n",tintstr(),id,pos.str());
 }
 
 
@@ -310,25 +323,26 @@ bin64_t Channel::OnData (Datagram& dgram) {
     uint8_t *data;
     int length = dgram.Pull(&data,1024);
     bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
-    dprintf("%s #%i %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
+    dprintf("%s #%u %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
     if (!ok) 
         return bin64_t::NONE;
     data_in_ = tintbin(NOW,pos);
     if (pos!=bin64_t::NONE) {
-        if (last_recv_data_time_) {
-            tint dip = NOW - last_recv_data_time_;
+        if (last_data_in_time_) {
+            tint dip = NOW - last_data_in_time_;
             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
         }
-        last_recv_data_time_ = NOW;
+        last_data_in_time_ = NOW;
     }
     CleanHintOut(pos);    
     return pos;
 }
 
 
-void    Channel::CleanDataOut (bin64_t ackd_pos) {
+void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
     
     int max_ack_off = 0;
+    //FIXME do LEDBAT magic somewhere here
     
     if (ackd_pos!=bin64_t::NONE) {
         for (int i=0; i<8 && i<data_out_.size(); i++) {
@@ -336,9 +350,9 @@ void    Channel::CleanDataOut (bin64_t ackd_pos) {
                 tint rtt = NOW-data_out_[i].time;
                 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
                 dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
-                dprintf("%s #%i rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_);
+                dprintf("%s #%u rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_);
                 bin64_t pos = data_out_[i].bin;
-                cc_->OnAckRcvd(pos);
+                ack_rcvd_recent_++;
                 data_out_[i]=tintbin();
                 max_ack_off = i;
                 if (ackd_pos==pos)
@@ -357,8 +371,8 @@ void    Channel::CleanDataOut (bin64_t ackd_pos) {
                 max_ack_off--;
             }
             while (max_ack_off>MAX_REORDERING) {
-                cc_->OnAckRcvd(bin64_t::NONE);
-                dprintf("%s #%i Rdata %s\n",tintstr(),id,data_out_.front().bin.str());
+                ack_not_rcvd_recent_++;
+                dprintf("%s #%u Rdata %s\n",tintstr(),id,data_out_.front().bin.str());
                 data_out_.pop_front();
                 max_ack_off--;
                 data_out_cap_ = bin64_t::ALL;
@@ -368,9 +382,9 @@ void    Channel::CleanDataOut (bin64_t ackd_pos) {
     tint timeout = NOW - rtt_avg_ - 4*std::max(dev_avg_,TINT_MSEC*50);
     while (!data_out_.empty() && data_out_.front().time<timeout) {
         if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
-            cc_->OnAckRcvd(bin64_t::NONE);
+            ack_not_rcvd_recent_++;
             data_out_cap_ = bin64_t::ALL;
-            dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
+            dprintf("%s #%u Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
         }
         data_out_.pop_front();
     }
@@ -386,7 +400,7 @@ void    Channel::OnAck (Datagram& dgram) {
         eprintf("invalid ack: %s\n",ackd_pos.str());
         return;
     }
-    dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str());
+    dprintf("%s #%u -ack %s\n",tintstr(),id,ackd_pos.str());
     ack_in_.set(ackd_pos);
     CleanDataOut(ackd_pos);
 }
@@ -394,7 +408,7 @@ void    Channel::OnAck (Datagram& dgram) {
 
 void Channel::OnTs (Datagram& dgram) {
     peer_send_time_ = dgram.Pull64();
-    dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
+    dprintf("%s #%u -ts %lli\n",tintstr(),id,peer_send_time_);
 }
 
 
@@ -403,13 +417,13 @@ void    Channel::OnHint (Datagram& dgram) {
     hint_in_.push_back(hint);
     //ack_in_.set(hint,bins::EMPTY);
     //RequeueSend(cc_->OnHintRecvd(hint));
-    dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str());
+    dprintf("%s #%u -hint %s\n",tintstr(),id,hint.str());
 }
 
 
 void Channel::OnHandshake (Datagram& dgram) {
     peer_channel_id_ = dgram.Pull32();
-    dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
+    dprintf("%s #%u -hs %i\n",tintstr(),id,peer_channel_id_);
     // FUTURE: channel forking
 }
 
@@ -418,7 +432,7 @@ void Channel::OnPex (Datagram& dgram) {
     uint32_t ipv4 = dgram.Pull32();
     uint16_t port = dgram.Pull16();
     Address addr(ipv4,port);
-    dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str());
+    dprintf("%s #%u -pex %s\n",tintstr(),id,addr.str());
     transfer().OnPexIn(addr);
 }
 
@@ -431,15 +445,15 @@ void    Channel::AddPex (Datagram& dgram) {
     dgram.Push8(P2TP_PEX_ADD);
     dgram.Push32(a.ipv4());
     dgram.Push16(a.port());
-    dprintf("%s #%i +pex %s\n",tintstr(),id,a.str());
+    dprintf("%s #%u +pex %s\n",tintstr(),id,a.str());
 }
 
 
-void    Channel::RecvDatagram (int socket) {
+Channel*    Channel::RecvDatagram (int socket) {
     Datagram data(socket);
     data.Recv();
     Address& addr = data.addr;
-#define return_log(...) { eprintf(__VA_ARGS__); return; }
+#define return_log(...) { eprintf(__VA_ARGS__); return NULL; }
     if (data.size()<4) 
         return_log("datagram shorter than 4 bytes %s\n",addr.str());
     uint32_t mych = data.Pull32();
@@ -461,22 +475,23 @@ void    Channel::RecvDatagram (int socket) {
         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
             if (channels[*i] && channels[*i]->peer_==data.addr && 
-                channels[*i]->last_recv_time_>NOW-TINT_SEC*2) 
+                channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
                 return_log("have a channel already to %s\n",addr.str());
         channel = new Channel(file, socket, data.address());
     } else {
         mych = DecodeID(mych);
         if (mych>=channels.size()) 
-            return_log("invalid channel #%i, %s\n",mych,addr.str());
+            return_log("invalid channel #%u, %s\n",mych,addr.str());
         channel = channels[mych];
         if (!channel) 
-            return_log ("channel #%i is already closed\n",mych,addr.str());
+            return_log ("channel #%u is already closed\n",mych,addr.str());
         if (channel->peer() != addr) 
-            return_log ("invalid peer address #%i %s!=%s\n",mych,channel->peer().str(),addr.str());
+            return_log ("invalid peer address #%u %s!=%s\n",mych,channel->peer().str(),addr.str());
         channel->own_id_mentioned_ = true;
     }
     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
     channel->Recv(data);
+    return channel;
 }
 
 
@@ -488,48 +503,52 @@ void    Channel::Loop (tint howlong) {
 
         tint send_time(TINT_NEVER);
         Channel* sender(NULL);
-        while (!send_queue.is_empty()) {
+        while (!sender && !send_queue.is_empty()) { // dequeue
             send_time = send_queue.peek().time;
             sender = channel((int)send_queue.peek().bin);
-            if (sender)
-                if ( sender->next_send_time_==send_time ||
-                     sender->next_send_time_==TINT_NEVER )
-                break;
-            sender = NULL; // it was a stale entry
             send_queue.pop();
+            if (sender && sender->next_send_time_!=send_time &&
+                     sender->next_send_time_!=TINT_NEVER )
+                sender = NULL; // it was a stale entry
         }
-        if (send_time>limit)
-            send_time = limit;
-        if ( sender && sender->next_send_time_ <= NOW ) {
-            dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
-                    tintstr(send_time));
-            sender->Send();
-            sender->last_send_time_ = NOW;
-            // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl
-            send_queue.pop();
-        } else if ( send_time > NOW ) {
-            tint towait = send_time - NOW;
+        
+        if ( sender && send_time <= NOW ) { // it's time
+            
+            if (sender->next_send_time_<NOW+TINT_MIN) {  // either send
+                dprintf("%s #%u sch_send %s\n",tintstr(),sender->id,
+                        tintstr(send_time));
+                sender->Send();
+                sender->Reschedule();
+            } else { // or close the channel
+                dprintf("%s #%u closed sendctrl\n",tintstr(),sender->id);
+                delete sender;
+            }
+            
+        } else {  // it's too early, wait
+            
+            tint towait = min(limit,send_time) - NOW;
             dprintf("%s waiting %lliusec\n",tintstr(),towait);
             int rd = Datagram::Wait(socket_count,sockets,towait);
-            if (rd!=INVALID_SOCKET)
-                RecvDatagram(rd);
-        } else  { // FIXME FIXME FIXME REWRITE!!!  if (sender->next_send_time_==TINT_NEVER) { 
-            if (sender) {
-            dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
-            delete sender;
+            if (rd!=INVALID_SOCKET) { // in meantime, received something
+                Channel* receiver = RecvDatagram(rd);
+                if (receiver) // receiver's state may have changed
+                    receiver->Reschedule();
             }
-            send_queue.pop();
+            if (sender)  // get back to that later
+                send_queue.push(tintbin(send_time,sender->id));
+            
         }
         
     } while (Datagram::Time()<limit);
-        
+            
 }
 
  
-void Channel::Schedule (tint next_time) {
-    next_send_time_ = next_time;
-    if (next_time==TINT_NEVER)
-        next_time = NOW + TINT_MIN; // 1min timeout
-    send_queue.push(tintbin(next_time,id));
-    dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
+void Channel::Reschedule () {
+    next_send_time_ = NextSendTime();
+    if (next_send_time_!=TINT_NEVER)
+        send_queue.push(tintbin(next_send_time_,id));
+    else
+        send_queue.push(tintbin(NOW+TINT_MIN,id));
+    dprintf("%s requeue #%u for %s\n",tintstr(),id,tintstr(next_send_time_));
 }
index 864c9f2..a163926 100644 (file)
@@ -39,6 +39,8 @@ TEST(Bin64Test,Overflows) {
 
     EXPECT_FALSE(bin64_t(0,1).within(bin64_t::NONE));
     EXPECT_TRUE(bin64_t(0,1).within(bin64_t::ALL));
+    EXPECT_EQ(0,bin64_t::none().width());
+    EXPECT_EQ(bin64_t::none(),bin64_t::none().twisted(123));
     /*EXPECT_EQ(bin64_t::NONE.parent(),bin64_t::NONE);
     EXPECT_EQ(bin64_t::NONE.left(),bin64_t::NONE);
     EXPECT_EQ(bin64_t::NONE.right(),bin64_t::NONE);
index d64cd60..88c8690 100644 (file)
@@ -74,15 +74,18 @@ void            FileTransfer::OnPexIn (const Address& addr) {
 }
 
 
-int        FileTransfer::RevealChannel (int& pex_out_) {
+int        FileTransfer::RevealChannel (int& pex_out_) { // FIXME brainfuck
     pex_out_ -= hs_in_offset_;
     if (pex_out_<0)
         pex_out_ = 0;
     while (pex_out_<hs_in_.size()) {
         Channel* c = Channel::channels[hs_in_[pex_out_]];
         if (c && c->transfer().fd()==this->fd()) {
-            pex_out_ += hs_in_offset_ + 1;
-            return c->id;
+            if (c->own_id_mentioned_) {
+                pex_out_ += hs_in_offset_ + 1;
+                return c->id;
+            } else
+                pex_out_++;
         } else {
             hs_in_[pex_out_] = hs_in_[0];
             hs_in_.pop_front();