compiles again
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Sun, 18 Oct 2009 18:20:22 +0000 (18:20 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Sun, 18 Oct 2009 18:20:22 +0000 (18:20 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@438 e16421f0-f15b-0410-abcd-98678b794739

17 files changed:
ChangeLog
bin64.cpp
bin64.h
bins.h
datagram.cpp
datagram.h
ext/ledbat_controller.cpp
ext/simple_selector.cpp
hashtree.cpp
hashtree.h
p2tp.cpp
p2tp.h
sendrecv.cpp
tests/bin64test.cpp
tests/connecttest.cpp
tests/dgramtest.cpp
transfer.cpp

index d272883..ad12850 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,7 @@
+0.003 - This is not a release as well - 18 Oct 2009
+
+       - but at least, it compiles now
+
 0.002 - This is not a release - 7 Oct 2009
 
        - it does not even compile, committed for reading purposes only
index 90524d5..599c4df 100644 (file)
--- a/bin64.cpp
+++ b/bin64.cpp
 
 const uint64_t bin64_t::NONE = 0xffffffffffffffffULL;
 const uint64_t bin64_t::ALL = 0x7fffffffffffffffULL;
+const uint32_t bin64_t::NONE32 = 0xffffffffU;
+const uint32_t bin64_t::ALL32 = 0x7fffffffU;
+
+uint32_t bin64_t::to32() const {
+    if (v<0xffffffff && v!=0x7fffffff)
+        return (uint32_t)v;
+    if (v==ALL)
+        return ALL32;
+    return NONE32;
+}
 
 bin64_t bin64_t::next_dfsio (uint8_t floor) {
     /*while (ret.is_right())
diff --git a/bin64.h b/bin64.h
index a9a7698..f6b2bf3 100644 (file)
--- a/bin64.h
+++ b/bin64.h
@@ -17,6 +17,8 @@ struct bin64_t {
     uint64_t v;
     static const uint64_t NONE;
     static const uint64_t ALL;
+    static const uint32_t NONE32;
+    static const uint32_t ALL32;
 
     bin64_t() : v(NONE) {}
     bin64_t(const bin64_t&b) : v(b.v) {}
@@ -24,6 +26,7 @@ struct bin64_t {
     bin64_t(uint8_t layer, uint64_t offset) : 
         v( (offset<<(layer+1)) | ((1ULL<<layer)-1) ) {}
     operator uint64_t () const { return v; }
+    uint32_t to32() const ;
     bool operator == (bin64_t& b) const { return v==b.v; }
 
     static bin64_t none () { return NONE; }
diff --git a/bins.h b/bins.h
index 299ece6..a2540a5 100644 (file)
--- a/bins.h
+++ b/bins.h
@@ -22,20 +22,21 @@ public:
     
     bins(const bins& b);
     
-    uint16_t get (bin64_t bin); 
+    uint16_t    get (bin64_t bin); 
     
-    void set (bin64_t bin, fill_t val=FILLED); 
+    void        set (bin64_t bin, fill_t val=FILLED); 
     
-    bin64_t find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
+    bin64_t     find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
     
-    void remove (bins& b);
+    void        remove (bins& b);
     
-    void dump(const char* note);
+    void        dump(const char* note);
 
     uint64_t*   get_stripes (int& count);
 
     uint32_t    size() { return cells_allocated; }
-    // TODO: bitwise operators
+    
+    bool        empty () const { return !deep(0) && !halves[0]; }
     
 private:
     
index a745d63..daccd9a 100644 (file)
 namespace p2tp {
 
 tint Datagram::now = Datagram::Time();
+uint32_t Datagram::Address::LOCALHOST = INADDR_LOOPBACK;
 
 int Datagram::Send () {
        int r = sendto(sock,buf+offset,length-offset,0,
-                                  (struct sockaddr*)&(addr),sizeof(struct sockaddr_in));
+                                  (struct sockaddr*)&(addr.addr),sizeof(struct sockaddr_in));
        offset=0;
        length=0;
        now = Time();
@@ -71,8 +72,8 @@ tint Datagram::Time () {
        return now=ret;
 }
 
-int Datagram::Bind (int portno) {
-    struct sockaddr_in addr;
+int Datagram::Bind (Address addr_) {
+    struct sockaddr_in addr = addr_;
        int fd, len = sizeof(struct sockaddr_in), 
         sndbuf=1<<20, rcvbuf=1<<20;
        if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
@@ -90,11 +91,11 @@ int Datagram::Bind (int portno) {
         return -3;
     }
     printf("BUFS: %i %i\n",sndbuf,rcvbuf);
-    memset(&addr, 0, sizeof(struct sockaddr_in));
+    /*memset(&addr, 0, sizeof(struct sockaddr_in));
        addr.sin_family = AF_INET;
     addr.sin_port = htons(portno);
-    addr.sin_addr.s_addr = INADDR_ANY;
-       if (::bind(fd, (struct sockaddr*)&addr, len) != 0) {
+    addr.sin_addr.s_addr = INADDR_ANY;*/
+       if (::bind(fd, (sockaddr*)&addr, len) != 0) {
         PLOG(ERROR)<<"bind fails";
         return -4;
     }
index 2b81140..ca46bf8 100644 (file)
@@ -39,21 +39,32 @@ struct Datagram {
        
     struct Address {
         struct sockaddr_in  addr;
-        Address() {
+        static uint32_t LOCALHOST;
+        void init(uint32_t ipv4=0, uint16_t port=0) {
             memset(&addr,0,sizeof(struct sockaddr_in)); 
-        }
-        Address(const char* ip, uint16_t port) {
             addr.sin_family = AF_INET;
             addr.sin_port = htons(port);
+            addr.sin_addr.s_addr = htonl(ipv4);
+        }
+        Address() { init(); }
+        Address(const char* ip, uint16_t port) {
+            init(LOCALHOST,port);
             inet_aton(ip,&(addr.sin_addr));
         }
         Address(uint16_t port) {
-            addr.sin_family = AF_INET;
-            addr.sin_port = htons(port);
-            addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+            init(LOCALHOST,port);
+        }
+        Address(uint32_t ipv4addr, uint16_t port) {
+            init(ipv4addr,port);
         }
         Address(const struct sockaddr_in& address) : addr(address) {}
-        operator sockaddr_in () {return addr;}
+        operator sockaddr_in () const {return addr;}
+        bool operator == (const Address& b) { 
+            return addr.sin_family==b.addr.sin_family &&
+            addr.sin_port==b.addr.sin_port && 
+            addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
+        }
+        bool operator != (const Address& b) { return !(*this==b); }
     };
     
        Address addr;
@@ -61,7 +72,7 @@ struct Datagram {
        int offset, length;
        uint8_t buf[MAXDGRAMSZ*2];
     
-       static int Bind(int port);
+       static int Bind(Address address);
        static void Close(int port);
        static tint Time();
        static int Wait (int sockcnt, int* sockets, tint usec=0);
index 87a705d..2eebcc0 100644 (file)
@@ -13,23 +13,60 @@ using namespace p2tp;
 
 class LedbatController : public CongestionController {
 public:
-    /*tint    rtt_avg;
-    tint    dev_avg;
-    int     cwnd;
-    int     peer_cwnd;*/
-    virtual void OnTimeout () {
+    tint dev_avg_, rtt_avg_;
+    tint last_send_time_, last_recv_time_;
+    int cwnd_, peer_cwnd_, in_flight_;
+    bin64_t last_bin_sent_;
+public:
+    LedbatController () : dev_avg_(0), rtt_avg_(TINT_SEC), last_send_time_(0),
+    last_recv_time_(0), cwnd_(1), peer_cwnd_(1), in_flight_(0) {
+    }
+    
+    tint    rtt_avg () {
+        return rtt_avg_;
+    }
+    
+    tint    dev_avg () {
+        return dev_avg_;
+    }
+    
+    int     cwnd () {
+        return cwnd_;
+    }
+    
+    int     peer_cwnd () {
+        return peer_cwnd_;
     }
     
-    virtual void OnDataSent(bin64_t b) {
+    int     free_cwnd ( ){
+        return cwnd_ - in_flight_;
     }
     
-    virtual void OnDataRecvd(bin64_t b) {
+    tint    next_send_time ( ){
+        return cwnd_ ? Datagram::now + (rtt_avg_/cwnd_) : TINT_NEVER; // TODO keepalives
     }
     
-    virtual void OnAckRcvd(bin64_t b, tint peer_stamp) {
+    void OnDataSent(bin64_t b) {
+        if (b==bin64_t::NONE) {
+            cwnd_ = 0; // nothing to send; suspend
+        } else {
+            last_bin_sent_ = b;
+            last_send_time_ = Datagram::now;
+            in_flight_++;
+        }
     }
     
-    virtual ~CongestionControl() {
+    void OnDataRecvd(bin64_t b) {
+        last_recv_time_ = Datagram::now;
     }
     
+    void OnAckRcvd(bin64_t b, tint peer_stamp) {
+        if (last_bin_sent_!=b)
+            return;
+        rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac
+        in_flight_--;
+    }
+    
+    ~LedbatController() {
+    }
 };
index e0d54e8..a45c76f 100644 (file)
@@ -7,28 +7,33 @@
  *
  */
 
+#include <queue>
 #include "p2tp.h"
 
 using namespace p2tp;
 
 class SimpleSelector : public PeerSelector {
-    typedef std::pair<sockaddr_in,uint32_t> memo_t;
-    std::queue<memo_t>  peers;
+    typedef std::pair<Address,Sha1Hash> memo_t;
+    typedef std::deque<memo_t>  peer_queue_t;
+    peer_queue_t    peers;
 public:
-    virtual void PeerKnown (const Sha1Hash& root, struct sockaddr_in& addr) {
-        peers.push_front(memo_t(addr,root.fingerprint()));
+    SimpleSelector () {
     }
-    virtual sockaddr_in GetPeer (const Sha1Hash& for_root) {
-        uint32_t fp = for_root.fingerprint();
-        for(std::queue<memo_t>::iterator i=peers.begin(); i!=peers.end(); i++)
-            if (i->second==fp) {
+    void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) {
+        peers.push_front(memo_t(addr,root)); //,root.fingerprint() !!!
+    }
+    Address GetPeer (const Sha1Hash& for_root) {
+        //uint32_t fp = for_root.fingerprint();
+        for(peer_queue_t::iterator i=peers.begin(); i!=peers.end(); i++)
+            if (i->second==for_root) {
                 i->second = 0;
                 sockaddr_in ret = i->first;
                 while (peers.begin()->second==0)
                     peers.pop_front();
                 return ret;
             }
+        return Address();
     }
 };
 
-static Channel::peer_selector = new SimpleSelector();
\ No newline at end of file
+PeerSelector* Channel::peer_selector = new SimpleSelector();
\ No newline at end of file
index 4e4d201..18c88d3 100644 (file)
@@ -41,7 +41,7 @@ Sha1Hash::Sha1Hash(bool hex, const char* hash) {
        memcpy(bits,hash,SIZE);
 }
 
-string Sha1Hash::hex() {
+string Sha1Hash::hex() const {
        char hex[HASHSZ*2+1];
        for(int i=0; i<HASHSZ; i++)
                sprintf(hex+i*2, "%02x", bits[i]);
index 611919b..a25e6af 100644 (file)
@@ -23,7 +23,7 @@ struct Sha1Hash {
        Sha1Hash(const char* bits);
        Sha1Hash(bool hex, const char* hash);
        
-       std::string     hex();
+       std::string     hex() const;
        bool    operator == (const Sha1Hash& b) const
                { return 0==memcmp(bits,b.bits,SIZE); }
        bool    operator != (const Sha1Hash& b) const { return !(*this==b); }
index 1bf4bc9..c1b4cc0 100644 (file)
--- a/p2tp.cpp
+++ b/p2tp.cpp
@@ -27,19 +27,16 @@ p2tp::tint Channel::last_tick = 0;
 int Channel::MAX_REORDERING = 4;
 p2tp::tint Channel::TIMEOUT = TINT_SEC*60;
 std::vector<Channel*> Channel::channels(1);
-std::vector<File*> File::files(4);
-int* Channel::sockets_ = (int*)malloc(40);
-int Channel::sock_count_ = 0;
+int Channel::sockets[8] = {0,0,0,0,0,0,0,0};
+int Channel::socket_count = 0;
 
 
-Channel::Channel       (int fd_, int socket, struct sockaddr_in peer_,
-                                        uint32_t peer_channel_, uint64_t supports_) :
-       fd(fd_), peer(peer_), peer_channel_id(peer_channel_), ack_out(0),
-       peer_status_(File::EMPTY), socket_(socket)
+Channel::Channel       (FileTransfer* file, int socket, struct sockaddr_in peer_addr) :
+       file_(file), peer(peer_addr), peer_channel_id(0), 
+       socket_(socket) // FIXME
 {
        this->id = channels.size();
        channels.push_back(this);
-       DLOG(INFO)<<"new channel "<<id<<" "<<*this;
 }
 
 
@@ -48,132 +45,72 @@ Channel::~Channel () {
 }
 
 
-File::File (int _fd) : fd(_fd), status_(DONE), hashes(_fd)
-{
-       bin::vec peaks = bin::peaks(hashes.data_size());
-       history.insert(history.end(),peaks.begin(),peaks.end());
-       for(bin::vec::iterator i=peaks.begin(); i!=peaks.end(); i++)
-               ack_out.set(*i);
-}
 
-File::File (Sha1Hash hash, int _fd) : hashes(hash), fd(_fd), status_(EMPTY) {
-       // TODO resubmit data
-}
 
-File::~File() {
-       if (fd>0) ::close(fd);
+int Channel::DecodeID(int scrambled) {
+       return scrambled;
+}
+int Channel::EncodeID(int unscrambled) {
+       return unscrambled;
 }
 
 
-bool   File::OfferHash (bin pos, const Sha1Hash& hash) {
-       HashTree::hashres_t res = hashes.offer(pos,hash);
-       if (res==HashTree::PEAK_ACCEPT) { // file size is finally known
-               ftruncate(fd, size());
-               LOG(INFO)<<fd<<" file size is set to "<<size();
-               history.push_back(0);
-               status_ = IN_PROGRESS;
-       }
-       return res==HashTree::PEAK_ACCEPT || res==HashTree::ACCEPT;
+int     p2tp::Listen (Datagram::Address addr) {
+    int sock = Datagram::Bind(addr);
+    if (sock!=INVALID_SOCKET)
+        Channel::sockets[Channel::socket_count++] = sock;
+    return sock;
 }
 
 
-File*  File::find (const Sha1Hash& hash) {
-       for(vector<File*>::iterator i=files.begin(); i!=files.end(); i++)
-               if (*i && (*i)->hashes.root==hash)
-                       return *i;
-       return NULL;
+void    p2tp::Shutdown (int sock_des) {
+    for(int i=0; i<Channel::socket_count; i++)
+        if (Channel::sockets[i]==sock_des)
+            Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
+    Datagram::Close(sock_des);
 }
 
-int Channel::DecodeID(int scrambled) {
-       return scrambled;
+
+void    p2tp::Loop (tint till) { 
+    Channel::Loop(till); 
 }
-int Channel::EncodeID(int unscrambled) {
-       return unscrambled;
+
+
+void   p2tp::Close (int fd) {
+    // FIXME delete all channels
+    if (fd>FileTransfer::files.size() && FileTransfer::files[fd])
+        delete FileTransfer::files[fd];
 }
 
 
-std::ostream& p2tp::operator << (std::ostream& os, const Channel& ch) {
-       return os<<'{'<<ch.fd<<'}'<<sock2str(ch.peer)<<":"<<ch.id<<'>'<<ch.peer_channel_id;
+void    p2tp::AddPeer (Datagram::Address address, const Sha1Hash& root) {
+    Channel::peer_selector->AddPeer(address,root);
 }
 
 
-void   Channel::Recv (int socket) {
-       Datagram data(socket);
-       data.Recv();
-       //LOG(INFO)<<" RECV "<<data.to_string();
-       int id = 0;
-       if (data.size()<4) 
-               RETLOG("datagram shorter than 4 bytes");
-       uint32_t mych = data.Pull32();
-       uint8_t type;
-       uint32_t peerch;
-       Sha1Hash hash;
-       Channel* channel;
-       if (!mych) { // handshake initiated
-               if (data.size()!=1+4+1+4+Sha1Hash::SIZE) 
-                       RETLOG ("incorrect size initial handshake packet");
-               type = data.Pull8();
-               if (type)  // handshake msg id is 0
-                       RETLOG ("it is not actually a handshake");
-               peerch = data.Pull32();
-               if (!peerch) 
-                       RETLOG ("peer channel is zero");
-               uint8_t hashid = data.Pull8();
-               if (hashid!=P2TP_HASH) 
-                       RETLOG ("no hash in the initial handshake");
-               bin pos = data.Pull32();
-               if (pos!=bin::ALL) 
-                       RETLOG ("that is not the root hash");
-               hash = data.PullHash();
-               File* file = File::find(hash);
-               if (!file) 
-                       RETLOG ("hash unknown, no such file");
-               channel = new Channel(file->fd, socket, data.address(), peerch);
-       } else {
-               mych = DecodeID(mych);
-               if (mych>=channels.size()) 
-                       RETLOG ("invalid channel id");
-               channel = channels[mych];
-               id = channel->id;
-               if (channel->peer.sin_addr.s_addr != data.address().sin_addr.s_addr) 
-                       RETLOG ("invalid peer address");
-               if (channel->peer.sin_port!=data.address().sin_port) 
-                       RETLOG ("invalid peer port");
-               if (!channel->peer_channel_id) { // handshake response
-                       if (data.size()<5) 
-                               RETLOG ("insufficient return handshake length");
-                       type = data.Pull8();
-                       if (type) 
-                               RETLOG ("it is not a handshake, after all");
-                       channel->peer_channel_id = data.Pull32();
-                       LOG(INFO)<<"out channel is open: "<<*channel;
-               } else if (channel->cc_.avg_rtt()==0) {
-                       LOG(INFO)<<"in channel is open: "<<*channel;
-               }
-        if (channel->cc_.avg_rtt()==0)
-            channel->cc_.RttSample(Datagram::now - channel->last_send_time + 1);
-               channel->Recv(data);
-       }
-       channel->Send();
+size_t  p2tp::Size (int fdes) {
+    if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+        return FileTransfer::files[fdes]->size();
+    else
+        return 0;
 }
 
 
-void           Channel::Tick () {
-       // choking/unchoking
-       // keepalives
-       // ack timeout
-       // if unchoked: don't bother
-       // whether to unchoke
-       // reevaluate reciprocity
-       // otherwise, send update (if needed)
-       // otherwise, send a keepalive
-       CleanStaleHintIn();
-       CleanStaleHintOut();
-       if (last_send_time && Datagram::now-last_send_time>=Channel::TIMEOUT/2)
-               Send();
+size_t  p2tp::Complete (int fdes) {
+    if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+        return FileTransfer::files[fdes]->complete();
+    else
+        return 0;
 }
 
 
+size_t  p2tp::SeqComplete (int fdes) {
+    if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+        return FileTransfer::files[fdes]->seq_complete();
+    else
+        return 0;
+}
+
 /**    <h2> P2TP handshake </h2>
  Basic rules:
  <ul>
@@ -188,92 +125,3 @@ void               Channel::Tick () {
  <b>Note:</b> 
  */
 
-
-void    Channel::Loop (tint time) {  
-       
-       tint untiltime = Datagram::Time()+time;
-       
-    while ( Datagram::now <= untiltime ) {
-               
-               tint towait = min(untiltime,Datagram::now+TINT_1SEC) - Datagram::now;
-               int rd = Datagram::Wait(sock_count_,sockets_,towait);
-               if (rd!=-1)
-                       Recv(rd);
-
-               /*if (Datagram::now-last_tick>TINT_1SEC) {
-                       for(int i=0; i<channels.size(); i++)
-                               if (channels[i])
-                                       channels[i]->Tick();
-                       last_tick = Datagram::now;
-               }*/
-               
-    }
-       
-}
-
-int p2tp::Open (const char* filename) {
-       int fd = ::open(filename,O_RDONLY);
-       if (fd<0)
-               return -1;
-       if (File::files.size()<fd+1)
-               File::files.resize(fd+1);
-       File::files[fd] = new File(fd);
-       return fd;
-}
-
-int p2tp::Open (const Sha1Hash& root_hash, const char* filename) {
-       int fd = ::open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
-       if (fd<0)
-               return -1;
-       if (File::files.size()<fd+1)
-               File::files.resize(fd+1);
-       File::files[fd] = new File(root_hash,fd);
-       return fd;
-}
-
-size_t p2tp::file_size (int fd) { return File::file(fd)->size(); }
-
-void p2tp::Close (int fid) {
-       if (!File::files[fid])
-               return;
-       delete File::files[fid];
-       File::files[fid] = NULL;
-}
-
-
-int p2tp::Connect (int fd, int sock, const struct sockaddr_in& addr, uint32_t peerch) {
-       Channel *ch = new Channel(fd,sock,addr,peerch);
-       ch->Send();
-       return ch->id;
-}
-
-void p2tp::Loop (tint time) {
-       Channel::Loop(time);
-}
-
-int p2tp::Init (int portno) {
-       int sock = Datagram::Bind(portno);
-       if (sock>0)
-               Channel::sockets_[Channel::sock_count_++] = sock;
-       return sock;
-}
-
-void p2tp::Shutdown (int sock) {
-       int i=0;
-       while (i<Channel::sock_count_ && Channel::sockets_[i]!=sock) i++;
-       if (i==Channel::sock_count_) {
-               LOG(ERROR)<<"socket "<<sock<<" is unknown to p2tp";
-               return;
-       }
-       Channel::sockets_[i] = Channel::sockets_[--Channel::sock_count_];
-       Datagram::Close(sock);
-}
-
-
-uint32_t p2tp::Width (const tbinvec& v) {
-       uint32_t ret = 0;
-       for(tbinvec::const_iterator i=v.begin(); i!=v.end(); i++)
-               ret += i->pos.width();
-       return ret;
-}
-
diff --git a/p2tp.h b/p2tp.h
index bdb39de..8623285 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -68,6 +68,7 @@ namespace p2tp {
     
        typedef std::deque<tintbin> tbqueue;
     typedef std::deque<bin64_t> binqueue;
+    typedef Datagram::Address Address;
 
        typedef enum { 
                P2TP_HANDSHAKE = 0, 
@@ -91,7 +92,7 @@ namespace p2tp {
     public:
                
                /**     Open/submit/retrieve a file.    */
-        FileTransfer(const Sha1Hash& _root_hash, const char *file_name);
+        FileTransfer(const char *file_name, const Sha1Hash& _root_hash=Sha1Hash::ZERO);
         
                /**     Close everything. */
                ~FileTransfer();
@@ -102,9 +103,9 @@ namespace p2tp {
                void            OfferHash (bin64_t pos, const Sha1Hash& hash);
         /** Offer data; the behavior is the same as with a hash:
             accept or remember or drop. Returns true => ACK is sent. */
-        bool            OfferData (bin64_t bin, uint8_t* data, size_t length);
+        bool            OfferData (bin64_t bin, const uint8_t* data, size_t length);
         
-        bin64_t         PickBinForRequest (bins& from, uint8_t layer) ;                static FileTransfer* Find (const Sha1Hash& hash);
+        static FileTransfer* Find (const Sha1Hash& hash);
                static FileTransfer* file (int fd) { 
             return fd<files.size() ? files[fd] : NULL; 
         }
@@ -126,10 +127,7 @@ namespace p2tp {
         uint64_t        seq_complete () const { return seq_complete_; }
         bins&           ack_out ()  { return ack_out_; }
         int             file_descriptor () const { return fd_; } 
-        
-               friend int      Open (const char* filename);
-               friend int      Open (const Sha1Hash& hash, const char* filename);
-               friend void     Close (int fdes);
+        PiecePicker*    picker () { return picker_; }
         
         static int instance; // FIXME this smells
         
@@ -179,17 +177,13 @@ namespace p2tp {
         void            LoadPeaks();
 
                friend class Channel;
+        friend size_t  Size (int fdes);
+        friend size_t  Complete (int fdes);
+        friend size_t  SeqComplete (int fdes);
+        friend int     Open (const char* filename, const Sha1Hash& hash) ;
+        friend void    Close (int fd) ;
        };
        
-       
-       int             Open (const char* filename) ;
-    int     Open (const Sha1Hash& hash, const char* filename) ;
-       void    Close (int fid) ;
-    void    Loop (tint till);
-    int     Bind (int port);
-    void    Shutdown (int fd);
-    void    HeardOfPeer (const Sha1Hash& root, struct sockaddr_in address);
-       
 
        class CongestionController {
     public:
@@ -214,10 +208,8 @@ namespace p2tp {
     
     class PeerSelector {
     public:
-        virtual void    PeerKnown 
-            (const Sha1Hash& root, struct sockaddr_in& addr) = 0;
-        virtual struct sockaddr_in GetPeer 
-            (const Sha1Hash& for_root) = 0;
+        virtual void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) = 0;
+        virtual Datagram::Address GetPeer (const Sha1Hash& for_root) = 0;
     };
     
     class DataStorer {
@@ -236,8 +228,7 @@ namespace p2tp {
         hash or a fragment of it into every datagram.) */
        class Channel {
        public:
-               Channel (int filedes, int socket, struct sockaddr_in peer,
-                                uint32_t peer_channel, uint64_t supports=0);
+               Channel (FileTransfer* file, int socket, struct sockaddr_in peer);
                ~Channel();
                
                static void     Recv (int socket);
@@ -273,19 +264,15 @@ namespace p2tp {
         
         FileTransfer& file() { return *file_; }
                
-               /*friend int Connect (int fd, int sock, 
-                                                       const struct sockaddr_in& addr, uint32_t peerch=0);
-               friend int Init (int portno);
-               friend std::ostream& operator << (std::ostream& os, const Channel& s);*/
                
        private:
                
                /** Channel id: index in the channel array. */
                uint32_t        id;
                /**     Socket address of the peer. */
-               struct sockaddr_in      peer;
+        Datagram::Address      peer;
                /**     The UDP socket fd. */
-               int                     socket;
+               int                     socket_;
                /**     Descriptor of the file in question. */
                FileTransfer*   file_;
                /**     Peer channel id; zero if we are trying to open a channel. */
@@ -312,16 +299,49 @@ namespace p2tp {
         
         static PeerSelector* peer_selector;
         
-               static int  MAX_REORDERING;
-               static tint TIMEOUT;
+               static int      MAX_REORDERING;
+               static tint     TIMEOUT;
                static std::vector<Channel*> channels;
-        int    sockets[4];
-        int    sock_count;
-               static tint last_tick;
+        static int      sockets[8];
+        static int      socket_count;
+               static tint     last_tick;
         
+        friend int     Listen (Datagram::Address addr);
+        friend void    Shutdown (int sock_des);
+        friend void    AddPeer (Datagram::Address address, const Sha1Hash& root);
        };
 
        
+
+    /*************** The top-level API ****************/
+    /** Start listening a port. Returns socket descriptor. */
+    int     Listen (Datagram::Address addr);
+    /** Run send/receive loop for the specified amount of time. */
+    void    Loop (tint till);
+    /** Stop listening to a port. */
+    void    Shutdown (int sock_des);
+
+    /** Open a file, start a transmission; fill it with content for a given root hash;
+        in case the hash is omitted, the file is a fresh submit. */
+    int     Open (const char* filename, const Sha1Hash& hash=Sha1Hash::ZERO) ;
+    /** Close a file and a transmission. */
+    void       Close (int fd) ;
+    /** Add a possible peer which participares in a given transmission. In the case
+        root hash is zero, the peer might be talked to regarding any transmission
+        (likely, a tracker, cache or an archive). */
+    void    AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO);
+
+    /** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte
+        before the transmission is complete. */
+    size_t  Size (int fdes);
+    /** Returns the amount of retrieved and verified data, in bytes. 
+        A 100% complete transmission has Size()==Complete(). */
+    size_t  Complete (int fdes);
+    /** Returns the number of bytes that are complete sequentially, starting from the
+        beginning, till the first not-yet-retrieved packet. */
+    size_t  SeqComplete (int fdes);
+
+    
        //uint32_t Width (const tbinvec& v);
 }
 
index b2a16aa..58c7fdd 100644 (file)
 #include <glog/logging.h>
 #include "p2tp.h"
 
+#include "ext/dummy_controller.cpp"
+
 using namespace std;
 using namespace p2tp;
 
-
 void   Channel::AddPeakHashes (Datagram& dgram) {
-       const std::vector<binhash>& peaks = file().hashes.peak_hashes();
-       for(int i=0; i<peaks.size(); i++) {
+       for(int i=0; i<file().peak_count(); i++) {
                dgram.Push8(P2TP_HASH);
-               dgram.Push32(peaks[i].first);
-               dgram.PushHash(peaks[i].second);
-        DLOG(INFO)<<"#"<<id<<" +pHASH"<<peaks[i].first;
+               dgram.Push32((uint32_t)file().peak(i));
+               dgram.PushHash(file().peak_hash(i));
+        DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
        }
 }
 
 
-void   Channel::AddUncleHashes (Datagram& dgram, bin pos) {
-       bin root = pos;
-       while (root.parent()<=file().hashes.data_mass() && ack_in.clean(root.parent()))
-               root = root.parent();
-       while (root!=pos) {
-               root = root.child(pos);
-               bin uncle = root.sibling();
+void   Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
+    bin64_t peak = file().peak_for(pos);
+    while (pos!=peak && ack_in.get(pos.parent())==bins::EMPTY) {
+        bin64_t uncle = pos.sibling();
                dgram.Push8(P2TP_HASH);
-               dgram.Push32(uncle);
-               dgram.PushHash( file().hashes[uncle] );
+               dgram.Push32((uint32_t)uncle);
+               dgram.PushHash( file().hash(uncle) );
         DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
-       }
+        pos = pos.parent();
+    }
 }
 
 
-bin            Channel::SenderPiecePick () { // TODO: resilience
+bin64_t                Channel::DequeueHint () { // TODO: resilience
        while (!hint_in.empty()) {
-               bin hint = hint_in.front().pos;
+               bin64_t hint = hint_in.front();
                hint_in.pop_front();
-               if (ack_in.contains(hint))
+               if (ack_in.get(hint)==bins::FILLED)
                        continue;
-               if (hint.layer()) {
-                       bin l=hint.left(), r=hint.right();
-                       //if (false)//rand()&1)
+               if ( file().ack_out().get(hint)==bins::EMPTY )
+                       continue;
+               if (!hint.is_base()) {
+                       bin64_t l=hint.left(), r=hint.right();
+                       //if (rand()&1)
                        //      swap(l,r);
-                       hint_in.push_front(tintbin(Datagram::now,r));
-                       hint_in.push_front(tintbin(Datagram::now,l));
+                       hint_in.push_front(r);
+                       hint_in.push_front(l);
                        continue;
                }
-               if ( !file().ack_out.contains(hint) )
-                       continue;
                return hint;
        }
-       return 0;
-}
-
-
-
-Channel::state_t       Channel::state () const {
-       if (!peer_channel_id)
-               return HS_REQ_OUT;
-       if (cc_.avg_rtt()==0)
-               return HS_RES_OUT;
-       return HS_DONE;
+       return bin64_t::NONE;
 }
 
 
-void   Channel::CleanStaleDataOut (bin ack_pos) {
-
-       if (ack_pos)
-               for(int i=0; i<data_out.size() && i<MAX_REORDERING*2; i++)
-                       if (data_out[i].pos && ack_pos.contains(data_out[i].pos)) {
-                               cc_.RttSample(Datagram::now-data_out[i].time);
-                               cc_.OnCongestionEvent(CongestionControl::ACK_EV);
-                               data_out[i].pos = 0;
-                       }
-       while (!data_out.empty() && data_out[0].pos==0)
-               data_out.pop_front();
-
-       tint timed_out = Datagram::now - cc_.safe_avg_rtt();
-       while (!data_out.empty() && data_out.front().time < timed_out) {
-               DLOG(INFO)<<*this<<" loss: "<<data_out.front().pos;
-               // reordering is a loss, collision is a loss
-               cc_.OnCongestionEvent(CongestionControl::LOSS_EV);
-               data_out.pop_front();
-       }
-       
-}
-
-
-void   Channel::CleanStaleHintOut () {
-       while ( !hint_out.empty() && file().ack_out.contains(hint_out.front().pos) ) 
-               hint_out.pop_front();
-       tint timed_out = Datagram::now - cc_.safe_avg_rtt()*4; //FIXME: timeout
+void   Channel::CleanStaleHints () {
+       while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED ) 
+               hint_out.pop_front();  // FIXME must normally clear fulfilled entries
+       tint timed_out = Datagram::now - cc->rtt_avg()*8;
        while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
-               file().hint_out -= hint_out.front().pos;
-               hint_out.pop_front(); // TODO: ignore count
+        file().picker()->Snubbed(hint_out.front().bin);
+               hint_out.pop_front();
        }
 }
 
-void   Channel::CleanStaleHintIn () {
-       // do I need it?
-}
 
-
-void   Channel::SendHandshake () {
-       Datagram dgram(socket_,peer);
-       dgram.Push32(peer_channel_id);
+void   Channel::AddHandshake (Datagram& dgram) {
        dgram.Push8(P2TP_HANDSHAKE);
        dgram.Push32(EncodeID(id));
        if (!peer_channel_id) { // initiating
                dgram.Push8(P2TP_HASH);
-               dgram.Push32(bin::ALL);
-               dgram.PushHash(file().hashes.root);
-               AddAck(dgram);
-       } else { // responding
-               AddAck(dgram);
+               dgram.Push32(bin64_t::ALL32);
+               dgram.PushHash(file().root_hash());
        }
-    DLOG(INFO)<<"#"<<id<<" sending a handshake to "<<*this;
-       PCHECK( dgram.Send() != -1 )<<"error sending";
-       last_send_time = Datagram::now;
+    AddAck(dgram);
+    //DLOG(INFO)<<"#"<<id<<" sending a handshake to "<<this->id_string();
 }
 
 
-void           Channel::SendData () {
-       CleanStaleDataOut(0);
-       int round = 0;
+tint   Channel::Send () {
     Datagram dgram(socket_,peer);
     dgram.Push32(peer_channel_id);
-    AddAck(dgram);
-    AddHint(dgram);
-       while (cc_.cwnd()>data_out.size()) {
-               AddData(dgram); // always the last: might be tail block
-               if (dgram.size()==4 && Datagram::now-last_send_time<TIMEOUT/2) 
-                       break; // nothing to send
-        DLOG(INFO)<<"#"<<id<<" sending "<<dgram.size()<<" bytes";
-               PCHECK( dgram.Send() != -1 )<<"error sending";
-               last_send_time = Datagram::now;
-               round++;
-        dgram.Clear();
-        dgram.Push32(peer_channel_id);
-       }
-       DLOG(INFO)<<"#"<<id<<" sent "<<round<<" rounds";
-}
-
-
-void   Channel::Send () {
-       if (state()==HS_DONE)
-               SendData();
-       else
-               SendHandshake();
-}
-
-
-bin            Channel::ReceiverPiecePick (int limit) {
-       bins diff(ack_in);
-       diff -= file().ack_out;
-       diff -= file().hint_out;
-       if (diff.empty()) {
-               //uninteresting = true;
-               return 0;
-       }
-       bin need = *(diff.begin());
-       while (need.width()>std::max(1,limit))
-               need = need.left();
-       return need;
+    if ( is_established() ) {
+        AddAck(dgram);
+        AddHint(dgram);
+        if (cc->free_cwnd() && Datagram::now>=cc->next_send_time()) {
+            bin64_t data = AddData(dgram);
+            cc->OnDataSent(data);
+        }
+    } else {
+        AddHandshake(dgram);
+    }
+    DLOG(INFO)<<"#"<<id<<" sending "<<dgram.size()<<" bytes";
+       PCHECK( dgram.Send() != -1 )<<"error sending";
+       last_send_time = Datagram::now;
+    return cc->next_send_time();
 }
 
 
 void   Channel::AddHint (Datagram& dgram) {
-       CleanStaleHintOut();
-       int onesec = TINT_1SEC/cc_.data_in_rate();
-       if (Width(hint_out)<onesec) {
-               bin hint = ReceiverPiecePick(onesec);
-               if (hint) {
-                       dgram.Push8(P2TP_HINT);
-                       dgram.Push32(hint);
-                       hint_out.push_back(tintbin(Datagram::now,hint));
-                       file().hint_out |= hint;  // FIXME: incapsulate File data
-            DLOG(INFO)<<"#"<<id<<" +HINT"<<hint;
-               }
-       }
+       CleanStaleHints();
+    uint64_t outstanding = 0;
+    for(tbqueue::iterator i=hint_out.begin(); i!=hint_out.end(); i++)
+        outstanding += i->bin.width();
+       uint64_t kbps = TINT_SEC * cc->peer_cwnd() / cc->rtt_avg();
+    if (outstanding>kbps) // have enough
+        return;
+    uint8_t layer = 0;
+    while( (1<<layer) < kbps ) layer++;
+    bin64_t hint = file().picker()->Pick(ack_in,layer);
+    if (hint==bin64_t::NONE)
+        return;
+    dgram.Push8(P2TP_HINT);
+    dgram.Push32(hint);
+    hint_out.push_back(tintbin(Datagram::now,hint));
+    DLOG(INFO)<<"#"<<id<<" +HINT"<<hint;
 }
 
 
-bin            Channel::AddData (Datagram& dgram) {
-       if (!file().history.size()) // know nothing
-               return 0;
-       bin tosend = hash_out ? hash_out : SenderPiecePick();
-       hash_out = 0;
-    if (tosend==0) {
-        LOG(WARNING)<<*this<<" no idea what to send";
-               cc_.OnCongestionEvent(CongestionControl::LOSS_EV);
-               return 0;
+bin64_t                Channel::AddData (Datagram& dgram) {
+       if (!file().size()) // know nothing
+               return bin64_t::NONE;
+       bin64_t tosend = DequeueHint();
+    if (tosend==bin64_t::NONE) {
+        //LOG(WARNING)<<this->id_string()<<" no idea what to send";
+               cc->OnDataSent(bin64_t::NONE);
+               return bin64_t::NONE;
        }
-       if (peer_status()==File::EMPTY && file().history.size()) //FIXME
+       if (ack_in.empty() && file().size())
                AddPeakHashes(dgram);
        AddUncleHashes(dgram,tosend);
        uint8_t buf[1024];
-       size_t r = pread(fd,buf,1024,tosend.offset()<<10); // TODO: ??? corrupted data, retries
+       size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); // TODO: ??? corrupted data, retries
        if (r<0) {
                PLOG(ERROR)<<"error on reading";
                return 0;
        }
-       if (dgram.space()<r+4+1) {
-               hash_out = tosend;
-               return -tosend; // FIXME
-       }
+       assert(dgram.space()>=r+4+1);
        dgram.Push8(P2TP_DATA);
        dgram.Push32(tosend);
        dgram.Push(buf,r);
-       data_out.push_back(tintbin(Datagram::Time(),tosend));
     DLOG(INFO)<<"#"<<id<<" +DATA"<<tosend;
        return tosend;
 }
 
 
 void   Channel::AddAck (Datagram& dgram) {
-       int ackspace = min(4,dgram.space()/5);
-       if (data_in_) {
-               dgram.Push8(P2TP_ACK);
-               dgram.Push32(data_in_);
-        DLOG(INFO)<<"#"<<id<<" +!ACK"<<data_in_;
-               ackspace--;
-       }
-       while (ack_out<file().history.size() && ackspace) {
-               bin h=file().history[ack_out++];
-               if (!file().ack_out.contains(h.parent()) && h!=data_in_) {
-                       dgram.Push8(P2TP_ACK);
-                       dgram.Push32(h);
-            DLOG(INFO)<<"#"<<id<<" +ACK"<<h;
-                       ackspace--;
-               }
+       if (data_in_.time) {
+               dgram.Push8(P2TP_ACK_TS);
+               dgram.Push32(data_in_.bin);
+               dgram.Push64(data_in_.time);
+        data_in_.time = 0;
+        DLOG(INFO)<<"#"<<id<<" +!ACK"<<data_in_.bin;
        }
-       data_in_ = 0;
+    bin64_t h=file().data_in(ack_out_);
+    int count=0;
+    while (h!=bin64_t::NONE && count++<4) {
+        dgram.Push8(P2TP_ACK);
+        dgram.Push32(h);
+        DLOG(INFO)<<"#"<<id<<" +ACK"<<h;
+        h=file().data_in(++ack_out_);
+    }
 }
 
 
@@ -247,12 +174,15 @@ void      Channel::Recv (Datagram& dgram) {
        while (dgram.size()) {
                uint8_t type = dgram.Pull8();
                switch (type) {
+            case P2TP_HANDSHAKE: OnHandshake(dgram); break;
                        case P2TP_DATA:         OnData(dgram); break;
+                       case P2TP_ACK_TS:       OnAckTs(dgram); break;
                        case P2TP_ACK:          OnAck(dgram); break;
                        case P2TP_HASH:         OnHash(dgram); break;
                        case P2TP_HINT:         OnHint(dgram); break;
+            case P2TP_PEX_ADD:  OnPex(dgram); break;
                        default:
-                               LOG(ERROR) << *this << " malformed datagram";
+                               //LOG(ERROR) << this->id_string() << " malformed datagram";
                                return;
                }
        }
@@ -260,71 +190,135 @@ void     Channel::Recv (Datagram& dgram) {
 
 
 void   Channel::OnHash (Datagram& dgram) {
-       bin pos = dgram.Pull32();
+       bin64_t pos = dgram.Pull32();
        Sha1Hash hash = dgram.PullHash();
-       if (file().OfferHash(pos,hash))
-        DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
+       file().OfferHash(pos,hash);
+    DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
 }
 
 
 void Channel::OnData (Datagram& dgram) {
-       bin pos = dgram.Pull32();
-       uint8_t* data;
-       size_t length = dgram.Pull(&data,1024);
+       bin64_t pos = dgram.Pull32();
     DLOG(INFO)<<"#"<<id<<" .DATA"<<pos;
-       if (pos.layer()) 
-               RETLOG("non-base layer DATA pos");
-       if (file().ack_out.contains(pos)) 
-               RETLOG("duplicate transmission: "<<pos);
-       if (file().status()==File::EMPTY)
-               RETLOG("DATA for an empty file");
-       if (pos.offset()>=file().packet_size())
-               RETLOG("DATA pos out of bounds");
-       Sha1Hash hash(data,length);
-       if (file().OfferHash(pos, hash)) {
-               //memcpy(file->data+offset*KILO,
-               //channel->datagram->data+channel->datagram->offset,KILO);
-               pwrite(fd,data,length,pos.offset()*1024); // TODO; if (last) ftruncate
-               if (pos==file().hashes.data_mass()) {
-                       int lendiff = 1024-length;
-                       ftruncate(fd, file().size()-lendiff);
-               }
-               data_in_ = pos;
-               file().ack_out |= pos;
-               file().history.push_back(file().ack_out.get(pos));
-               if (file().history.size()==file().packet_size()+1) // FIXME: encapsulate
-                       file().status_ = File::DONE;
-               cc_.OnCongestionEvent(CongestionControl::DATA_EV);
-               //DLOG(INFO)<<*this<<" DATA< "<<pos;
-               CleanStaleHintOut();
-       } else
-               LOG(ERROR)<<"data hash is not accepted "<<pos<<" len "<<length;
+    file().OfferData(pos, *dgram, dgram.size());
+       cc->OnDataRecvd(pos);
+       CleanStaleHints();
 }
 
 
 void   Channel::OnAck (Datagram& dgram) {
-       // FIXME check whether it is in the range
-       bin pos = dgram.Pull32();
+       // note: no bound checking
+       bin64_t pos = dgram.Pull32();
     DLOG(INFO)<<"#"<<id<<" .ACK"<<pos;
-       if (file().hashes.data_mass() && pos>file().hashes.data_mass()) {
-               LOG(WARNING) << "out-of-bounds ACK";
-               return;
-       }
-       ack_in |= pos;
-       
-       CleanStaleDataOut(pos);
-       
-       if (peer_status_==File::EMPTY) {
-               peer_status_ = File::IN_PROGRESS;
-       } else if (peer_status_==File::IN_PROGRESS) {
-               // FIXME: FINISHED  ack_in_.filled(file().size())
-       }
+       ack_in.set(pos);
+}
+
+
+void   Channel::OnAckTs (Datagram& dgram) {
+       bin64_t pos = dgram.Pull32();
+    tint ts = dgram.Pull64();
+    DLOG(INFO)<<"#"<<id<<" ,ACK"<<pos;
+    //dprintf("%lli #%i +ack %lli +ts %lli",Datagram::now,id,pos,ts);
+       ack_in.set(pos);
+       cc->OnAckRcvd(tintbin(ts,pos));
 }
 
 
 void   Channel::OnHint (Datagram& dgram) {
-       bin hint = dgram.Pull32();
-       hint_in.push_back(tintbin(Datagram::now,hint));
+       bin64_t hint = dgram.Pull32();
+       hint_in.push_back(hint);
+}
+
+
+void Channel::OnHandshake (Datagram& dgram) {
+    peer_channel_id = dgram.Pull32();
+    // FUTURE: channel forking
+}
+
+
+void Channel::OnPex (Datagram& dgram) {
+    uint32_t addr = dgram.Pull32();
+    uint16_t port = dgram.Pull16();
+    if (peer_selector)
+        peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
+}
+
+
+void   Channel::Recv (int socket) {
+       Datagram data(socket);
+       data.Recv();
+       if (data.size()<4) 
+               RETLOG("datagram shorter than 4 bytes");
+       uint32_t mych = data.Pull32();
+       Sha1Hash hash;
+       Channel* channel;
+       if (!mych) { // handshake initiated
+               if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
+                       RETLOG ("incorrect size initial handshake packet");
+               uint8_t hashid = data.Pull8();
+               if (hashid!=P2TP_HASH) 
+                       RETLOG ("no hash in the initial handshake");
+               bin pos = data.Pull32();
+               if (pos!=bin64_t::ALL32) 
+                       RETLOG ("that is not the root hash");
+               hash = data.PullHash();
+               FileTransfer* file = FileTransfer::Find(hash);
+               if (!file) 
+                       RETLOG ("hash unknown, no such file");
+               channel = new Channel(file, socket, data.address());
+       } else {
+               mych = DecodeID(mych);
+               if (mych>=channels.size()) 
+                       RETLOG ("invalid channel id");
+               channel = channels[mych];
+               if (!channel) 
+                       RETLOG ("channel is closed");
+               if (channel->peer != data.address()) 
+                       RETLOG ("invalid peer address");
+               channel->Recv(data);
+       }
+       channel->Send();
+}
+
+
+bool tblater (const tintbin& a, const tintbin& b) {
+    return a.time > b.time;
 }
 
 
+void    Channel::Loop (tint time) {  
+       
+       tint untiltime = Datagram::Time()+time;
+    tbqueue send_queue;
+    for(int i=0; i<channels.size(); i++)
+        if (channels[i])
+            send_queue.push_back(tintbin(Datagram::now,i));
+       
+    while ( Datagram::now <= untiltime ) {
+               
+        tintbin next_send = send_queue.front();
+        pop_heap(send_queue.begin(), send_queue.end(), tblater);
+        send_queue.pop_back();
+        tint wake_on = min(next_send.time,untiltime);
+               tint towait = min(wake_on-Datagram::now,TINT_SEC); // towait<0?
+        
+               int rd = Datagram::Wait(socket_count,sockets,towait);
+               if (rd!=-1)
+                       Recv(rd);
+        
+        int chid = (int)(next_send.bin);
+        Channel* sender = channels[chid];
+        if (sender) {
+            tint next_time = sender->Send();
+            if (next_time!=TINT_NEVER) {
+                send_queue.push_back(tintbin(next_time,chid));
+                push_heap(send_queue.begin(),send_queue.end(),tblater);
+            } else {
+                delete sender;
+                channels[chid] = NULL;
+            }
+        }
+               
+    }
+       
+}
index 3a2bb44..7f3c44d 100644 (file)
@@ -76,6 +76,14 @@ TEST(Bin64Test, Iteration) {
     EXPECT_EQ(bin64_t(3,0),i);
 }
 
+TEST(Bin64Test, Bits) {
+    bin64_t all = bin64_t::ALL, none = bin64_t::NONE, big = bin64_t(40,18);
+    uint32_t a32 = all.to32(), n32 = none.to32(), b32 = big.to32();
+    EXPECT_EQ(0x7fffffff,a32);
+    EXPECT_EQ(0xffffffff,n32);
+    EXPECT_EQ(bin64_t::NONE32,b32);
+}
+
 int main (int argc, char** argv) {
        
        testing::InitGoogleTest(&argc, argv);
index d831a5e..9cecd7a 100644 (file)
@@ -11,6 +11,7 @@
 #include <glog/logging.h>
 #include "p2tp.h"
 
+using namespace p2tp;
 
 /*TEST(P2TP, ConnectTest) {
 
 TEST(P2TP,CwndTest) {
        int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
        int size = rand()%(1<<19) + (1<<19);
+    int sizek = (size>>10) + ((size&1023)?1:0);
        char* b = (char*)malloc(size);
        for(int i=0; i<size; i++) 
                b[i] = (i%1024!=1023) ? ('A' + rand()%('Z'-'A')) : ('\n');
        write(f,b,size);
        free(b);
        close(f);
-       
+
+       /*
        struct sockaddr_in addr1, addr2;
        addr1.sin_family = AF_INET;
        addr1.sin_port = htons(7003);
@@ -65,43 +68,40 @@ TEST(P2TP,CwndTest) {
        addr2.sin_family = AF_INET;
        addr2.sin_port = htons(7004);
        addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-         
-    int sock1 = p2tp::Init(7003);
-       int sock2 = p2tp::Init(7004);
+         */
+    int sock1 = p2tp::Listen(7003);
        ASSERT_TRUE(sock1>=0);
-       ASSERT_TRUE(sock2>=0);
+       //ASSERT_TRUE(sock2>=0);
        
+    p2tp::AddPeer(Datagram::Address("127.0.0.1",7001));
+    
        int file = p2tp::Open("big_test_file");
-       p2tp::File& fileobj = * p2tp::File::file(file);
-  
-       int copy = p2tp::Open(fileobj.root_hash(),"big_test_file_copy");
-       p2tp::File& copyobj = * p2tp::File::file(copy);
+    FileTransfer* fileobj = FileTransfer::file(file);
+    
+       int copy = p2tp::Open("big_test_file_copy",fileobj->root_hash());
   
-       int chan = p2tp::Connect(copy,sock1,addr2);
-  
-       p2tp::Loop();
-       p2tp::Loop();
-       p2tp::Channel& sendch = * p2tp::Channel::channel(chan+1);
-       while (copyobj.status()!=p2tp::File::DONE) {
-               p2tp::Loop();
-               LOG(INFO)<<sendch.congestion_control().cwnd()<<" cwnd";
-               //EXPECT_GE(1,sendch.congestion_control().cwnd());
-       }
-       
-       ASSERT_EQ(p2tp::file_size(file),p2tp::file_size(copy));
+       p2tp::Loop(TINT_MSEC);
+    
+    ASSERT_EQ(sizek<<10,p2tp::Size(copy));
+
+    int count = 0;
+    while (p2tp::SeqComplete(copy)!=size && count++<(1<<14))
+        p2tp::Loop(TINT_MSEC);
+    ASSERT_EQ(size,p2tp::SeqComplete(copy));
+    
        p2tp::Close(file);
        p2tp::Close(copy);
 
        p2tp::Shutdown(sock1);
-       p2tp::Shutdown(sock2);
+       //p2tp::Release(sock2);
 
 }
 
 
 int main (int argc, char** argv) {
        
-       bin::init();
-       bins::init();
+       //bin::init();
+       //bins::init();
        google::InitGoogleLogging(argv[0]);
        testing::InitGoogleTest(&argc, argv);
        int ret = RUN_ALL_TESTS();
index 395e85d..87d4313 100644 (file)
@@ -40,7 +40,7 @@ TEST(Datagram, BinaryTest) {
        int datalen = strlen(text)+1+2+4+8;
        ASSERT_EQ(datalen,d.Send());
     int socks[1] = {socket};
-       socket = Datagram::Wait(1,socks);
+       ASSERT_EQ (socket, Datagram::Wait(1,socks));
        Datagram rcv(socket);
        ASSERT_EQ(datalen,rcv.Recv());
        char* rbuf;
index 9a0db9d..59900e5 100644 (file)
@@ -22,7 +22,7 @@ int FileTransfer::instance = 0;
 
 // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
 
-FileTransfer::FileTransfer (const Sha1Hash& _root_hash, const char* filename) :
+FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
     root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false), 
     peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
     complete_(0), completek_(0), seq_complete_(0)
@@ -30,6 +30,9 @@ FileTransfer::FileTransfer (const Sha1Hash& _root_hash, const char* filename) :
        fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
        if (fd_<0)
         return;
+    if (files.size()<fd_+1)
+        files.resize(fd_+1);
+    files[fd_] = this;
     if (root_hash_==Sha1Hash::ZERO) // fresh submit, hash it
         Submit();
     else
@@ -38,11 +41,6 @@ FileTransfer::FileTransfer (const Sha1Hash& _root_hash, const char* filename) :
 }
 
 
-bin64_t         FileTransfer::PickBinForRequest (bins& from, uint8_t layer) {
-    return picker_->Pick(from,layer);
-}
-
-
 void FileTransfer::LoadPeaks () {
     char file_name[1024];
     sprintf(file_name,PEAK_FILE_TEMPLATE,root_hash().hex().c_str(),instance);
@@ -154,6 +152,14 @@ void            FileTransfer::Submit () {
 }
 
 
+bin64_t         FileTransfer::peak_for (bin64_t pos) const {
+    int pi=0;
+    while (pi<peak_count_ && !pos.within(peaks_[pi]))
+        pi++;
+    return pi==peak_count_ ? bin64_t(bin64_t::NONE) : peaks_[pi];
+}
+
+
 void            FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {    
        if (!size_)  // only peak hashes are accepted at this point
                return OfferPeak(pos,hash);
@@ -177,19 +183,16 @@ bin64_t         FileTransfer::data_in (int offset) {
 }
 
 
-bool            FileTransfer::OfferData (bin64_t pos, uint8_t* data, size_t length) {
+bool            FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_t length) {
     if (!pos.is_base())
         return false;
     if (length<1024 && pos!=bin64_t(0,sizek_-1))
         return false;
     if (ack_out_.get(pos)==bins::FILLED)
         return true; // ???
-    int pi=0;
-    while (pi<peak_count_ && !pos.within(peaks_[pi]))
-        pi++;
-    if (pi==peak_count_)
+    bin64_t peak = peak_for(pos);
+    if (peak==bin64_t::NONE)
         return false;
-    bin64_t peak = peaks_[pi];
     
     Sha1Hash hash(data,length);       
     bin64_t p = pos;
@@ -266,6 +269,7 @@ FileTransfer::~FileTransfer () {
     munmap(hashes_,sizek_*2*Sha1Hash::SIZE);
     close(hashfd_);
     close(fd_);
+    files[fd_] = NULL;
 }
 
                            
@@ -277,18 +281,14 @@ FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
 }
 
 
-int      p2tp::Open (const char* filename) {
-    return Open(Sha1Hash::ZERO,filename);
-}
-
-
-int      p2tp::Open (const Sha1Hash& hash, const char* filename) {
-    FileTransfer* ft = new FileTransfer(hash, filename);
-    if (ft->fd_>0) {
-        if (FileTransfer::files.size()<ft->fd_)
-            FileTransfer::files.resize(ft->fd_);
-        FileTransfer::files[ft->fd_] = ft;
-        return ft->fd_;
+int      p2tp::Open (const char* filename, const Sha1Hash& hash) {
+    FileTransfer* ft = new FileTransfer(filename, hash);
+    int fdes = ft->file_descriptor();
+    if (fdes>0) {
+        if (FileTransfer::files.size()<fdes)
+            FileTransfer::files.resize(fdes);
+        FileTransfer::files[fdes] = ft;
+        return fdes;
     } else {
         delete ft;
         return -1;
@@ -296,14 +296,6 @@ int      p2tp::Open (const Sha1Hash& hash, const char* filename) {
 }
 
 
-void     p2tp::Close (int fdes) {
-    // FIXME delete all channels
-    delete FileTransfer::files[fdes];
-    FileTransfer::files[fdes] = NULL;
-}
-
-
-
 /*
  for(int i=0; i<peak_hash_count; i++) {
  bin64_t x = peaks[i], end = x.sibling();