Towards ACK : HAVE split.
authorVictor Grishchenko <victor@TUD276042.tudelft.net>
Thu, 25 Feb 2010 17:47:29 +0000 (18:47 +0100)
committerVictor Grishchenko <victor@TUD276042.tudelft.net>
Thu, 25 Feb 2010 17:47:29 +0000 (18:47 +0100)
One message for all worked well on a single connection;
in a swarm, creates a mess.

Makefile [new file with mode: 0644]
sendrecv.cpp
swift.h

diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..a3c4828
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,7 @@
+CPPFLAGS=-I.
+
+all: swift
+
+swift: swift.o sha1.o compat.o sendrecv.o send_control.o hashtree.o bin64.o bins.o channel.o datagram.o transfer.o
+       g++ -I. *.o -o swift
+
index b36b41e..e357a02 100644 (file)
@@ -45,20 +45,25 @@ void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
 }
 
 
+bin64_t           Channel::ImposeHint () {
+    uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
+    twist &= file().peak(0); // FIXME may make it semi-seq here
+    file().ack_out().twist(twist);
+    ack_in_.twist(twist);
+    bin64_t my_pick =
+        file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
+    while (my_pick.width()>max(1,(int)cwnd_))
+        my_pick = my_pick.left();
+    file().ack_out().twist(0);
+    ack_in_.twist(0);
+    return my_pick.twisted(twist);
+}
+
+
 bin64_t        Channel::DequeueHint () {
-    if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {    
-        uint64_t twist = peer_channel_id_;  // got no hints, send something randomly
-        twist &= file().peak(0); // may make it semi-seq here
-        file().ack_out().twist(twist);
-        ack_in_.twist(twist);
-        bin64_t my_pick = 
-            file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
-        while (my_pick.width()>max(1,(int)cwnd_))
-            my_pick = my_pick.left();
-        file().ack_out().twist(0);
-        ack_in_.twist(0);
+    if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
+        bin64_t my_pick = ImposeHint(); // FIXME move to the loop
         if (my_pick!=bin64_t::NONE) {
-            my_pick = my_pick.twisted(twist);
             hint_in_.push_back(my_pick);
             dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
         }
@@ -74,7 +79,7 @@ bin64_t        Channel::DequeueHint () {
         }
         //if (time < NOW-TINT_SEC*3/2 )
         //    continue;  bad idea
-        if (ack_in_.get(hint)!=binmap_t::FILLED) 
+        if (ack_in_.get(hint)!=binmap_t::FILLED)
             send = hint;
     }
     uint64_t mass = 0;
@@ -112,7 +117,7 @@ void    Channel::Send () {
         if (!file().is_complete())
             AddHint(dgram);
         AddPex(dgram);
-        CleanDataOut();
+        TimeoutDataOut();
         data = AddData(dgram);
     } else {
         AddHandshake(dgram);
@@ -122,18 +127,6 @@ void    Channel::Send () {
             tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
     if (dgram.size()==4) {// only the channel id; bare keep-alive
         data = bin64_t::ALL;
-        //dprintf("%s #%u considering keepalive %i %f %s\n",
-        //        tintstr(),id_,(int)data_out_.size(),cwnd_,SEND_CONTROL_MODES[send_control_]);
-        //if (data_out_.size()<cwnd_ && send_control_!=KEEP_ALIVE_CONTROL) {
-            //if ( cwnd_ < 1 )
-            //    SwitchSendControl(KEEP_ALIVE_CONTROL);
-            //else
-            //    cwnd_ = cwnd_/2.0;
-        //}
-        //if (data_out_.empty() && send_control_!=KEEP_ALIVE_CONTROL)
-        //     SwitchSendControl(KEEP_ALIVE_CONTROL);// we did our best
-        //if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
-        //    return; // don't send empty dgram
     }
     if (dgram.Send()==-1)
         print_error("can't send datagram");
@@ -146,23 +139,20 @@ void    Channel::Send () {
 void    Channel::AddHint (Datagram& dgram) {
 
     tint plan_for = max(TINT_SEC,rtt_avg_*4);
-    
+
     tint timed_out = NOW - plan_for*2;
     while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
         hint_out_size_ -= hint_out_.front().bin.width();
         hint_out_.pop_front();
     }
-    
-    /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
-    if (!peer_cwnd)
-        peer_cwnd = 1;*/
+
     int plan_pck = max ( (tint)1, plan_for / dip_avg_ );
-    
+
     if ( hint_out_size_ < plan_pck ) {
-            
+
         int diff = plan_pck - hint_out_size_; // TODO: aggregate
         bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
-        
+
         if (hint!=bin64_t::NONE) {
             dgram.Push8(SWIFT_HINT);
             dgram.Push32(hint);
@@ -171,16 +161,16 @@ void    Channel::AddHint (Datagram& dgram) {
             hint_out_size_ += hint.width();
         } else
             dprintf("%s #%u Xhint\n",tintstr(),id_);
-        
+
     }
 }
 
 
 bin64_t        Channel::AddData (Datagram& dgram) {
-    
+
     if (!file().size()) // know nothing
         return bin64_t::NONE;
-    
+
     bin64_t tosend = bin64_t::NONE;
     tint luft = send_interval_>>4; // may wake up a bit earlier
     if (data_out_.size()<cwnd_ &&
@@ -194,10 +184,10 @@ bin64_t        Channel::AddData (Datagram& dgram) {
     } else
         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
                 tintstr(),id_,cwnd_,data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
-    
-    if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty())) 
+
+    if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
-    
+
     if (ack_in_.is_empty() && file().size())
         AddPeakHashes(dgram);
     AddUncleHashes(dgram,tosend);
@@ -208,12 +198,12 @@ bin64_t        Channel::AddData (Datagram& dgram) {
         dgram.Send(); // kind of fragmentation
         dgram.Push32(peer_channel_id_);
     }
-    
+
     dgram.Push8(SWIFT_DATA);
     dgram.Push32(tosend.to32());
-    
+
     uint8_t buf[1024];
-    size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
+    size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
     // TODO: corrupted data, retries, caching
     if (r<0) {
         print_error("error on reading");
@@ -221,56 +211,52 @@ bin64_t        Channel::AddData (Datagram& dgram) {
     }
     assert(dgram.space()>=r+4+1);
     dgram.Push(buf,r);
-    
+
     last_data_out_time_ = NOW;
     data_out_.push_back(tosend);
     dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
-    
+
     return tosend;
 }
 
 
-void    Channel::AddTs (Datagram& dgram) {
-    dgram.Push8(SWIFT_TS);
-    dgram.Push64(data_in_.time);
-    dprintf("%s #%u +ts %s\n",tintstr(),id_,tintstr(data_in_.time));
+void    Channel::AddAck (Datagram& dgram) {
+    if (data_in_==tintbin())
+        return;
+    dgram.Push8(SWIFT_ACK);
+    dgram.Push32(data_in_.bin.to32());
+    dgram.Push32(data_in_.time);
+    ack_out_.set(data_in_.bin);
+    dprintf("%s #%u +ack %s %s\n",
+        tintstr(),id_,data_in_.bin.str(),tintstr(data_in_.time));
+    if (data_in_.bin.layer()>2)
+        data_in_dbl_ = data_in_.bin;
+    data_in_ = tintbin();
 }
 
 
-void    Channel::AddAck (Datagram& dgram) {
+void    Channel::AddHave (Datagram& dgram) {
     if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
-        dgram.Push8(SWIFT_ACK);
+        dgram.Push8(SWIFT_HAVE);
         dgram.Push32(data_in_dbl_.to32());
         data_in_dbl_=bin64_t::NONE;
     }
-    if (data_in_.time!=TINT_NEVER) { // TODO: ACK NONE for corrupted data
-        AddTs(dgram);
-        bin64_t pos = data_in_.bin; // be precise file().ack_out().cover(data_in_.bin);
-        dgram.Push8(SWIFT_ACK);
-        dgram.Push32(pos.to32());
-        //dgram.Push64(data_in_.time);
-        ack_out_.set(pos);
-        dprintf("%s #%u +ack %s %s\n",tintstr(),id_,pos.str(),tintstr(data_in_.time));
-        data_in_ = tintbin(TINT_NEVER,bin64_t::NONE);
-        if (pos.layer()>2)
-            data_in_dbl_ = pos;
-    }
     for(int count=0; count<4; count++) {
-        bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, binmap_t::FILLED);
+        bin64_t ack = file().ack_out().find_filtered // FIXME: do rotating queue
+            (ack_out_, bin64_t::ALL, binmap_t::FILLED);
         if (ack==bin64_t::NONE)
             break;
         ack = file().ack_out().cover(ack);
         ack_out_.set(ack);
-        dgram.Push8(SWIFT_ACK);
+        dgram.Push8(SWIFT_HAVE);
         dgram.Push32(ack.to32());
-        dprintf("%s #%u +ack %s\n",tintstr(),id_,ack.str());
+        dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str());
     }
 }
 
 
 void    Channel::Recv (Datagram& dgram) {
     dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4);
-    peer_send_time_ = 0; // has scope of 1 datagram
     dgrams_rcvd_++;
     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
         rtt_avg_ = NOW - last_send_time_;
@@ -284,7 +270,7 @@ void    Channel::Recv (Datagram& dgram) {
         switch (type) {
             case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
             case SWIFT_DATA:      data=OnData(dgram); break;
-            case SWIFT_TS:        OnTs(dgram); break;
+            case SWIFT_HAVE:      OnHave(dgram); break;
             case SWIFT_ACK:       OnAck(dgram); break;
             case SWIFT_HASH:      OnHash(dgram); break;
             case SWIFT_HINT:      OnHint(dgram); break;
@@ -303,7 +289,6 @@ void    Channel::OnHash (Datagram& dgram) {
     bin64_t pos = dgram.Pull32();
     Sha1Hash hash = dgram.PullHash();
     file().OfferHash(pos,hash);
-    //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
     dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
 }
 
@@ -329,14 +314,14 @@ void    Channel::CleanHintOut (bin64_t pos) {
 }
 
 
-bin64_t Channel::OnData (Datagram& dgram) {
+bin64_t Channel::OnData (Datagram& dgram) {  // TODO: HAVE NONE for corrupted data
     bin64_t pos = dgram.Pull32();
     uint8_t *data;
     int length = dgram.Pull(&data,1024);
     bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
     dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
     data_in_ = tintbin(NOW,bin64_t::NONE);
-    if (!ok) 
+    if (!ok)
         return bin64_t::NONE;
     data_in_.bin = pos;
     if (pos!=bin64_t::NONE) {
@@ -346,74 +331,77 @@ bin64_t Channel::OnData (Datagram& dgram) {
         }
         last_data_in_time_ = NOW;
     }
-    CleanHintOut(pos);    
+    CleanHintOut(pos);
     return pos;
 }
 
 
-void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
-    
-    int max_ack_off = 0;
-    
-    if (ackd_pos!=bin64_t::NONE) {
-        for (int i=0; i<data_out_.size(); i++) {
-            if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
-                if (peer_send_time_)
-                    for(tbqueue::iterator j=data_out_tmo_.begin(); j!=data_out_tmo_.end(); j++)
-                        if (j->bin==data_out_[i].bin)
-                            peer_send_time_=0; // possibly retransmit
-                if (peer_send_time_) { // well, it is sorta ACK ACK
-                    tint rtt = NOW-data_out_[i].time;
-                    rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
-                    dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
-                    assert(data_out_[i].time!=TINT_NEVER);
-                    tint owd = peer_send_time_ - data_out_[i].time;
-                    owd_cur_bin_ = (owd_cur_bin_+1) & 3;
-                    owd_current_[owd_cur_bin_] = owd;
-                    if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
-                        owd_min_bin_start_ = NOW;
-                        owd_min_bin_ = (owd_min_bin_+1) & 3;
-                        owd_min_bins_[owd_min_bin_] = TINT_NEVER;
-                    }
-                    if (owd_min_bins_[owd_min_bin_]>owd)
-                        owd_min_bins_[owd_min_bin_] = owd;
-                    peer_send_time_ = 0;
-                }
-                dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
-                        tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str());
-                bin64_t pos = data_out_[i].bin;
-                ack_rcvd_recent_++;
-                data_out_[i]=tintbin();
-                max_ack_off = i;
-                if (ackd_pos==pos)
-                    break;
-            }
-        }
-        while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
-            data_out_.pop_front();
-            max_ack_off--;
-        }
-        static const int MAX_REORDERING = 2;  // the triple-ACK principle
-        if (max_ack_off>MAX_REORDERING) {
-            while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
-                                   || ack_in_.is_filled(data_out_.front().bin)) ) {
-                data_out_.pop_front();
-                max_ack_off--;
-            }
-            while (max_ack_off>MAX_REORDERING) {
-                ack_not_rcvd_recent_++;
-                data_out_tmo_.push_back(data_out_.front().bin);
-                dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
-                data_out_.pop_front();
-                max_ack_off--;
-                data_out_cap_ = bin64_t::ALL;
-            }
+void    Channel::OnAck (Datagram& dgram) {
+    bin64_t ackd_pos = dgram.Pull32();
+    tint peer_time_ = dgram.Pull64();
+    if (ackd_pos==bin64_t::NONE)
+        return; // likely, brocken packet / insufficient hashes
+    if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
+        eprintf("invalid ack: %s\n",ackd_pos.str());
+        return;
+    }
+    ack_in_.set(ackd_pos);
+    int di = 0, ri = 0;
+    // find an entry for the send (data out) event
+    while (  di<data_out_.size() && ( data_out_[di]==tintbin() ||
+           !data_out_[di].bin.within(ackd_pos) )  )
+        di++;
+    // FUTURE: delayed acks
+    // rule out retransmits
+    while (  ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
+        ri++;
+    dprintf("%s #%u %cack %s %s\n",tintstr(),id_,
+            di==data_out_.size()?'?':'-',ackd_pos.str(),tintstr(peer_time_));
+    if (ri==data_out_tmo_.size()) { // not a retransmit
+            // round trip time calculations
+        tint rtt = NOW-data_out_[di].time;
+        rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
+        dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+        assert(data_out_[di].time!=TINT_NEVER);
+            // one-way delay calculations
+        tint owd = peer_time_ - data_out_[di].time;
+        owd_cur_bin_ = (owd_cur_bin_+1) & 3;
+        owd_current_[owd_cur_bin_] = owd;
+        if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
+            owd_min_bin_start_ = NOW;
+            owd_min_bin_ = (owd_min_bin_+1) & 3;
+            owd_min_bins_[owd_min_bin_] = TINT_NEVER;
         }
-        peer_send_time_ = 0;
+        if (owd_min_bins_[owd_min_bin_]>owd)
+            owd_min_bins_[owd_min_bin_] = owd;
+        dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
+                tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str());
+        ack_rcvd_recent_++;
+        data_out_[di]=tintbin();
+    }
+    // early loss detection by packet reordering
+    for (int re=0; re<di-MAX_REORDERING; re++) {
+        if (data_out_[re]==tintbin())
+            continue;
+        ack_not_rcvd_recent_++;
+        data_out_tmo_.push_back(data_out_.front().bin);
+        dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
+        data_out_cap_ = bin64_t::ALL;
+        data_out_[ri] = tintbin();
     }
-    tint timeout = NOW - rtt_avg_ - 4*max(dev_avg_,TINT_MSEC*50);
-    while (!data_out_.empty() && data_out_.front().time<timeout) {
-        if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
+    // clear zeroed items
+    while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
+            ack_in_.is_filled(data_out_.front().bin) ) )
+        data_out_.pop_front();
+    assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
+}
+
+
+void Channel::TimeoutDataOut ( ) {
+    // losses: timeouted packets
+    tint timeout = NOW - ack_timeout();
+    for (int i=0; i<data_out_.size() && data_out_[i].time<timeout; i++) {
+        if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
             ack_not_rcvd_recent_++;
             data_out_cap_ = bin64_t::ALL;
             data_out_tmo_.push_back(data_out_.front().bin);
@@ -421,40 +409,25 @@ void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
         }
         data_out_.pop_front();
     }
-    while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
-        data_out_.pop_front();
-    assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
+    // clear retransmit queue of older items
     while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
         data_out_tmo_.pop_front();
-
 }
 
 
-void    Channel::OnAck (Datagram& dgram) {
+void Channel::OnHave (Datagram& dgram) {
     bin64_t ackd_pos = dgram.Pull32();
     if (ackd_pos==bin64_t::NONE)
-        return; // likely, brocken packet / insufficient hashes
-    if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
-        eprintf("invalid ack: %s\n",ackd_pos.str());
-        return;
-    }
-    dprintf("%s #%u -ack %s\n",tintstr(),id_,ackd_pos.str());
+        return; // wow, peer has hashes
     ack_in_.set(ackd_pos);
-    CleanDataOut(ackd_pos); // FIXME do AFTER all ACKs
-}
-
-
-void Channel::OnTs (Datagram& dgram) {
-    peer_send_time_ = dgram.Pull64();
-    dprintf("%s #%u -ts %lli\n",tintstr(),id_,peer_send_time_);
+    dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str());
 }
 
 
 void    Channel::OnHint (Datagram& dgram) {
     bin64_t hint = dgram.Pull32();
+    // FIXME: wake up here
     hint_in_.push_back(hint);
-    //ack_in_.set(hint,binmap_t::EMPTY);
-    //RequeueSend(cc_->OnHintRecvd(hint));
     dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
 }
 
@@ -501,40 +474,40 @@ Channel*    Channel::RecvDatagram (int socket) {
     data.Recv();
     const Address& addr = data.address();
 #define return_log(...) { printf(__VA_ARGS__); return NULL; }
-    if (data.size()<4) 
+    if (data.size()<4)
         return_log("datagram shorter than 4 bytes %s\n",addr.str());
     uint32_t mych = data.Pull32();
     Sha1Hash hash;
     Channel* channel = NULL;
     if (!mych) { // handshake initiated
-        if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
+        if (data.size()<1+4+1+4+Sha1Hash::SIZE)
             return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
                         tintstr(),data.size(),addr.str());
         uint8_t hashid = data.Pull8();
-        if (hashid!=SWIFT_HASH) 
+        if (hashid!=SWIFT_HASH)
             return_log ("%s #0 no hash in the initial handshake %s\n",
                         tintstr(),addr.str());
         bin64_t pos = data.Pull32();
-        if (pos!=bin64_t::ALL) 
+        if (pos!=bin64_t::ALL)
             return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
         hash = data.PullHash();
         FileTransfer* file = FileTransfer::Find(hash);
-        if (!file) 
+        if (!file)
             return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
-            if (channels[*i] && channels[*i]->peer_==data.address() && 
+            if (channels[*i] && channels[*i]->peer_==data.address() &&
                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
                 return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
         channel = new Channel(file, socket, data.address());
     } else {
         mych = DecodeID(mych);
-        if (mych>=channels.size()) 
+        if (mych>=channels.size())
             return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
         channel = channels[mych];
-        if (!channel) 
+        if (!channel)
             return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
-        if (channel->peer() != addr) 
+        if (channel->peer() != addr)
             return_log ("%s #%u invalid peer address %s!=%s\n",
                         tintstr(),mych,channel->peer().str(),addr.str());
         channel->own_id_mentioned_ = true;
@@ -545,10 +518,10 @@ Channel*    Channel::RecvDatagram (int socket) {
 }
 
 
-void    Channel::Loop (tint howlong) {  
-    
+void    Channel::Loop (tint howlong) {
+
     tint limit = Datagram::Time() + howlong;
-    
+
     do {
 
         tint send_time(TINT_NEVER);
@@ -561,16 +534,16 @@ void    Channel::Loop (tint howlong) {
                      sender->next_send_time_!=TINT_NEVER )
                 sender = NULL; // it was a stale entry
         }
-        
+
         if ( sender!=NULL && send_time<=NOW ) { // it's time
-            
+
             dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
                     tintstr(send_time));
             sender->Send();
             sender->Reschedule();
-            
+
         } else {  // it's too early, wait
-            
+
             tint towait = min(limit,send_time) - NOW;
             dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
             int rd = Datagram::Wait(socket_count,sockets,towait);
@@ -581,20 +554,21 @@ void    Channel::Loop (tint howlong) {
             }
             if (sender)  // get back to that later
                 send_queue.push(tintbin(send_time,sender->id()));
-            
+
         }
-        
+
     } while (NOW<limit);
-            
+
 }
 
+
 void Channel::Close () {
     this->SwitchSendControl(CLOSE_CONTROL);
 }
 
 
 void Channel::Reschedule () {
+    TimeoutDataOut(); // precaution to know free cwnd
     next_send_time_ = NextSendTime();
     if (next_send_time_!=TINT_NEVER) {
         assert(next_send_time_<NOW+TINT_MIN);
diff --git a/swift.h b/swift.h
index 5b61f46..df9398f 100644 (file)
--- a/swift.h
+++ b/swift.h
@@ -20,12 +20,12 @@ Messages
  DATA        01, bin_32, buffer
  1K of data.
 
- ACK        02, bin_32
ACKTS      08, bin_32, timestamp_32
+ ACK        02, bin_32, timestamp_32
HAVE       03, bin_32
  Confirms successfull delivery of data. Used for
  congestion control, as well.
 
- HINT        03, bin_32
+ HINT        08, bin_32
  Practical value of "hints" is to avoid overlap, mostly.
  Hints might be lost in the network or ignored.
  Peer might send out data without a hint.
@@ -61,9 +61,9 @@ Messages
 namespace swift {
 
     #define NOW Datagram::now
-    
+
     /** tintbin is basically a pair<tint,bin64_t> plus some nice operators.
-        Most frequently used in different queues (acknowledgements, requests, 
+        Most frequently used in different queues (acknowledgements, requests,
         etc). */
     struct tintbin {
         tint    time;
@@ -72,7 +72,7 @@ namespace swift {
         tintbin() : time(TINT_NEVER), bin(bin64_t::NONE) {}
         tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
         tintbin(bin64_t bin_) : time(NOW), bin(bin_) {}
-        bool operator < (const tintbin& b) const 
+        bool operator < (const tintbin& b) const
             { return time > b.time; }
         bool operator == (const tintbin& b) const
             { return time==b.time && bin==b.bin; }
@@ -110,13 +110,12 @@ namespace swift {
         SWIFT_HANDSHAKE = 0,
         SWIFT_DATA = 1,
         SWIFT_ACK = 2,
-        SWIFT_TS = 8,
-        SWIFT_HINT = 3,
+        SWIFT_HAVE = 3,
         SWIFT_HASH = 4,
         SWIFT_PEX_ADD = 5,
         SWIFT_PEX_RM = 6,
         SWIFT_SIGNED_HASH = 7,
-        SWIFT_MSGTYPE_SENT = 8,
+        SWIFT_HINT = 8,
         SWIFT_MSGTYPE_RCVD = 9,
         SWIFT_MESSAGE_COUNT = 10
     } messageid_t;
@@ -131,8 +130,8 @@ namespace swift {
 
     public:
 
-        /** A constructor. Open/submit/retrieve a file. 
-         *  @param file_name    the name of the file 
+        /** A constructor. Open/submit/retrieve a file.
+         *  @param file_name    the name of the file
          *  @param root_hash    the root hash of the file; zero hash if the file
                                 is newly submitted */
         FileTransfer(const char *file_name, const Sha1Hash& root_hash=Sha1Hash::ZERO);
@@ -184,7 +183,7 @@ namespace swift {
 
         /** Messages we are accepting.    */
         uint64_t        cap_out_;
-        
+
         tint            init_time_;
 
     public:
@@ -209,7 +208,7 @@ namespace swift {
     public:
         virtual void Randomize (uint64_t twist) = 0;
         /** The piece picking method itself.
-         *  @param  offered     the daata acknowledged by the peer 
+         *  @param  offered     the daata acknowledged by the peer
          *  @param  max_width   maximum number of packets to ask for
          *  @param  expires     (not used currently) when to consider request expired
          *  @return             the bin number to request */
@@ -238,11 +237,11 @@ namespace swift {
         being transferred between two peers. As we don't need buffers and
         lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
         Normally, API users do not deal with this class. */
-    class Channel {  
+    class Channel {
     public:
         Channel    (FileTransfer* file, int socket=-1, Address peer=Address());
         ~Channel();
-        
+
         typedef enum {
             KEEP_ALIVE_CONTROL,
             PING_PONG_CONTROL,
@@ -251,7 +250,7 @@ namespace swift {
             LEDBAT_CONTROL,
             CLOSE_CONTROL
         } send_control_t;
-        
+
         static const char* SEND_CONTROL_MODES[];
 
         static Channel*
@@ -263,7 +262,7 @@ namespace swift {
         void        Close ();
 
         void        OnAck (Datagram& dgram);
-        void        OnTs (Datagram& dgram);
+        void        OnHave (Datagram& dgram);
         bin64_t     OnData (Datagram& dgram);
         void        OnHint (Datagram& dgram);
         void        OnHash (Datagram& dgram);
@@ -272,7 +271,7 @@ namespace swift {
         void        AddHandshake (Datagram& dgram);
         bin64_t     AddData (Datagram& dgram);
         void        AddAck (Datagram& dgram);
-        void        AddTs (Datagram& dgram);
+        void        AddHave (Datagram& dgram);
         void        AddHint (Datagram& dgram);
         void        AddUncleHashes (Datagram& dgram, bin64_t pos);
         void        AddPeakHashes (Datagram& dgram);
@@ -287,7 +286,7 @@ namespace swift {
         tint        SlowStartNextSendTime ();
         tint        AimdNextSendTime ();
         tint        LedbatNextSendTime ();
-        
+
         static int  MAX_REORDERING;
         static tint TIMEOUT;
         static tint MIN_DEV;
@@ -298,7 +297,7 @@ namespace swift {
         static bool SELF_CONN_OK;
         static tint MAX_POSSIBLE_RTT;
         static FILE* debug_file;
-        
+
         const std::string id_string () const;
         /** A channel is "established" if had already sent and received packets. */
         bool        is_established () { return peer_channel_id_ && own_id_mentioned_; }
@@ -308,10 +307,10 @@ namespace swift {
         tint ack_timeout () {
                        tint dev = dev_avg_ < MIN_DEV ? MIN_DEV : dev_avg_;
                        tint tmo = rtt_avg_ + dev * 4;
-                       return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC; 
+                       return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC;
         }
         uint32_t    id () const { return id_; }
-        
+
         static int  DecodeID(int scrambled);
         static int  EncodeID(int unscrambled);
         static Channel* channel(int i) {
@@ -363,7 +362,6 @@ namespace swift {
         tint        last_data_in_time_;
         tint        last_loss_time_;
         tint        next_send_time_;
-        tint        peer_send_time_;
         /** Congestion window; TODO: int, bytes. */
         float       cwnd_;
         /** Data sending interval. */
@@ -391,7 +389,8 @@ namespace swift {
         }
         /** Get a request for one packet from the queue of peer's requests. */
         bin64_t     DequeueHint();
-        void        CleanDataOut (bin64_t acks_pos=bin64_t::NONE);
+        bin64_t     ImposeHint();
+        void        TimeoutDataOut ();
         void        CleanStaleHintOut();
         void        CleanHintOut(bin64_t pos);
         void        Reschedule();
@@ -401,7 +400,7 @@ namespace swift {
         static SOCKET   sockets[8];
         static int      socket_count;
         static tint     last_tick;
-        static tbheap   send_queue;        
+        static tbheap   send_queue;
 
         static Address  tracker;
         static std::vector<Channel*> channels;