reworking queues
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Fri, 20 Nov 2009 09:11:58 +0000 (09:11 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Fri, 20 Nov 2009 09:11:58 +0000 (09:11 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@607 e16421f0-f15b-0410-abcd-98678b794739

13 files changed:
bins.cpp
bins.h
datagram.cpp
datagram.h
do_tests.sh
exec/SConscript
exec/leecher.cpp
exec/seeder.cpp
ext/seq_picker.cpp
hashtree.cpp
p2tp.cpp
p2tp.h
sendrecv.cpp

index 23e942f..cfca3a2 100644 (file)
--- a/bins.cpp
+++ b/bins.cpp
@@ -421,6 +421,17 @@ uint64_t    bins::seq_length () {
     return i.pos.base_offset() + (*i==FILLED ? 1 : 0);
 }
 
+
+bool        bins::is_empty (bin64_t range)  {
+    if (range==bin64_t::ALL) 
+        return !deep(0) && !halves[0];
+    iterator i(this,range,false);
+    while ( i.pos!=range && (i.deep() || !i.solid()) )
+        i.towards(range);
+    return !i.deep() && *i==EMPTY;
+}
+
+
 binheap::binheap() {
     size_ = 32;
     heap_ = (bin64_t*) malloc(size_*sizeof(bin64_t));
diff --git a/bins.h b/bins.h
index 45ef4ea..8275c73 100644 (file)
--- a/bins.h
+++ b/bins.h
@@ -46,7 +46,7 @@ public:
 
     uint64_t    mass ();
     
-    bool        is_empty () const { return !deep(0) && !halves[0]; }
+    bool        is_empty (bin64_t range=bin64_t::ALL) ;
 
     void        clear ();
     
index f26a78f..ecb72df 100644 (file)
@@ -23,7 +23,7 @@ namespace p2tp {
 
 tint Datagram::now = Datagram::Time();
 tint Datagram::start = now;
-tint Datagram::epoch = now/360000000LL*360000000LL;
+tint Datagram::epoch = now/360000000LL*360000000LL; // make logs mergeable
 uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
 uint64_t Datagram::dgrams_up=0, Datagram::dgrams_down=0,
          Datagram::bytes_up=0, Datagram::bytes_down=0;
index bc3c52e..27a1caf 100644 (file)
@@ -93,11 +93,13 @@ struct Address {
         addr.sin_port==b.addr.sin_port &&
         addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
     }
-    std::string str () const {
-        char s[32];
-        sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
+    const char* str () const {
+        static char rs[4][32];
+        static int i;
+        i = (i+1) & 3;
+        sprintf(rs[i],"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
                 (ipv4()>>8)&0xff,ipv4()&0xff,port());
-        return std::string(s);
+        return rs[i];
     }
     bool operator != (const Address& b) const { return !(*this==b); }
 };
index d478ec6..d87ec38 100755 (executable)
@@ -1,5 +1,9 @@
 #!/bin/bash
 
 for tst in `ls tests/*test`; do
-    $tst
+    if echo $tst; $tst > $tst.log; then
+        echo $tst OK
+    else
+        echo $tst FAIL
+    fi
 done
index 5b2dd64..f195cef 100644 (file)
@@ -22,12 +22,12 @@ else:
 print "tests: libpath is",libpath
 
 
-env.Program( 
-    target='trial',
-    source=['trial.cpp'],
-    CPPPATH=cpppath,
-    LIBS=libs,
-    LIBPATH=libpath )
+#env.Program( 
+#    target='trial',
+#    source=['trial.cpp'],
+#    CPPPATH=cpppath,
+#    LIBS=libs,
+#    LIBPATH=libpath )
 
 env.Program( 
     target='seeder',
index 9f02950..c07253f 100644 (file)
@@ -35,7 +35,7 @@ int main (int argn, char** args) {
     Address tracker(args[3]), bindaddr;
 
     if (tracker==Address()) {
-        fprintf(stderr,"Tracker address format: [1.2.3.4:]12345\n");
+        fprintf(stderr,"Tracker address format: [1.2.3.4:]12345, not %s\n",args[3]);
         return -2;
     }
     if (argn>=5)
index 0cff2d7..629f7d4 100644 (file)
@@ -36,7 +36,7 @@ int main (int argn, char** args) {
     }
 
     assert(0<p2tp::Listen(bindaddr));
-    printf("seeder bound to %s\n",bindaddr.str().c_str());
+    printf("seeder bound to %s\n",bindaddr.str());
 
 
        int file = p2tp::Open(filename);
index 986c3ac..74557eb 100644 (file)
@@ -14,6 +14,7 @@ using namespace p2tp;
 class SeqPiecePicker : public PiecePicker {
     
     bins            ack_hint_out_;
+    tbqueue         hint_out_;
     FileTransfer*   transfer_;
     uint64_t        twist_;
     
@@ -33,10 +34,14 @@ public:
     }
     
     virtual bin64_t Pick (bins& offer, uint64_t max_width, tint expires) {
-        //dprintf("twist is %lli\n",twist_);
+        while (hint_out_.size() && hint_out_.front().time<NOW-TINT_SEC*3/2) {
+            ack_hint_out_.copy_range(file().ack_out(), hint_out_.front().bin);
+            hint_out_.pop_front();
+        }
         if (!file().size()) {
             return bin64_t(0,0); // whoever sends it first
         }
+    retry:
         twist_ &= (file().peak(0)) & ((1<<6)-1);
         if (twist_) {
             offer.twist(twist_);
@@ -48,25 +53,19 @@ public:
             offer.twist(0);
             ack_hint_out_.twist(0);
         }
-        if (hint==bin64_t::NONE)
+        if (hint==bin64_t::NONE) {
             return hint; // TODO: end-game mode
+        }
+        if (!file().ack_out().is_empty(hint)) { // unhinted/late data
+            ack_hint_out_.copy_range(file().ack_out(), hint);
+            goto retry;
+        }
         while (hint.width()>max_width)
             hint = hint.left();
         assert(ack_hint_out_.get(hint)==bins::EMPTY);
-        if (hint.offset() && file().ack_out().get(hint)!=bins::EMPTY) { // FIXME DEBUG remove
-            eprintf("bogus hint: %s\n",hint.str());
-            exit(1);
-        }
         ack_hint_out_.set(hint);
+        hint_out_.push_back(hint);
         return hint;
     }
     
-    void Received (bin64_t bin) {
-        ack_hint_out_.set(bin);
-    }
-    
-    void Expired (bin64_t bin) {
-        ack_hint_out_.copy_range(file().ack_out(), bin);
-    }
-    
 };
index 1c17153..fd3a713 100644 (file)
@@ -173,7 +173,8 @@ void            HashTree::RecoverProgress () {
         if (hashes_[pos]==Sha1Hash::ZERO)
             continue;
         size_t rd = read(fd_,buf,1<<10);
-        assert(rd==(1<<10) || p==packet_size()-1); // FIXME BUG
+        if (rd!=(1<<10) && p!=packet_size()-1)
+            break;
         if (rd==(1<<10) && !memcmp(buf, zeros, rd) &&
                 hashes_[pos]!=kilo_zero) // FIXME
             continue;
@@ -182,6 +183,8 @@ void            HashTree::RecoverProgress () {
         ack_out_.set(pos);
         completek_++;
         complete_+=rd;
+        if (rd!=(1<<10) && p==packet_size()-1)
+            size_ = ((sizek_-1)<<10) + rd;
     }
 }
 
index 9211e16..2ff6504 100644 (file)
--- a/p2tp.cpp
+++ b/p2tp.cpp
@@ -43,14 +43,14 @@ Channel::Channel    (FileTransfer* transfer, int socket, Address peer_addr) :
     data_out_cap_(bin64_t::ALL), last_send_data_time_(0), last_recv_data_time_(0),
     own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
     last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
-    data_in_dbl_(bin64_t::NONE)
+    data_in_dbl_(bin64_t::NONE), hint_out_size_(0)
 {
     if (peer_==Address())
         peer_ = tracker;
        this->id = channels.size();
        channels.push_back(this);
     cc_ = new PingPongController(this);
-    dprintf("%s #%i init %s\n",tintstr(),id,peer_.str().c_str());
+    dprintf("%s #%i init %s\n",tintstr(),id,peer_.str());
     Schedule(NOW); // FIXME ugly
 }
 
diff --git a/p2tp.h b/p2tp.h
index 90ea4b6..ebef7bc 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -192,8 +192,6 @@ namespace p2tp {
     public:
         virtual void Randomize (uint64_t twist) = 0;
         virtual bin64_t Pick (bins& offered, uint64_t max_width, tint expires) = 0;
-        virtual void Expired (bin64_t bin) = 0;
-        virtual void Received (bin64_t bin) = 0;
     };
 
 
@@ -284,6 +282,7 @@ namespace p2tp {
                tbqueue     hint_in_;
                /** Hints sent (to detect and reschedule ignored hints). */
         tbqueue     hint_out_;
+        uint64_t    hint_out_size_;
                /** The congestion control strategy. */
                SendController  *cc_;
         /** Types of messages the peer accepts. */
@@ -309,7 +308,7 @@ namespace p2tp {
         bin64_t                DequeueHint();
         void        ClearStaleDataOut ();
         void        CleanStaleHintOut();
-        void        CleanFulfilledHints(bin64_t pos);
+        void        CleanHintOut(bin64_t pos);
         void        CleanFulfilledDataOut(bin64_t pos);
         void        Schedule(tint send_time);
 
@@ -373,9 +372,6 @@ namespace p2tp {
        //uint32_t Width (const tbinvec& v);
 
 
-// FIXME kill this macro
-#define RETLOG(str) { fprintf(stderr,"%s\n",str); return; }
-
        /** Must be called by any client using the library */
        void LibraryInit(void);
 
index c328c18..da3fe14 100644 (file)
@@ -15,13 +15,8 @@ using namespace std; // FIXME remove
 
 /*
  TODO  25 Oct 18:55
- - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
- - ANY_LAYER
  - range: ALL
  - randomized testing of advanced ops (new testcase)
- - PeerCwnd()
- - bins hint_out_, tbqueue hint_out_ts_
  */
 
 void   Channel::AddPeakHashes (Datagram& dgram) {
@@ -125,7 +120,7 @@ void        Channel::Send () {
         AddHandshake(dgram);
         AddAck(dgram);
     }
-    dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str().c_str());
+    dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str());
     if (dgram.size()==4) // only the channel id; bare keep-alive
         data = bin64_t::ALL;
     cc_->OnDataSent(data);
@@ -134,33 +129,24 @@ void      Channel::Send () {
 }
 
 
-void   Channel::CleanStaleHintOut () {
-    tint timed_out = NOW - 8*rtt_avg_; // FIXME BULLSHIT (take rtt=0)
+void   Channel::AddHint (Datagram& dgram) {
+
+    tint timed_out = NOW - TINT_SEC*3/2;
        while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
-        transfer().picker().Expired(hint_out_.front().bin);
+        hint_out_size_ -= hint_out_.front().bin.width();
                hint_out_.pop_front();
        }
-}
-
-
-void   Channel::AddHint (Datagram& dgram) {
-
-    CleanStaleHintOut();
-    
-    uint64_t hint_out_mass=0;
-    for(int i=0; i<hint_out_.size(); i++)
-        hint_out_mass += hint_out_[i].bin.width();
     
     int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
     if (!peer_cwnd)
         peer_cwnd = 1;
-    int peer_pps = TINT_SEC / dip_avg_;
+    int peer_pps = TINT_SEC / dip_avg_; // data packets per sec
     if (!peer_pps)
         peer_pps = 1;
     
-    if ( hint_out_mass < peer_pps ) { //4*peer_cwnd ) {
+    if ( hint_out_size_ < peer_pps ) { //4*peer_cwnd ) {
             
-        int diff = peer_pps - hint_out_mass;
+        int diff = peer_pps - hint_out_size_;
         //if (diff>4 && diff>2*peer_cwnd)
         //    diff >>= 1;
         bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
@@ -168,8 +154,9 @@ void        Channel::AddHint (Datagram& dgram) {
         if (hint!=bin64_t::NONE) {
             dgram.Push8(P2TP_HINT);
             dgram.Push32(hint);
-            dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_mass);
+            dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
             hint_out_.push_back(hint);
+            hint_out_size_ += hint.width();
         } else
             dprintf("%s #%i .hint\n",tintstr(),id);
         
@@ -300,25 +287,24 @@ void      Channel::OnHash (Datagram& dgram) {
 }
 
 
-void    Channel::CleanFulfilledHints (bin64_t pos) {
+void    Channel::CleanHintOut (bin64_t pos) {
     int hi = 0;
-    while (hi<hint_out_.size() && hi<8 && !pos.within(hint_out_[hi].bin))
+    while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
         hi++;
-    if (hi<8 && hi<hint_out_.size()) {
-        while (hi--) {
-            transfer().picker().Expired(hint_out_.front().bin);
-            hint_out_.pop_front();
-        }
-        while (hint_out_.front().bin!=pos) {
-            tintbin f = hint_out_.front();
-            f.bin = f.bin.towards(pos);
-            hint_out_.front().bin = f.bin.sibling();
-            hint_out_.push_front(f);
-        }
+    if (hi==hint_out_.size())
+        return;
+    while (hi--) { // removing likely snubbed hints
+        hint_out_size_ -= hint_out_.front().bin.width();
         hint_out_.pop_front();
     }
-     // every HINT ends up as either Expired or Received 
-    transfer().picker().Received(pos);
+    while (hint_out_.front().bin!=pos) {
+        tintbin f = hint_out_.front();
+        f.bin = f.bin.towards(pos);
+        hint_out_.front().bin = f.bin.sibling();
+        hint_out_.push_front(f);
+    }
+    hint_out_.pop_front();
+    hint_out_size_--;
 }
 
 
@@ -338,7 +324,7 @@ bin64_t Channel::OnData (Datagram& dgram) {
         }
         last_recv_data_time_ = NOW;
     }
-    CleanFulfilledHints(pos);    
+    CleanHintOut(pos);    
     return pos;
 }
 
@@ -398,7 +384,7 @@ void Channel::OnPex (Datagram& dgram) {
     uint32_t ipv4 = dgram.Pull32();
     uint16_t port = dgram.Pull16();
     Address addr(ipv4,port);
-    dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str().c_str());
+    dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str());
     transfer().OnPexIn(addr);
 }
 
@@ -411,48 +397,48 @@ void    Channel::AddPex (Datagram& dgram) {
     dgram.Push8(P2TP_PEX_ADD);
     dgram.Push32(a.ipv4());
     dgram.Push16(a.port());
-    dprintf("%s #%i +pex %s\n",tintstr(),id,a.str().c_str());
+    dprintf("%s #%i +pex %s\n",tintstr(),id,a.str());
 }
 
 
 void   Channel::RecvDatagram (int socket) {
        Datagram data(socket);
        data.Recv();
+    Address& addr = data.addr;
+#define return_log(...) { eprintf(__VA_ARGS__); return; }
        if (data.size()<4) 
-               RETLOG("datagram shorter than 4 bytes");
+               return_log("datagram shorter than 4 bytes %s\n",addr.str());
        uint32_t mych = data.Pull32();
        Sha1Hash hash;
        Channel* channel = NULL;
        if (!mych) { // handshake initiated
                if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
-                       RETLOG ("incorrect size initial handshake packet");
+                       return_log ("incorrect size %i initial handshake packet %s\n",data.size(),addr.str());
                uint8_t hashid = data.Pull8();
                if (hashid!=P2TP_HASH) 
-                       RETLOG ("no hash in the initial handshake");
+                       return_log ("no hash in the initial handshake %s\n",addr.str());
                bin64_t pos = data.Pull32();
                if (pos!=bin64_t::ALL) 
-                       RETLOG ("that is not the root hash");
+                       return_log ("that is not the root hash %s\n",addr.str());
                hash = data.PullHash();
                FileTransfer* file = FileTransfer::Find(hash);
                if (!file) 
-                       RETLOG ("hash unknown, no such file");
+                       return_log ("hash %s unknown, no such file %s\n",hash.hex().c_str(),addr.str());
         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
             if (channels[*i] && channels[*i]->peer_==data.addr && 
                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2) 
-                RETLOG("have a channel already");
+                return_log("have a channel already to %s\n",addr.str());
                channel = new Channel(file, socket, data.address());
        } else {
                mych = DecodeID(mych);
-               if (mych>=channels.size()) {
-            eprintf("invalid channel #%i\n",mych);
-            return;
-        }
+               if (mych>=channels.size()) 
+            return_log("invalid channel #%i, %s\n",mych,addr.str());
                channel = channels[mych];
                if (!channel) 
-                       RETLOG ("channel is closed");
-               if (channel->peer() != data.address()
-                       RETLOG ("invalid peer address");
+                       return_log ("channel #%i is already closed\n",mych,addr.str());
+               if (channel->peer() != addr
+                       return_log ("invalid peer address #%i %s!=%s\n",mych,channel->peer().str(),addr.str());
         channel->own_id_mentioned_ = true;
        }
     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);