towards swarming
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Fri, 6 Nov 2009 11:28:37 +0000 (11:28 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Fri, 6 Nov 2009 11:28:37 +0000 (11:28 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@501 e16421f0-f15b-0410-abcd-98678b794739

12 files changed:
datagram.cpp
datagram.h
exec/leecher.cpp
ext/ledbat_controller.cpp
ext/send_control.cpp
ext/simple_selector.cpp
p2tp.cpp
p2tp.h
sendrecv.cpp
tests/connecttest.cpp
tests/dgramtest.cpp
transfer.cpp

index 3a50a98..f6325eb 100644 (file)
@@ -21,7 +21,7 @@ namespace p2tp {
 
 tint Datagram::now = Datagram::Time();
 tint Datagram::epoch = now;
-uint32_t Datagram::Address::LOCALHOST = INADDR_LOOPBACK;
+uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
 uint64_t Datagram::dgrams_up=0, Datagram::dgrams_down=0, 
          Datagram::bytes_up=0, Datagram::bytes_down=0;
 
index d272558..35da147 100644 (file)
@@ -46,46 +46,48 @@ typedef int64_t tint;
 #define INVALID_SOCKET -1
 #endif
 
+    
+struct Address {
+    struct sockaddr_in  addr;
+    static uint32_t LOCALHOST;
+    void init(uint32_t ipv4=0, uint16_t port=0) {
+        memset(&addr,0,sizeof(struct sockaddr_in));
+        addr.sin_family = AF_INET;
+        addr.sin_port = htons(port);
+        addr.sin_addr.s_addr = htonl(ipv4);
+    }
+    Address() { init(); }
+    Address(const char* ip, uint16_t port) {
+        init(LOCALHOST,port);
+        inet_aton(ip,&(addr.sin_addr));
+    }
+    Address(uint16_t port) {
+        init(LOCALHOST,port);
+    }
+    Address(uint32_t ipv4addr, uint16_t port) {
+        init(ipv4addr,port);
+    }
+    Address(const struct sockaddr_in& address) : addr(address) {}
+    uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); }
+    uint16_t port () const { return ntohs(addr.sin_port); }
+    operator sockaddr_in () const {return addr;}
+    bool operator == (const Address& b) const { 
+        return addr.sin_family==b.addr.sin_family &&
+        addr.sin_port==b.addr.sin_port &&
+        addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
+    }
+    std::string str () const {
+        char s[32];
+        sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
+                (ipv4()>>8)&0xff,ipv4()&0xff,port());
+        return std::string(s);
+    }
+    bool operator != (const Address& b) const { return !(*this==b); }
+};
+    
+    
 struct Datagram {
 
-    struct Address {
-        struct sockaddr_in  addr;
-        static uint32_t LOCALHOST;
-        void init(uint32_t ipv4=0, uint16_t port=0) {
-            memset(&addr,0,sizeof(struct sockaddr_in));
-            addr.sin_family = AF_INET;
-            addr.sin_port = htons(port);
-            addr.sin_addr.s_addr = htonl(ipv4);
-        }
-        Address() { init(); }
-        Address(const char* ip, uint16_t port) {
-            init(LOCALHOST,port);
-            inet_aton(ip,&(addr.sin_addr));
-        }
-        Address(uint16_t port) {
-            init(LOCALHOST,port);
-        }
-        Address(uint32_t ipv4addr, uint16_t port) {
-            init(ipv4addr,port);
-        }
-        Address(const struct sockaddr_in& address) : addr(address) {}
-        uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); }
-        uint16_t port () const { return ntohs(addr.sin_port); }
-        operator sockaddr_in () const {return addr;}
-        bool operator == (const Address& b) const { 
-            return addr.sin_family==b.addr.sin_family &&
-            addr.sin_port==b.addr.sin_port &&
-            addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
-        }
-        std::string str () const {
-            char s[32];
-            sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
-                    (ipv4()>>8)&0xff,ipv4()&0xff,port());
-            return std::string(s);
-        }
-        bool operator != (const Address& b) const { return !(*this==b); }
-    };
-
        Address addr;
        SOCKET sock;
        int offset, length;
index a68ebf9..abd05da 100644 (file)
 using namespace p2tp;
 
 
+/** P2TP downloader. Params: root hash, filename, tracker ip/port, own ip/port */
 int main (int argn, char** args) {
     
-    assert(0<p2tp::Listen(Datagram::Address(INADDR_ANY,7002)));
-       p2tp::SetTracker(Datagram::Address("130.161.211.198",7001));
-    unlink("doc/sofi-copy.jpg");
-       int file = p2tp::Open("doc/sofi-copy.jpg",
-                          Sha1Hash(true,"282a863d5567695161721686a59f0c667250a35d"));
+    if (argn<4) {
+        fprintf(stderr,"parameters: root_hash filename tracker_ip/port [own_ip/port]\n");
+        return -1;
+    }
+    Sha1Hash root_hash(true,args[1]);
+    if (root_hash==Sha1Hash::ZERO) {
+        fprintf(stderr,"Sha1 hash format: hex, 40 symbols\n");
+        return -2;
+    }
+    
+    char* filename = args[2];
+    
+    Address tracker(args[3]), bindaddr(args[4]);
+    
+    if (tracker==Address()) {
+        fprintf(stderr,"Tracker address format: [1.2.3.4:]12345\n");
+        return -2;
+    }
+    if (bindaddr==Address()) 
+        bindaddr = Address(rand()%10000+7000);
+    
+    assert(0<p2tp::Listen(bindaddr));
+    
+       p2tp::SetTracker(tracker);
+    
+       int file = p2tp::Open(filename,root_hash);
     
     while (!p2tp::Complete(file)) {
-           p2tp::Loop(TINT_SEC/10);
+           p2tp::Loop(TINT_SEC/100);
         printf("%lli dgram %lli bytes up, %lli dgram %lli bytes down\n",
                Datagram::dgrams_up, Datagram::bytes_up,
                Datagram::dgrams_down, Datagram::bytes_down );
index 8febc9c..5bc19bd 100644 (file)
@@ -44,7 +44,7 @@ public:
     }
     
     tint    next_send_time ( ){
-        return cwnd_ ? Datagram::now + (rtt_avg_/cwnd_) : TINT_NEVER; // TODO keepalives
+        return cwnd_ ? NOW + (rtt_avg_/cwnd_) : TINT_NEVER; // TODO keepalives
     }
     
     void OnDataSent(bin64_t b) {
@@ -52,20 +52,20 @@ public:
             cwnd_ = 0; // nothing to send; suspend
         } else {
             last_bin_sent_ = b;
-            last_send_time_ = Datagram::now;
+            last_send_time_ = NOW;
             in_flight_++;
         }
     }
     
     void OnDataRecvd(bin64_t b) {
-        last_recv_time_ = Datagram::now;
+        last_recv_time_ = NOW;
     }
     
     void OnAckRcvd(bin64_t b, tint peer_stamp) {
         if (last_bin_sent_!=b)
             return;
         if (peer_stamp!=TINT_NEVER)
-            rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac
+            rtt_avg_ = (rtt_avg_*7 + (NOW-last_send_time_)) >> 3; // van Jac
         in_flight_--;
     }
     
index 85d599e..593ef34 100644 (file)
@@ -29,7 +29,7 @@ tint    PingPongController::NextSendTime () {
 }
     
 void    PingPongController::OnDataSent(bin64_t b) {
-    if ( (ch_->last_recv_time_ && ch_->last_recv_time_<Datagram::now-TINT_SEC*3) || //no reply
+    if ( (ch_->last_recv_time_ && ch_->last_recv_time_<NOW-TINT_SEC*3) || //no reply
          (b==bin64_t::ALL && MaySendData()) ) // nothing to send
         Swap(new KeepAliveController(this));
 }
@@ -69,7 +69,7 @@ bool    CwndController::MaySendData() {
     dprintf("%s #%i maysend %i < %f & %s (rtt %lli)\n",Datagram::TimeStr(),
             ch_->id,(int)ch_->data_out_.size(),cwnd_,Datagram::TimeStr(NextSendTime()),
             ch_->rtt_avg_);
-    return ch_->data_out_.size() < cwnd_  &&  Datagram::now >= NextSendTime();
+    return ch_->data_out_.size() < cwnd_  &&  NOW >= NextSendTime();
 }
     
 tint    CwndController::NextSendTime () {
index 790a205..2396b11 100644 (file)
@@ -19,7 +19,7 @@ class SimpleSelector : public PeerSelector {
 public:
     SimpleSelector () {
     }
-    void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) {
+    void AddPeer (const Address& addr, const Sha1Hash& root) {
         peers.push_front(memo_t(addr,root)); //,root.fingerprint() !!!
     }
     Address GetPeer (const Sha1Hash& for_root) {
index db8f530..485b2a2 100644 (file)
--- a/p2tp.cpp
+++ b/p2tp.cpp
@@ -45,7 +45,7 @@ Channel::Channel      (FileTransfer* file, int socket, Address peer_addr) :
        this->id = channels.size();
        channels.push_back(this);
     cc_ = new PingPongController(this);
-    RequeueSend(Datagram::now);
+    RequeueSend(NOW);
 }
 
 
@@ -67,7 +67,7 @@ int Channel::EncodeID(int unscrambled) {
 }
 
 
-int     p2tp::Listen (Datagram::Address addr) {
+int     p2tp::Listen (Address addr) {
     int sock = Datagram::Bind(addr);
     if (sock!=INVALID_SOCKET)
         Channel::sockets[Channel::socket_count++] = sock;
@@ -117,7 +117,7 @@ void        p2tp::Close (int fd) {
 }
 
 
-void    p2tp::AddPeer (Datagram::Address address, const Sha1Hash& root) {
+void    p2tp::AddPeer (Address address, const Sha1Hash& root) {
     Channel::peer_selector->AddPeer(address,root);
 }
 
diff --git a/p2tp.h b/p2tp.h
index e7b46e1..69ea5f3 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -64,19 +64,19 @@ Messages
 
 namespace p2tp {
 
+    #define NOW Datagram::now
     struct tintbin {
         tint    time;
         bin64_t bin;
         tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
         tintbin() : time(0), bin(bin64_t::NONE) {}
         tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
-        tintbin(bin64_t bin_) : time(Datagram::now), bin(bin_) {}
+        tintbin(bin64_t bin_) : time(NOW), bin(bin_) {}
     };
 
-    
        typedef std::deque<tintbin> tbqueue;
     typedef std::deque<bin64_t> binqueue;
-    typedef Datagram::Address   Address;
+    typedef Address   Address;
 
        typedef enum {
                P2TP_HANDSHAKE = 0,
@@ -118,7 +118,7 @@ namespace p2tp {
             we use a rotating queue of bin completion events. */
         //bin64_t         RevealAck (uint64_t& offset);
         /** Rotating queue read for channels of this transmission. */
-        uint32_t        RevealChannel (int& i);
+        int             RevealChannel (int& i);
         
         static FileTransfer* Find (const Sha1Hash& hash);
                static FileTransfer* file (int fd) {
@@ -144,7 +144,7 @@ namespace p2tp {
         bins&           ack_out ()  { return ack_out_; }
         int             file_descriptor () const { return fd_; }
         PiecePicker&    picker () { return *picker_; }
-        int             channel_count () const { return handshake_in_.size(); }
+        int             channel_count () const { return hs_in_.size(); }
 
         static int instance; // FIXME this smells
 
@@ -188,7 +188,8 @@ namespace p2tp {
         char*           error_;
 
         /** Channels working for this transfer. */
-        std::deque<int> handshake_in_;
+        binqueue        hs_in_;
+        int             hs_in_offset_;
         std::deque<Address>        pex_in_;
         /** Messages we are accepting.    */
         uint64_t        cap_out_;
@@ -226,8 +227,8 @@ namespace p2tp {
     
     class PeerSelector {
     public:
-        virtual void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) = 0;
-        virtual Datagram::Address GetPeer (const Sha1Hash& for_root) = 0;
+        virtual void AddPeer (const Address& addr, const Sha1Hash& root) = 0;
+        virtual Address GetPeer (const Sha1Hash& for_root) = 0;
     };
 
     
@@ -288,7 +289,7 @@ namespace p2tp {
                /** Channel id: index in the channel array. */
                uint32_t        id;
                /**     Socket address of the peer. */
-        Datagram::Address      peer_;
+        Address        peer_;
                /**     The UDP socket fd. */
                int                     socket_;
                /**     Descriptor of the file in question. */
@@ -344,9 +345,9 @@ namespace p2tp {
         static Address  tracker;
                static std::vector<Channel*> channels;
 
-        friend int      Listen (Datagram::Address addr);
+        friend int      Listen (Address addr);
         friend void     Shutdown (int sock_des);
-        friend void     AddPeer (Datagram::Address address, const Sha1Hash& root);
+        friend void     AddPeer (Address address, const Sha1Hash& root);
         friend void     SetTracker(const Address& tracker);
         friend int      Open (const char*, const Sha1Hash&) ; // FIXME
         
@@ -358,7 +359,7 @@ namespace p2tp {
 
     /*************** The top-level API ****************/
     /** Start listening a port. Returns socket descriptor. */
-    int     Listen (Datagram::Address addr);
+    int     Listen (Address addr);
     /** Run send/receive loop for the specified amount of time. */
     void    Loop (tint till);
     /** Stop listening to a port. */
@@ -372,7 +373,7 @@ namespace p2tp {
     /** Add a possible peer which participares in a given transmission. In the case
         root hash is zero, the peer might be talked to regarding any transmission
         (likely, a tracker, cache or an archive). */
-    void    AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO);
+    void    AddPeer (Address address, const Sha1Hash& root=Sha1Hash::ZERO);
     
     void    SetTracker(const Address& tracker);
     
index 2d73dde..ac65e2d 100644 (file)
@@ -72,7 +72,7 @@ bin64_t               Channel::DequeueHint () { // TODO: resilience
 /*void Channel::CleanStaleHints () {
        while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED ) 
                hint_out.pop_front();  // FIXME must normally clear fulfilled entries
-       tint timed_out = Datagram::now - cc_->RoundTripTime()*8;
+       tint timed_out = NOW - cc_->RoundTripTime()*8;
        while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
         file().picker()->Snubbed(hint_out.front().bin);
                hint_out.pop_front();
@@ -99,7 +99,7 @@ void  Channel::AddHandshake (Datagram& dgram) {
 void    Channel::ClearStaleDataOut() {
     int oldsize = data_out_.size();
     while ( data_out_.size() && data_out_.front().time < 
-           Datagram::now - rtt_avg_ - dev_avg_*4 )
+           NOW - rtt_avg_ - dev_avg_*4 )
         data_out_.pop_front();
     if (data_out_.size()!=oldsize)
         cc_->OnAckRcvd(bin64_t::NONE);
@@ -113,6 +113,7 @@ void        Channel::Send () {
     if ( is_established() ) {
         AddAck(dgram);
         AddHint(dgram);
+        AddPex(dgram);
         ClearStaleDataOut();
         if (cc_->MaySendData()) 
             data = AddData(dgram);
@@ -127,7 +128,7 @@ void        Channel::Send () {
     if (dgram.size()==4) // only the channel id; bare keep-alive
         data = bin64_t::ALL;
     cc_->OnDataSent(data);
-    last_send_time_ = Datagram::now;
+    last_send_time_ = NOW;
     RequeueSend(cc_->NextSendTime());
 }
 
@@ -136,7 +137,7 @@ void        Channel::AddHint (Datagram& dgram) {
 
     while (!hint_out_.empty()) {
         tintbin f = hint_out_.front();
-        if (f.time<Datagram::now-rtt_avg_*8) {
+        if (f.time<NOW-rtt_avg_*8) {
             hint_out_.pop_front();
         } else {
             int status = file().ack_out().get(f.bin);
@@ -152,7 +153,7 @@ void        Channel::AddHint (Datagram& dgram) {
         }
     }
     /*while (!hint_out_.empty() &&
-            (hint_out_.front().time<Datagram::now-TINT_SEC ||
+            (hint_out_.front().time<NOW-TINT_SEC ||
             file().ack_out().get(hint_out_.front().bin)==bins::FILLED ) ) {
         file().picker().Expired(hint_out_.front().bin);
         hint_out_.pop_front();
@@ -250,9 +251,10 @@ void       Channel::AddAck (Datagram& dgram) {
 
 void   Channel::Recv (Datagram& dgram) {
     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
-        rtt_avg_ = Datagram::now - last_send_time_;
+        rtt_avg_ = NOW - last_send_time_;
         dev_avg_ = rtt_avg_;
         dip_avg_ = rtt_avg_;
+        file_->hs_in_.push_back(id);
         dprintf("%s #%i rtt init %lli\n",Datagram::TimeStr(),id,rtt_avg_);
     }
     bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
@@ -272,9 +274,9 @@ void        Channel::Recv (Datagram& dgram) {
                }
        }
     cc_->OnDataRecvd(data);
-    last_recv_time_ = Datagram::now;
+    last_recv_time_ = NOW;
     if (data!=bin64_t::ALL)
-        RequeueSend(Datagram::now);
+        RequeueSend(NOW);
 }
 
 
@@ -294,9 +296,9 @@ bin64_t Channel::OnData (Datagram& dgram) {
     bool ok = file().OfferData(pos, data, length) ;
     dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset());
     if (ok) {
-        data_in_ = tintbin(Datagram::now,pos);
+        data_in_ = tintbin(NOW,pos);
         if (last_recv_time_) {
-            tint dip = Datagram::now - last_recv_time_;
+            tint dip = NOW - last_recv_time_;
             dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
         }
         return pos;
@@ -312,7 +314,7 @@ void        Channel::OnAck (Datagram& dgram) {
     dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,ackd_pos.layer(),ackd_pos.offset());
     for (int i=0; i<8 && i<data_out_.size(); i++) 
         if (data_out_[i].bin.within(ackd_pos)) {
-            tint rtt = Datagram::now-data_out_[i].time;
+            tint rtt = NOW-data_out_[i].time;
             rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
             dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
             dprintf("%s #%i rtt %lli dev %lli\n",
@@ -360,32 +362,20 @@ void Channel::OnPex (Datagram& dgram) {
     uint32_t ipv4 = dgram.Pull32();
     uint16_t port = dgram.Pull16();
     Address addr(ipv4,port);
-    //if (peer_selector)
-    //    peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
-    file_->pex_in_.push_back(addr);
-    if (file_->pex_in_.size()>1000)
-        file_->pex_in_.pop_front(); 
-    static int ENOUGH_PEERS_THRESHOLD = 20;
-    if (file_->channel_count()<ENOUGH_PEERS_THRESHOLD) {
-        int i = 0, chno;
-        while ( (chno=file_->RevealChannel(i)) != -1 ) {
-            if (channels[i]->peer()==addr) 
-                break;
-        }
-        if (chno==-1)
-            new Channel(file_,socket_,addr);
-    }
+    dprintf("%s #%i -pex %s\n",Datagram::TimeStr(),id,addr.str().c_str());
+    file_->OnPexIn(addr);
 }
 
 
 void    Channel::AddPex (Datagram& dgram) {
-    int ch = file().RevealChannel(this->pex_out_);
-    if (ch==-1)
+    int chid = file_->RevealChannel(pex_out_);
+    if (chid==-1 || chid==id)
         return;
-    Address a = channels[ch]->peer();
+    Address a = channels[chid]->peer();
     dgram.Push8(P2TP_PEX_ADD);
     dgram.Push32(a.ipv4());
     dgram.Push16(a.port());
+    dprintf("%s #%i +pex %s\n",Datagram::TimeStr(),id,a.str().c_str());
 }
 
 
@@ -396,7 +386,7 @@ void        Channel::Recv (int socket) {
                RETLOG("datagram shorter than 4 bytes");
        uint32_t mych = data.Pull32();
        Sha1Hash hash;
-       Channel* channel;
+       Channel* channel = NULL;
        if (!mych) { // handshake initiated
                if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
                        RETLOG ("incorrect size initial handshake packet");
@@ -411,6 +401,9 @@ void        Channel::Recv (int socket) {
                if (!file) 
                        RETLOG ("hash unknown, no such file");
         dprintf("%s #0 -hash ALL %s\n",Datagram::TimeStr(),hash.hex().c_str());
+        for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
+            if (channels[*i] && channels[*i]->peer_==data.addr) 
+                RETLOG("have a channel already");
                channel = new Channel(file, socket, data.address());
        } else {
                mych = DecodeID(mych);
@@ -463,14 +456,14 @@ void    Channel::Loop (tint howlong) {
         }
         if (send_time>limit)
             send_time = limit;
-        if (sender && send_time<=Datagram::now) {
+        if (sender && send_time<=NOW) {
             dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id,
                     Datagram::TimeStr(send_time));
             sender->Send();
             pop_heap(send_queue.begin(), send_queue.end(), tblater);
             send_queue.pop_back();
         } else {
-            tint towait = send_time - Datagram::now;
+            tint towait = send_time - NOW;
             dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
             int rd = Datagram::Wait(socket_count,sockets,towait);
             if (rd!=-1)
@@ -489,7 +482,7 @@ void    Channel::Loop (tint howlong) {
  if (send_queue.empty())
  dprintf("%s empty send_queue\n", Datagram::TimeStr());
  
- while ( Datagram::now <= untiltime && !send_queue.empty() ) {
+ while ( NOW <= untiltime && !send_queue.empty() ) {
  
  // BUG BUG BUG  no scheduled sends => just listen
  
@@ -508,11 +501,11 @@ void    Channel::Loop (tint howlong) {
  if (sender->next_send_time_!=next_send.time)
  continue;
  
- if (wake_on<Datagram::now)
- wake_on = Datagram::now;
+ if (wake_on<NOW)
+ wake_on = NOW;
  if (wake_on>untiltime)
  wake_on = untiltime;
- tint towait = min(wake_on-Datagram::now,TINT_SEC);
+ tint towait = min(wake_on-NOW,TINT_SEC);
  dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
  int rd = Datagram::Wait(socket_count,sockets,towait);
  if (rd!=-1)
index dd9094a..f4b18b9 100644 (file)
 
 using namespace p2tp;
 
-/*TEST(P2TP, ConnectTest) {
-
-       uint8_t buf[1024];
-       int f = open("test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
-       for(char c='a'; c<='c'; c++) {
-               memset(buf,c,1024);
-               write(f,buf,1024);
-       }
-       close(f);
-       
-       struct sockaddr_in addr;
-       addr.sin_family = AF_INET;
-       addr.sin_port = htons(7001);
-       addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-
-       int sock = p2tp::Init(7001);
-       ASSERT_TRUE(0<sock);
-
-       int file = p2tp::Open("test_file");
-       p2tp::File& fileobj = * p2tp::File::file(file);
-       int copy = p2tp::Open(fileobj.root_hash(),"test_file_copy");
-       p2tp::File& copyobj = * p2tp::File::file(copy);
-       int chan = p2tp::Connect(copy,sock,addr); // TRICK: will open a channel to the first file
-       p2tp::Loop(p2tp::TINT_1SEC);
-       //ASSERT_EQ(p2tp::Channel::channel(chan)->state(),p2tp::Channel::HS_DONE); FIXME: status
-       ASSERT_EQ(p2tp::file_size(file),p2tp::file_size(copy));
-       ASSERT_EQ(p2tp::File::DONE,copyobj.status());
-       p2tp::Close(file);
-       p2tp::Close(copy);
-       
-       p2tp::Shutdown(sock);
-       
-}*/
-
 
 TEST(P2TP,CwndTest) {
 
@@ -57,35 +23,14 @@ TEST(P2TP,CwndTest) {
        ASSERT_EQ(0,stat("doc/sofi.jpg", &st));
     int size = st.st_size;//, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ;
     
-       /*int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
-       int size = 60<<10; //rand()%(1<<19) + (1<<19);
-    int sizek = (size>>10) + ((size&1023)?1:0);
-       char* b = (char*)malloc(size);
-       for(int i=0; i<size; i++) 
-               b[i] = (i%1024!=1023) ? ('A' + rand()%('Z'-'A')) : ('\n');
-       write(f,b,size);
-       free(b);
-       close(f);*/
-
-       /*
-       struct sockaddr_in addr1, addr2;
-       addr1.sin_family = AF_INET;
-       addr1.sin_port = htons(7003);
-       addr1.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-  
-       addr2.sin_family = AF_INET;
-       addr2.sin_port = htons(7004);
-       addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-         */
     int sock1 = p2tp::Listen(7001);
        ASSERT_TRUE(sock1>=0);
-       //ASSERT_TRUE(sock2>=0);
        
        int file = p2tp::Open("doc/sofi.jpg");
     FileTransfer* fileobj = FileTransfer::file(file);
     FileTransfer::instance++;
     
-    p2tp::SetTracker(Datagram::Address("127.0.0.1",7001));
+    p2tp::SetTracker(Address("127.0.0.1",7001));
     
        int copy = p2tp::Open("doc/sofi-copy.jpg",fileobj->root_hash());
   
@@ -100,15 +45,12 @@ TEST(P2TP,CwndTest) {
        p2tp::Close(copy);
 
        p2tp::Shutdown(sock1);
-       //p2tp::Release(sock2);
 
 }
 
 
 int main (int argc, char** argv) {
        
-       //bin::init();
-       //bins::init();
        google::InitGoogleLogging(argv[0]);
        testing::InitGoogleTest(&argc, argv);
        int ret = RUN_ALL_TESTS();
index 4a58d8b..f12f5b1 100644 (file)
@@ -21,7 +21,7 @@ TEST(Datagram, BinaryTest) {
        addr.sin_family = AF_INET;
        addr.sin_port = htons(7001);
        addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-       Datagram d(socket,addr); //Datagram::Address(7001));
+       Datagram d(socket,addr); //Address(7001));
        const char * text = "text";
        const uint8_t num8 = 0xab;
        const uint16_t num16 = 0xabcd;
@@ -75,7 +75,7 @@ TEST(Datagram,TwoPortTest) {
        addr2.sin_port = htons(10002);
        addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);*/
 
-       Datagram send(sock1,Datagram::Address(10002));
+       Datagram send(sock1,Address(10002));
        send.Push32(1234);
        send.Send();
 
index 059dc96..f0983f4 100644 (file)
@@ -31,7 +31,7 @@ int FileTransfer::instance = 0;
 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
     root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false),
     peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
-    complete_(0), completek_(0), seq_complete_(0) //, data_in_off_(0)
+    complete_(0), completek_(0), seq_complete_(0), hs_in_offset_(0)
 {
        fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
        if (fd_<0)
@@ -339,13 +339,6 @@ FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
 }
 
 
-void FileTransfer::OnPexIn (const Address& addr) {
-    pex_in_.push_back(addr);
-    if (pex_in_.size()>1000)
-        pex_in_.pop_front(); 
-}
-
-
 std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix)
 {
        std::string tempfile = gettmpdir();
@@ -371,11 +364,39 @@ std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std
 }*/
 
 
-uint32_t        FileTransfer::RevealChannel (int& offset) {
-    while (offset<Channel::channels.size() && 
-           (!Channel::channels[offset] || Channel::channels[offset]->file_!=this) )
-           offset++;
-    return offset < Channel::channels.size() ? offset : -1;
+void            FileTransfer::OnPexIn (const Address& addr) {
+    for(int i=0; i<hs_in_.size(); i++) {
+        Channel* c = Channel::channels[hs_in_[i]];
+        if (c && c->file_==this && c->peer_==addr)
+            return; // already connected
+    }
+    if (hs_in_.size()<20) {
+        new Channel(this,Channel::sockets[0],addr);
+    } else {
+        pex_in_.push_back(addr);
+        if (pex_in_.size()>1000)
+            pex_in_.pop_front();
+    }
+}
+
+
+int        FileTransfer::RevealChannel (int& pex_out_) {
+    pex_out_ -= hs_in_offset_;
+    if (pex_out_<0)
+        pex_out_ = 0;
+    while (pex_out_<hs_in_.size()) {
+        Channel* c = Channel::channels[hs_in_[pex_out_]];
+        if (c && c->file_==this) {
+            pex_out_ += hs_in_offset_ + 1;
+            return c->id;
+        } else {
+            hs_in_[pex_out_] = hs_in_[0];
+            hs_in_.pop_front();
+            hs_in_offset_++;
+        }
+    }
+    pex_out_ += hs_in_offset_;
+    return -1;
 }