TestDir='tests'
target = 'p2tp'
-source = [ 'bin64.cpp','hashtree.cpp','datagram.cpp','bins.cpp', 'transfer.cpp', 'compat/hirestimeofday.cpp', 'compat/util.cpp']
+source = [ 'bin64.cpp','hashtree.cpp','datagram.cpp','bins.cpp',
+ 'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp',
+ 'compat/hirestimeofday.cpp', 'compat/util.cpp']
env = Environment()
if sys.platform == "win32":
cpppath = ""
print "To use external libs, set CPPPATH environment variable to list of colon-separated include dirs"
env.Append(CPPPATH=".:"+cpppath)
- env.Append(LINKFLAGS="--static")
+ #env.Append(LINKFLAGS="--static")
#if DEBUG:
# env.Append(CXXFLAGS="-g")
}
+bin64_t bins::cover(bin64_t val) {
+ iterator i(this,val,false);
+ while (i.pos!=val && !i.solid())
+ i.towards(val);
+ //if (!i.half && !halves[0])
+ // return bin64_t::ALL;
+ return i.pos;
+}
+
+
bin64_t bins::find_filtered
- (bins& filter, const bin64_t range, const uint8_t layer, fill_t seek)
+ (bins& filter, bin64_t range, const uint8_t layer, fill_t seek)
{
+ if (range==bin64_t::ALL)
+ range = bin64_t ( height>filter.height ? height : filter.height, 0 );
iterator i(this,range,true), j(&filter,range,true);
fill_t stop = seek==EMPTY ? FILLED : EMPTY;
while (true) {
- while ( i.bin().layer()>layer && (i.deep() || *i!=stop || j.deep() || *j!=EMPTY) )
+ while ( i.bin().layer()>layer && (i.deep() || *i!=stop || j.deep() || *j!=FILLED) )
i.left(), j.left(); // TODO may optimize a lot here
if (i.bin().layer()==layer && !i.deep() && *i==seek && *j==EMPTY)
return i.bin();
return bin64_t::NONE;
}
-void bins::set_range (bins& origin, bin64_t range) { // FIXME unite with remove(); do bitwise()
+// FIXME unite with remove(); do bitwise()
+void bins::copy_range (bins& origin, bin64_t range) {
+ if (range==bin64_t::ALL)
+ range = bin64_t ( height>origin.height ? height : origin.height, 0 );
iterator zis(this,range,true), zat(&origin,range,true);
while (zis.pos.within(range)) {
while (zis.deep() || zat.deep()) {
void set (bin64_t bin, fill_t val=FILLED);
- void set_range (bins& origin, bin64_t range);
+ void copy_range (bins& origin, bin64_t range);
bin64_t find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
bin64_t find_filtered
- (bins& filter, const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
+ (bins& filter, bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
void remove (bins& b);
uint32_t size() { return cells_allocated; }
+ bin64_t cover(bin64_t val);
+
bool empty () const { return !deep(0) && !halves[0]; }
static bool is_mixed (uint16_t val) { return val!=EMPTY && val!=FILLED; }
*/\r
\r
#include <iostream>\r
-#include "compat/hirestimeofday.h"\r
+#include "hirestimeofday.h"\r
\r
#ifndef _WIN32\r
#include <sys/time.h>\r
* Written by Arno Bakker\r
* see LICENSE.txt for license information\r
*/\r
+#ifdef _WIN32\r
\r
-#include "compat/unixio.h"\r
-#include <io.h>\r
+#include "unixio.h"\r
#include <stdio.h>\r
+#include <io.h>\r
#include <winsock2.h>\r
\r
size_t pread(int fildes, void *buf, size_t nbyte, long offset)\r
inp->S_un.S_addr = inet_addr(cp);\r
return 1;\r
}\r
+\r
+#endif\r
*\r
* Defines UNIX like I/O calls and parameters for Win32\r
*/\r
+#ifdef _WIN32\r
\r
#ifndef UNIXIO_H_\r
#define UNIXIO_H_\r
int inet_aton(const char *cp, struct in_addr *inp);\r
\r
#endif /* UNIXIO_H_ */\r
+\r
+#endif // WIN32
\ No newline at end of file
namespace p2tp {
tint Datagram::now = Datagram::Time();
+tint Datagram::epoch = now;
uint32_t Datagram::Address::LOCALHOST = INADDR_LOOPBACK;
+char* Datagram::TimeStr (tint time) {
+ static char ret_str[128];
+ if (time==0)
+ time = now;
+ time -= epoch;
+ int hours = time/TINT_HOUR;
+ time %= TINT_HOUR;
+ int mins = time/TINT_MIN;
+ time %= TINT_MIN;
+ int secs = time/TINT_SEC;
+ time %= TINT_SEC;
+ int msecs = time/TINT_MSEC;
+ time %= TINT_MSEC;
+ int usecs = time/TINT_uSEC;
+ sprintf(ret_str,"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
+ return ret_str;
+}
+
int Datagram::Send () {
int r = sendto(sock,(const char *)buf+offset,length-offset,0,
(struct sockaddr*)&(addr.addr),sizeof(struct sockaddr_in));
- offset=0;
- length=0;
- now = Time();
+ //offset=0;
+ //length=0;
+ Time();
return r;
}
offset = 0;
length = recvfrom (sock, (char *)buf, MAXDGRAMSZ, 0,
(struct sockaddr*)&(addr), &addrlen);
- if (length<0)
+ if (length<0) // FIXME FIXME FIXME
#ifdef _WIN32
PLOG(ERROR)<<"on recv" << WSAGetLastError() << "\n";
#else
PLOG(ERROR)<<"on recv";
#endif
- now = Time();
+ Time();
return length;
}
SOCKET Datagram::Wait (int sockcnt, SOCKET* sockets, tint usec) {
- // ARNOTODO: LOG commented out, it causes a crash on win32 (in a strlen()
- // done as part of a std::local::name() ??
- //
- //LOG(INFO)<<"waiting for "<<sockcnt;
+ dprintf("waiting (%i socks)\n",sockcnt);
struct timeval timeout;
timeout.tv_sec = usec/TINT_SEC;
timeout.tv_usec = usec%TINT_SEC;
max_sock_fd = sockets[i];
}
int sel = select(max_sock_fd+1, &bases, NULL, &err, &timeout);
+ Time();
if (sel>0) {
for (int i=0; i<=sockcnt; i++)
if (FD_ISSET(sockets[i],&bases))
return sockets[i];
- } else if (sel<0)
+ } else if (sel<0) {
#ifdef _WIN32
PLOG(ERROR)<<"select fails" << WSAGetLastError() << "\n";
#else
PLOG(ERROR)<<"select fails";
#endif
-
- // Arno: may return 0 when timeout expired
- return sel;
+ }
+ return -1;
}
tint Datagram::Time () {
namespace p2tp {
+typedef int64_t tint;
+#define TINT_HOUR ((tint)1000000*60*60)
+#define TINT_MIN ((tint)1000000*60)
+#define TINT_SEC ((tint)1000000)
+#define TINT_MSEC ((tint)1000)
+#define TINT_uSEC ((tint)1)
+#define TINT_NEVER ((tint)0x7fffffffffffffffLL)
#define MAXDGRAMSZ 1400
#ifndef _WIN32
#define INVALID_SOCKET -1
init(ipv4addr,port);
}
Address(const struct sockaddr_in& address) : addr(address) {}
+ uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); }
+ uint16_t port () const { return ntohs(addr.sin_port); }
operator sockaddr_in () const {return addr;}
- bool operator == (const Address& b) {
+ bool operator == (const Address& b) const {
return addr.sin_family==b.addr.sin_family &&
addr.sin_port==b.addr.sin_port &&
addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
}
- bool operator != (const Address& b) { return !(*this==b); }
+ std::string str () const {
+ char s[32];
+ sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
+ (ipv4()>>8)&0xff,ipv4()&0xff,port());
+ return std::string(s);
+ }
+ bool operator != (const Address& b) const { return !(*this==b); }
};
Address addr;
static SOCKET Bind(Address address);
static void Close(int port);
static tint Time();
+ static char* TimeStr(tint time=0);
static SOCKET Wait (int sockcnt, SOCKET* sockets, tint usec=0);
- static tint now;
+ static tint now, epoch;
Datagram (SOCKET socket, const Address addr_) : addr(addr_), offset(0),
length(0), sock(socket) {}
};
std::string sock2str (struct sockaddr_in addr);
+#define dprintf(...) printf(__VA_ARGS__)
}
int cwnd_, peer_cwnd_, in_flight_;
bin64_t last_bin_sent_;
public:
- LedbatController () : dev_avg_(0), rtt_avg_(TINT_SEC), last_send_time_(0),
- last_recv_time_(0), cwnd_(1), peer_cwnd_(1), in_flight_(0) {
+ LedbatController (int chid) : dev_avg_(0), rtt_avg_(TINT_SEC), last_send_time_(0),
+ last_recv_time_(0), cwnd_(1), peer_cwnd_(1), in_flight_(0), CongestionController(chid) {
}
tint rtt_avg () {
}
int cwnd () {
+ // check for timeouts
return cwnd_;
}
}
int free_cwnd ( ){
- return cwnd_ - in_flight_;
+ return cwnd() - in_flight_;
}
tint next_send_time ( ){
void OnAckRcvd(bin64_t b, tint peer_stamp) {
if (last_bin_sent_!=b)
return;
- rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac
+ if (peer_stamp!=TINT_NEVER)
+ rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac
in_flight_--;
}
class SeqPiecePicker : public PiecePicker {
- bins hint_out_;
- FileTransfer* file_;
+ bins ack_hint_out_;
+ FileTransfer* file_;
public:
- SeqPiecePicker (FileTransfer* file_to_pick_from) : file_(file_to_pick_from), hint_out_() {
+ SeqPiecePicker (FileTransfer* file_to_pick_from) : file_(file_to_pick_from), ack_hint_out_() {
+ ack_hint_out_.copy_range(file_->ack_out(),bin64_t::ALL);
}
- virtual bin64_t Pick (bins& from, uint8_t layer) {
- bins may_pick = from;
- may_pick.remove (file_->ack_out());
- may_pick.remove (hint_out_);
- for (int l=layer; l>=0; l--) {
+ virtual bin64_t Pick (bins& offer, uint8_t layer) {
+
+ bin64_t hint = offer.find_filtered
+ (ack_hint_out_,bin64_t::ALL,layer,bins::FILLED);
+ if (hint==bin64_t::NONE)
+ return hint; // TODO: end-game mode
+ while (hint.layer()>layer)
+ hint = hint.left();
+ ack_hint_out_.set(hint);
+ return hint;
+ /*for (int l=layer; l>=0; l--) {
for(int i=0; i<file_->peak_count(); i++) {
bin64_t pick = may_pick.find(file_->peak(i),l,bins::FILLED);
if (pick!=bin64_t::NONE)
return pick;
}
}
- return bin64_t::NONE;
+ return bin64_t::NONE;*/
}
virtual void Received (bin64_t b) {
- hint_out_.set(b,bins::EMPTY);
+ ack_hint_out_.set(b,bins::FILLED);
}
- virtual void Snubbed (bin64_t b) {
- hint_out_.set(b,bins::EMPTY);
+ virtual void Expired (bin64_t b) {
+ ack_hint_out_.copy_range(file_->ack_out(),b);
}
};
\ No newline at end of file
}
};
-PeerSelector* Channel::peer_selector = new SimpleSelector();
\ No newline at end of file
std::vector<Channel*> Channel::channels(1);
int Channel::sockets[8] = {0,0,0,0,0,0,0,0};
int Channel::socket_count = 0;
-
-
-Channel::Channel (FileTransfer* file, int socket, struct sockaddr_in peer_addr) :
- file_(file), peer(peer_addr), peer_channel_id(0),
- socket_(socket) // FIXME
+Address Channel::tracker;
+tbqueue Channel::send_queue;
+#include "ext/dummy_controller.cpp"
+#include "ext/simple_selector.cpp"
+PeerSelector* Channel::peer_selector = new SimpleSelector();
+
+Channel::Channel (FileTransfer* file, int socket, Address peer_addr) :
+ file_(file), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
+ socket_(socket==-1?sockets[0]:socket), // FIXME
+ own_id_mentioned_(false), next_send_time_(0)
{
+ if (peer_==Address())
+ peer_ = tracker;
this->id = channels.size();
channels.push_back(this);
+ cc_ = new BasicController();
+ RequeueSend(Datagram::now);
}
}
+void p2tp::SetTracker(const Address& tracker) {
+ Channel::tracker = tracker;
+}
int Channel::DecodeID(int scrambled) {
}
+int p2tp::Open (const char* filename, const Sha1Hash& hash) {
+ FileTransfer* ft = new FileTransfer(filename, hash);
+ int fdes = ft->file_descriptor();
+ if (fdes>0) {
+
+ /*if (FileTransfer::files.size()<fdes) // FIXME duplication
+ FileTransfer::files.resize(fdes);
+ FileTransfer::files[fdes] = ft;*/
+
+ // initiate tracker connections
+ if (Channel::tracker!=Address())
+ new Channel(ft);
+
+ return fdes;
+ } else {
+ delete ft;
+ return -1;
+ }
+}
+
+
void p2tp::Close (int fd) {
// FIXME delete all channels
if (fd>FileTransfer::files.size() && FileTransfer::files[fd])
return 0;
}
+
+
+
+
/** <h2> P2TP handshake </h2>
Basic rules:
<ul>
tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
tintbin() : time(0), bin(bin64_t::NONE) {}
tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
+ tintbin(bin64_t bin_) : time(Datagram::now), bin(bin_) {}
};
+
typedef std::deque<tintbin> tbqueue;
typedef std::deque<bin64_t> binqueue;
- typedef Datagram::Address Address;
+ typedef Datagram::Address Address;
typedef enum {
P2TP_HANDSHAKE = 0,
accept or remember or drop. Returns true => ACK is sent. */
bool OfferData (bin64_t bin, const uint8_t* data, size_t length);
+ /** While we need to feed ACKs to every peer, we try (1) avoid
+ unnecessary duplication and (2) keep minimum state. Thus,
+ we use a rotating queue of bin completion events. */
+ //bin64_t RevealAck (uint64_t& offset);
+ /** Rotating queue read for channels of this transmission. */
+ uint32_t RevealChannel (int& i);
+
static FileTransfer* Find (const Sha1Hash& hash);
static FileTransfer* file (int fd) {
return fd<files.size() ? files[fd] : NULL;
return hashes_[pos];
}
const Sha1Hash& root_hash () const { return root_hash_; }
- bin64_t data_in (int offset);
uint64_t size () const { return size_; }
uint64_t size_kilo () const { return sizek_; }
uint64_t complete () const { return complete_; }
uint64_t seq_complete () const { return seq_complete_; }
bins& ack_out () { return ack_out_; }
int file_descriptor () const { return fd_; }
- PiecePicker* picker () { return picker_; }
+ PiecePicker& picker () { return *picker_; }
+ int channel_count () const { return handshake_in_.size(); }
static int instance; // FIXME this smells
size_t complete_;
size_t completek_;
size_t seq_complete_;
- /** A map for all packets obtained and succesfully checked. */
+ /** A binmap for all packets obtained and succesfully checked. */
bins ack_out_;
/** History of bin retrieval. */
- binqueue data_in_;
+ //binqueue data_in_;
+ //int data_in_off_;
/** Piece picker strategy. */
PiecePicker* picker_;
/** File for keeping the Merkle hash tree. */
/** Error encountered */
char* error_;
+ /** Channels working for this transfer. */
+ std::deque<int> handshake_in_;
+ std::deque<Address> pex_in_;
+ /** Messages we are accepting. */
+ uint64_t cap_out_;
+
protected:
void SetSize(size_t bytes);
void Submit();
Sha1Hash DeriveRoot();
void SavePeaks();
void LoadPeaks();
+ void OnDataIn (bin64_t pos);
+ void OnPexIn (const Address& addr);
friend class Channel;
friend size_t Size (int fdes);
friend void Close (int fd) ;
};
- class CongestionController {
- public:
- virtual tint rtt_avg() = 0;
- virtual tint dev_avg() = 0;
- virtual tint next_send_time() = 0;
- virtual int cwnd() = 0;
- virtual int peer_cwnd() = 0;
+ struct CongestionController {
+ CongestionController () {}
virtual int free_cwnd() = 0;
- virtual void OnDataSent(bin64_t b) = 0;
- virtual void OnDataRecvd(bin64_t b) = 0;
- virtual void OnAckRcvd(const tintbin& tsack) = 0;
- virtual ~CongestionController() = 0;
+ virtual tint RoundTripTime() = 0;
+ virtual tint RoundTripTimeoutTime() = 0;
+ virtual int PeerBPS() = 0;
+ virtual float PeerCWindow() = 0;
+ virtual tint OnDataSent(bin64_t b) = 0;
+ virtual tint OnDataRecvd(bin64_t b) = 0;
+ virtual tint OnAckRcvd(bin64_t ackd, tint peer_time=0) = 0;
+ virtual ~CongestionController() {}
};
class PiecePicker {
public:
- virtual bin64_t Pick (bins& from, uint8_t layer) = 0;
+ virtual bin64_t Pick (bins& offered, uint8_t layer) = 0;
+ virtual void Expired (bin64_t b) = 0;
virtual void Received (bin64_t b) = 0;
- virtual void Snubbed (bin64_t b) = 0;
};
class PeerSelector {
hash or a fragment of it into every datagram.) */
class Channel {
public:
- Channel (FileTransfer* file, int socket, struct sockaddr_in peer);
+ Channel (FileTransfer* file, int socket=-1, Address peer=Address());
~Channel();
static void Recv (int socket);
static void Loop (tint till);
void Recv (Datagram& dgram);
- tint Send ();
+ void Send ();
void OnAck (Datagram& dgram);
void OnAckTs (Datagram& dgram);
- void OnData (Datagram& dgram);
+ bin64_t OnData (Datagram& dgram);
void OnHint (Datagram& dgram);
void OnHash (Datagram& dgram);
void OnPex (Datagram& dgram);
void AddHint (Datagram& dgram);
void AddUncleHashes (Datagram& dgram, bin64_t pos);
void AddPeakHashes (Datagram& dgram);
+ void AddPex (Datagram& dgram);
const std::string id_string () const;
/** A channel is "established" if had already sent and received packets. */
- bool is_established () { return peer_channel_id && own_id_mentioned; }
+ bool is_established () { return peer_channel_id_ && own_id_mentioned_; }
+ FileTransfer& file() { return *file_; }
+ const Address& peer() const { return peer_; }
static int DecodeID(int scrambled);
static int EncodeID(int unscrambled);
return i<channels.size()?channels[i]:NULL;
}
- FileTransfer& file() { return *file_; }
-
private:
/** Channel id: index in the channel array. */
uint32_t id;
/** Socket address of the peer. */
- Datagram::Address peer;
+ Datagram::Address peer_;
/** The UDP socket fd. */
int socket_;
/** Descriptor of the file in question. */
FileTransfer* file_;
/** Peer channel id; zero if we are trying to open a channel. */
- uint32_t peer_channel_id;
- bool own_id_mentioned;
+ uint32_t peer_channel_id_;
+ bool own_id_mentioned_;
/** Peer's progress, based on acknowledgements. */
- bins ack_in;
+ bins ack_in_;
/** Last data received; needs to be acked immediately. */
tintbin data_in_;
/** Index in the history array. */
- uint32_t ack_out_;
+ bins ack_out_;
/** Transmit schedule: in most cases filled with the peer's hints */
- binqueue hint_in;
+ binqueue hint_in_;
/** Hints sent (to detect and reschedule ignored hints). */
- tbqueue hint_out;
+ tbqueue hint_out_;
/** The congestion control strategy. */
- CongestionController *cc;
+ CongestionController *cc_;
+ /** Types of messages the peer accepts. */
+ uint64_t cap_in_;
/** For repeats. */
- tint last_send_time, last_recv_time;
-
+ //tint last_send_time, last_recv_time;
+ /** PEX progress */
+ int pex_out_;
+
+ tint next_send_time_;
+ static tbqueue send_queue;
+ void RequeueSend (tint next_time);
+
/** Get a rewuest for one packet from the queue of peer's requests. */
bin64_t DequeueHint();
- void CleanStaleHints();
+ //void CleanStaleHints();
static PeerSelector* peer_selector;
static int MAX_REORDERING;
static tint TIMEOUT;
- static std::vector<Channel*> channels;
static int sockets[8];
static int socket_count;
static tint last_tick;
- friend int Listen (Datagram::Address addr);
- friend void Shutdown (int sock_des);
- friend void AddPeer (Datagram::Address address, const Sha1Hash& root);
+ static Address tracker;
+ static std::vector<Channel*> channels;
+
+ friend int Listen (Datagram::Address addr);
+ friend void Shutdown (int sock_des);
+ friend void AddPeer (Datagram::Address address, const Sha1Hash& root);
+ friend void SetTracker(const Address& tracker);
+ friend int Open (const char*, const Sha1Hash&) ; // FIXME
+
+ friend class FileTransfer; // FIXME!!!
};
root hash is zero, the peer might be talked to regarding any transmission
(likely, a tracker, cache or an archive). */
void AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO);
-
+
+ void SetTracker(const Address& tracker);
+
/** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte
before the transmission is complete. */
size_t Size (int fdes);
//uint32_t Width (const tbinvec& v);
- void LibraryInit(void);
+
+// FIXME kill this macro
+#define RETLOG(str) { fprintf(stderr,"%s\n",str); return; }
+
/** Must be called by any client using the library */
+ void LibraryInit(void);
} // namespace end
-#define RETLOG(str) { LOG(WARNING)<<str; return; }
#endif
using namespace std;
using namespace p2tp;
+/*
+ TODO 25 Oct 18:55
+ - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
+ - ANY_LAYER
+ - range: ALL
+ - randomized testing of advanced ops (new testcase)
+ - PeerCwnd()
+ - bins hint_out_, tbqueue hint_out_ts_
+
+ */
+
void Channel::AddPeakHashes (Datagram& dgram) {
for(int i=0; i<file().peak_count(); i++) {
+ bin64_t peak = file().peak(i);
dgram.Push8(P2TP_HASH);
- dgram.Push32((uint32_t)file().peak(i));
+ dgram.Push32((uint32_t)peak);
dgram.PushHash(file().peak_hash(i));
- DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
+ //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
+ dprintf("%s #%i +phash (%i,%lli)\n",Datagram::TimeStr(),id,peak.layer(),peak.offset());
}
}
void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
bin64_t peak = file().peak_for(pos);
- while (pos!=peak && ack_in.get(pos.parent())==bins::EMPTY) {
+ while (pos!=peak && ack_in_.get(pos.parent())==bins::EMPTY) {
bin64_t uncle = pos.sibling();
dgram.Push8(P2TP_HASH);
dgram.Push32((uint32_t)uncle);
dgram.PushHash( file().hash(uncle) );
- DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
+ //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
+ dprintf("%s #%i +hash (%i,%lli)\n",Datagram::TimeStr(),id,uncle.layer(),uncle.offset());
pos = pos.parent();
}
}
bin64_t Channel::DequeueHint () { // TODO: resilience
- while (!hint_in.empty()) {
- bin64_t hint = hint_in.front();
- hint_in.pop_front();
- if (ack_in.get(hint)==bins::FILLED)
- continue;
- if ( file().ack_out().get(hint)==bins::EMPTY )
- continue;
- if (!hint.is_base()) {
- bin64_t l=hint.left(), r=hint.right();
- //if (rand()&1)
- // swap(l,r);
- hint_in.push_front(r);
- hint_in.push_front(l);
- continue;
- }
- return hint;
- }
- return bin64_t::NONE;
+ bin64_t send = bin64_t::NONE;
+ while (!hint_in_.empty() && send==bin64_t::NONE) {
+ bin64_t hint = hint_in_.front();
+ hint_in_.pop_front();
+ send = file().ack_out().find_filtered
+ (ack_in_,hint,0,bins::FILLED);
+ if (send!=bin64_t::NONE)
+ while (send!=hint) {
+ hint = hint.towards(send);
+ hint_in_.push_front(hint.sibling());
+ }
+ }
+ return send;
}
-void Channel::CleanStaleHints () {
+/*void Channel::CleanStaleHints () {
while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED )
hint_out.pop_front(); // FIXME must normally clear fulfilled entries
- tint timed_out = Datagram::now - cc->rtt_avg()*8;
+ tint timed_out = Datagram::now - cc_->RoundTripTime()*8;
while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
file().picker()->Snubbed(hint_out.front().bin);
hint_out.pop_front();
}
-}
+}*/
void Channel::AddHandshake (Datagram& dgram) {
- dgram.Push8(P2TP_HANDSHAKE);
- dgram.Push32(EncodeID(id));
- if (!peer_channel_id) { // initiating
+ if (!peer_channel_id_) { // initiating
dgram.Push8(P2TP_HASH);
dgram.Push32(bin64_t::ALL32);
dgram.PushHash(file().root_hash());
+ dprintf("%s #%i +hash ALL %s\n",
+ Datagram::TimeStr(),id,file().root_hash().hex().c_str());
}
+ dgram.Push8(P2TP_HANDSHAKE);
+ dgram.Push32(EncodeID(id));
+ dprintf("%s #%i +hs\n",Datagram::TimeStr(),id);
AddAck(dgram);
- //DLOG(INFO)<<"#"<<id<<" sending a handshake to "<<this->id_string();
}
-tint Channel::Send () {
- Datagram dgram(socket_,peer);
- dgram.Push32(peer_channel_id);
+void Channel::Send () {
+ Datagram dgram(socket_,peer());
+ dgram.Push32(peer_channel_id_);
+ bin64_t data = bin64_t::NONE;
if ( is_established() ) {
AddAck(dgram);
AddHint(dgram);
- if (cc->free_cwnd() && Datagram::now>=cc->next_send_time()) {
- bin64_t data = AddData(dgram);
- cc->OnDataSent(data);
- }
+ if (cc_->free_cwnd())
+ data = AddData(dgram);
} else {
AddHandshake(dgram);
+ AddAck(dgram);
}
- DLOG(INFO)<<"#"<<id<<" sending "<<dgram.size()<<" bytes";
+ dprintf("%s #%i sent %ib %s\n",Datagram::TimeStr(),id,dgram.size(),peer().str().c_str());
PCHECK( dgram.Send() != -1 )<<"error sending";
- last_send_time = Datagram::now;
- return cc->next_send_time();
+ if (dgram.size()==4) // only the channel id; bare keep-alive
+ data = bin64_t::ALL;
+ RequeueSend(cc_->OnDataSent(data));
}
void Channel::AddHint (Datagram& dgram) {
- CleanStaleHints();
- uint64_t outstanding = 0;
- for(tbqueue::iterator i=hint_out.begin(); i!=hint_out.end(); i++)
- outstanding += i->bin.width();
- uint64_t kbps = TINT_SEC * cc->peer_cwnd() / cc->rtt_avg();
- if (outstanding>kbps) // have enough
- return;
- uint8_t layer = 0;
- while( (1<<layer) < kbps ) layer++;
- bin64_t hint = file().picker()->Pick(ack_in,layer);
- if (hint==bin64_t::NONE)
- return;
- dgram.Push8(P2TP_HINT);
- dgram.Push32(hint);
- hint_out.push_back(tintbin(Datagram::now,hint));
- DLOG(INFO)<<"#"<<id<<" +HINT"<<hint;
+
+ tint hint_timeout = Datagram::now - 2*TINT_SEC;
+ while (!hint_out_.empty() && hint_out_.front().time<hint_timeout) {
+ tintbin old_hint = hint_out_.front();
+ file().picker().Expired(old_hint.bin);
+ hint_out_.pop_front();
+ }
+ // FIXME weird weird weird
+ uint16_t state;
+ while ( !hint_out_.empty() && (state=file().ack_out().get(hint_out_.front().bin)) != bins::EMPTY ) {
+ if (state==bins::FILLED) {
+ hint_out_.pop_front();
+ } else {
+ tintbin old_hint = hint_out_.front();
+ hint_out_.pop_front();
+ old_hint.bin = old_hint.bin.right();
+ hint_out_.push_front(old_hint);
+ old_hint.bin = old_hint.bin.sibling();
+ hint_out_.push_front(old_hint);
+ }
+ }
+
+ uint64_t hinted = 0;
+ for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
+ hinted+=i->bin.width();
+
+ float peer_cwnd = cc_->PeerBPS() * cc_->RoundTripTime() / TINT_SEC;
+
+ if ( hinted*1024 < peer_cwnd*4 ) {
+
+ uint8_t layer = 0;
+ bin64_t hint = file().picker().Pick(ack_in_,layer);
+
+ if (hint!=bin64_t::NONE) {
+ this->hint_out_.push_back(tintbin(hint));
+ dgram.Push8(P2TP_HINT);
+ dgram.Push32(hint);
+ dprintf("%s #%i +hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset());
+ }
+
+ }
}
if (!file().size()) // know nothing
return bin64_t::NONE;
bin64_t tosend = DequeueHint();
- if (tosend==bin64_t::NONE) {
- //LOG(WARNING)<<this->id_string()<<" no idea what to send";
- cc->OnDataSent(bin64_t::NONE);
- return bin64_t::NONE;
- }
- if (ack_in.empty() && file().size())
- AddPeakHashes(dgram);
- AddUncleHashes(dgram,tosend);
- uint8_t buf[1024];
- size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); // TODO: ??? corrupted data, retries
- if (r<0) {
- PLOG(ERROR)<<"error on reading";
- return 0;
- }
- assert(dgram.space()>=r+4+1);
- dgram.Push8(P2TP_DATA);
- dgram.Push32(tosend);
- dgram.Push(buf,r);
- DLOG(INFO)<<"#"<<id<<" +DATA"<<tosend;
+ if (tosend==bin64_t::NONE)
+ return bin64_t::NONE;
+ if (ack_in_.empty() && file().size())
+ AddPeakHashes(dgram);
+ AddUncleHashes(dgram,tosend);
+ uint8_t buf[1024];
+ size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
+ // TODO: ??? corrupted data, retries
+ if (r<0) {
+ PLOG(ERROR)<<"error on reading";
+ return 0;
+ }
+ assert(dgram.space()>=r+4+1);
+ dgram.Push8(P2TP_DATA);
+ dgram.Push32(tosend);
+ dgram.Push(buf,r);
+ dprintf("%s #%i +data (%lli)\n",Datagram::TimeStr(),id,tosend.base_offset());
return tosend;
}
void Channel::AddAck (Datagram& dgram) {
- if (data_in_.time) {
+ if (data_in_.bin!=bin64_t::NONE) {
+ bin64_t pos = data_in_.bin;
dgram.Push8(P2TP_ACK_TS);
- dgram.Push32(data_in_.bin);
+ dgram.Push32(pos);
dgram.Push64(data_in_.time);
- data_in_.time = 0;
- DLOG(INFO)<<"#"<<id<<" +!ACK"<<data_in_.bin;
+ ack_out_.set(pos);
+ dprintf("%s #%i +ackts (%i,%lli) %s\n",Datagram::TimeStr(),id,
+ pos.layer(),pos.offset(),Datagram::TimeStr(data_in_.time));
+ data_in_ = tintbin(0,bin64_t::NONE);
}
- bin64_t h=file().data_in(ack_out_);
- int count=0;
- while (h!=bin64_t::NONE && count++<4) {
+ for(int count=0; count<4; count++) {
+ bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, 0, bins::FILLED);
+ // TODO bins::ANY_LAYER
+ if (ack==bin64_t::NONE)
+ break;
+ while (file().ack_out().get(ack.parent())==bins::FILLED)
+ ack = ack.parent();
+ ack_out_.set(ack);
dgram.Push8(P2TP_ACK);
- dgram.Push32(h);
- DLOG(INFO)<<"#"<<id<<" +ACK"<<h;
- h=file().data_in(++ack_out_);
+ dgram.Push32(ack);
+ dprintf("%s #%i +ack (%i,%lli)\n",Datagram::TimeStr(),id,ack.layer(),ack.offset());
}
}
void Channel::Recv (Datagram& dgram) {
+ bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
while (dgram.size()) {
uint8_t type = dgram.Pull8();
switch (type) {
case P2TP_HANDSHAKE: OnHandshake(dgram); break;
- case P2TP_DATA: OnData(dgram); break;
+ case P2TP_DATA: data=OnData(dgram); break;
case P2TP_ACK_TS: OnAckTs(dgram); break;
case P2TP_ACK: OnAck(dgram); break;
case P2TP_HASH: OnHash(dgram); break;
return;
}
}
+ RequeueSend(cc_->OnDataRecvd(data));
}
bin64_t pos = dgram.Pull32();
Sha1Hash hash = dgram.PullHash();
file().OfferHash(pos,hash);
- DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
+ //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
+ dprintf("%s #%i -hash (%i,%lli)\n",Datagram::TimeStr(),id,pos.layer(),pos.offset());
}
-void Channel::OnData (Datagram& dgram) {
+bin64_t Channel::OnData (Datagram& dgram) {
bin64_t pos = dgram.Pull32();
- DLOG(INFO)<<"#"<<id<<" .DATA"<<pos;
- file().OfferData(pos, *dgram, dgram.size());
- cc->OnDataRecvd(pos);
- CleanStaleHints();
+ uint8_t *data;
+ int length = dgram.Pull(&data,1024);
+ bool ok = file().OfferData(pos, data, length) ;
+ dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset());
+ data_in_ = tintbin(Datagram::now,pos);
+ return ok ? pos : bin64_t::none();
}
void Channel::OnAck (Datagram& dgram) {
// note: no bound checking
bin64_t pos = dgram.Pull32();
- DLOG(INFO)<<"#"<<id<<" .ACK"<<pos;
- ack_in.set(pos);
+ dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,pos.layer(),pos.offset());
+ ack_in_.set(pos);
+ RequeueSend(cc_->OnAckRcvd(pos,0));
}
void Channel::OnAckTs (Datagram& dgram) {
bin64_t pos = dgram.Pull32();
tint ts = dgram.Pull64();
- DLOG(INFO)<<"#"<<id<<" ,ACK"<<pos;
- //dprintf("%lli #%i +ack %lli +ts %lli",Datagram::now,id,pos,ts);
- ack_in.set(pos);
- cc->OnAckRcvd(tintbin(ts,pos));
+ // TODO sanity check
+ dprintf("%s #%i -ackts (%i,%lli) %s\n",
+ Datagram::TimeStr(),id,pos.layer(),pos.offset(),Datagram::TimeStr(ts));
+ ack_in_.set(pos);
+ RequeueSend(cc_->OnAckRcvd(pos,ts));
}
void Channel::OnHint (Datagram& dgram) {
bin64_t hint = dgram.Pull32();
- hint_in.push_back(hint);
+ hint_in_.push_back(hint);
+ dprintf("%s #%i -hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset());
}
void Channel::OnHandshake (Datagram& dgram) {
- peer_channel_id = dgram.Pull32();
+ if (!peer_channel_id_)
+ cc_->OnAckRcvd(bin64_t::ALL);
+ peer_channel_id_ = dgram.Pull32();
+ dprintf("%s #%i -hs %i\n",Datagram::TimeStr(),id,peer_channel_id_);
// FUTURE: channel forking
}
void Channel::OnPex (Datagram& dgram) {
- uint32_t addr = dgram.Pull32();
+ uint32_t ipv4 = dgram.Pull32();
uint16_t port = dgram.Pull16();
- if (peer_selector)
- peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
+ Address addr(ipv4,port);
+ //if (peer_selector)
+ // peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
+ file_->pex_in_.push_back(addr);
+ if (file_->pex_in_.size()>1000)
+ file_->pex_in_.pop_front();
+ static int ENOUGH_PEERS_THRESHOLD = 20;
+ if (file_->channel_count()<ENOUGH_PEERS_THRESHOLD) {
+ int i = 0, chno;
+ while ( (chno=file_->RevealChannel(i)) != -1 ) {
+ if (channels[i]->peer()==addr)
+ break;
+ }
+ if (chno==-1)
+ new Channel(file_,socket_,addr);
+ }
+}
+
+
+void Channel::AddPex (Datagram& dgram) {
+ int ch = file().RevealChannel(this->pex_out_);
+ if (ch==-1)
+ return;
+ Address a = channels[ch]->peer();
+ dgram.Push8(P2TP_PEX_ADD);
+ dgram.Push32(a.ipv4());
+ dgram.Push16(a.port());
}
FileTransfer* file = FileTransfer::Find(hash);
if (!file)
RETLOG ("hash unknown, no such file");
+ dprintf("%s #0 -hash ALL %s\n",Datagram::TimeStr(),hash.hex().c_str());
channel = new Channel(file, socket, data.address());
} else {
mych = DecodeID(mych);
channel = channels[mych];
if (!channel)
RETLOG ("channel is closed");
- if (channel->peer != data.address())
+ if (channel->peer() != data.address())
RETLOG ("invalid peer address");
- channel->Recv(data);
+ channel->own_id_mentioned_ = true;
}
- channel->Send();
+ channel->Recv(data);
}
}
-void Channel::Loop (tint time) {
-
- tint untiltime = Datagram::Time()+time;
- tbqueue send_queue;
- for(int i=0; i<channels.size(); i++)
- if (channels[i])
- send_queue.push_back(tintbin(Datagram::now,i));
+void Channel::RequeueSend (tint next_time) {
+ if (next_time==next_send_time_)
+ return;
+ next_send_time_ = next_time;
+ send_queue.push_back(tintbin(next_time,id));
+ push_heap(send_queue.begin(),send_queue.end(),tblater);
+ dprintf("%s requeue #%i for %s\n",Datagram::TimeStr(),id,Datagram::TimeStr(next_time));
+}
+
+
+void Channel::Loop (tint howlong) {
- while ( Datagram::now <= untiltime ) {
-
- tintbin next_send = send_queue.front();
- pop_heap(send_queue.begin(), send_queue.end(), tblater);
- send_queue.pop_back();
- tint wake_on = min(next_send.time,untiltime);
- tint towait = min(wake_on-Datagram::now,TINT_SEC); // towait<0?
-
- int rd = Datagram::Wait(socket_count,sockets,towait);
- if (rd!=-1)
- Recv(rd);
-
- int chid = (int)(next_send.bin);
- Channel* sender = channels[chid];
- if (sender) {
- tint next_time = sender->Send();
- if (next_time!=TINT_NEVER) {
- send_queue.push_back(tintbin(next_time,chid));
- push_heap(send_queue.begin(),send_queue.end(),tblater);
- } else {
- delete sender;
- channels[chid] = NULL;
- }
+ tint limit = Datagram::Time() + howlong;
+
+ do {
+
+ tint send_time(TINT_NEVER);
+ Channel* sender(NULL);
+ while (!send_queue.empty()) {
+ send_time = send_queue.front().time;
+ sender = channel((int)send_queue.front().bin);
+ if (sender && sender->next_send_time_==send_time)
+ break;
+ sender = NULL; // it was a stale entry
+ pop_heap(send_queue.begin(), send_queue.end(), tblater);
+ send_queue.pop_back();
}
-
- }
-
+ if (send_time>limit)
+ send_time = limit;
+ if (sender && send_time<=Datagram::now) {
+ dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id,
+ Datagram::TimeStr(send_time));
+ sender->Send();
+ pop_heap(send_queue.begin(), send_queue.end(), tblater);
+ send_queue.pop_back();
+ } else {
+ tint towait = send_time - Datagram::now;
+ dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
+ int rd = Datagram::Wait(socket_count,sockets,towait);
+ if (rd!=-1)
+ Recv(rd);
+ }
+
+ } while (Datagram::Time()<limit);
+
}
+
+
+
+/*
+
+ tint untiltime = Datagram::Time()+time;
+ if (send_queue.empty())
+ dprintf("%s empty send_queue\n", Datagram::TimeStr());
+
+ while ( Datagram::now <= untiltime && !send_queue.empty() ) {
+
+ // BUG BUG BUG no scheduled sends => just listen
+
+ tintbin next_send = send_queue.front();
+ tint wake_on = next_send.time;
+ Channel* sender = channel(next_send.bin);
+
+ // BUG BUG BUG filter stale timeouts here
+
+ //if (wake_on<=untiltime) {
+ pop_heap(send_queue.begin(), send_queue.end(), tblater);
+ send_queue.pop_back();
+ //}// else
+ //sender = 0; // BUG will never wake up
+
+ if (sender->next_send_time_!=next_send.time)
+ continue;
+
+ if (wake_on<Datagram::now)
+ wake_on = Datagram::now;
+ if (wake_on>untiltime)
+ wake_on = untiltime;
+ tint towait = min(wake_on-Datagram::now,TINT_SEC);
+ dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
+ int rd = Datagram::Wait(socket_count,sockets,towait);
+ if (rd!=-1)
+ Recv(rd);
+ // BUG WRONG BUG WRONG another may need to send
+ if (sender) {
+ dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id,
+ Datagram::TimeStr(next_send.time));
+ sender->Send();
+ // if (sender->cc_->next_send_time==TINT_NEVER)
+ }
+
+ }
+
+ */
\ No newline at end of file
CPPPATH=cpppath,
LIBS=libs,
LIBPATH=libpath )
+
+env.Program(
+ target='connecttest',
+ source=['connecttest.cpp'],
+ CPPPATH=cpppath,
+ LIBS=libs,
+ LIBPATH=libpath )
+
}
-TEST(BinsTest,SetRange) {
+
+TEST(BinsTest, Cover) {
+
+ bins b;
+ b.set(bin64_t(2,0));
+ b.set(bin64_t(4,1));
+ EXPECT_EQ(bin64_t(4,1),b.cover(bin64_t(0,30)));
+ //bins c;
+ //EXPECT_EQ(bin64_t::ALL,b.cover(bin64_t(0,30)));
+
+}
+
+
+TEST(BinsTest,FindFiltered2) {
+
+ bins data, filter;
+ for(int i=0; i<1024; i+=2)
+ data.set(bin64_t(0,i));
+ for(int j=1; j<1024; j+=2)
+ filter.set(bin64_t(0,j));
+ filter.set(bin64_t(0,501),bins::EMPTY);
+ EXPECT_EQ(bin64_t(0,501),data.find_filtered(filter,bin64_t(10,0),0));
+ data.set(bin64_t(0,501));
+ EXPECT_EQ(bin64_t::NONE,data.find_filtered(filter,bin64_t(10,0),0));
+
+}
+
+TEST(BinsTest,CopyRange) {
bins data, add;
data.set(bin64_t(2,0));
data.set(bin64_t(2,2));
add.set(bin64_t(1,4));
add.set(bin64_t(0,13));
add.set(bin64_t(5,118));
- data.set_range(add, bin64_t(3,0));
+ data.copy_range(add, bin64_t(3,0));
EXPECT_TRUE(bins::is_mixed(data.get(bin64_t(3,0))));
EXPECT_EQ(bins::EMPTY,data.get(bin64_t(2,0)));
EXPECT_EQ(bins::FILLED,data.get(bin64_t(2,1)));
TEST(P2TP,CwndTest) {
- int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
- int size = rand()%(1<<19) + (1<<19);
+
+ unlink("doc/sofi-copy.jpg");
+ struct stat st;
+ ASSERT_EQ(0,stat("doc/sofi.jpg", &st));
+ int size = st.st_size, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ;
+
+ /*int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
+ int size = 60<<10; //rand()%(1<<19) + (1<<19);
int sizek = (size>>10) + ((size&1023)?1:0);
char* b = (char*)malloc(size);
for(int i=0; i<size; i++)
b[i] = (i%1024!=1023) ? ('A' + rand()%('Z'-'A')) : ('\n');
write(f,b,size);
free(b);
- close(f);
+ close(f);*/
/*
struct sockaddr_in addr1, addr2;
addr2.sin_port = htons(7004);
addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
*/
- int sock1 = p2tp::Listen(7003);
+ int sock1 = p2tp::Listen(7001);
ASSERT_TRUE(sock1>=0);
//ASSERT_TRUE(sock2>=0);
- p2tp::AddPeer(Datagram::Address("127.0.0.1",7001));
-
- int file = p2tp::Open("big_test_file");
+ int file = p2tp::Open("doc/sofi.jpg");
FileTransfer* fileobj = FileTransfer::file(file);
+ FileTransfer::instance++;
+
+ p2tp::SetTracker(Datagram::Address("127.0.0.1",7001));
- int copy = p2tp::Open("big_test_file_copy",fileobj->root_hash());
+ int copy = p2tp::Open("doc/sofi-copy.jpg",fileobj->root_hash());
- p2tp::Loop(TINT_MSEC);
+ p2tp::Loop(TINT_SEC);
- ASSERT_EQ(sizek<<10,p2tp::Size(copy));
-
int count = 0;
- while (p2tp::SeqComplete(copy)!=size && count++<(1<<14))
- p2tp::Loop(TINT_MSEC);
+ while (p2tp::SeqComplete(copy)!=size && count++<20)
+ p2tp::Loop(TINT_SEC);
ASSERT_EQ(size,p2tp::SeqComplete(copy));
p2tp::Close(file);
if (4+8!=ack.Send())
fprintf(stderr,"short write\n");
fprintf(stderr,"%lli rcvd%i\n",now/TINT_SEC,seq);
+ //cc->OnDataRecv(bin64_t(0,seq));
// TODO: peer cwnd !!!
continue;
}
leech = new FileTransfer("copy",seed->root_hash());
EXPECT_EQ(2,leech->complete_kilo());
}
- bin64_t next = leech->picker()->Pick(seed->ack_out(),0);
+ bin64_t next = leech->picker().Pick(seed->ack_out(),0);
ASSERT_NE(bin64_t::NONE,next);
+ ASSERT_TRUE(next.base_offset()<5);
uint8_t buf[1024]; //size_t len = seed->storer->ReadData(next,&buf);
- size_t len = pread(seed->file_descriptor(),buf,1024,next.base_offset()<<10); // FIXME TEST FOR ERROR
+ size_t len = pread(seed->file_descriptor(),buf,1024,next.base_offset()<<10);
bin64_t sibling = next.sibling();
if (sibling.base_offset()<seed->size_kilo())
leech->OfferHash(sibling, seed->hash(sibling));
FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false),
peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
- complete_(0), completek_(0), seq_complete_(0)
+ complete_(0), completek_(0), seq_complete_(0) //, data_in_off_(0)
{
fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
if (fd_<0)
} else
hashes_[b] = Sha1Hash(hashes_[b.left()],hashes_[b.right()]);
peak_hashes_[p] = hashes_[peaks_[p]];
- ack_out_.set(peaks_[p],bins::FILLED);
+ //ack_out_.set(peaks_[p],bins::FILLED);
+ OnDataIn(peaks_[p]);
}
root_hash_ = DeriveRoot();
Sha1Hash *hash_tmp = hashes_;
}
-bin64_t FileTransfer::data_in (int offset) {
- if (offset>data_in_.size())
- return bin64_t::NONE;
- return data_in_[offset];
-}
-
-
bool FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_t length) {
if (!pos.is_base())
return false;
//printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str());
// walk to the nearest proven hash FIXME 0-layer peak
- ack_out_.set(pos,bins::FILLED);
+ OnDataIn(pos);
pwrite(fd_,data,length,pos.base_offset()<<10);
complete_ += length;
completek_++;
seq_complete_+=1024;
if (seq_complete_>size_)
seq_complete_ = size_;
- data_in_.push_back(pos);
return true;
}
+/*bin64_t FileTransfer::RevealAck (uint64_t& offset) {
+ if (offset<data_in_off_)
+ offset = data_in_off_;
+ for(int off=offset-data_in_off_; off<data_in_.size(); off++) {
+ offset++;
+ if (data_in_[off]!=bin64_t::NONE) {
+ bin64_t parent = data_in_[off].parent();
+ if (ack_out_.get(parent)!=bins::FILLED)
+ return data_in_[off];
+ else
+ data_in_[off] = bin64_t::NONE;
+ }
+ }
+ return bin64_t::NONE;
+}*/
+
+
+void FileTransfer::OnDataIn (bin64_t pos) {
+ ack_out_.set(pos,bins::FILLED);
+ /*bin64_t closed = pos;
+ while (ack_out_.get(closed.parent())==bins::FILLED) // TODO optimize
+ closed = closed.parent();
+ data_in_.push_back(closed);
+ // rotating the queue
+ bin64_t parent = data_in_.front().parent();
+ if (ack_out_.get(parent)!=bins::FILLED)
+ data_in_.push_back(data_in_.front());
+ data_in_.front() = bin64_t::NONE;
+ while ( !data_in_.empty() && data_in_.front()==bin64_t::NONE) {
+ data_in_.pop_front();
+ data_in_off_++;
+ }*/
+}
+
+
Sha1Hash FileTransfer::DeriveRoot () {
int c = peak_count_-1;
bin64_t p = peaks_[c];
FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
for(int i=0; i<files.size(); i++)
- if (files[i] && files[i]->root_hash_==root_hash)
+ if (files[i] && files[i]->root_hash()==root_hash)
return files[i];
return NULL;
}
+void FileTransfer::OnPexIn (const Address& addr) {
+ pex_in_.push_back(addr);
+ if (pex_in_.size()>1000)
+ pex_in_.pop_front();
+}
+
std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix)
{
}
-
-int p2tp::Open (const char* filename, const Sha1Hash& hash) {
+/*int p2tp::Open (const char* filename, const Sha1Hash& hash) {
FileTransfer* ft = new FileTransfer(filename, hash);
int fdes = ft->file_descriptor();
if (fdes>0) {
delete ft;
return -1;
}
+}*/
+
+
+uint32_t FileTransfer::RevealChannel (int& offset) {
+ while (offset<Channel::channels.size() &&
+ (!Channel::channels[offset] || Channel::channels[offset]->file_!=this) )
+ offset++;
+ return offset < Channel::channels.size() ? offset : -1;
}