From: victor Date: Mon, 26 Oct 2009 15:27:30 +0000 (+0000) Subject: barely works X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=618326993faba46512f4aa7924d0c70cdc711288;p=swift-upb.git barely works git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@478 e16421f0-f15b-0410-abcd-98678b794739 --- diff --git a/SConstruct b/SConstruct index 33970ee..c28c801 100644 --- a/SConstruct +++ b/SConstruct @@ -26,7 +26,9 @@ DEBUG = True TestDir='tests' target = 'p2tp' -source = [ 'bin64.cpp','hashtree.cpp','datagram.cpp','bins.cpp', 'transfer.cpp', 'compat/hirestimeofday.cpp', 'compat/util.cpp'] +source = [ 'bin64.cpp','hashtree.cpp','datagram.cpp','bins.cpp', + 'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', + 'compat/hirestimeofday.cpp', 'compat/util.cpp'] env = Environment() if sys.platform == "win32": @@ -83,7 +85,7 @@ else: cpppath = "" print "To use external libs, set CPPPATH environment variable to list of colon-separated include dirs" env.Append(CPPPATH=".:"+cpppath) - env.Append(LINKFLAGS="--static") + #env.Append(LINKFLAGS="--static") #if DEBUG: # env.Append(CXXFLAGS="-g") diff --git a/bins.cpp b/bins.cpp index f5a32b6..68dca27 100644 --- a/bins.cpp +++ b/bins.cpp @@ -317,13 +317,25 @@ void bins::remove (bins& b) { } +bin64_t bins::cover(bin64_t val) { + iterator i(this,val,false); + while (i.pos!=val && !i.solid()) + i.towards(val); + //if (!i.half && !halves[0]) + // return bin64_t::ALL; + return i.pos; +} + + bin64_t bins::find_filtered - (bins& filter, const bin64_t range, const uint8_t layer, fill_t seek) + (bins& filter, bin64_t range, const uint8_t layer, fill_t seek) { + if (range==bin64_t::ALL) + range = bin64_t ( height>filter.height ? height : filter.height, 0 ); iterator i(this,range,true), j(&filter,range,true); fill_t stop = seek==EMPTY ? FILLED : EMPTY; while (true) { - while ( i.bin().layer()>layer && (i.deep() || *i!=stop || j.deep() || *j!=EMPTY) ) + while ( i.bin().layer()>layer && (i.deep() || *i!=stop || j.deep() || *j!=FILLED) ) i.left(), j.left(); // TODO may optimize a lot here if (i.bin().layer()==layer && !i.deep() && *i==seek && *j==EMPTY) return i.bin(); @@ -337,7 +349,10 @@ bin64_t bins::find_filtered return bin64_t::NONE; } -void bins::set_range (bins& origin, bin64_t range) { // FIXME unite with remove(); do bitwise() +// FIXME unite with remove(); do bitwise() +void bins::copy_range (bins& origin, bin64_t range) { + if (range==bin64_t::ALL) + range = bin64_t ( height>origin.height ? height : origin.height, 0 ); iterator zis(this,range,true), zat(&origin,range,true); while (zis.pos.within(range)) { while (zis.deep() || zat.deep()) { diff --git a/bins.h b/bins.h index 86b5d04..919f893 100644 --- a/bins.h +++ b/bins.h @@ -26,12 +26,12 @@ public: void set (bin64_t bin, fill_t val=FILLED); - void set_range (bins& origin, bin64_t range); + void copy_range (bins& origin, bin64_t range); bin64_t find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ; bin64_t find_filtered - (bins& filter, const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ; + (bins& filter, bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ; void remove (bins& b); @@ -41,6 +41,8 @@ public: uint32_t size() { return cells_allocated; } + bin64_t cover(bin64_t val); + bool empty () const { return !deep(0) && !halves[0]; } static bool is_mixed (uint16_t val) { return val!=EMPTY && val!=FILLED; } diff --git a/compat/hirestimeofday.cpp b/compat/hirestimeofday.cpp index 6a03c00..f5aad7c 100644 --- a/compat/hirestimeofday.cpp +++ b/compat/hirestimeofday.cpp @@ -5,7 +5,7 @@ */ #include -#include "compat/hirestimeofday.h" +#include "hirestimeofday.h" #ifndef _WIN32 #include diff --git a/compat/unixio.cpp b/compat/unixio.cpp index 1c369a6..01b08ea 100644 --- a/compat/unixio.cpp +++ b/compat/unixio.cpp @@ -2,10 +2,11 @@ * Written by Arno Bakker * see LICENSE.txt for license information */ +#ifdef _WIN32 -#include "compat/unixio.h" -#include +#include "unixio.h" #include +#include #include size_t pread(int fildes, void *buf, size_t nbyte, long offset) @@ -26,3 +27,5 @@ int inet_aton(const char *cp, struct in_addr *inp) inp->S_un.S_addr = inet_addr(cp); return 1; } + +#endif diff --git a/compat/unixio.h b/compat/unixio.h index e36a1f1..94255c1 100644 --- a/compat/unixio.h +++ b/compat/unixio.h @@ -4,6 +4,7 @@ * * Defines UNIX like I/O calls and parameters for Win32 */ +#ifdef _WIN32 #ifndef UNIXIO_H_ #define UNIXIO_H_ @@ -23,3 +24,5 @@ size_t pwrite(int fildes, const void *buf, size_t nbyte, long offset); int inet_aton(const char *cp, struct in_addr *inp); #endif /* UNIXIO_H_ */ + +#endif // WIN32 \ No newline at end of file diff --git a/datagram.cpp b/datagram.cpp index 5edadd7..ea6e60f 100644 --- a/datagram.cpp +++ b/datagram.cpp @@ -20,14 +20,33 @@ namespace p2tp { tint Datagram::now = Datagram::Time(); +tint Datagram::epoch = now; uint32_t Datagram::Address::LOCALHOST = INADDR_LOOPBACK; +char* Datagram::TimeStr (tint time) { + static char ret_str[128]; + if (time==0) + time = now; + time -= epoch; + int hours = time/TINT_HOUR; + time %= TINT_HOUR; + int mins = time/TINT_MIN; + time %= TINT_MIN; + int secs = time/TINT_SEC; + time %= TINT_SEC; + int msecs = time/TINT_MSEC; + time %= TINT_MSEC; + int usecs = time/TINT_uSEC; + sprintf(ret_str,"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs); + return ret_str; +} + int Datagram::Send () { int r = sendto(sock,(const char *)buf+offset,length-offset,0, (struct sockaddr*)&(addr.addr),sizeof(struct sockaddr_in)); - offset=0; - length=0; - now = Time(); + //offset=0; + //length=0; + Time(); return r; } @@ -36,22 +55,19 @@ int Datagram::Recv () { offset = 0; length = recvfrom (sock, (char *)buf, MAXDGRAMSZ, 0, (struct sockaddr*)&(addr), &addrlen); - if (length<0) + if (length<0) // FIXME FIXME FIXME #ifdef _WIN32 PLOG(ERROR)<<"on recv" << WSAGetLastError() << "\n"; #else PLOG(ERROR)<<"on recv"; #endif - now = Time(); + Time(); return length; } SOCKET Datagram::Wait (int sockcnt, SOCKET* sockets, tint usec) { - // ARNOTODO: LOG commented out, it causes a crash on win32 (in a strlen() - // done as part of a std::local::name() ?? - // - //LOG(INFO)<<"waiting for "<0) { for (int i=0; i<=sockcnt; i++) if (FD_ISSET(sockets[i],&bases)) return sockets[i]; - } else if (sel<0) + } else if (sel<0) { #ifdef _WIN32 PLOG(ERROR)<<"select fails" << WSAGetLastError() << "\n"; #else PLOG(ERROR)<<"select fails"; #endif - - // Arno: may return 0 when timeout expired - return sel; + } + return -1; } tint Datagram::Time () { diff --git a/datagram.h b/datagram.h index 45099e2..48d09ee 100644 --- a/datagram.h +++ b/datagram.h @@ -34,6 +34,13 @@ namespace p2tp { +typedef int64_t tint; +#define TINT_HOUR ((tint)1000000*60*60) +#define TINT_MIN ((tint)1000000*60) +#define TINT_SEC ((tint)1000000) +#define TINT_MSEC ((tint)1000) +#define TINT_uSEC ((tint)1) +#define TINT_NEVER ((tint)0x7fffffffffffffffLL) #define MAXDGRAMSZ 1400 #ifndef _WIN32 #define INVALID_SOCKET -1 @@ -62,13 +69,21 @@ struct Datagram { init(ipv4addr,port); } Address(const struct sockaddr_in& address) : addr(address) {} + uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); } + uint16_t port () const { return ntohs(addr.sin_port); } operator sockaddr_in () const {return addr;} - bool operator == (const Address& b) { + bool operator == (const Address& b) const { return addr.sin_family==b.addr.sin_family && addr.sin_port==b.addr.sin_port && addr.sin_addr.s_addr==b.addr.sin_addr.s_addr; } - bool operator != (const Address& b) { return !(*this==b); } + std::string str () const { + char s[32]; + sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff, + (ipv4()>>8)&0xff,ipv4()&0xff,port()); + return std::string(s); + } + bool operator != (const Address& b) const { return !(*this==b); } }; Address addr; @@ -79,8 +94,9 @@ struct Datagram { static SOCKET Bind(Address address); static void Close(int port); static tint Time(); + static char* TimeStr(tint time=0); static SOCKET Wait (int sockcnt, SOCKET* sockets, tint usec=0); - static tint now; + static tint now, epoch; Datagram (SOCKET socket, const Address addr_) : addr(addr_), offset(0), length(0), sock(socket) {} @@ -167,6 +183,7 @@ struct Datagram { }; std::string sock2str (struct sockaddr_in addr); +#define dprintf(...) printf(__VA_ARGS__) } diff --git a/doc/cc-states.png b/doc/cc-states.png new file mode 100644 index 0000000..57b5d49 Binary files /dev/null and b/doc/cc-states.png differ diff --git a/doc/sofi.jpg b/doc/sofi.jpg new file mode 100644 index 0000000..fe34680 Binary files /dev/null and b/doc/sofi.jpg differ diff --git a/doc/state-diagram.pdf b/doc/state-diagram.pdf new file mode 100644 index 0000000..1057574 Binary files /dev/null and b/doc/state-diagram.pdf differ diff --git a/ext/ledbat_controller.cpp b/ext/ledbat_controller.cpp index 007a799..8febc9c 100644 --- a/ext/ledbat_controller.cpp +++ b/ext/ledbat_controller.cpp @@ -18,8 +18,8 @@ public: int cwnd_, peer_cwnd_, in_flight_; bin64_t last_bin_sent_; public: - LedbatController () : dev_avg_(0), rtt_avg_(TINT_SEC), last_send_time_(0), - last_recv_time_(0), cwnd_(1), peer_cwnd_(1), in_flight_(0) { + LedbatController (int chid) : dev_avg_(0), rtt_avg_(TINT_SEC), last_send_time_(0), + last_recv_time_(0), cwnd_(1), peer_cwnd_(1), in_flight_(0), CongestionController(chid) { } tint rtt_avg () { @@ -31,6 +31,7 @@ public: } int cwnd () { + // check for timeouts return cwnd_; } @@ -39,7 +40,7 @@ public: } int free_cwnd ( ){ - return cwnd_ - in_flight_; + return cwnd() - in_flight_; } tint next_send_time ( ){ @@ -63,7 +64,8 @@ public: void OnAckRcvd(bin64_t b, tint peer_stamp) { if (last_bin_sent_!=b) return; - rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac + if (peer_stamp!=TINT_NEVER) + rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac in_flight_--; } diff --git a/ext/seq_picker.cpp b/ext/seq_picker.cpp index 1001d8e..eae0525 100644 --- a/ext/seq_picker.cpp +++ b/ext/seq_picker.cpp @@ -13,34 +13,41 @@ using namespace p2tp; class SeqPiecePicker : public PiecePicker { - bins hint_out_; - FileTransfer* file_; + bins ack_hint_out_; + FileTransfer* file_; public: - SeqPiecePicker (FileTransfer* file_to_pick_from) : file_(file_to_pick_from), hint_out_() { + SeqPiecePicker (FileTransfer* file_to_pick_from) : file_(file_to_pick_from), ack_hint_out_() { + ack_hint_out_.copy_range(file_->ack_out(),bin64_t::ALL); } - virtual bin64_t Pick (bins& from, uint8_t layer) { - bins may_pick = from; - may_pick.remove (file_->ack_out()); - may_pick.remove (hint_out_); - for (int l=layer; l>=0; l--) { + virtual bin64_t Pick (bins& offer, uint8_t layer) { + + bin64_t hint = offer.find_filtered + (ack_hint_out_,bin64_t::ALL,layer,bins::FILLED); + if (hint==bin64_t::NONE) + return hint; // TODO: end-game mode + while (hint.layer()>layer) + hint = hint.left(); + ack_hint_out_.set(hint); + return hint; + /*for (int l=layer; l>=0; l--) { for(int i=0; ipeak_count(); i++) { bin64_t pick = may_pick.find(file_->peak(i),l,bins::FILLED); if (pick!=bin64_t::NONE) return pick; } } - return bin64_t::NONE; + return bin64_t::NONE;*/ } virtual void Received (bin64_t b) { - hint_out_.set(b,bins::EMPTY); + ack_hint_out_.set(b,bins::FILLED); } - virtual void Snubbed (bin64_t b) { - hint_out_.set(b,bins::EMPTY); + virtual void Expired (bin64_t b) { + ack_hint_out_.copy_range(file_->ack_out(),b); } }; \ No newline at end of file diff --git a/ext/simple_selector.cpp b/ext/simple_selector.cpp index 2f165fa..790a205 100644 --- a/ext/simple_selector.cpp +++ b/ext/simple_selector.cpp @@ -36,4 +36,3 @@ public: } }; -PeerSelector* Channel::peer_selector = new SimpleSelector(); \ No newline at end of file diff --git a/p2tp.cpp b/p2tp.cpp index bbc7aa7..fedd68f 100644 --- a/p2tp.cpp +++ b/p2tp.cpp @@ -29,14 +29,23 @@ p2tp::tint Channel::TIMEOUT = TINT_SEC*60; std::vector Channel::channels(1); int Channel::sockets[8] = {0,0,0,0,0,0,0,0}; int Channel::socket_count = 0; - - -Channel::Channel (FileTransfer* file, int socket, struct sockaddr_in peer_addr) : - file_(file), peer(peer_addr), peer_channel_id(0), - socket_(socket) // FIXME +Address Channel::tracker; +tbqueue Channel::send_queue; +#include "ext/dummy_controller.cpp" +#include "ext/simple_selector.cpp" +PeerSelector* Channel::peer_selector = new SimpleSelector(); + +Channel::Channel (FileTransfer* file, int socket, Address peer_addr) : + file_(file), peer_(peer_addr), peer_channel_id_(0), pex_out_(0), + socket_(socket==-1?sockets[0]:socket), // FIXME + own_id_mentioned_(false), next_send_time_(0) { + if (peer_==Address()) + peer_ = tracker; this->id = channels.size(); channels.push_back(this); + cc_ = new BasicController(); + RequeueSend(Datagram::now); } @@ -45,6 +54,9 @@ Channel::~Channel () { } +void p2tp::SetTracker(const Address& tracker) { + Channel::tracker = tracker; +} int Channel::DecodeID(int scrambled) { @@ -76,6 +88,27 @@ void p2tp::Loop (tint till) { } +int p2tp::Open (const char* filename, const Sha1Hash& hash) { + FileTransfer* ft = new FileTransfer(filename, hash); + int fdes = ft->file_descriptor(); + if (fdes>0) { + + /*if (FileTransfer::files.size()FileTransfer::files.size() && FileTransfer::files[fd]) @@ -111,6 +144,10 @@ size_t p2tp::SeqComplete (int fdes) { return 0; } + + + + /**

P2TP handshake

Basic rules:
    diff --git a/p2tp.h b/p2tp.h index 6b50663..9b4ecb2 100644 --- a/p2tp.h +++ b/p2tp.h @@ -70,11 +70,13 @@ namespace p2tp { tintbin(const tintbin& b) : time(b.time), bin(b.bin) {} tintbin() : time(0), bin(bin64_t::NONE) {} tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {} + tintbin(bin64_t bin_) : time(Datagram::now), bin(bin_) {} }; + typedef std::deque tbqueue; typedef std::deque binqueue; - typedef Datagram::Address Address; + typedef Datagram::Address Address; typedef enum { P2TP_HANDSHAKE = 0, @@ -111,6 +113,13 @@ namespace p2tp { accept or remember or drop. Returns true => ACK is sent. */ bool OfferData (bin64_t bin, const uint8_t* data, size_t length); + /** While we need to feed ACKs to every peer, we try (1) avoid + unnecessary duplication and (2) keep minimum state. Thus, + we use a rotating queue of bin completion events. */ + //bin64_t RevealAck (uint64_t& offset); + /** Rotating queue read for channels of this transmission. */ + uint32_t RevealChannel (int& i); + static FileTransfer* Find (const Sha1Hash& hash); static FileTransfer* file (int fd) { return fd handshake_in_; + std::deque
    pex_in_; + /** Messages we are accepting. */ + uint64_t cap_out_; + protected: void SetSize(size_t bytes); void Submit(); @@ -183,6 +199,8 @@ namespace p2tp { Sha1Hash DeriveRoot(); void SavePeaks(); void LoadPeaks(); + void OnDataIn (bin64_t pos); + void OnPexIn (const Address& addr); friend class Channel; friend size_t Size (int fdes); @@ -192,25 +210,24 @@ namespace p2tp { friend void Close (int fd) ; }; - class CongestionController { - public: - virtual tint rtt_avg() = 0; - virtual tint dev_avg() = 0; - virtual tint next_send_time() = 0; - virtual int cwnd() = 0; - virtual int peer_cwnd() = 0; + struct CongestionController { + CongestionController () {} virtual int free_cwnd() = 0; - virtual void OnDataSent(bin64_t b) = 0; - virtual void OnDataRecvd(bin64_t b) = 0; - virtual void OnAckRcvd(const tintbin& tsack) = 0; - virtual ~CongestionController() = 0; + virtual tint RoundTripTime() = 0; + virtual tint RoundTripTimeoutTime() = 0; + virtual int PeerBPS() = 0; + virtual float PeerCWindow() = 0; + virtual tint OnDataSent(bin64_t b) = 0; + virtual tint OnDataRecvd(bin64_t b) = 0; + virtual tint OnAckRcvd(bin64_t ackd, tint peer_time=0) = 0; + virtual ~CongestionController() {} }; class PiecePicker { public: - virtual bin64_t Pick (bins& from, uint8_t layer) = 0; + virtual bin64_t Pick (bins& offered, uint8_t layer) = 0; + virtual void Expired (bin64_t b) = 0; virtual void Received (bin64_t b) = 0; - virtual void Snubbed (bin64_t b) = 0; }; class PeerSelector { @@ -235,18 +252,18 @@ namespace p2tp { hash or a fragment of it into every datagram.) */ class Channel { public: - Channel (FileTransfer* file, int socket, struct sockaddr_in peer); + Channel (FileTransfer* file, int socket=-1, Address peer=Address()); ~Channel(); static void Recv (int socket); static void Loop (tint till); void Recv (Datagram& dgram); - tint Send (); + void Send (); void OnAck (Datagram& dgram); void OnAckTs (Datagram& dgram); - void OnData (Datagram& dgram); + bin64_t OnData (Datagram& dgram); void OnHint (Datagram& dgram); void OnHash (Datagram& dgram); void OnPex (Datagram& dgram); @@ -257,10 +274,13 @@ namespace p2tp { void AddHint (Datagram& dgram); void AddUncleHashes (Datagram& dgram, bin64_t pos); void AddPeakHashes (Datagram& dgram); + void AddPex (Datagram& dgram); const std::string id_string () const; /** A channel is "established" if had already sent and received packets. */ - bool is_established () { return peer_channel_id && own_id_mentioned; } + bool is_established () { return peer_channel_id_ && own_id_mentioned_; } + FileTransfer& file() { return *file_; } + const Address& peer() const { return peer_; } static int DecodeID(int scrambled); static int EncodeID(int unscrambled); @@ -268,52 +288,64 @@ namespace p2tp { return i channels; static int sockets[8]; static int socket_count; static tint last_tick; - friend int Listen (Datagram::Address addr); - friend void Shutdown (int sock_des); - friend void AddPeer (Datagram::Address address, const Sha1Hash& root); + static Address tracker; + static std::vector channels; + + friend int Listen (Datagram::Address addr); + friend void Shutdown (int sock_des); + friend void AddPeer (Datagram::Address address, const Sha1Hash& root); + friend void SetTracker(const Address& tracker); + friend int Open (const char*, const Sha1Hash&) ; // FIXME + + friend class FileTransfer; // FIXME!!! }; @@ -335,7 +367,9 @@ namespace p2tp { root hash is zero, the peer might be talked to regarding any transmission (likely, a tracker, cache or an archive). */ void AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO); - + + void SetTracker(const Address& tracker); + /** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte before the transmission is complete. */ size_t Size (int fdes); @@ -349,12 +383,15 @@ namespace p2tp { //uint32_t Width (const tbinvec& v); - void LibraryInit(void); + +// FIXME kill this macro +#define RETLOG(str) { fprintf(stderr,"%s\n",str); return; } + /** Must be called by any client using the library */ + void LibraryInit(void); } // namespace end -#define RETLOG(str) { LOG(WARNING)<rtt_avg()*8; + tint timed_out = Datagram::now - cc_->RoundTripTime()*8; while ( !hint_out.empty() && hint_out.front().time < timed_out ) { file().picker()->Snubbed(hint_out.front().bin); hint_out.pop_front(); } -} +}*/ void Channel::AddHandshake (Datagram& dgram) { - dgram.Push8(P2TP_HANDSHAKE); - dgram.Push32(EncodeID(id)); - if (!peer_channel_id) { // initiating + if (!peer_channel_id_) { // initiating dgram.Push8(P2TP_HASH); dgram.Push32(bin64_t::ALL32); dgram.PushHash(file().root_hash()); + dprintf("%s #%i +hash ALL %s\n", + Datagram::TimeStr(),id,file().root_hash().hex().c_str()); } + dgram.Push8(P2TP_HANDSHAKE); + dgram.Push32(EncodeID(id)); + dprintf("%s #%i +hs\n",Datagram::TimeStr(),id); AddAck(dgram); - //DLOG(INFO)<<"#"<id_string(); } -tint Channel::Send () { - Datagram dgram(socket_,peer); - dgram.Push32(peer_channel_id); +void Channel::Send () { + Datagram dgram(socket_,peer()); + dgram.Push32(peer_channel_id_); + bin64_t data = bin64_t::NONE; if ( is_established() ) { AddAck(dgram); AddHint(dgram); - if (cc->free_cwnd() && Datagram::now>=cc->next_send_time()) { - bin64_t data = AddData(dgram); - cc->OnDataSent(data); - } + if (cc_->free_cwnd()) + data = AddData(dgram); } else { AddHandshake(dgram); + AddAck(dgram); } - DLOG(INFO)<<"#"<next_send_time(); + if (dgram.size()==4) // only the channel id; bare keep-alive + data = bin64_t::ALL; + RequeueSend(cc_->OnDataSent(data)); } void Channel::AddHint (Datagram& dgram) { - CleanStaleHints(); - uint64_t outstanding = 0; - for(tbqueue::iterator i=hint_out.begin(); i!=hint_out.end(); i++) - outstanding += i->bin.width(); - uint64_t kbps = TINT_SEC * cc->peer_cwnd() / cc->rtt_avg(); - if (outstanding>kbps) // have enough - return; - uint8_t layer = 0; - while( (1<Pick(ack_in,layer); - if (hint==bin64_t::NONE) - return; - dgram.Push8(P2TP_HINT); - dgram.Push32(hint); - hint_out.push_back(tintbin(Datagram::now,hint)); - DLOG(INFO)<<"#"<bin.width(); + + float peer_cwnd = cc_->PeerBPS() * cc_->RoundTripTime() / TINT_SEC; + + if ( hinted*1024 < peer_cwnd*4 ) { + + uint8_t layer = 0; + bin64_t hint = file().picker().Pick(ack_in_,layer); + + if (hint!=bin64_t::NONE) { + this->hint_out_.push_back(tintbin(hint)); + dgram.Push8(P2TP_HINT); + dgram.Push32(hint); + dprintf("%s #%i +hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset()); + } + + } } @@ -128,54 +165,60 @@ bin64_t Channel::AddData (Datagram& dgram) { if (!file().size()) // know nothing return bin64_t::NONE; bin64_t tosend = DequeueHint(); - if (tosend==bin64_t::NONE) { - //LOG(WARNING)<id_string()<<" no idea what to send"; - cc->OnDataSent(bin64_t::NONE); - return bin64_t::NONE; - } - if (ack_in.empty() && file().size()) - AddPeakHashes(dgram); - AddUncleHashes(dgram,tosend); - uint8_t buf[1024]; - size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); // TODO: ??? corrupted data, retries - if (r<0) { - PLOG(ERROR)<<"error on reading"; - return 0; - } - assert(dgram.space()>=r+4+1); - dgram.Push8(P2TP_DATA); - dgram.Push32(tosend); - dgram.Push(buf,r); - DLOG(INFO)<<"#"<=r+4+1); + dgram.Push8(P2TP_DATA); + dgram.Push32(tosend); + dgram.Push(buf,r); + dprintf("%s #%i +data (%lli)\n",Datagram::TimeStr(),id,tosend.base_offset()); return tosend; } void Channel::AddAck (Datagram& dgram) { - if (data_in_.time) { + if (data_in_.bin!=bin64_t::NONE) { + bin64_t pos = data_in_.bin; dgram.Push8(P2TP_ACK_TS); - dgram.Push32(data_in_.bin); + dgram.Push32(pos); dgram.Push64(data_in_.time); - data_in_.time = 0; - DLOG(INFO)<<"#"<OnDataRecvd(data)); } @@ -193,54 +237,88 @@ void Channel::OnHash (Datagram& dgram) { bin64_t pos = dgram.Pull32(); Sha1Hash hash = dgram.PullHash(); file().OfferHash(pos,hash); - DLOG(INFO)<<"#"<OnDataRecvd(pos); - CleanStaleHints(); + uint8_t *data; + int length = dgram.Pull(&data,1024); + bool ok = file().OfferData(pos, data, length) ; + dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset()); + data_in_ = tintbin(Datagram::now,pos); + return ok ? pos : bin64_t::none(); } void Channel::OnAck (Datagram& dgram) { // note: no bound checking bin64_t pos = dgram.Pull32(); - DLOG(INFO)<<"#"<OnAckRcvd(pos,0)); } void Channel::OnAckTs (Datagram& dgram) { bin64_t pos = dgram.Pull32(); tint ts = dgram.Pull64(); - DLOG(INFO)<<"#"<OnAckRcvd(tintbin(ts,pos)); + // TODO sanity check + dprintf("%s #%i -ackts (%i,%lli) %s\n", + Datagram::TimeStr(),id,pos.layer(),pos.offset(),Datagram::TimeStr(ts)); + ack_in_.set(pos); + RequeueSend(cc_->OnAckRcvd(pos,ts)); } void Channel::OnHint (Datagram& dgram) { bin64_t hint = dgram.Pull32(); - hint_in.push_back(hint); + hint_in_.push_back(hint); + dprintf("%s #%i -hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset()); } void Channel::OnHandshake (Datagram& dgram) { - peer_channel_id = dgram.Pull32(); + if (!peer_channel_id_) + cc_->OnAckRcvd(bin64_t::ALL); + peer_channel_id_ = dgram.Pull32(); + dprintf("%s #%i -hs %i\n",Datagram::TimeStr(),id,peer_channel_id_); // FUTURE: channel forking } void Channel::OnPex (Datagram& dgram) { - uint32_t addr = dgram.Pull32(); + uint32_t ipv4 = dgram.Pull32(); uint16_t port = dgram.Pull16(); - if (peer_selector) - peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash()); + Address addr(ipv4,port); + //if (peer_selector) + // peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash()); + file_->pex_in_.push_back(addr); + if (file_->pex_in_.size()>1000) + file_->pex_in_.pop_front(); + static int ENOUGH_PEERS_THRESHOLD = 20; + if (file_->channel_count()RevealChannel(i)) != -1 ) { + if (channels[i]->peer()==addr) + break; + } + if (chno==-1) + new Channel(file_,socket_,addr); + } +} + + +void Channel::AddPex (Datagram& dgram) { + int ch = file().RevealChannel(this->pex_out_); + if (ch==-1) + return; + Address a = channels[ch]->peer(); + dgram.Push8(P2TP_PEX_ADD); + dgram.Push32(a.ipv4()); + dgram.Push16(a.port()); } @@ -265,6 +343,7 @@ void Channel::Recv (int socket) { FileTransfer* file = FileTransfer::Find(hash); if (!file) RETLOG ("hash unknown, no such file"); + dprintf("%s #0 -hash ALL %s\n",Datagram::TimeStr(),hash.hex().c_str()); channel = new Channel(file, socket, data.address()); } else { mych = DecodeID(mych); @@ -273,11 +352,11 @@ void Channel::Recv (int socket) { channel = channels[mych]; if (!channel) RETLOG ("channel is closed"); - if (channel->peer != data.address()) + if (channel->peer() != data.address()) RETLOG ("invalid peer address"); - channel->Recv(data); + channel->own_id_mentioned_ = true; } - channel->Send(); + channel->Recv(data); } @@ -286,39 +365,97 @@ bool tblater (const tintbin& a, const tintbin& b) { } -void Channel::Loop (tint time) { - - tint untiltime = Datagram::Time()+time; - tbqueue send_queue; - for(int i=0; iSend(); - if (next_time!=TINT_NEVER) { - send_queue.push_back(tintbin(next_time,chid)); - push_heap(send_queue.begin(),send_queue.end(),tblater); - } else { - delete sender; - channels[chid] = NULL; - } + tint limit = Datagram::Time() + howlong; + + do { + + tint send_time(TINT_NEVER); + Channel* sender(NULL); + while (!send_queue.empty()) { + send_time = send_queue.front().time; + sender = channel((int)send_queue.front().bin); + if (sender && sender->next_send_time_==send_time) + break; + sender = NULL; // it was a stale entry + pop_heap(send_queue.begin(), send_queue.end(), tblater); + send_queue.pop_back(); } - - } - + if (send_time>limit) + send_time = limit; + if (sender && send_time<=Datagram::now) { + dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id, + Datagram::TimeStr(send_time)); + sender->Send(); + pop_heap(send_queue.begin(), send_queue.end(), tblater); + send_queue.pop_back(); + } else { + tint towait = send_time - Datagram::now; + dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait); + int rd = Datagram::Wait(socket_count,sockets,towait); + if (rd!=-1) + Recv(rd); + } + + } while (Datagram::Time() just listen + + tintbin next_send = send_queue.front(); + tint wake_on = next_send.time; + Channel* sender = channel(next_send.bin); + + // BUG BUG BUG filter stale timeouts here + + //if (wake_on<=untiltime) { + pop_heap(send_queue.begin(), send_queue.end(), tblater); + send_queue.pop_back(); + //}// else + //sender = 0; // BUG will never wake up + + if (sender->next_send_time_!=next_send.time) + continue; + + if (wake_onuntiltime) + wake_on = untiltime; + tint towait = min(wake_on-Datagram::now,TINT_SEC); + dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait); + int rd = Datagram::Wait(socket_count,sockets,towait); + if (rd!=-1) + Recv(rd); + // BUG WRONG BUG WRONG another may need to send + if (sender) { + dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id, + Datagram::TimeStr(next_send.time)); + sender->Send(); + // if (sender->cc_->next_send_time==TINT_NEVER) + } + + } + + */ \ No newline at end of file diff --git a/tests/SConscript b/tests/SConscript index f7f8d0f..9ff2066 100644 --- a/tests/SConscript +++ b/tests/SConscript @@ -79,3 +79,11 @@ env.Program( CPPPATH=cpppath, LIBS=libs, LIBPATH=libpath ) + +env.Program( + target='connecttest', + source=['connecttest.cpp'], + CPPPATH=cpppath, + LIBS=libs, + LIBPATH=libpath ) + diff --git a/tests/binstest2.cpp b/tests/binstest2.cpp index bbe7796..225f985 100755 --- a/tests/binstest2.cpp +++ b/tests/binstest2.cpp @@ -227,7 +227,34 @@ TEST(BinsTest,FindFiltered) { } -TEST(BinsTest,SetRange) { + +TEST(BinsTest, Cover) { + + bins b; + b.set(bin64_t(2,0)); + b.set(bin64_t(4,1)); + EXPECT_EQ(bin64_t(4,1),b.cover(bin64_t(0,30))); + //bins c; + //EXPECT_EQ(bin64_t::ALL,b.cover(bin64_t(0,30))); + +} + + +TEST(BinsTest,FindFiltered2) { + + bins data, filter; + for(int i=0; i<1024; i+=2) + data.set(bin64_t(0,i)); + for(int j=1; j<1024; j+=2) + filter.set(bin64_t(0,j)); + filter.set(bin64_t(0,501),bins::EMPTY); + EXPECT_EQ(bin64_t(0,501),data.find_filtered(filter,bin64_t(10,0),0)); + data.set(bin64_t(0,501)); + EXPECT_EQ(bin64_t::NONE,data.find_filtered(filter,bin64_t(10,0),0)); + +} + +TEST(BinsTest,CopyRange) { bins data, add; data.set(bin64_t(2,0)); data.set(bin64_t(2,2)); @@ -236,7 +263,7 @@ TEST(BinsTest,SetRange) { add.set(bin64_t(1,4)); add.set(bin64_t(0,13)); add.set(bin64_t(5,118)); - data.set_range(add, bin64_t(3,0)); + data.copy_range(add, bin64_t(3,0)); EXPECT_TRUE(bins::is_mixed(data.get(bin64_t(3,0)))); EXPECT_EQ(bins::EMPTY,data.get(bin64_t(2,0))); EXPECT_EQ(bins::FILLED,data.get(bin64_t(2,1))); diff --git a/tests/connecttest.cpp b/tests/connecttest.cpp index 258ccd2..b0bd23e 100644 --- a/tests/connecttest.cpp +++ b/tests/connecttest.cpp @@ -49,15 +49,21 @@ using namespace p2tp; TEST(P2TP,CwndTest) { - int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); - int size = rand()%(1<<19) + (1<<19); + + unlink("doc/sofi-copy.jpg"); + struct stat st; + ASSERT_EQ(0,stat("doc/sofi.jpg", &st)); + int size = st.st_size, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ; + + /*int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + int size = 60<<10; //rand()%(1<<19) + (1<<19); int sizek = (size>>10) + ((size&1023)?1:0); char* b = (char*)malloc(size); for(int i=0; i=0); //ASSERT_TRUE(sock2>=0); - p2tp::AddPeer(Datagram::Address("127.0.0.1",7001)); - - int file = p2tp::Open("big_test_file"); + int file = p2tp::Open("doc/sofi.jpg"); FileTransfer* fileobj = FileTransfer::file(file); + FileTransfer::instance++; + + p2tp::SetTracker(Datagram::Address("127.0.0.1",7001)); - int copy = p2tp::Open("big_test_file_copy",fileobj->root_hash()); + int copy = p2tp::Open("doc/sofi-copy.jpg",fileobj->root_hash()); - p2tp::Loop(TINT_MSEC); + p2tp::Loop(TINT_SEC); - ASSERT_EQ(sizek<<10,p2tp::Size(copy)); - int count = 0; - while (p2tp::SeqComplete(copy)!=size && count++<(1<<14)) - p2tp::Loop(TINT_MSEC); + while (p2tp::SeqComplete(copy)!=size && count++<20) + p2tp::Loop(TINT_SEC); ASSERT_EQ(size,p2tp::SeqComplete(copy)); p2tp::Close(file); diff --git a/tests/ledbattest.cpp b/tests/ledbattest.cpp index ee6302f..21818ea 100644 --- a/tests/ledbattest.cpp +++ b/tests/ledbattest.cpp @@ -65,6 +65,7 @@ TEST(Datagram,LedbatTest) { if (4+8!=ack.Send()) fprintf(stderr,"short write\n"); fprintf(stderr,"%lli rcvd%i\n",now/TINT_SEC,seq); + //cc->OnDataRecv(bin64_t(0,seq)); // TODO: peer cwnd !!! continue; } diff --git a/tests/transfertest.cpp b/tests/transfertest.cpp index e4c4b1b..095d38b 100644 --- a/tests/transfertest.cpp +++ b/tests/transfertest.cpp @@ -72,10 +72,11 @@ TEST(TransferTest,TransferFile) { leech = new FileTransfer("copy",seed->root_hash()); EXPECT_EQ(2,leech->complete_kilo()); } - bin64_t next = leech->picker()->Pick(seed->ack_out(),0); + bin64_t next = leech->picker().Pick(seed->ack_out(),0); ASSERT_NE(bin64_t::NONE,next); + ASSERT_TRUE(next.base_offset()<5); uint8_t buf[1024]; //size_t len = seed->storer->ReadData(next,&buf); - size_t len = pread(seed->file_descriptor(),buf,1024,next.base_offset()<<10); // FIXME TEST FOR ERROR + size_t len = pread(seed->file_descriptor(),buf,1024,next.base_offset()<<10); bin64_t sibling = next.sibling(); if (sibling.base_offset()size_kilo()) leech->OfferHash(sibling, seed->hash(sibling)); diff --git a/transfer.cpp b/transfer.cpp index 7928303..059dc96 100644 --- a/transfer.cpp +++ b/transfer.cpp @@ -31,7 +31,7 @@ int FileTransfer::instance = 0; FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) : root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false), peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0), - complete_(0), completek_(0), seq_complete_(0) + complete_(0), completek_(0), seq_complete_(0) //, data_in_off_(0) { fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); if (fd_<0) @@ -163,7 +163,8 @@ void FileTransfer::Submit () { } else hashes_[b] = Sha1Hash(hashes_[b.left()],hashes_[b.right()]); peak_hashes_[p] = hashes_[peaks_[p]]; - ack_out_.set(peaks_[p],bins::FILLED); + //ack_out_.set(peaks_[p],bins::FILLED); + OnDataIn(peaks_[p]); } root_hash_ = DeriveRoot(); Sha1Hash *hash_tmp = hashes_; @@ -200,13 +201,6 @@ void FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) { } -bin64_t FileTransfer::data_in (int offset) { - if (offset>data_in_.size()) - return bin64_t::NONE; - return data_in_[offset]; -} - - bool FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_t length) { if (!pos.is_base()) return false; @@ -230,7 +224,7 @@ bool FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_ //printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str()); // walk to the nearest proven hash FIXME 0-layer peak - ack_out_.set(pos,bins::FILLED); + OnDataIn(pos); pwrite(fd_,data,length,pos.base_offset()<<10); complete_ += length; completek_++; @@ -242,11 +236,45 @@ bool FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_ seq_complete_+=1024; if (seq_complete_>size_) seq_complete_ = size_; - data_in_.push_back(pos); return true; } +/*bin64_t FileTransfer::RevealAck (uint64_t& offset) { + if (offsetroot_hash_==root_hash) + if (files[i] && files[i]->root_hash()==root_hash) return files[i]; return NULL; } +void FileTransfer::OnPexIn (const Address& addr) { + pex_in_.push_back(addr); + if (pex_in_.size()>1000) + pex_in_.pop_front(); +} + std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix) { @@ -322,8 +356,7 @@ std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std } - -int p2tp::Open (const char* filename, const Sha1Hash& hash) { +/*int p2tp::Open (const char* filename, const Sha1Hash& hash) { FileTransfer* ft = new FileTransfer(filename, hash); int fdes = ft->file_descriptor(); if (fdes>0) { @@ -335,6 +368,14 @@ int p2tp::Open (const char* filename, const Sha1Hash& hash) { delete ft; return -1; } +}*/ + + +uint32_t FileTransfer::RevealChannel (int& offset) { + while (offsetfile_!=this) ) + offset++; + return offset < Channel::channels.size() ? offset : -1; }