barely works
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Mon, 26 Oct 2009 15:27:30 +0000 (15:27 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Mon, 26 Oct 2009 15:27:30 +0000 (15:27 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@478 e16421f0-f15b-0410-abcd-98678b794739

23 files changed:
SConstruct
bins.cpp
bins.h
compat/hirestimeofday.cpp
compat/unixio.cpp
compat/unixio.h
datagram.cpp
datagram.h
doc/cc-states.png [new file with mode: 0644]
doc/sofi.jpg [new file with mode: 0644]
doc/state-diagram.pdf [new file with mode: 0644]
ext/ledbat_controller.cpp
ext/seq_picker.cpp
ext/simple_selector.cpp
p2tp.cpp
p2tp.h
sendrecv.cpp
tests/SConscript
tests/binstest2.cpp
tests/connecttest.cpp
tests/ledbattest.cpp
tests/transfertest.cpp
transfer.cpp

index 33970ee..c28c801 100644 (file)
@@ -26,7 +26,9 @@ DEBUG = True
 TestDir='tests'
 
 target = 'p2tp'
-source = [ 'bin64.cpp','hashtree.cpp','datagram.cpp','bins.cpp', 'transfer.cpp', 'compat/hirestimeofday.cpp', 'compat/util.cpp']
+source = [ 'bin64.cpp','hashtree.cpp','datagram.cpp','bins.cpp',
+    'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 
+    'compat/hirestimeofday.cpp', 'compat/util.cpp']
 
 env = Environment()
 if sys.platform == "win32":
@@ -83,7 +85,7 @@ else:
            cpppath = ""
            print "To use external libs, set CPPPATH environment variable to list of colon-separated include dirs"
        env.Append(CPPPATH=".:"+cpppath)
-        env.Append(LINKFLAGS="--static")
+    #env.Append(LINKFLAGS="--static")
 
        #if DEBUG:
        #       env.Append(CXXFLAGS="-g")
index f5a32b6..68dca27 100644 (file)
--- a/bins.cpp
+++ b/bins.cpp
@@ -317,13 +317,25 @@ void    bins::remove (bins& b) {
 }
 
 
+bin64_t     bins::cover(bin64_t val) {
+    iterator i(this,val,false);
+    while (i.pos!=val && !i.solid())
+        i.towards(val);
+    //if (!i.half && !halves[0])
+    //    return bin64_t::ALL;
+    return i.pos;
+}
+
+
 bin64_t     bins::find_filtered 
-    (bins& filter, const bin64_t range, const uint8_t layer, fill_t seek)  
+    (bins& filter, bin64_t range, const uint8_t layer, fill_t seek)  
 {
+    if (range==bin64_t::ALL)
+        range = bin64_t ( height>filter.height ? height : filter.height, 0 );
     iterator i(this,range,true), j(&filter,range,true);
     fill_t stop = seek==EMPTY ? FILLED : EMPTY;
     while (true) {
-        while ( i.bin().layer()>layer && (i.deep() || *i!=stop || j.deep() || *j!=EMPTY) )
+        while ( i.bin().layer()>layer && (i.deep() || *i!=stop || j.deep() || *j!=FILLED) )
             i.left(), j.left(); // TODO may optimize a lot here 
         if (i.bin().layer()==layer && !i.deep() && *i==seek && *j==EMPTY)
             return i.bin();
@@ -337,7 +349,10 @@ bin64_t     bins::find_filtered
     return bin64_t::NONE;    
 }
 
-void        bins::set_range (bins& origin, bin64_t range) { // FIXME unite with remove(); do bitwise()
+// FIXME unite with remove(); do bitwise()
+void        bins::copy_range (bins& origin, bin64_t range) { 
+    if (range==bin64_t::ALL)
+        range = bin64_t ( height>origin.height ? height : origin.height, 0 );
     iterator zis(this,range,true), zat(&origin,range,true);
     while (zis.pos.within(range)) {
         while (zis.deep() || zat.deep()) {
diff --git a/bins.h b/bins.h
index 86b5d04..919f893 100644 (file)
--- a/bins.h
+++ b/bins.h
@@ -26,12 +26,12 @@ public:
     
     void        set (bin64_t bin, fill_t val=FILLED); 
     
-    void        set_range (bins& origin, bin64_t range);
+    void        copy_range (bins& origin, bin64_t range);
     
     bin64_t     find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
     
     bin64_t     find_filtered
-        (bins& filter, const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
+        (bins& filter, bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
     
     void        remove (bins& b);
     
@@ -41,6 +41,8 @@ public:
 
     uint32_t    size() { return cells_allocated; }
     
+    bin64_t     cover(bin64_t val);
+    
     bool        empty () const { return !deep(0) && !halves[0]; }
     
     static bool is_mixed (uint16_t val) { return val!=EMPTY && val!=FILLED; }
index 6a03c00..f5aad7c 100644 (file)
@@ -5,7 +5,7 @@
  */\r
 \r
 #include <iostream>\r
-#include "compat/hirestimeofday.h"\r
+#include "hirestimeofday.h"\r
 \r
 #ifndef _WIN32\r
 #include <sys/time.h>\r
index 1c369a6..01b08ea 100644 (file)
@@ -2,10 +2,11 @@
  * Written by Arno Bakker\r
  * see LICENSE.txt for license information\r
  */\r
+#ifdef _WIN32\r
 \r
-#include "compat/unixio.h"\r
-#include <io.h>\r
+#include "unixio.h"\r
 #include <stdio.h>\r
+#include <io.h>\r
 #include <winsock2.h>\r
 \r
 size_t pread(int fildes, void *buf, size_t nbyte, long offset)\r
@@ -26,3 +27,5 @@ int inet_aton(const char *cp, struct in_addr *inp)
        inp->S_un.S_addr = inet_addr(cp);\r
        return 1;\r
 }\r
+\r
+#endif\r
index e36a1f1..94255c1 100644 (file)
@@ -4,6 +4,7 @@
  *\r
  * Defines UNIX like I/O calls and parameters for Win32\r
  */\r
+#ifdef _WIN32\r
 \r
 #ifndef UNIXIO_H_\r
 #define UNIXIO_H_\r
@@ -23,3 +24,5 @@ size_t pwrite(int fildes, const void *buf, size_t nbyte, long offset);
 int inet_aton(const char *cp, struct in_addr *inp);\r
 \r
 #endif /* UNIXIO_H_ */\r
+\r
+#endif // WIN32
\ No newline at end of file
index 5edadd7..ea6e60f 100644 (file)
 namespace p2tp {
 
 tint Datagram::now = Datagram::Time();
+tint Datagram::epoch = now;
 uint32_t Datagram::Address::LOCALHOST = INADDR_LOOPBACK;
 
+char* Datagram::TimeStr (tint time) {
+    static char ret_str[128];
+    if (time==0)
+        time = now;
+    time -= epoch;
+    int hours = time/TINT_HOUR;
+    time %= TINT_HOUR;
+    int mins = time/TINT_MIN;
+    time %= TINT_MIN;
+    int secs = time/TINT_SEC;
+    time %= TINT_SEC;
+    int msecs = time/TINT_MSEC;
+    time %= TINT_MSEC;
+    int usecs = time/TINT_uSEC;
+    sprintf(ret_str,"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
+    return ret_str;
+}
+    
 int Datagram::Send () {
        int r = sendto(sock,(const char *)buf+offset,length-offset,0,
                                   (struct sockaddr*)&(addr.addr),sizeof(struct sockaddr_in));
-       offset=0;
-       length=0;
-       now = Time();
+       //offset=0;
+       //length=0;
+       Time();
        return r;
 }
 
@@ -36,22 +55,19 @@ int Datagram::Recv () {
        offset = 0;
        length = recvfrom (sock, (char *)buf, MAXDGRAMSZ, 0,
                                           (struct sockaddr*)&(addr), &addrlen);
-       if (length<0)
+       if (length<0) // FIXME FIXME FIXME 
 #ifdef _WIN32
                PLOG(ERROR)<<"on recv" << WSAGetLastError() << "\n";
 #else
                PLOG(ERROR)<<"on recv";
 #endif
-       now = Time();
+       Time();
        return length;
 }
 
 
 SOCKET Datagram::Wait (int sockcnt, SOCKET* sockets, tint usec) {
-       // ARNOTODO: LOG commented out, it causes a crash on win32 (in a strlen()
-       // done as part of a std::local::name() ??
-       //
-       //LOG(INFO)<<"waiting for "<<sockcnt;
+       dprintf("waiting (%i socks)\n",sockcnt);
        struct timeval timeout;
        timeout.tv_sec = usec/TINT_SEC;
        timeout.tv_usec = usec%TINT_SEC;
@@ -66,19 +82,19 @@ SOCKET Datagram::Wait (int sockcnt, SOCKET* sockets, tint usec) {
                        max_sock_fd = sockets[i];
        }
        int sel = select(max_sock_fd+1, &bases, NULL, &err, &timeout);
+    Time();
        if (sel>0) {
                for (int i=0; i<=sockcnt; i++)
                        if (FD_ISSET(sockets[i],&bases))
                                return sockets[i];
-       } else if (sel<0)
+       } else if (sel<0) {
 #ifdef _WIN32
                PLOG(ERROR)<<"select fails" << WSAGetLastError() << "\n";
 #else
                PLOG(ERROR)<<"select fails";
 #endif
-
-       // Arno: may return 0 when timeout expired
-       return sel;
+    } 
+    return -1;
 }
 
 tint Datagram::Time () {
index 45099e2..48d09ee 100644 (file)
 
 namespace p2tp {
 
+typedef int64_t tint;
+#define TINT_HOUR ((tint)1000000*60*60)
+#define TINT_MIN ((tint)1000000*60)
+#define TINT_SEC ((tint)1000000)
+#define TINT_MSEC ((tint)1000)
+#define TINT_uSEC ((tint)1)
+#define TINT_NEVER ((tint)0x7fffffffffffffffLL)
 #define MAXDGRAMSZ 1400
 #ifndef _WIN32
 #define INVALID_SOCKET -1
@@ -62,13 +69,21 @@ struct Datagram {
             init(ipv4addr,port);
         }
         Address(const struct sockaddr_in& address) : addr(address) {}
+        uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); }
+        uint16_t port () const { return ntohs(addr.sin_port); }
         operator sockaddr_in () const {return addr;}
-        bool operator == (const Address& b) {
+        bool operator == (const Address& b) const { 
             return addr.sin_family==b.addr.sin_family &&
             addr.sin_port==b.addr.sin_port &&
             addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
         }
-        bool operator != (const Address& b) { return !(*this==b); }
+        std::string str () const {
+            char s[32];
+            sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
+                    (ipv4()>>8)&0xff,ipv4()&0xff,port());
+            return std::string(s);
+        }
+        bool operator != (const Address& b) const { return !(*this==b); }
     };
 
        Address addr;
@@ -79,8 +94,9 @@ struct Datagram {
        static SOCKET Bind(Address address);
        static void Close(int port);
        static tint Time();
+    static char* TimeStr(tint time=0);
        static SOCKET Wait (int sockcnt, SOCKET* sockets, tint usec=0);
-       static tint now;
+       static tint now, epoch;
 
        Datagram (SOCKET socket, const Address addr_) : addr(addr_), offset(0),
                length(0), sock(socket) {}
@@ -167,6 +183,7 @@ struct Datagram {
 };
 
 std::string sock2str (struct sockaddr_in addr);
+#define dprintf(...) printf(__VA_ARGS__)
 
 }
 
diff --git a/doc/cc-states.png b/doc/cc-states.png
new file mode 100644 (file)
index 0000000..57b5d49
Binary files /dev/null and b/doc/cc-states.png differ
diff --git a/doc/sofi.jpg b/doc/sofi.jpg
new file mode 100644 (file)
index 0000000..fe34680
Binary files /dev/null and b/doc/sofi.jpg differ
diff --git a/doc/state-diagram.pdf b/doc/state-diagram.pdf
new file mode 100644 (file)
index 0000000..1057574
Binary files /dev/null and b/doc/state-diagram.pdf differ
index 007a799..8febc9c 100644 (file)
@@ -18,8 +18,8 @@ public:
     int cwnd_, peer_cwnd_, in_flight_;
     bin64_t last_bin_sent_;
 public:
-    LedbatController () : dev_avg_(0), rtt_avg_(TINT_SEC), last_send_time_(0),
-    last_recv_time_(0), cwnd_(1), peer_cwnd_(1), in_flight_(0) {
+    LedbatController (int chid) : dev_avg_(0), rtt_avg_(TINT_SEC), last_send_time_(0),
+    last_recv_time_(0), cwnd_(1), peer_cwnd_(1), in_flight_(0), CongestionController(chid) {
     }
     
     tint    rtt_avg () {
@@ -31,6 +31,7 @@ public:
     }
     
     int     cwnd () {
+        // check for timeouts
         return cwnd_;
     }
     
@@ -39,7 +40,7 @@ public:
     }
     
     int     free_cwnd ( ){
-        return cwnd_ - in_flight_;
+        return cwnd() - in_flight_;
     }
     
     tint    next_send_time ( ){
@@ -63,7 +64,8 @@ public:
     void OnAckRcvd(bin64_t b, tint peer_stamp) {
         if (last_bin_sent_!=b)
             return;
-        rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac
+        if (peer_stamp!=TINT_NEVER)
+            rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac
         in_flight_--;
     }
     
index 1001d8e..eae0525 100644 (file)
@@ -13,34 +13,41 @@ using namespace p2tp;
 
 class SeqPiecePicker : public PiecePicker {
     
-    bins hint_out_;
-    FileTransfer* file_;
+    bins            ack_hint_out_;
+    FileTransfer*   file_;
     
 public:
     
-    SeqPiecePicker (FileTransfer* file_to_pick_from) : file_(file_to_pick_from), hint_out_() {
+    SeqPiecePicker (FileTransfer* file_to_pick_from) : file_(file_to_pick_from), ack_hint_out_() {
+        ack_hint_out_.copy_range(file_->ack_out(),bin64_t::ALL);
     }
     
-    virtual bin64_t Pick (bins& from, uint8_t layer) {
-        bins may_pick = from;
-        may_pick.remove (file_->ack_out());
-        may_pick.remove (hint_out_);
-        for (int l=layer; l>=0; l--) {
+    virtual bin64_t Pick (bins& offer, uint8_t layer) {
+        
+        bin64_t hint = offer.find_filtered
+            (ack_hint_out_,bin64_t::ALL,layer,bins::FILLED);
+        if (hint==bin64_t::NONE)
+            return hint; // TODO: end-game mode
+        while (hint.layer()>layer)
+            hint = hint.left();
+        ack_hint_out_.set(hint);
+        return hint;
+        /*for (int l=layer; l>=0; l--) {
             for(int i=0; i<file_->peak_count(); i++) {
                 bin64_t pick = may_pick.find(file_->peak(i),l,bins::FILLED);
                 if (pick!=bin64_t::NONE)
                     return pick;
             }
         }
-        return bin64_t::NONE;
+        return bin64_t::NONE;*/
     }
     
     virtual void    Received (bin64_t b) {
-        hint_out_.set(b,bins::EMPTY);
+        ack_hint_out_.set(b,bins::FILLED);
     }
     
-    virtual void    Snubbed (bin64_t b) {
-        hint_out_.set(b,bins::EMPTY);
+    virtual void    Expired (bin64_t b) {
+        ack_hint_out_.copy_range(file_->ack_out(),b);
     }
     
 };
\ No newline at end of file
index 2f165fa..790a205 100644 (file)
@@ -36,4 +36,3 @@ public:
     }
 };
 
-PeerSelector* Channel::peer_selector = new SimpleSelector();
\ No newline at end of file
index bbc7aa7..fedd68f 100644 (file)
--- a/p2tp.cpp
+++ b/p2tp.cpp
@@ -29,14 +29,23 @@ p2tp::tint Channel::TIMEOUT = TINT_SEC*60;
 std::vector<Channel*> Channel::channels(1);
 int Channel::sockets[8] = {0,0,0,0,0,0,0,0};
 int Channel::socket_count = 0;
-
-
-Channel::Channel       (FileTransfer* file, int socket, struct sockaddr_in peer_addr) :
-       file_(file), peer(peer_addr), peer_channel_id(0),
-       socket_(socket) // FIXME
+Address Channel::tracker;
+tbqueue Channel::send_queue;
+#include "ext/dummy_controller.cpp"
+#include "ext/simple_selector.cpp"
+PeerSelector* Channel::peer_selector = new SimpleSelector();
+
+Channel::Channel       (FileTransfer* file, int socket, Address peer_addr) :
+       file_(file), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
+    socket_(socket==-1?sockets[0]:socket), // FIXME
+    own_id_mentioned_(false), next_send_time_(0)
 {
+    if (peer_==Address())
+        peer_ = tracker;
        this->id = channels.size();
        channels.push_back(this);
+    cc_ = new BasicController();
+    RequeueSend(Datagram::now);
 }
 
 
@@ -45,6 +54,9 @@ Channel::~Channel () {
 }
 
 
+void     p2tp::SetTracker(const Address& tracker) {
+    Channel::tracker = tracker;
+}
 
 
 int Channel::DecodeID(int scrambled) {
@@ -76,6 +88,27 @@ void    p2tp::Loop (tint till) {
 }
 
 
+int      p2tp::Open (const char* filename, const Sha1Hash& hash) {
+    FileTransfer* ft = new FileTransfer(filename, hash);
+    int fdes = ft->file_descriptor();
+    if (fdes>0) {
+        
+        /*if (FileTransfer::files.size()<fdes)  // FIXME duplication
+            FileTransfer::files.resize(fdes);
+        FileTransfer::files[fdes] = ft;*/
+        
+        // initiate tracker connections
+        if (Channel::tracker!=Address())
+            new Channel(ft);
+        
+        return fdes;
+    } else {
+        delete ft;
+        return -1;
+    }
+}
+
+
 void   p2tp::Close (int fd) {
     // FIXME delete all channels
     if (fd>FileTransfer::files.size() && FileTransfer::files[fd])
@@ -111,6 +144,10 @@ size_t  p2tp::SeqComplete (int fdes) {
         return 0;
 }
 
+
+
+
+
 /**    <h2> P2TP handshake </h2>
  Basic rules:
  <ul>
diff --git a/p2tp.h b/p2tp.h
index 6b50663..9b4ecb2 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -70,11 +70,13 @@ namespace p2tp {
         tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
         tintbin() : time(0), bin(bin64_t::NONE) {}
         tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
+        tintbin(bin64_t bin_) : time(Datagram::now), bin(bin_) {}
     };
 
+    
        typedef std::deque<tintbin> tbqueue;
     typedef std::deque<bin64_t> binqueue;
-    typedef Datagram::Address Address;
+    typedef Datagram::Address   Address;
 
        typedef enum {
                P2TP_HANDSHAKE = 0,
@@ -111,6 +113,13 @@ namespace p2tp {
             accept or remember or drop. Returns true => ACK is sent. */
         bool            OfferData (bin64_t bin, const uint8_t* data, size_t length);
 
+        /** While we need to feed ACKs to every peer, we try (1) avoid
+            unnecessary duplication and (2) keep minimum state. Thus,
+            we use a rotating queue of bin completion events. */
+        //bin64_t         RevealAck (uint64_t& offset);
+        /** Rotating queue read for channels of this transmission. */
+        uint32_t        RevealChannel (int& i);
+        
         static FileTransfer* Find (const Sha1Hash& hash);
                static FileTransfer* file (int fd) {
             return fd<files.size() ? files[fd] : NULL;
@@ -125,7 +134,6 @@ namespace p2tp {
             return hashes_[pos];
         }
         const Sha1Hash& root_hash () const { return root_hash_; }
-        bin64_t         data_in (int offset);
         uint64_t        size () const { return size_; }
         uint64_t        size_kilo () const { return sizek_; }
         uint64_t        complete () const { return complete_; }
@@ -133,7 +141,8 @@ namespace p2tp {
         uint64_t        seq_complete () const { return seq_complete_; }
         bins&           ack_out ()  { return ack_out_; }
         int             file_descriptor () const { return fd_; }
-        PiecePicker*    picker () { return picker_; }
+        PiecePicker&    picker () { return *picker_; }
+        int             channel_count () const { return handshake_in_.size(); }
 
         static int instance; // FIXME this smells
 
@@ -151,10 +160,11 @@ namespace p2tp {
                size_t          complete_;
                size_t          completek_;
                size_t          seq_complete_;
-               /**     A map for all packets obtained and succesfully checked. */
+               /**     A binmap for all packets obtained and succesfully checked. */
                bins                    ack_out_;
                /**     History of bin retrieval. */
-               binqueue                data_in_;
+               //binqueue              data_in_;
+        //int             data_in_off_;
         /** Piece picker strategy. */
         PiecePicker*    picker_;
                /** File for keeping the Merkle hash tree. */
@@ -175,6 +185,12 @@ namespace p2tp {
         /** Error encountered */
         char*           error_;
 
+        /** Channels working for this transfer. */
+        std::deque<int> handshake_in_;
+        std::deque<Address>        pex_in_;
+        /** Messages we are accepting.    */
+        uint64_t        cap_out_;
+
     protected:
         void            SetSize(size_t bytes);
         void            Submit();
@@ -183,6 +199,8 @@ namespace p2tp {
         Sha1Hash        DeriveRoot();
         void            SavePeaks();
         void            LoadPeaks();
+        void            OnDataIn (bin64_t pos);
+        void            OnPexIn (const Address& addr);
 
                friend class Channel;
         friend size_t  Size (int fdes);
@@ -192,25 +210,24 @@ namespace p2tp {
         friend void    Close (int fd) ;
        };
 
-       class CongestionController {
-    public:
-        virtual tint    rtt_avg() = 0;
-        virtual tint    dev_avg() = 0;
-        virtual tint    next_send_time() = 0;
-        virtual int     cwnd() = 0;
-        virtual int     peer_cwnd() = 0;
+       struct CongestionController {
+        CongestionController () {}
         virtual int     free_cwnd() = 0;
-        virtual void    OnDataSent(bin64_t b) = 0;
-        virtual void    OnDataRecvd(bin64_t b) = 0;
-        virtual void    OnAckRcvd(const tintbin& tsack) = 0;
-               virtual         ~CongestionController() = 0;
+        virtual tint    RoundTripTime() = 0;
+        virtual tint    RoundTripTimeoutTime() = 0;
+        virtual int     PeerBPS() = 0;
+        virtual float   PeerCWindow() = 0;
+        virtual tint    OnDataSent(bin64_t b) = 0;
+        virtual tint    OnDataRecvd(bin64_t b) = 0;
+        virtual tint    OnAckRcvd(bin64_t ackd, tint peer_time=0) = 0;
+               virtual         ~CongestionController() {}
        };
 
     class PiecePicker {
     public:
-        virtual bin64_t Pick (bins& from, uint8_t layer) = 0;
+        virtual bin64_t Pick (bins& offered, uint8_t layer) = 0;
+        virtual void    Expired (bin64_t b) = 0;
         virtual void    Received (bin64_t b) = 0;
-        virtual void    Snubbed (bin64_t b) = 0;
     };
 
     class PeerSelector {
@@ -235,18 +252,18 @@ namespace p2tp {
         hash or a fragment of it into every datagram.) */
        class Channel {
        public:
-               Channel (FileTransfer* file, int socket, struct sockaddr_in peer);
+               Channel (FileTransfer* file, int socket=-1, Address peer=Address());
                ~Channel();
 
                static void     Recv (int socket);
         static void Loop (tint till);
 
                void            Recv (Datagram& dgram);
-               tint            Send ();
+               void            Send ();
 
                void            OnAck (Datagram& dgram);
                void            OnAckTs (Datagram& dgram);
-               void            OnData (Datagram& dgram);
+               bin64_t         OnData (Datagram& dgram);
                void            OnHint (Datagram& dgram);
                void            OnHash (Datagram& dgram);
                void            OnPex (Datagram& dgram);
@@ -257,10 +274,13 @@ namespace p2tp {
                void            AddHint (Datagram& dgram);
                void            AddUncleHashes (Datagram& dgram, bin64_t pos);
                void            AddPeakHashes (Datagram& dgram);
+               void            AddPex (Datagram& dgram);
 
         const std::string id_string () const;
         /** A channel is "established" if had already sent and received packets. */
-        bool        is_established () { return peer_channel_id && own_id_mentioned; }
+        bool        is_established () { return peer_channel_id_ && own_id_mentioned_; }
+        FileTransfer& file() { return *file_; }
+        const Address& peer() const { return peer_; }
 
                static int DecodeID(int scrambled);
                static int EncodeID(int unscrambled);
@@ -268,52 +288,64 @@ namespace p2tp {
             return i<channels.size()?channels[i]:NULL;
         }
 
-        FileTransfer& file() { return *file_; }
-
        private:
 
                /** Channel id: index in the channel array. */
                uint32_t        id;
                /**     Socket address of the peer. */
-        Datagram::Address      peer;
+        Datagram::Address      peer_;
                /**     The UDP socket fd. */
                int                     socket_;
                /**     Descriptor of the file in question. */
                FileTransfer*   file_;
                /**     Peer channel id; zero if we are trying to open a channel. */
-               uint32_t        peer_channel_id;
-        bool        own_id_mentioned;
+               uint32_t        peer_channel_id_;
+        bool        own_id_mentioned_;
                /**     Peer's progress, based on acknowledgements. */
-               bins            ack_in;
+               bins            ack_in_;
                /**     Last data received; needs to be acked immediately. */
                tintbin         data_in_;
         /** Index in the history array. */
-               uint32_t        ack_out_;
+               bins        ack_out_;
                /**     Transmit schedule: in most cases filled with the peer's hints */
-               binqueue    hint_in;
+               binqueue    hint_in_;
                /** Hints sent (to detect and reschedule ignored hints). */
-               tbqueue         hint_out;
+               tbqueue         hint_out_;
                /** The congestion control strategy. */
-               CongestionController    *cc;
+               CongestionController    *cc_;
+        /** Types of messages the peer accepts. */
+        uint64_t    cap_in_;
         /** For repeats. */
-               tint            last_send_time, last_recv_time;
-
+               //tint          last_send_time, last_recv_time;
+        /** PEX progress */
+        int         pex_out_;
+
+        tint        next_send_time_;
+        static      tbqueue send_queue;
+        void        RequeueSend (tint next_time);
+        
         /** Get a rewuest for one packet from the queue of peer's requests. */
         bin64_t                DequeueHint();
-        void        CleanStaleHints();
+        //void        CleanStaleHints();
 
         static PeerSelector* peer_selector;
 
                static int      MAX_REORDERING;
                static tint     TIMEOUT;
-               static std::vector<Channel*> channels;
         static int      sockets[8];
         static int      socket_count;
                static tint     last_tick;
 
-        friend int     Listen (Datagram::Address addr);
-        friend void    Shutdown (int sock_des);
-        friend void    AddPeer (Datagram::Address address, const Sha1Hash& root);
+        static Address  tracker;
+               static std::vector<Channel*> channels;
+
+        friend int      Listen (Datagram::Address addr);
+        friend void     Shutdown (int sock_des);
+        friend void     AddPeer (Datagram::Address address, const Sha1Hash& root);
+        friend void     SetTracker(const Address& tracker);
+        friend int      Open (const char*, const Sha1Hash&) ; // FIXME
+        
+        friend class FileTransfer; // FIXME!!!
        };
 
 
@@ -335,7 +367,9 @@ namespace p2tp {
         root hash is zero, the peer might be talked to regarding any transmission
         (likely, a tracker, cache or an archive). */
     void    AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO);
-
+    
+    void    SetTracker(const Address& tracker);
+    
     /** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte
         before the transmission is complete. */
     size_t  Size (int fdes);
@@ -349,12 +383,15 @@ namespace p2tp {
 
        //uint32_t Width (const tbinvec& v);
 
-       void LibraryInit(void);
+
+// FIXME kill this macro
+#define RETLOG(str) { fprintf(stderr,"%s\n",str); return; }
+
        /** Must be called by any client using the library */
+       void LibraryInit(void);
 
 
 } // namespace end
 
-#define RETLOG(str) { LOG(WARNING)<<str; return; }
 
 #endif
index 57beea7..ffd3667 100644 (file)
 using namespace std;
 using namespace p2tp;
 
+/*
+ TODO  25 Oct 18:55
+ - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
+ - ANY_LAYER
+ - range: ALL
+ - randomized testing of advanced ops (new testcase)
+ - PeerCwnd()
+ - bins hint_out_, tbqueue hint_out_ts_
+ */
+
 void   Channel::AddPeakHashes (Datagram& dgram) {
        for(int i=0; i<file().peak_count(); i++) {
+        bin64_t peak = file().peak(i);
                dgram.Push8(P2TP_HASH);
-               dgram.Push32((uint32_t)file().peak(i));
+               dgram.Push32((uint32_t)peak);
                dgram.PushHash(file().peak_hash(i));
-        DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
+        //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
+        dprintf("%s #%i +phash (%i,%lli)\n",Datagram::TimeStr(),id,peak.layer(),peak.offset());
        }
 }
 
 
 void   Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
     bin64_t peak = file().peak_for(pos);
-    while (pos!=peak && ack_in.get(pos.parent())==bins::EMPTY) {
+    while (pos!=peak && ack_in_.get(pos.parent())==bins::EMPTY) {
         bin64_t uncle = pos.sibling();
                dgram.Push8(P2TP_HASH);
                dgram.Push32((uint32_t)uncle);
                dgram.PushHash( file().hash(uncle) );
-        DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
+        //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
+        dprintf("%s #%i +hash (%i,%lli)\n",Datagram::TimeStr(),id,uncle.layer(),uncle.offset());
         pos = pos.parent();
     }
 }
 
 
 bin64_t                Channel::DequeueHint () { // TODO: resilience
-       while (!hint_in.empty()) {
-               bin64_t hint = hint_in.front();
-               hint_in.pop_front();
-               if (ack_in.get(hint)==bins::FILLED)
-                       continue;
-               if ( file().ack_out().get(hint)==bins::EMPTY )
-                       continue;
-               if (!hint.is_base()) {
-                       bin64_t l=hint.left(), r=hint.right();
-                       //if (rand()&1)
-                       //      swap(l,r);
-                       hint_in.push_front(r);
-                       hint_in.push_front(l);
-                       continue;
-               }
-               return hint;
-       }
-       return bin64_t::NONE;
+    bin64_t send = bin64_t::NONE;
+    while (!hint_in_.empty() && send==bin64_t::NONE) {
+        bin64_t hint = hint_in_.front();
+        hint_in_.pop_front();
+        send = file().ack_out().find_filtered
+            (ack_in_,hint,0,bins::FILLED);
+        if (send!=bin64_t::NONE)
+            while (send!=hint) {
+                hint = hint.towards(send);
+                hint_in_.push_front(hint.sibling());
+            }
+    }
+    return send;
 }
 
 
-void   Channel::CleanStaleHints () {
+/*void Channel::CleanStaleHints () {
        while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED ) 
                hint_out.pop_front();  // FIXME must normally clear fulfilled entries
-       tint timed_out = Datagram::now - cc->rtt_avg()*8;
+       tint timed_out = Datagram::now - cc_->RoundTripTime()*8;
        while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
         file().picker()->Snubbed(hint_out.front().bin);
                hint_out.pop_front();
        }
-}
+}*/
 
 
 void   Channel::AddHandshake (Datagram& dgram) {
-       dgram.Push8(P2TP_HANDSHAKE);
-       dgram.Push32(EncodeID(id));
-       if (!peer_channel_id) { // initiating
+       if (!peer_channel_id_) { // initiating
                dgram.Push8(P2TP_HASH);
                dgram.Push32(bin64_t::ALL32);
                dgram.PushHash(file().root_hash());
+        dprintf("%s #%i +hash ALL %s\n",
+                Datagram::TimeStr(),id,file().root_hash().hex().c_str());
        }
+       dgram.Push8(P2TP_HANDSHAKE);
+       dgram.Push32(EncodeID(id));
+    dprintf("%s #%i +hs\n",Datagram::TimeStr(),id);
     AddAck(dgram);
-    //DLOG(INFO)<<"#"<<id<<" sending a handshake to "<<this->id_string();
 }
 
 
-tint   Channel::Send () {
-    Datagram dgram(socket_,peer);
-    dgram.Push32(peer_channel_id);
+void   Channel::Send () {
+    Datagram dgram(socket_,peer());
+    dgram.Push32(peer_channel_id_);
+    bin64_t data = bin64_t::NONE;
     if ( is_established() ) {
         AddAck(dgram);
         AddHint(dgram);
-        if (cc->free_cwnd() && Datagram::now>=cc->next_send_time()) {
-            bin64_t data = AddData(dgram);
-            cc->OnDataSent(data);
-        }
+        if (cc_->free_cwnd()) 
+            data = AddData(dgram);
     } else {
         AddHandshake(dgram);
+        AddAck(dgram);
     }
-    DLOG(INFO)<<"#"<<id<<" sending "<<dgram.size()<<" bytes";
+    dprintf("%s #%i sent %ib %s\n",Datagram::TimeStr(),id,dgram.size(),peer().str().c_str());
        PCHECK( dgram.Send() != -1 )<<"error sending";
-       last_send_time = Datagram::now;
-    return cc->next_send_time();
+    if (dgram.size()==4) // only the channel id; bare keep-alive
+        data = bin64_t::ALL;
+    RequeueSend(cc_->OnDataSent(data));
 }
 
 
 void   Channel::AddHint (Datagram& dgram) {
-       CleanStaleHints();
-    uint64_t outstanding = 0;
-    for(tbqueue::iterator i=hint_out.begin(); i!=hint_out.end(); i++)
-        outstanding += i->bin.width();
-       uint64_t kbps = TINT_SEC * cc->peer_cwnd() / cc->rtt_avg();
-    if (outstanding>kbps) // have enough
-        return;
-    uint8_t layer = 0;
-    while( (1<<layer) < kbps ) layer++;
-    bin64_t hint = file().picker()->Pick(ack_in,layer);
-    if (hint==bin64_t::NONE)
-        return;
-    dgram.Push8(P2TP_HINT);
-    dgram.Push32(hint);
-    hint_out.push_back(tintbin(Datagram::now,hint));
-    DLOG(INFO)<<"#"<<id<<" +HINT"<<hint;
+
+    tint hint_timeout = Datagram::now - 2*TINT_SEC;
+    while (!hint_out_.empty() && hint_out_.front().time<hint_timeout) {
+        tintbin old_hint = hint_out_.front();
+        file().picker().Expired(old_hint.bin);
+        hint_out_.pop_front();
+    }
+    // FIXME  weird weird weird
+    uint16_t state;
+    while ( !hint_out_.empty() && (state=file().ack_out().get(hint_out_.front().bin)) != bins::EMPTY ) {
+        if (state==bins::FILLED) {
+            hint_out_.pop_front();
+        } else {
+            tintbin old_hint = hint_out_.front();
+            hint_out_.pop_front();
+            old_hint.bin = old_hint.bin.right();
+            hint_out_.push_front(old_hint);
+            old_hint.bin = old_hint.bin.sibling();
+            hint_out_.push_front(old_hint);
+        }
+    }
+    
+    uint64_t hinted = 0;
+    for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
+        hinted+=i->bin.width();
+    
+    float peer_cwnd = cc_->PeerBPS() * cc_->RoundTripTime() / TINT_SEC;
+    
+    if ( hinted*1024 < peer_cwnd*4 ) {
+        
+        uint8_t layer = 0;
+        bin64_t hint = file().picker().Pick(ack_in_,layer);
+        
+        if (hint!=bin64_t::NONE) {
+            this->hint_out_.push_back(tintbin(hint));
+            dgram.Push8(P2TP_HINT);
+            dgram.Push32(hint);
+            dprintf("%s #%i +hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset());
+        }
+        
+    }
 }
 
 
@@ -128,54 +165,60 @@ bin64_t           Channel::AddData (Datagram& dgram) {
        if (!file().size()) // know nothing
                return bin64_t::NONE;
        bin64_t tosend = DequeueHint();
-    if (tosend==bin64_t::NONE) {
-        //LOG(WARNING)<<this->id_string()<<" no idea what to send";
-               cc->OnDataSent(bin64_t::NONE);
-               return bin64_t::NONE;
-       }
-       if (ack_in.empty() && file().size())
-               AddPeakHashes(dgram);
-       AddUncleHashes(dgram,tosend);
-       uint8_t buf[1024];
-       size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); // TODO: ??? corrupted data, retries
-       if (r<0) {
-               PLOG(ERROR)<<"error on reading";
-               return 0;
-       }
-       assert(dgram.space()>=r+4+1);
-       dgram.Push8(P2TP_DATA);
-       dgram.Push32(tosend);
-       dgram.Push(buf,r);
-    DLOG(INFO)<<"#"<<id<<" +DATA"<<tosend;
+    if (tosend==bin64_t::NONE) 
+        return bin64_t::NONE;
+    if (ack_in_.empty() && file().size())
+        AddPeakHashes(dgram);
+    AddUncleHashes(dgram,tosend);
+    uint8_t buf[1024];
+    size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); 
+    // TODO: ??? corrupted data, retries
+    if (r<0) {
+        PLOG(ERROR)<<"error on reading";
+        return 0;
+    }
+    assert(dgram.space()>=r+4+1);
+    dgram.Push8(P2TP_DATA);
+    dgram.Push32(tosend);
+    dgram.Push(buf,r);
+    dprintf("%s #%i +data (%lli)\n",Datagram::TimeStr(),id,tosend.base_offset());
        return tosend;
 }
 
 
 void   Channel::AddAck (Datagram& dgram) {
-       if (data_in_.time) {
+       if (data_in_.bin!=bin64_t::NONE) {
+        bin64_t pos = data_in_.bin;
                dgram.Push8(P2TP_ACK_TS);
-               dgram.Push32(data_in_.bin);
+               dgram.Push32(pos);
                dgram.Push64(data_in_.time);
-        data_in_.time = 0;
-        DLOG(INFO)<<"#"<<id<<" +!ACK"<<data_in_.bin;
+        ack_out_.set(pos);
+        dprintf("%s #%i +ackts (%i,%lli) %s\n",Datagram::TimeStr(),id,
+                pos.layer(),pos.offset(),Datagram::TimeStr(data_in_.time));
+        data_in_ = tintbin(0,bin64_t::NONE);
        }
-    bin64_t h=file().data_in(ack_out_);
-    int count=0;
-    while (h!=bin64_t::NONE && count++<4) {
+    for(int count=0; count<4; count++) {
+        bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, 0, bins::FILLED);
+        // TODO bins::ANY_LAYER
+        if (ack==bin64_t::NONE)
+            break;
+        while (file().ack_out().get(ack.parent())==bins::FILLED)
+            ack = ack.parent();
+        ack_out_.set(ack);
         dgram.Push8(P2TP_ACK);
-        dgram.Push32(h);
-        DLOG(INFO)<<"#"<<id<<" +ACK"<<h;
-        h=file().data_in(++ack_out_);
+        dgram.Push32(ack);
+        dprintf("%s #%i +ack (%i,%lli)\n",Datagram::TimeStr(),id,ack.layer(),ack.offset());
     }
 }
 
 
 void   Channel::Recv (Datagram& dgram) {
+    bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
        while (dgram.size()) {
                uint8_t type = dgram.Pull8();
                switch (type) {
             case P2TP_HANDSHAKE: OnHandshake(dgram); break;
-                       case P2TP_DATA:         OnData(dgram); break;
+                       case P2TP_DATA:         data=OnData(dgram); break;
                        case P2TP_ACK_TS:       OnAckTs(dgram); break;
                        case P2TP_ACK:          OnAck(dgram); break;
                        case P2TP_HASH:         OnHash(dgram); break;
@@ -186,6 +229,7 @@ void        Channel::Recv (Datagram& dgram) {
                                return;
                }
        }
+    RequeueSend(cc_->OnDataRecvd(data));
 }
 
 
@@ -193,54 +237,88 @@ void      Channel::OnHash (Datagram& dgram) {
        bin64_t pos = dgram.Pull32();
        Sha1Hash hash = dgram.PullHash();
        file().OfferHash(pos,hash);
-    DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
+    //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
+    dprintf("%s #%i -hash (%i,%lli)\n",Datagram::TimeStr(),id,pos.layer(),pos.offset());
 }
 
 
-void Channel::OnData (Datagram& dgram) {
+bin64_t Channel::OnData (Datagram& dgram) {
        bin64_t pos = dgram.Pull32();
-    DLOG(INFO)<<"#"<<id<<" .DATA"<<pos;
-    file().OfferData(pos, *dgram, dgram.size());
-       cc->OnDataRecvd(pos);
-       CleanStaleHints();
+    uint8_t *data;
+    int length = dgram.Pull(&data,1024);
+    bool ok = file().OfferData(pos, data, length) ;
+    dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset());
+    data_in_ = tintbin(Datagram::now,pos);
+    return ok ? pos : bin64_t::none();
 }
 
 
 void   Channel::OnAck (Datagram& dgram) {
        // note: no bound checking
        bin64_t pos = dgram.Pull32();
-    DLOG(INFO)<<"#"<<id<<" .ACK"<<pos;
-       ack_in.set(pos);
+    dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,pos.layer(),pos.offset());
+       ack_in_.set(pos);
+       RequeueSend(cc_->OnAckRcvd(pos,0));
 }
 
 
 void   Channel::OnAckTs (Datagram& dgram) {
        bin64_t pos = dgram.Pull32();
     tint ts = dgram.Pull64();
-    DLOG(INFO)<<"#"<<id<<" ,ACK"<<pos;
-    //dprintf("%lli #%i +ack %lli +ts %lli",Datagram::now,id,pos,ts);
-       ack_in.set(pos);
-       cc->OnAckRcvd(tintbin(ts,pos));
+    // TODO sanity check
+    dprintf("%s #%i -ackts (%i,%lli) %s\n",
+            Datagram::TimeStr(),id,pos.layer(),pos.offset(),Datagram::TimeStr(ts));
+       ack_in_.set(pos);
+       RequeueSend(cc_->OnAckRcvd(pos,ts));
 }
 
 
 void   Channel::OnHint (Datagram& dgram) {
        bin64_t hint = dgram.Pull32();
-       hint_in.push_back(hint);
+       hint_in_.push_back(hint);
+    dprintf("%s #%i -hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset());
 }
 
 
 void Channel::OnHandshake (Datagram& dgram) {
-    peer_channel_id = dgram.Pull32();
+    if (!peer_channel_id_)
+        cc_->OnAckRcvd(bin64_t::ALL);
+    peer_channel_id_ = dgram.Pull32();
+    dprintf("%s #%i -hs %i\n",Datagram::TimeStr(),id,peer_channel_id_);
     // FUTURE: channel forking
 }
 
 
 void Channel::OnPex (Datagram& dgram) {
-    uint32_t addr = dgram.Pull32();
+    uint32_t ipv4 = dgram.Pull32();
     uint16_t port = dgram.Pull16();
-    if (peer_selector)
-        peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
+    Address addr(ipv4,port);
+    //if (peer_selector)
+    //    peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
+    file_->pex_in_.push_back(addr);
+    if (file_->pex_in_.size()>1000)
+        file_->pex_in_.pop_front(); 
+    static int ENOUGH_PEERS_THRESHOLD = 20;
+    if (file_->channel_count()<ENOUGH_PEERS_THRESHOLD) {
+        int i = 0, chno;
+        while ( (chno=file_->RevealChannel(i)) != -1 ) {
+            if (channels[i]->peer()==addr) 
+                break;
+        }
+        if (chno==-1)
+            new Channel(file_,socket_,addr);
+    }
+}
+
+
+void    Channel::AddPex (Datagram& dgram) {
+    int ch = file().RevealChannel(this->pex_out_);
+    if (ch==-1)
+        return;
+    Address a = channels[ch]->peer();
+    dgram.Push8(P2TP_PEX_ADD);
+    dgram.Push32(a.ipv4());
+    dgram.Push16(a.port());
 }
 
 
@@ -265,6 +343,7 @@ void        Channel::Recv (int socket) {
                FileTransfer* file = FileTransfer::Find(hash);
                if (!file) 
                        RETLOG ("hash unknown, no such file");
+        dprintf("%s #0 -hash ALL %s\n",Datagram::TimeStr(),hash.hex().c_str());
                channel = new Channel(file, socket, data.address());
        } else {
                mych = DecodeID(mych);
@@ -273,11 +352,11 @@ void      Channel::Recv (int socket) {
                channel = channels[mych];
                if (!channel) 
                        RETLOG ("channel is closed");
-               if (channel->peer != data.address()) 
+               if (channel->peer() != data.address()) 
                        RETLOG ("invalid peer address");
-               channel->Recv(data);
+        channel->own_id_mentioned_ = true;
        }
-       channel->Send();
+    channel->Recv(data);
 }
 
 
@@ -286,39 +365,97 @@ bool tblater (const tintbin& a, const tintbin& b) {
 }
 
 
-void    Channel::Loop (tint time) {  
-       
-       tint untiltime = Datagram::Time()+time;
-    tbqueue send_queue;
-    for(int i=0; i<channels.size(); i++)
-        if (channels[i])
-            send_queue.push_back(tintbin(Datagram::now,i));
+void    Channel::RequeueSend (tint next_time) {
+    if (next_time==next_send_time_)
+        return;
+    next_send_time_ = next_time;
+    send_queue.push_back(tintbin(next_time,id));
+    push_heap(send_queue.begin(),send_queue.end(),tblater);
+    dprintf("%s requeue #%i for %s\n",Datagram::TimeStr(),id,Datagram::TimeStr(next_time));
+}
+
+
+void    Channel::Loop (tint howlong) {  
        
-    while ( Datagram::now <= untiltime ) {
-               
-        tintbin next_send = send_queue.front();
-        pop_heap(send_queue.begin(), send_queue.end(), tblater);
-        send_queue.pop_back();
-        tint wake_on = min(next_send.time,untiltime);
-               tint towait = min(wake_on-Datagram::now,TINT_SEC); // towait<0?
-        
-               int rd = Datagram::Wait(socket_count,sockets,towait);
-               if (rd!=-1)
-                       Recv(rd);
-        
-        int chid = (int)(next_send.bin);
-        Channel* sender = channels[chid];
-        if (sender) {
-            tint next_time = sender->Send();
-            if (next_time!=TINT_NEVER) {
-                send_queue.push_back(tintbin(next_time,chid));
-                push_heap(send_queue.begin(),send_queue.end(),tblater);
-            } else {
-                delete sender;
-                channels[chid] = NULL;
-            }
+    tint limit = Datagram::Time() + howlong;
+    
+    do {
+
+        tint send_time(TINT_NEVER);
+        Channel* sender(NULL);
+        while (!send_queue.empty()) {
+            send_time = send_queue.front().time;
+            sender = channel((int)send_queue.front().bin);
+            if (sender && sender->next_send_time_==send_time)
+                break;
+            sender = NULL; // it was a stale entry
+            pop_heap(send_queue.begin(), send_queue.end(), tblater);
+            send_queue.pop_back();
         }
-               
-    }
-       
+        if (send_time>limit)
+            send_time = limit;
+        if (sender && send_time<=Datagram::now) {
+            dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id,
+                    Datagram::TimeStr(send_time));
+            sender->Send();
+            pop_heap(send_queue.begin(), send_queue.end(), tblater);
+            send_queue.pop_back();
+        } else {
+            tint towait = send_time - Datagram::now;
+            dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
+            int rd = Datagram::Wait(socket_count,sockets,towait);
+            if (rd!=-1)
+                Recv(rd);
+        }
+        
+    } while (Datagram::Time()<limit);
+       
 }
+
+
+
+/*
+ tint untiltime = Datagram::Time()+time;
+ if (send_queue.empty())
+ dprintf("%s empty send_queue\n", Datagram::TimeStr());
+ while ( Datagram::now <= untiltime && !send_queue.empty() ) {
+ // BUG BUG BUG  no scheduled sends => just listen
+ tintbin next_send = send_queue.front();
+ tint wake_on = next_send.time;
+ Channel* sender = channel(next_send.bin);
+ // BUG BUG BUG filter stale timeouts here
+ //if (wake_on<=untiltime) {
+ pop_heap(send_queue.begin(), send_queue.end(), tblater);
+ send_queue.pop_back();
+ //}// else
+ //sender = 0; // BUG will never wake up
+ if (sender->next_send_time_!=next_send.time)
+ continue;
+ if (wake_on<Datagram::now)
+ wake_on = Datagram::now;
+ if (wake_on>untiltime)
+ wake_on = untiltime;
+ tint towait = min(wake_on-Datagram::now,TINT_SEC);
+ dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
+ int rd = Datagram::Wait(socket_count,sockets,towait);
+ if (rd!=-1)
+ Recv(rd);
+ // BUG WRONG BUG WRONG  another may need to send
+ if (sender) {
+ dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id,
+ Datagram::TimeStr(next_send.time));
+ sender->Send();
+ // if (sender->cc_->next_send_time==TINT_NEVER) 
+ }
+ }
+ */
\ No newline at end of file
index f7f8d0f..9ff2066 100644 (file)
@@ -79,3 +79,11 @@ env.Program(
     CPPPATH=cpppath,
     LIBS=libs,
     LIBPATH=libpath )
+
+env.Program( 
+    target='connecttest',
+    source=['connecttest.cpp'],
+    CPPPATH=cpppath,
+    LIBS=libs,
+    LIBPATH=libpath )
+
index bbe7796..225f985 100755 (executable)
@@ -227,7 +227,34 @@ TEST(BinsTest,FindFiltered) {
     
 }
 
-TEST(BinsTest,SetRange) {
+
+TEST(BinsTest, Cover) {
+    
+    bins b;
+    b.set(bin64_t(2,0));
+    b.set(bin64_t(4,1));
+    EXPECT_EQ(bin64_t(4,1),b.cover(bin64_t(0,30)));
+    //bins c;
+    //EXPECT_EQ(bin64_t::ALL,b.cover(bin64_t(0,30)));
+    
+}
+
+
+TEST(BinsTest,FindFiltered2) {
+    
+    bins data, filter;
+    for(int i=0; i<1024; i+=2)
+        data.set(bin64_t(0,i));
+    for(int j=1; j<1024; j+=2)
+        filter.set(bin64_t(0,j));
+    filter.set(bin64_t(0,501),bins::EMPTY);
+    EXPECT_EQ(bin64_t(0,501),data.find_filtered(filter,bin64_t(10,0),0));
+    data.set(bin64_t(0,501));
+    EXPECT_EQ(bin64_t::NONE,data.find_filtered(filter,bin64_t(10,0),0));
+    
+}
+    
+TEST(BinsTest,CopyRange) {
     bins data, add;
     data.set(bin64_t(2,0));
     data.set(bin64_t(2,2));
@@ -236,7 +263,7 @@ TEST(BinsTest,SetRange) {
     add.set(bin64_t(1,4));
     add.set(bin64_t(0,13));
     add.set(bin64_t(5,118));
-    data.set_range(add, bin64_t(3,0));
+    data.copy_range(add, bin64_t(3,0));
     EXPECT_TRUE(bins::is_mixed(data.get(bin64_t(3,0))));
     EXPECT_EQ(bins::EMPTY,data.get(bin64_t(2,0)));
     EXPECT_EQ(bins::FILLED,data.get(bin64_t(2,1)));
index 258ccd2..b0bd23e 100644 (file)
@@ -49,15 +49,21 @@ using namespace p2tp;
 
 
 TEST(P2TP,CwndTest) {
-       int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
-       int size = rand()%(1<<19) + (1<<19);
+    
+    unlink("doc/sofi-copy.jpg");
+    struct stat st;
+       ASSERT_EQ(0,stat("doc/sofi.jpg", &st));
+    int size = st.st_size, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ;
+    
+       /*int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
+       int size = 60<<10; //rand()%(1<<19) + (1<<19);
     int sizek = (size>>10) + ((size&1023)?1:0);
        char* b = (char*)malloc(size);
        for(int i=0; i<size; i++) 
                b[i] = (i%1024!=1023) ? ('A' + rand()%('Z'-'A')) : ('\n');
        write(f,b,size);
        free(b);
-       close(f);
+       close(f);*/
 
        /*
        struct sockaddr_in addr1, addr2;
@@ -69,24 +75,23 @@ TEST(P2TP,CwndTest) {
        addr2.sin_port = htons(7004);
        addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
          */
-    int sock1 = p2tp::Listen(7003);
+    int sock1 = p2tp::Listen(7001);
        ASSERT_TRUE(sock1>=0);
        //ASSERT_TRUE(sock2>=0);
        
-    p2tp::AddPeer(Datagram::Address("127.0.0.1",7001));
-    
-       int file = p2tp::Open("big_test_file");
+       int file = p2tp::Open("doc/sofi.jpg");
     FileTransfer* fileobj = FileTransfer::file(file);
+    FileTransfer::instance++;
+    
+    p2tp::SetTracker(Datagram::Address("127.0.0.1",7001));
     
-       int copy = p2tp::Open("big_test_file_copy",fileobj->root_hash());
+       int copy = p2tp::Open("doc/sofi-copy.jpg",fileobj->root_hash());
   
-       p2tp::Loop(TINT_MSEC);
+       p2tp::Loop(TINT_SEC);
     
-    ASSERT_EQ(sizek<<10,p2tp::Size(copy));
-
     int count = 0;
-    while (p2tp::SeqComplete(copy)!=size && count++<(1<<14))
-        p2tp::Loop(TINT_MSEC);
+    while (p2tp::SeqComplete(copy)!=size && count++<20)
+        p2tp::Loop(TINT_SEC);
     ASSERT_EQ(size,p2tp::SeqComplete(copy));
     
        p2tp::Close(file);
index ee6302f..21818ea 100644 (file)
@@ -65,6 +65,7 @@ TEST(Datagram,LedbatTest) {
             if (4+8!=ack.Send())
                 fprintf(stderr,"short write\n");
             fprintf(stderr,"%lli rcvd%i\n",now/TINT_SEC,seq);
+            //cc->OnDataRecv(bin64_t(0,seq));
             // TODO: peer cwnd !!!
             continue;
         }
index e4c4b1b..095d38b 100644 (file)
@@ -72,10 +72,11 @@ TEST(TransferTest,TransferFile) {
             leech = new FileTransfer("copy",seed->root_hash());
             EXPECT_EQ(2,leech->complete_kilo());
         }
-        bin64_t next = leech->picker()->Pick(seed->ack_out(),0);
+        bin64_t next = leech->picker().Pick(seed->ack_out(),0);
         ASSERT_NE(bin64_t::NONE,next);
+        ASSERT_TRUE(next.base_offset()<5);
         uint8_t buf[1024];         //size_t len = seed->storer->ReadData(next,&buf);
-        size_t len = pread(seed->file_descriptor(),buf,1024,next.base_offset()<<10); // FIXME TEST FOR ERROR
+        size_t len = pread(seed->file_descriptor(),buf,1024,next.base_offset()<<10);
         bin64_t sibling = next.sibling();
         if (sibling.base_offset()<seed->size_kilo())
             leech->OfferHash(sibling, seed->hash(sibling));
index 7928303..059dc96 100644 (file)
@@ -31,7 +31,7 @@ int FileTransfer::instance = 0;
 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
     root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false),
     peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
-    complete_(0), completek_(0), seq_complete_(0)
+    complete_(0), completek_(0), seq_complete_(0) //, data_in_off_(0)
 {
        fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
        if (fd_<0)
@@ -163,7 +163,8 @@ void            FileTransfer::Submit () {
             } else
                 hashes_[b] = Sha1Hash(hashes_[b.left()],hashes_[b.right()]);
         peak_hashes_[p] = hashes_[peaks_[p]];
-        ack_out_.set(peaks_[p],bins::FILLED);
+        //ack_out_.set(peaks_[p],bins::FILLED);
+        OnDataIn(peaks_[p]);
     }
     root_hash_ = DeriveRoot();
     Sha1Hash *hash_tmp = hashes_;
@@ -200,13 +201,6 @@ void            FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {
 }
 
 
-bin64_t         FileTransfer::data_in (int offset) {
-    if (offset>data_in_.size())
-        return bin64_t::NONE;
-    return data_in_[offset];
-}
-
-
 bool            FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_t length) {
     if (!pos.is_base())
         return false;
@@ -230,7 +224,7 @@ bool            FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_
 
     //printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str());
        // walk to the nearest proven hash   FIXME 0-layer peak
-    ack_out_.set(pos,bins::FILLED);
+    OnDataIn(pos);
     pwrite(fd_,data,length,pos.base_offset()<<10);
     complete_ += length;
     completek_++;
@@ -242,11 +236,45 @@ bool            FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_
         seq_complete_+=1024;
     if (seq_complete_>size_)
         seq_complete_ = size_;
-    data_in_.push_back(pos);
     return true;
 }
 
 
+/*bin64_t         FileTransfer::RevealAck (uint64_t& offset) {
+    if (offset<data_in_off_)
+        offset = data_in_off_;
+    for(int off=offset-data_in_off_; off<data_in_.size(); off++) {
+        offset++;
+        if (data_in_[off]!=bin64_t::NONE) {
+            bin64_t parent = data_in_[off].parent();
+            if (ack_out_.get(parent)!=bins::FILLED)
+                return data_in_[off];
+            else
+                data_in_[off] = bin64_t::NONE;
+        }
+    }
+    return bin64_t::NONE;
+}*/
+
+
+void            FileTransfer::OnDataIn (bin64_t pos) {
+    ack_out_.set(pos,bins::FILLED);
+    /*bin64_t closed = pos;
+    while (ack_out_.get(closed.parent())==bins::FILLED) // TODO optimize
+        closed = closed.parent();
+    data_in_.push_back(closed);
+    // rotating the queue
+    bin64_t parent = data_in_.front().parent();
+    if (ack_out_.get(parent)!=bins::FILLED)
+        data_in_.push_back(data_in_.front());
+    data_in_.front() = bin64_t::NONE;
+    while ( !data_in_.empty() && data_in_.front()==bin64_t::NONE) {
+        data_in_.pop_front();
+        data_in_off_++;
+    }*/
+}
+
+
 Sha1Hash        FileTransfer::DeriveRoot () {
        int c = peak_count_-1;
        bin64_t p = peaks_[c];
@@ -305,12 +333,18 @@ FileTransfer::~FileTransfer ()
 
 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
     for(int i=0; i<files.size(); i++)
-        if (files[i] && files[i]->root_hash_==root_hash)
+        if (files[i] && files[i]->root_hash()==root_hash)
             return files[i];
     return NULL;
 }
 
 
+void FileTransfer::OnPexIn (const Address& addr) {
+    pex_in_.push_back(addr);
+    if (pex_in_.size()>1000)
+        pex_in_.pop_front(); 
+}
+
 
 std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix)
 {
@@ -322,8 +356,7 @@ std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std
 }
 
 
-
-int      p2tp::Open (const char* filename, const Sha1Hash& hash) {
+/*int      p2tp::Open (const char* filename, const Sha1Hash& hash) {
     FileTransfer* ft = new FileTransfer(filename, hash);
     int fdes = ft->file_descriptor();
     if (fdes>0) {
@@ -335,6 +368,14 @@ int      p2tp::Open (const char* filename, const Sha1Hash& hash) {
         delete ft;
         return -1;
     }
+}*/
+
+
+uint32_t        FileTransfer::RevealChannel (int& offset) {
+    while (offset<Channel::channels.size() && 
+           (!Channel::channels[offset] || Channel::channels[offset]->file_!=this) )
+           offset++;
+    return offset < Channel::channels.size() ? offset : -1;
 }