apparently, I'fixed consequences of yesterday's coding
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Sat, 14 Nov 2009 15:03:00 +0000 (15:03 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Sat, 14 Nov 2009 15:03:00 +0000 (15:03 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@552 e16421f0-f15b-0410-abcd-98678b794739

16 files changed:
BUGS
bin64.cpp
bin64.h
bins.cpp
bins.h
datagram.cpp
ext/send_control.cpp
ext/send_control.h
ext/seq_picker.cpp
hashtree.cpp
p2tp.cpp
p2tp.h
sendrecv.cpp
tests/binstest2.cpp
tests/transfertest.cpp
transfer.cpp

diff --git a/BUGS b/BUGS
index 9098364..9d19b0d 100644 (file)
--- a/BUGS
+++ b/BUGS
@@ -25,3 +25,4 @@
     * hint queue buildup
     * file operations are not 64-bit ready
         http://mail.python.org/pipermail/patches/2000-June/000848.html
+    * recovery: last packet
index 10c4859..6b2ccb5 100644 (file)
--- a/bin64.cpp
+++ b/bin64.cpp
@@ -21,6 +21,15 @@ uint32_t bin64_t::to32() const {
     return NONE32;
 }
 
+bin64_t::bin64_t(const uint32_t val) {
+    if (val==ALL32)
+        v = ALL;
+    else if (val==NONE32)
+        v = NONE;
+    else
+        v = val;
+}
+
 bin64_t bin64_t::next_dfsio (uint8_t floor) {
     /*while (ret.is_right())
         ret = ret.parent();
@@ -54,3 +63,18 @@ int bin64_t::peaks (uint64_t length, bin64_t* peaks) {
     peaks[pp] = NONE;
     return pp;
 }
+
+#include <stdio.h>
+
+const char* bin64_t::str () const {
+    static char _b64sr[4][32];
+    static int _rsc;
+    _rsc = (_rsc+1) & 3;
+    if (v==ALL)
+        return "(ALL)";
+    else if (v==NONE)
+        return "(NONE)";
+    else
+        sprintf(_b64sr[_rsc],"(%i,%lli)",(int)layer(),offset());
+    return _b64sr[_rsc];
+}
diff --git a/bin64.h b/bin64.h
index 8b8befd..c2193ad 100644 (file)
--- a/bin64.h
+++ b/bin64.h
@@ -25,6 +25,7 @@ struct bin64_t {
 
     bin64_t() : v(NONE) {}
     bin64_t(const bin64_t&b) : v(b.v) {}
+    bin64_t(const uint32_t val) ;
     bin64_t(const uint64_t val) : v(val) {}
     bin64_t(uint8_t layer, uint64_t offset) :
         v( (offset<<(layer+1)) | ((1ULL<<layer)-1) ) {}
@@ -141,6 +142,8 @@ struct bin64_t {
     bin64_t width () const {
         return (tail_bits()+1)>>1;
     }
+    
+    const char* str () const;
 
     /** The array must have 64 cells, as it is the max
      number of peaks possible +1 (and there are no reason
index 2718115..23e942f 100644 (file)
--- a/bins.cpp
+++ b/bins.cpp
@@ -251,6 +251,8 @@ bin64_t bins::find (const bin64_t range, fill_t seek) {
 
 
 uint16_t bins::get (bin64_t bin) {
+    if (bin==bin64_t::NONE)
+        return EMPTY;
     iterator i(this,bin,true);
     //while ( i.pos!=bin && 
     //        (i.deep() || (*i!=BIN_FULL && *i!=BIN_EMPTY)) )
@@ -281,6 +283,8 @@ uint64_t bins::mass () {
 
 
 void bins::set (bin64_t bin, fill_t val) {
+    if (bin==bin64_t::NONE)
+        return;
     assert(val==FILLED || val==EMPTY);
     iterator i(this,bin,false);
     while (i.bin()!=bin && (i.deep() || *i!=val))
@@ -303,7 +307,7 @@ uint64_t*   bins::get_stripes (int& count) {
     count = 0;
     uint16_t cur = bins::EMPTY;
     stripes[count++] = 0;
-    iterator i(this,0,false);
+    iterator i(this,bin64_t(0,0),false);
     while (!i.solid())
         i.left();
 
diff --git a/bins.h b/bins.h
index 3f8494f..45ef4ea 100644 (file)
--- a/bins.h
+++ b/bins.h
@@ -122,7 +122,7 @@ public: // rm this
     uint8_t     layer_;
     bin64_t     pos;  // TODO: half[] layer bin
 public:
-    iterator(bins* host, bin64_t start=0, bool split=false);
+    iterator(bins* host, bin64_t start=bin64_t(0,0), bool split=false);
     ~iterator();
     bool deep () { return host->deep(half); }
     bool solid () { 
index 11931f8..8d60b08 100644 (file)
@@ -31,10 +31,8 @@ const char* tintstr (tint time) {
     static char ret_str[4][32]; // wow
     static int i;
     i = (i+1) & 3;
-    if (time==TINT_NEVER) {
-        strcpy(ret_str[i],"NEVER");
-        return ret_str[i];
-    }
+    if (time==TINT_NEVER)
+        return "NEVER";
     time -= Datagram::epoch;
     assert(time>=0);
     int hours = time/TINT_HOUR;
index 5d5d066..b520210 100644 (file)
@@ -20,61 +20,51 @@ void    SendController::Swap (SendController* newctrl) {
 }
 
 
-bool    PingPongController::MaySendData(){
-    return ch_->data_out_.empty();
+void    SendController::Schedule (tint next_time) {
+    ch_->Schedule(next_time);
 }
-    
-tint    PingPongController::NextSendTime () {
-    if (unanswered_>=3)
-        return TINT_NEVER;
-    return ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_*4;  // remind on timeout
-}
-    
-void    PingPongController::OnDataSent(bin64_t b) {
-    unanswered_++;
-    if ( (b==bin64_t::ALL && MaySendData()) ) // nothing to send
-        Swap(new KeepAliveController(this));
-}
-    
-void    PingPongController::OnDataRecvd(bin64_t b) {
-    unanswered_ = 0;
-}
-    
-void    PingPongController::OnAckRcvd(bin64_t ackd) {
-    //if (ch_->data_out_.empty())
-    Swap(new SlowStartController(this));
+
+
+
+KeepAliveController::KeepAliveController (Channel* ch) : SendController(ch), delay_(ch->rtt_avg_) {
 }
 
 
 KeepAliveController::KeepAliveController(SendController* prev, tint delay) : 
 SendController(prev), delay_(delay) {
-    ch_->dev_avg_ = TINT_SEC; // without active measurement, rtt is unreliable
+    ch_->dev_avg_ = TINT_SEC; // without constant active measurement, rtt is unreliable
+    delay_=ch_->rtt_avg_;
 }
 
 bool    KeepAliveController::MaySendData() {
     return true;
 }
     
-tint    KeepAliveController::NextSendTime () {
-    if (!delay_)
-        delay_ = ch_->rtt_avg_;
-    if (ch_->last_recv_time_ < ch_->last_send_time_-TINT_MIN)
-        return TINT_NEVER;
-    return ch_->last_send_time_ + delay_;
-}
-    
+
 void    KeepAliveController::OnDataSent(bin64_t b) {
-    delay_ = (NOW - std::max(ch_->last_send_time_,ch_->last_recv_time_)) * 3 / 2;
-    if (delay_>TINT_SEC*58)
-        delay_ = TINT_SEC*58;
-    if (b!=bin64_t::ALL && b!=bin64_t::NONE)
+    if (b==bin64_t::ALL || b==bin64_t::NONE) {
+        delay_ = delay_ * 2; // backing off
+        if (delay_>TINT_SEC*58) // keep NAT mappings alive
+            delay_ = TINT_SEC*58;
+        if (delay_>=4*TINT_SEC && ch_->last_recv_time_ < NOW-TINT_MIN)
+            Schedule(TINT_NEVER); // no response; enter close timeout
+        else
+            Schedule(NOW+delay_); // all right, just keep it alive
+    } else {
+        Schedule(NOW+ch_->rtt_avg_); // cwnd==1 => next send in 1 rtt
         Swap(new SlowStartController(this));
+    }
 }
     
 void    KeepAliveController::OnDataRecvd(bin64_t b) {
+    if (b!=bin64_t::NONE && b!=bin64_t::ALL) { // channel is alive
+        delay_ = ch_->rtt_avg_;
+        Schedule(NOW); // schedule an ACK; TODO: aggregate
+    }
 }
     
 void    KeepAliveController::OnAckRcvd(bin64_t ackd) {
+    // probably to something sent by CwndControllers before this one got installed
 }
     
 
@@ -83,30 +73,36 @@ SendController(orig), cwnd_(cwnd), last_change_(0) {
 }
 
 bool    CwndController::MaySendData() {
+    tint spacing = ch_->rtt_avg_ / cwnd_;
     dprintf("%s #%i sendctrl may send %i < %f & %s (rtt %lli)\n",tintstr(),
-            ch_->id,(int)ch_->data_out_.size(),cwnd_,tintstr(NextSendTime()),
-            ch_->rtt_avg_);
+            ch_->id,(int)ch_->data_out_.size(),cwnd_,
+            tintstr(ch_->last_send_data_time_+spacing), ch_->rtt_avg_);
     return  ch_->data_out_.empty() ||
-            (ch_->data_out_.size() < cwnd_  &&  NOW >= NextSendTime());
-}
-    
-tint    CwndController::NextSendTime () {
-    tint sendtime;
-    if (ch_->data_out_.size() < cwnd_)
-        sendtime = ch_->last_send_time_ + (ch_->rtt_avg_ / cwnd_);
-    else
-        sendtime = ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_ * 4 ;
-    return sendtime;
+            (ch_->data_out_.size() < cwnd_  &&  NOW-ch_->last_send_data_time_ >= spacing);
 }
     
+
 void    CwndController::OnDataSent(bin64_t b) {
-    if (b==bin64_t::ALL || b==bin64_t::NONE) {
-        if (MaySendData())
-            Swap(new KeepAliveController(this));
-    } 
+    if ( (b==bin64_t::ALL || b==bin64_t::NONE) && MaySendData() ) { // no more data
+        Schedule(NOW+ch_->rtt_avg_);
+        Swap(new KeepAliveController(this));
+    } else {
+        tint spacing = ch_->rtt_avg_ / cwnd_;
+        if (ch_->data_out_.size() < cwnd_) { // have cwnd; not the right time yet
+            Schedule(ch_->last_send_data_time_+spacing);
+        } else {    // no free cwnd
+            tint timeout = std::max( ch_->rtt_avg_+ch_->dev_avg_*4, 500*TINT_MSEC );
+            assert(!ch_->data_out_.empty());
+            Schedule(ch_->data_out_.front().time+timeout); // wait for ACK or timeout
+        }
+    }
 }
-    
+
+
 void    CwndController::OnDataRecvd(bin64_t b) {
+    if (b!=bin64_t::NONE && b!=bin64_t::ALL) {
+        Schedule(NOW); // send ACK; todo: aggregate ACKs
+    }
 }
     
 void    CwndController::OnAckRcvd(bin64_t ackd) {
@@ -122,6 +118,8 @@ void    CwndController::OnAckRcvd(bin64_t ackd) {
         else 
             cwnd_ += 1.0/cwnd_;
         dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_);
+        tint spacing = ch_->rtt_avg_ / cwnd_;
+        Schedule(ch_->last_send_time_+spacing);
     }
 }
 
@@ -135,17 +133,3 @@ void SlowStartController::OnAckRcvd (bin64_t pos) {
         cwnd_ /= 2;
 }
     
-
-void AIMDController::OnAckRcvd (bin64_t pos) {
-    if (pos==bin64_t::NONE) {
-        dprintf("%s #%i sendctrl loss detected\n",tintstr(),ch_->id);
-        if (NOW>last_change_+ch_->rtt_avg_) {
-            cwnd_ /= 2;
-            last_change_ = NOW;
-        }
-    } else {
-        cwnd_ += 1.0/cwnd_;
-        dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_);
-    }
-}
index 67ae52f..91318a6 100644 (file)
@@ -21,11 +21,11 @@ struct SendController {
     SendController(SendController* orig) : ch_(orig->ch_) { }
     
     void    Swap (SendController* replacement);
+    void    Schedule (tint time);
     
     virtual const char* type() const = 0;
     
     virtual bool    MaySendData() = 0;
-    virtual tint    NextSendTime () = 0;
     
     /** A datagram was sent to the peer.
      *  @param data the bin number for the data sent; bin64_t::NONE if only
@@ -45,33 +45,19 @@ struct SendController {
 };
 
 
-struct PingPongController : public SendController {
-    
-    int     unanswered_;
-
-    PingPongController (SendController* orig) :
-        SendController(orig), unanswered_(0) {} 
-    PingPongController (Channel* ch) : 
-        unanswered_(0), SendController(ch) {}
-    const char* type() const { return "PingPong"; }
-    bool    MaySendData();
-    tint    NextSendTime ();
-    void    OnDataSent(bin64_t b);
-    void    OnDataRecvd(bin64_t b);
-    void    OnAckRcvd(bin64_t ackd) ;
-    ~PingPongController() {}
-    
-};
-
-
+/** Mission of the keepalive controller to keep the channel
+    alive as no data sending happens; If no data is transmitted
+    in either direction, inter-packet times grow exponentially
+    till 58 sec, which refresh period is deemed necessary to keep
+    NAT mappings alive. */
 struct KeepAliveController : public SendController {
 
     tint delay_;
     
+    KeepAliveController (Channel* ch);
     KeepAliveController(SendController* prev, tint delay=0) ;
     const char* type() const { return "KeepAlive"; }
     bool    MaySendData();
-    tint    NextSendTime () ;
     void    OnDataSent(bin64_t b) ;
     void    OnDataRecvd(bin64_t b) ;
     void    OnAckRcvd(bin64_t ackd) ;
@@ -79,6 +65,7 @@ struct KeepAliveController : public SendController {
 };
 
 
+/** Base class for any congestion window based algorithm. */
 struct CwndController : public SendController {
     
     double  cwnd_;
@@ -87,7 +74,6 @@ struct CwndController : public SendController {
     CwndController(SendController* orig, int cwnd=1) ;
     
     bool    MaySendData() ;
-    tint    NextSendTime () ;
     void    OnDataSent(bin64_t b) ;
     void    OnDataRecvd(bin64_t b) ;
     void    OnAckRcvd(bin64_t ackd) ;
@@ -95,6 +81,7 @@ struct CwndController : public SendController {
 };
 
 
+/** TCP-like exponential "slow" start algorithm. */
 struct SlowStartController : public CwndController {
     
     SlowStartController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {}
@@ -104,11 +91,14 @@ struct SlowStartController : public CwndController {
 };
 
 
+/** The classics: additive increase - multiplicative decrease algorithm.
+    A naive version of "standard" TCP congestion control. Potentially useful
+    for seedboxes, so needs to be improved. (QUBIC?) */
 struct AIMDController : public CwndController {
     
     AIMDController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {}
     const char* type() const { return "AIMD"; }
-    void    OnAckRcvd(bin64_t ackd) ;
+    //void    OnAckRcvd(bin64_t ackd) ;
     
 };
 
index 3cacb2b..986c3ac 100644 (file)
@@ -16,7 +16,6 @@ class SeqPiecePicker : public PiecePicker {
     bins            ack_hint_out_;
     FileTransfer*   transfer_;
     uint64_t        twist_;
-    tbheap          hint_out_; // FIXME since I use fixed 1.5 sec expiration, may replace for a queue
     
 public:
     
@@ -34,8 +33,6 @@ public:
     }
     
     virtual bin64_t Pick (bins& offer, uint64_t max_width, tint expires) {
-        while (hint_out_.size() && hint_out_.peek().time<NOW)
-            ack_hint_out_.copy_range(file().ack_out(), hint_out_.pop().bin);
         //dprintf("twist is %lli\n",twist_);
         if (!file().size()) {
             return bin64_t(0,0); // whoever sends it first
@@ -57,11 +54,10 @@ public:
             hint = hint.left();
         assert(ack_hint_out_.get(hint)==bins::EMPTY);
         if (hint.offset() && file().ack_out().get(hint)!=bins::EMPTY) { // FIXME DEBUG remove
-            eprintf("bogus hint: (%i,%lli)\n",(int)hint.layer(),hint.offset());
+            eprintf("bogus hint: %s\n",hint.str());
             exit(1);
         }
         ack_hint_out_.set(hint);
-        hint_out_.push(tintbin(expires,hint));
         return hint;
     }
     
@@ -69,4 +65,8 @@ public:
         ack_hint_out_.set(bin);
     }
     
+    void Expired (bin64_t bin) {
+        ack_hint_out_.copy_range(file().ack_out(), bin);
+    }
+    
 };
index bfb4b28..01c07c5 100644 (file)
@@ -163,8 +163,9 @@ void            HashTree::RecoverProgress () {
         if (hashes_[pos]==Sha1Hash::ZERO)
             continue;
         size_t rd = read(fd_,buf,1<<10);
-        assert(rd==(1<<10) || p==packet_size()-1);
-        if (rd==(1<<10) && !memcmp(buf, zeros, rd) && hashes_[pos]!=kilo_zero)
+        assert(rd==(1<<10) || p==packet_size()-1); // FIXME BUG
+        if (rd==(1<<10) && !memcmp(buf, zeros, rd) &&
+                hashes_[pos]!=kilo_zero) // FIXME
             continue;
         if ( data_recheck_ && !OfferHash(pos, Sha1Hash(buf,rd)) )
             continue;
index 3ef544a..7fe32df 100644 (file)
--- a/p2tp.cpp
+++ b/p2tp.cpp
@@ -33,24 +33,23 @@ std::vector<Channel*> Channel::channels(1);
 SOCKET Channel::sockets[8] = {0,0,0,0,0,0,0,0};
 int Channel::socket_count = 0;
 Address Channel::tracker;
-tbqueue Channel::send_queue;
+tbheap Channel::send_queue;
 #include "ext/simple_selector.cpp"
 PeerSelector* Channel::peer_selector = new SimpleSelector();
 
 Channel::Channel       (FileTransfer* transfer, int socket, Address peer_addr) :
        transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
     socket_(socket==-1?sockets[0]:socket), // FIXME
-    data_out_cap_(bin64_t::ALL), last_data_time_(0),
+    data_out_cap_(bin64_t::ALL), last_send_data_time_(0), last_recv_data_time_(0),
     own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
-    last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
-    hint_out_(0), hint_out_mark_(), hint_out_am_(0)
+    last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC)
 {
     if (peer_==Address())
         peer_ = tracker;
        this->id = channels.size();
        channels.push_back(this);
-    cc_ = new PingPongController(this);
-    RequeueSend(NOW);
+    cc_ = new KeepAliveController(this);
+    Schedule(NOW); // FIXME ugly
 }
 
 
diff --git a/p2tp.h b/p2tp.h
index d068c0a..4681a56 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -70,13 +70,15 @@ namespace p2tp {
         tint    time;
         bin64_t bin;
         tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
-        tintbin() : time(0), bin(bin64_t::NONE) {}
+        tintbin() : time(TINT_NEVER), bin(bin64_t::NONE) {}
         tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
         tintbin(bin64_t bin_) : time(NOW), bin(bin_) {}
         bool operator < (const tintbin& b) const 
             { return time > b.time; }
         bool operator == (const tintbin& b) const
             { return time==b.time && bin==b.bin; }
+        bool operator != (const tintbin& b) const
+            { return !(*this==b); }
     };
 
        typedef std::deque<tintbin> tbqueue;
@@ -188,6 +190,7 @@ namespace p2tp {
     public:
         virtual void Randomize (uint64_t twist) = 0;
         virtual bin64_t Pick (bins& offered, uint64_t max_width, tint expires) = 0;
+        virtual void Expired (bin64_t bin) = 0;
         virtual void Received (bin64_t bin) = 0;
     };
 
@@ -277,10 +280,7 @@ namespace p2tp {
                /**     Transmit schedule: in most cases filled with the peer's hints */
                tbqueue     hint_in_;
                /** Hints sent (to detect and reschedule ignored hints). */
-               //  tbqueue             hint_out_;
-        uint64_t    hint_out_;
-        tintbin     hint_out_mark_;
-        uint64_t    hint_out_am_;
+        tbqueue     hint_out_;
                /** The congestion control strategy. */
                SendController  *cc_;
         /** Types of messages the peer accepts. */
@@ -292,20 +292,23 @@ namespace p2tp {
         /** Smoothed averages for RTT, RTT deviation and data interarrival periods. */
         tint        rtt_avg_, dev_avg_, dip_avg_;
         tint        last_send_time_;
-        tint        last_data_time_;
         tint        last_recv_time_;
+        tint        last_send_data_time_;
+        tint        last_recv_data_time_;
         tint        next_send_time_;
         tint        peer_send_time_;
-        static      tbqueue send_queue;
+        static      tbheap send_queue;
 
-        void        RequeueSend (tint next_time);
         int         PeerBPS() const {
             return TINT_SEC / dip_avg_ * 1024;
         }
         /** Get a request for one packet from the queue of peer's requests. */
         bin64_t                DequeueHint();
         void        ClearStaleDataOut ();
-        //void        CleanStaleHints();
+        void        CleanStaleHintOut();
+        void        CleanFulfilledHints(bin64_t pos);
+        void        CleanFulfilledDataOut(bin64_t pos);
+        void        Schedule(tint send_time);
 
         static PeerSelector* peer_selector;
 
index 07151d3..e93e12c 100644 (file)
@@ -31,7 +31,7 @@ void  Channel::AddPeakHashes (Datagram& dgram) {
                dgram.Push32((uint32_t)peak);
                dgram.PushHash(file().peak_hash(i));
         //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
-        dprintf("%s #%i +phash (%i,%lli)\n",tintstr(),id,peak.layer(),peak.offset());
+        dprintf("%s #%i +phash %s\n",tintstr(),id,peak.str());
        }
 }
 
@@ -45,7 +45,7 @@ void  Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
                dgram.Push32((uint32_t)uncle);
                dgram.PushHash( file().hash(uncle) );
         //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
-        dprintf("%s #%i +hash (%i,%lli)\n",tintstr(),id,uncle.layer(),uncle.offset());
+        dprintf("%s #%i +hash %s\n",tintstr(),id,uncle.str());
         pos = pos.parent();
     }
 }
@@ -57,8 +57,8 @@ bin64_t               Channel::DequeueHint () { // TODO: resilience
         bin64_t hint = hint_in_.front().bin;
         tint time = hint_in_.front().time;
         hint_in_.pop_front();
-        //if (time < NOW-2*TINT_SEC ) //NOW-8*rtt_avg_)
-        //    continue;
+        if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_)
+            continue;
         send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
         send = send.left_foot(); // single packet
         dprintf("%s #%i dequeued %lli\n",tintstr(),id,send.base_offset());
@@ -72,17 +72,6 @@ bin64_t              Channel::DequeueHint () { // TODO: resilience
 }
 
 
-/*void Channel::CleanStaleHints () {
-       while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED ) 
-               hint_out.pop_front();  // FIXME must normally clear fulfilled entries
-       tint timed_out = NOW - cc_->RoundTripTime()*8;
-       while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
-        file().picker()->Snubbed(hint_out.front().bin);
-               hint_out.pop_front();
-       }
-}*/
-
-
 void   Channel::AddHandshake (Datagram& dgram) {
        if (!peer_channel_id_) { // initiating
                dgram.Push8(P2TP_HASH);
@@ -101,14 +90,16 @@ void       Channel::AddHandshake (Datagram& dgram) {
 
 void    Channel::ClearStaleDataOut() {
     int oldsize = data_out_.size();
-    while ( data_out_.size() && data_out_.front().time < 
-           NOW - rtt_avg_ - dev_avg_*4 )
+    tint timeout = NOW - max( rtt_avg_-dev_avg_*4, 500*TINT_MSEC );
+    while ( data_out_.size() && data_out_.front().time < timeout ) {
+        dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
         data_out_.pop_front();
+    }
     if (data_out_.size()!=oldsize) {
         cc_->OnAckRcvd(bin64_t::NONE);
         data_out_cap_ = bin64_t::ALL;
     }
-    while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
+    while (data_out_.size() && (data_out_.front()==tintbin() || ack_in_.get(data_out_.front().bin)==bins::FILLED))
         data_out_.pop_front();
 }
 
@@ -124,10 +115,7 @@ void       Channel::Send () {
             AddHint(dgram);
         AddPex(dgram);
         ClearStaleDataOut();
-        if (cc_->MaySendData()) 
-            data = AddData(dgram);
-        else
-            dprintf("%s #%i no cwnd\n",tintstr(),id);
+        data = AddData(dgram);
     } else {
         AddHandshake(dgram);
         AddAck(dgram);
@@ -141,75 +129,91 @@ void      Channel::Send () {
 }
 
 
+void   Channel::CleanStaleHintOut () {
+    tint timed_out = NOW - 8*rtt_avg_;
+       while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
+        transfer().picker().Expired(hint_out_.front().bin);
+               hint_out_.pop_front();
+       }
+}
+
+
 void   Channel::AddHint (Datagram& dgram) {
 
+    CleanStaleHintOut();
+    
+    uint64_t hint_out_mass=0;
+    for(int i=0; i<hint_out_.size(); i++)
+        hint_out_mass += hint_out_[i].bin.width();
+    
     int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
     if (!peer_cwnd)
         peer_cwnd = 1;
     int peer_pps = TINT_SEC / dip_avg_;
     if (!peer_pps)
         peer_pps = 1;
-    dprintf("%s #%i hint_out_ %lli+%lli mark (%i,%lli) peer_cwnd %lli/%lli=%f\n",
-            tintstr(),id,hint_out_,hint_out_am_,(int)hint_out_mark_.bin.layer(),
-            hint_out_mark_.bin.offset(),
-            rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_));
-    
-    if ( hint_out_mark_.time < NOW - TINT_SEC*2 ) { //NOW-rtt_avg_*8-dev_avg_) {
-        hint_out_mark_.bin=bin64_t::NONE;
-        //hint_out_ = hint_out_am_;
-        //hint_out_am_ = 0;
-    }
     
-    if ( peer_pps > hint_out_+hint_out_am_ ) {  //4*peer_cwnd
+    if ( hint_out_mass < 4*peer_cwnd ) {
         
-        int diff = peer_pps - hint_out_ - hint_out_am_;  // 4*peer_cwnd
+        int diff = 5*peer_cwnd - hint_out_mass;
         if (diff>4 && diff>2*peer_cwnd)
             diff >>= 1;
-        bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+TINT_SEC*3/2); //rtt_avg_*8+TINT_MSEC*10
+        bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
         
         if (hint!=bin64_t::NONE) {
             dgram.Push8(P2TP_HINT);
             dgram.Push32(hint);
-            dprintf("%s #%i +hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
-            if (hint_out_mark_.bin==bin64_t::NONE) {
-                hint_out_mark_ = tintbin(NOW,hint);
-                hint_out_ = hint_out_am_;
-                hint_out_am_ = 0;
-            }
-            hint_out_am_ += hint.width();
-            //hint_out_ += hint.width();
-        }
+            dprintf("%s #%i +hint %s\n",tintstr(),id,hint.str());
+            hint_out_.push_back(hint);
+        } else
+            printf("%s #%i Xhint\n",tintstr(),id);
         
     }
 }
 
 
 bin64_t                Channel::AddData (Datagram& dgram) {
-       if (!file().size()) // know nothing
+       
+    if (!file().size()) // know nothing
                return bin64_t::NONE;
-       bin64_t tosend = DequeueHint();
-    if (tosend==bin64_t::NONE) {
-        dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
-        return bin64_t::NONE;
-    }
-    if (ack_in_.is_empty() && file().size())
-        AddPeakHashes(dgram);
-    AddUncleHashes(dgram,tosend);
-    uint8_t buf[1024];
-    size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
-    // TODO: ??? corrupted data, retries
-    if (r<0) {
-        print_error("error on reading");
-        return bin64_t::NONE;
+    
+       bin64_t tosend = bin64_t::NONE;
+    if (cc_->MaySendData()) {
+        tosend = DequeueHint();
+        if (tosend==bin64_t::NONE)
+            dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
+    } else
+        dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id);
+    
+    if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty())) 
+        return bin64_t::NONE; // once in a while, empty data is sent just to check rtt
+    
+    if (tosend!=bin64_t::NONE) { // hashes
+        if (ack_in_.is_empty() && file().size())
+            AddPeakHashes(dgram);
+        AddUncleHashes(dgram,tosend);
+        data_out_cap_ = tosend;
     }
-    assert(dgram.space()>=r+4+1);
+    
     dgram.Push8(P2TP_DATA);
-    dgram.Push32(tosend);
-    dgram.Push(buf,r);
-    dprintf("%s #%i +data (%lli)\n",tintstr(),id,tosend.base_offset());
+    dgram.Push32(tosend.to32());
+    
+    if (tosend!=bin64_t::NONE) { // data
+        uint8_t buf[1024];
+        size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
+        // TODO: corrupted data, retries, caching
+        if (r<0) {
+            print_error("error on reading");
+            return bin64_t::NONE;
+        }
+        assert(dgram.space()>=r+4+1);
+        dgram.Push(buf,r);
+    }
+    
+    last_send_data_time_ = NOW;
     data_out_.push_back(tosend);
-    data_out_cap_ = tosend;
-    // FIXME BUG this makes data_out_ all stale  ack_in_.set(tosend);
+    dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str());
+    
        return tosend;
 }
 
@@ -229,8 +233,7 @@ void        Channel::AddAck (Datagram& dgram) {
                dgram.Push32(pos);
                //dgram.Push64(data_in_.time);
         ack_out_.set(pos);
-        dprintf("%s #%i +ack (%i,%lli) %s\n",tintstr(),id,
-                pos.layer(),pos.offset(),tintstr(data_in_.time));
+        dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
         data_in_ = tintbin(0,bin64_t::NONE);
        }
     for(int count=0; count<4; count++) {
@@ -241,7 +244,7 @@ void        Channel::AddAck (Datagram& dgram) {
         ack_out_.set(ack);
         dgram.Push8(P2TP_ACK);
         dgram.Push32(ack);
-        dprintf("%s #%i +ack (%i,%lli)\n",tintstr(),id,ack.layer(),ack.offset());
+        dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str());
     }
 }
 
@@ -272,8 +275,10 @@ void       Channel::Recv (Datagram& dgram) {
        }
     cc_->OnDataRecvd(data);
     last_recv_time_ = NOW;
-    if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC)
+    if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC) {
+        Datagram::Time();
         Send();
+    }
 }
 
 
@@ -282,7 +287,29 @@ void       Channel::OnHash (Datagram& dgram) {
        Sha1Hash hash = dgram.PullHash();
        file().OfferHash(pos,hash);
     //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
-    dprintf("%s #%i -hash (%i,%lli)\n",tintstr(),id,pos.layer(),pos.offset());
+    dprintf("%s #%i -hash %s\n",tintstr(),id,pos.str());
+}
+
+
+void    Channel::CleanFulfilledHints (bin64_t pos) {
+    int hi = 0;
+    while (hi<hint_out_.size() && hi<8 && !pos.within(hint_out_[hi].bin))
+        hi++;
+    if (hi<8 && hi<hint_out_.size()) {
+        while (hi--) {
+            transfer().picker().Expired(hint_out_.front().bin);
+            hint_out_.pop_front();
+        }
+        while (hint_out_.front().bin!=pos) {
+            tintbin f = hint_out_.front();
+            f.bin = f.bin.towards(pos);
+            hint_out_.front().bin = f.bin.sibling();
+            hint_out_.push_front(f);
+        }
+        hint_out_.pop_front();
+    }
+     // every HINT ends up as either Expired or Received 
+    transfer().picker().Received(pos);
 }
 
 
@@ -290,59 +317,51 @@ bin64_t Channel::OnData (Datagram& dgram) {
        bin64_t pos = dgram.Pull32();
     uint8_t *data;
     int length = dgram.Pull(&data,1024);
-    bool ok = file().OfferData(pos, (char*)data, length) ;
+    bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
     dprintf("%s #%i %cdata (%lli)\n",tintstr(),id,ok?'-':'!',pos.offset());
     if (!ok) 
         return bin64_t::NONE;
     data_in_ = tintbin(NOW,pos);
-    transfer().picker().Received(pos); // FIXME ugly
-    if (last_data_time_) {
-        tint dip = NOW - last_data_time_;
-        dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
-    }
-    last_data_time_ = NOW;
-    if (pos.within(hint_out_mark_.bin)) {
-        hint_out_mark_.bin = bin64_t::NONE;
+    if (pos!=bin64_t::NONE) {
+        if (last_recv_data_time_) {
+            tint dip = NOW - last_recv_data_time_;
+            dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
+        }
+        last_recv_data_time_ = NOW;
     }
-    if (hint_out_)
-        hint_out_--;
-    else if (hint_out_am_) // probably, the marking HINT was lost or whatever
-        hint_out_am_--;
+    CleanFulfilledHints(pos);    
     return pos;
 }
 
 
-void   Channel::OnAck (Datagram& dgram) {
-       bin64_t ackd_pos = dgram.Pull32();
-    if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
-        eprintf("invalid ack: (%i,%lli)\n",ackd_pos.layer(),ackd_pos.offset());
-        return;
-    }
-    dprintf("%s #%i -ack (%i,%lli)\n",tintstr(),id,ackd_pos.layer(),ackd_pos.offset());
+void    Channel::CleanFulfilledDataOut (bin64_t ackd_pos) {
     for (int i=0; i<8 && i<data_out_.size(); i++) 
-        if (data_out_[i].bin.within(ackd_pos)) {
+        if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
             tint rtt = NOW-data_out_[i].time;
-            rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
+            rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
             dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
             dprintf("%s #%i rtt %lli dev %lli\n",
                     tintstr(),id,rtt_avg_,dev_avg_);
-            cc_->OnAckRcvd(data_out_[i].bin); // may be invoked twice FIXME FIXME FIXME 
+            cc_->OnAckRcvd(data_out_[i].bin);
+            data_out_[i]=tintbin();
         }
-       ack_in_.set(ackd_pos);
-    while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
+    while ( data_out_.size() && ( data_out_.front()==tintbin() ||
+                                 ack_in_.get(data_out_.front().bin)==bins::FILLED ) )
         data_out_.pop_front();
 }
 
 
-/*void Channel::OnAckTs (Datagram& dgram) {  // FIXME:   OnTs
-       bin64_t pos = dgram.Pull32();
-    tint ts = dgram.Pull64();
-    // TODO sanity check
-    dprintf("%s #%i -ackts (%i,%lli) %s\n",
-            tintstr(),id,pos.layer(),pos.offset(),tintstr(ts));
-       ack_in_.set(pos);
-       cc_->OnAckRcvd(pos,ts);
-}*/
+void   Channel::OnAck (Datagram& dgram) {
+       bin64_t ackd_pos = dgram.Pull32();
+    if (ackd_pos!=bin64_t::NONE && file().size() && ackd_pos.base_offset()>=file().packet_size()) {
+        eprintf("invalid ack: %s\n",ackd_pos.str());
+        return;
+    }
+    dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str());
+    ack_in_.set(ackd_pos);
+    CleanFulfilledDataOut(ackd_pos);
+}
+
 
 void Channel::OnTs (Datagram& dgram) {
     peer_send_time_ = dgram.Pull64();
@@ -355,7 +374,7 @@ void        Channel::OnHint (Datagram& dgram) {
        hint_in_.push_back(hint);
     //ack_in_.set(hint,bins::EMPTY);
     //RequeueSend(cc_->OnHintRecvd(hint));
-    dprintf("%s #%i -hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
+    dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str());
 }
 
 
@@ -402,7 +421,7 @@ void        Channel::RecvDatagram (int socket) {
                if (hashid!=P2TP_HASH) 
                        RETLOG ("no hash in the initial handshake");
                bin64_t pos = data.Pull32();
-               if (pos!=bin64_t::ALL32
+               if (pos!=bin64_t::ALL) 
                        RETLOG ("that is not the root hash");
                hash = data.PullHash();
                FileTransfer* file = FileTransfer::Find(hash);
@@ -426,27 +445,11 @@ void      Channel::RecvDatagram (int socket) {
                        RETLOG ("invalid peer address");
         channel->own_id_mentioned_ = true;
        }
-    dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
+    //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
     channel->Recv(data);
 }
 
 
-bool tblater (const tintbin& a, const tintbin& b) {
-    return a.time > b.time;
-}
-
-
-void    Channel::RequeueSend (tint next_time) {
-    if (next_time==next_send_time_)
-        return;
-    next_send_time_ = next_time;
-    send_queue.push_back
-        (tintbin(next_time==TINT_NEVER?NOW+TINT_MIN:next_time,id));
-    push_heap(send_queue.begin(),send_queue.end(),tblater);
-    dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
-}
-
-
 void    Channel::Loop (tint howlong) {  
        
     tint limit = Datagram::Time() + howlong;
@@ -455,16 +458,15 @@ void    Channel::Loop (tint howlong) {
 
         tint send_time(TINT_NEVER);
         Channel* sender(NULL);
-        while (!send_queue.empty()) {
-            send_time = send_queue.front().time;
-            sender = channel((int)send_queue.front().bin);
+        while (!send_queue.is_empty()) {
+            send_time = send_queue.peek().time;
+            sender = channel((int)send_queue.peek().bin);
             if (sender)
                 if ( sender->next_send_time_==send_time ||
                      sender->next_send_time_==TINT_NEVER )
                 break;
             sender = NULL; // it was a stale entry
-            pop_heap(send_queue.begin(), send_queue.end(), tblater);
-            send_queue.pop_back();
+            send_queue.pop();
         }
         if (send_time>limit)
             send_time = limit;
@@ -473,23 +475,31 @@ void    Channel::Loop (tint howlong) {
                     tintstr(send_time));
             sender->Send();
             sender->last_send_time_ = NOW;
-            sender->RequeueSend(sender->cc_->NextSendTime());
-            pop_heap(send_queue.begin(), send_queue.end(), tblater);
-            send_queue.pop_back();
+            // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl
+            send_queue.pop();
         } else if ( send_time > NOW ) {
             tint towait = send_time - NOW;
             dprintf("%s waiting %lliusec\n",tintstr(),towait);
             int rd = Datagram::Wait(socket_count,sockets,towait);
             if (rd!=INVALID_SOCKET)
                 RecvDatagram(rd);
-        } else { //if (sender->next_send_time_==TINT_NEVER) { 
+        } else if (sender) { // FIXME FIXME FIXME REWRITE!!!  if (sender->next_send_time_==TINT_NEVER) { 
             dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
             delete sender;
-            pop_heap(send_queue.begin(), send_queue.end(), tblater);
-            send_queue.pop_back();
+            send_queue.pop();
         }
         
     } while (Datagram::Time()<limit);
        
 }
 
+void Channel::Schedule (tint next_time) {
+    if (next_time==next_send_time_)
+        return;
+    next_send_time_ = next_time;
+    if (next_time==TINT_NEVER)
+        next_time = NOW + TINT_MIN; // 1min timeout
+    send_queue.push(tintbin(next_time,id));
+    dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
+}
\ No newline at end of file
index 07a750f..36149da 100755 (executable)
@@ -51,7 +51,7 @@ TEST(BinsTest,SetGet) {
 TEST(BinsTest,Iterator) {
     bins b;
     b.set(bin64_t(3,1));
-    iterator i(&b,0,false);
+    iterator i(&b,bin64_t(0,0),false);
     while (!i.solid())
         i.left();
     EXPECT_EQ(bin64_t(3,0),i.bin());
index 5625300..fbd0e47 100644 (file)
@@ -51,7 +51,7 @@ TEST(TransferTest,TransferFile) {
     
     FileTransfer* seed_transfer = new FileTransfer(BTF);
     HashTree* seed = & seed_transfer->file();
-    EXPECT_TRUE(A==seed->hash(0));
+    EXPECT_TRUE(A==seed->hash(bin64_t(0,0)));
     EXPECT_TRUE(E==seed->hash(bin64_t(0,4)));
     EXPECT_TRUE(ABCD==seed->hash(bin64_t(2,0)));
     EXPECT_TRUE(ROOT==seed->root_hash());
index 62bc51b..8c2d42d 100644 (file)
 #include "p2tp.h"
 #include "compat/util.h"
 
+#include "ext/seq_picker.cpp" // FIXME FIXME FIXME FIXME 
+
 using namespace p2tp;
 
 std::vector<FileTransfer*> FileTransfer::files(20);
 
 #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
 
-#include "ext/seq_picker.cpp"
-
 // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
 
 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :