+0.003 - This is not a release as well - 18 Oct 2009
+
+ - but at least, it compiles now
+
0.002 - This is not a release - 7 Oct 2009
- it does not even compile, committed for reading purposes only
const uint64_t bin64_t::NONE = 0xffffffffffffffffULL;
const uint64_t bin64_t::ALL = 0x7fffffffffffffffULL;
+const uint32_t bin64_t::NONE32 = 0xffffffffU;
+const uint32_t bin64_t::ALL32 = 0x7fffffffU;
+
+uint32_t bin64_t::to32() const {
+ if (v<0xffffffff && v!=0x7fffffff)
+ return (uint32_t)v;
+ if (v==ALL)
+ return ALL32;
+ return NONE32;
+}
bin64_t bin64_t::next_dfsio (uint8_t floor) {
/*while (ret.is_right())
uint64_t v;
static const uint64_t NONE;
static const uint64_t ALL;
+ static const uint32_t NONE32;
+ static const uint32_t ALL32;
bin64_t() : v(NONE) {}
bin64_t(const bin64_t&b) : v(b.v) {}
bin64_t(uint8_t layer, uint64_t offset) :
v( (offset<<(layer+1)) | ((1ULL<<layer)-1) ) {}
operator uint64_t () const { return v; }
+ uint32_t to32() const ;
bool operator == (bin64_t& b) const { return v==b.v; }
static bin64_t none () { return NONE; }
bins(const bins& b);
- uint16_t get (bin64_t bin);
+ uint16_t get (bin64_t bin);
- void set (bin64_t bin, fill_t val=FILLED);
+ void set (bin64_t bin, fill_t val=FILLED);
- bin64_t find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
+ bin64_t find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
- void remove (bins& b);
+ void remove (bins& b);
- void dump(const char* note);
+ void dump(const char* note);
uint64_t* get_stripes (int& count);
uint32_t size() { return cells_allocated; }
- // TODO: bitwise operators
+
+ bool empty () const { return !deep(0) && !halves[0]; }
private:
namespace p2tp {
tint Datagram::now = Datagram::Time();
+uint32_t Datagram::Address::LOCALHOST = INADDR_LOOPBACK;
int Datagram::Send () {
int r = sendto(sock,buf+offset,length-offset,0,
- (struct sockaddr*)&(addr),sizeof(struct sockaddr_in));
+ (struct sockaddr*)&(addr.addr),sizeof(struct sockaddr_in));
offset=0;
length=0;
now = Time();
return now=ret;
}
-int Datagram::Bind (int portno) {
- struct sockaddr_in addr;
+int Datagram::Bind (Address addr_) {
+ struct sockaddr_in addr = addr_;
int fd, len = sizeof(struct sockaddr_in),
sndbuf=1<<20, rcvbuf=1<<20;
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
return -3;
}
printf("BUFS: %i %i\n",sndbuf,rcvbuf);
- memset(&addr, 0, sizeof(struct sockaddr_in));
+ /*memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(portno);
- addr.sin_addr.s_addr = INADDR_ANY;
- if (::bind(fd, (struct sockaddr*)&addr, len) != 0) {
+ addr.sin_addr.s_addr = INADDR_ANY;*/
+ if (::bind(fd, (sockaddr*)&addr, len) != 0) {
PLOG(ERROR)<<"bind fails";
return -4;
}
struct Address {
struct sockaddr_in addr;
- Address() {
+ static uint32_t LOCALHOST;
+ void init(uint32_t ipv4=0, uint16_t port=0) {
memset(&addr,0,sizeof(struct sockaddr_in));
- }
- Address(const char* ip, uint16_t port) {
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = htonl(ipv4);
+ }
+ Address() { init(); }
+ Address(const char* ip, uint16_t port) {
+ init(LOCALHOST,port);
inet_aton(ip,&(addr.sin_addr));
}
Address(uint16_t port) {
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ init(LOCALHOST,port);
+ }
+ Address(uint32_t ipv4addr, uint16_t port) {
+ init(ipv4addr,port);
}
Address(const struct sockaddr_in& address) : addr(address) {}
- operator sockaddr_in () {return addr;}
+ operator sockaddr_in () const {return addr;}
+ bool operator == (const Address& b) {
+ return addr.sin_family==b.addr.sin_family &&
+ addr.sin_port==b.addr.sin_port &&
+ addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
+ }
+ bool operator != (const Address& b) { return !(*this==b); }
};
Address addr;
int offset, length;
uint8_t buf[MAXDGRAMSZ*2];
- static int Bind(int port);
+ static int Bind(Address address);
static void Close(int port);
static tint Time();
static int Wait (int sockcnt, int* sockets, tint usec=0);
class LedbatController : public CongestionController {
public:
- /*tint rtt_avg;
- tint dev_avg;
- int cwnd;
- int peer_cwnd;*/
- virtual void OnTimeout () {
+ tint dev_avg_, rtt_avg_;
+ tint last_send_time_, last_recv_time_;
+ int cwnd_, peer_cwnd_, in_flight_;
+ bin64_t last_bin_sent_;
+public:
+ LedbatController () : dev_avg_(0), rtt_avg_(TINT_SEC), last_send_time_(0),
+ last_recv_time_(0), cwnd_(1), peer_cwnd_(1), in_flight_(0) {
+ }
+
+ tint rtt_avg () {
+ return rtt_avg_;
+ }
+
+ tint dev_avg () {
+ return dev_avg_;
+ }
+
+ int cwnd () {
+ return cwnd_;
+ }
+
+ int peer_cwnd () {
+ return peer_cwnd_;
}
- virtual void OnDataSent(bin64_t b) {
+ int free_cwnd ( ){
+ return cwnd_ - in_flight_;
}
- virtual void OnDataRecvd(bin64_t b) {
+ tint next_send_time ( ){
+ return cwnd_ ? Datagram::now + (rtt_avg_/cwnd_) : TINT_NEVER; // TODO keepalives
}
- virtual void OnAckRcvd(bin64_t b, tint peer_stamp) {
+ void OnDataSent(bin64_t b) {
+ if (b==bin64_t::NONE) {
+ cwnd_ = 0; // nothing to send; suspend
+ } else {
+ last_bin_sent_ = b;
+ last_send_time_ = Datagram::now;
+ in_flight_++;
+ }
}
- virtual ~CongestionControl() {
+ void OnDataRecvd(bin64_t b) {
+ last_recv_time_ = Datagram::now;
}
+ void OnAckRcvd(bin64_t b, tint peer_stamp) {
+ if (last_bin_sent_!=b)
+ return;
+ rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac
+ in_flight_--;
+ }
+
+ ~LedbatController() {
+ }
};
*
*/
+#include <queue>
#include "p2tp.h"
using namespace p2tp;
class SimpleSelector : public PeerSelector {
- typedef std::pair<sockaddr_in,uint32_t> memo_t;
- std::queue<memo_t> peers;
+ typedef std::pair<Address,Sha1Hash> memo_t;
+ typedef std::deque<memo_t> peer_queue_t;
+ peer_queue_t peers;
public:
- virtual void PeerKnown (const Sha1Hash& root, struct sockaddr_in& addr) {
- peers.push_front(memo_t(addr,root.fingerprint()));
+ SimpleSelector () {
}
- virtual sockaddr_in GetPeer (const Sha1Hash& for_root) {
- uint32_t fp = for_root.fingerprint();
- for(std::queue<memo_t>::iterator i=peers.begin(); i!=peers.end(); i++)
- if (i->second==fp) {
+ void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) {
+ peers.push_front(memo_t(addr,root)); //,root.fingerprint() !!!
+ }
+ Address GetPeer (const Sha1Hash& for_root) {
+ //uint32_t fp = for_root.fingerprint();
+ for(peer_queue_t::iterator i=peers.begin(); i!=peers.end(); i++)
+ if (i->second==for_root) {
i->second = 0;
sockaddr_in ret = i->first;
while (peers.begin()->second==0)
peers.pop_front();
return ret;
}
+ return Address();
}
};
-static Channel::peer_selector = new SimpleSelector();
\ No newline at end of file
+PeerSelector* Channel::peer_selector = new SimpleSelector();
\ No newline at end of file
memcpy(bits,hash,SIZE);
}
-string Sha1Hash::hex() {
+string Sha1Hash::hex() const {
char hex[HASHSZ*2+1];
for(int i=0; i<HASHSZ; i++)
sprintf(hex+i*2, "%02x", bits[i]);
Sha1Hash(const char* bits);
Sha1Hash(bool hex, const char* hash);
- std::string hex();
+ std::string hex() const;
bool operator == (const Sha1Hash& b) const
{ return 0==memcmp(bits,b.bits,SIZE); }
bool operator != (const Sha1Hash& b) const { return !(*this==b); }
int Channel::MAX_REORDERING = 4;
p2tp::tint Channel::TIMEOUT = TINT_SEC*60;
std::vector<Channel*> Channel::channels(1);
-std::vector<File*> File::files(4);
-int* Channel::sockets_ = (int*)malloc(40);
-int Channel::sock_count_ = 0;
+int Channel::sockets[8] = {0,0,0,0,0,0,0,0};
+int Channel::socket_count = 0;
-Channel::Channel (int fd_, int socket, struct sockaddr_in peer_,
- uint32_t peer_channel_, uint64_t supports_) :
- fd(fd_), peer(peer_), peer_channel_id(peer_channel_), ack_out(0),
- peer_status_(File::EMPTY), socket_(socket)
+Channel::Channel (FileTransfer* file, int socket, struct sockaddr_in peer_addr) :
+ file_(file), peer(peer_addr), peer_channel_id(0),
+ socket_(socket) // FIXME
{
this->id = channels.size();
channels.push_back(this);
- DLOG(INFO)<<"new channel "<<id<<" "<<*this;
}
}
-File::File (int _fd) : fd(_fd), status_(DONE), hashes(_fd)
-{
- bin::vec peaks = bin::peaks(hashes.data_size());
- history.insert(history.end(),peaks.begin(),peaks.end());
- for(bin::vec::iterator i=peaks.begin(); i!=peaks.end(); i++)
- ack_out.set(*i);
-}
-File::File (Sha1Hash hash, int _fd) : hashes(hash), fd(_fd), status_(EMPTY) {
- // TODO resubmit data
-}
-File::~File() {
- if (fd>0) ::close(fd);
+int Channel::DecodeID(int scrambled) {
+ return scrambled;
+}
+int Channel::EncodeID(int unscrambled) {
+ return unscrambled;
}
-bool File::OfferHash (bin pos, const Sha1Hash& hash) {
- HashTree::hashres_t res = hashes.offer(pos,hash);
- if (res==HashTree::PEAK_ACCEPT) { // file size is finally known
- ftruncate(fd, size());
- LOG(INFO)<<fd<<" file size is set to "<<size();
- history.push_back(0);
- status_ = IN_PROGRESS;
- }
- return res==HashTree::PEAK_ACCEPT || res==HashTree::ACCEPT;
+int p2tp::Listen (Datagram::Address addr) {
+ int sock = Datagram::Bind(addr);
+ if (sock!=INVALID_SOCKET)
+ Channel::sockets[Channel::socket_count++] = sock;
+ return sock;
}
-File* File::find (const Sha1Hash& hash) {
- for(vector<File*>::iterator i=files.begin(); i!=files.end(); i++)
- if (*i && (*i)->hashes.root==hash)
- return *i;
- return NULL;
+void p2tp::Shutdown (int sock_des) {
+ for(int i=0; i<Channel::socket_count; i++)
+ if (Channel::sockets[i]==sock_des)
+ Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
+ Datagram::Close(sock_des);
}
-int Channel::DecodeID(int scrambled) {
- return scrambled;
+
+void p2tp::Loop (tint till) {
+ Channel::Loop(till);
}
-int Channel::EncodeID(int unscrambled) {
- return unscrambled;
+
+
+void p2tp::Close (int fd) {
+ // FIXME delete all channels
+ if (fd>FileTransfer::files.size() && FileTransfer::files[fd])
+ delete FileTransfer::files[fd];
}
-std::ostream& p2tp::operator << (std::ostream& os, const Channel& ch) {
- return os<<'{'<<ch.fd<<'}'<<sock2str(ch.peer)<<":"<<ch.id<<'>'<<ch.peer_channel_id;
+void p2tp::AddPeer (Datagram::Address address, const Sha1Hash& root) {
+ Channel::peer_selector->AddPeer(address,root);
}
-void Channel::Recv (int socket) {
- Datagram data(socket);
- data.Recv();
- //LOG(INFO)<<" RECV "<<data.to_string();
- int id = 0;
- if (data.size()<4)
- RETLOG("datagram shorter than 4 bytes");
- uint32_t mych = data.Pull32();
- uint8_t type;
- uint32_t peerch;
- Sha1Hash hash;
- Channel* channel;
- if (!mych) { // handshake initiated
- if (data.size()!=1+4+1+4+Sha1Hash::SIZE)
- RETLOG ("incorrect size initial handshake packet");
- type = data.Pull8();
- if (type) // handshake msg id is 0
- RETLOG ("it is not actually a handshake");
- peerch = data.Pull32();
- if (!peerch)
- RETLOG ("peer channel is zero");
- uint8_t hashid = data.Pull8();
- if (hashid!=P2TP_HASH)
- RETLOG ("no hash in the initial handshake");
- bin pos = data.Pull32();
- if (pos!=bin::ALL)
- RETLOG ("that is not the root hash");
- hash = data.PullHash();
- File* file = File::find(hash);
- if (!file)
- RETLOG ("hash unknown, no such file");
- channel = new Channel(file->fd, socket, data.address(), peerch);
- } else {
- mych = DecodeID(mych);
- if (mych>=channels.size())
- RETLOG ("invalid channel id");
- channel = channels[mych];
- id = channel->id;
- if (channel->peer.sin_addr.s_addr != data.address().sin_addr.s_addr)
- RETLOG ("invalid peer address");
- if (channel->peer.sin_port!=data.address().sin_port)
- RETLOG ("invalid peer port");
- if (!channel->peer_channel_id) { // handshake response
- if (data.size()<5)
- RETLOG ("insufficient return handshake length");
- type = data.Pull8();
- if (type)
- RETLOG ("it is not a handshake, after all");
- channel->peer_channel_id = data.Pull32();
- LOG(INFO)<<"out channel is open: "<<*channel;
- } else if (channel->cc_.avg_rtt()==0) {
- LOG(INFO)<<"in channel is open: "<<*channel;
- }
- if (channel->cc_.avg_rtt()==0)
- channel->cc_.RttSample(Datagram::now - channel->last_send_time + 1);
- channel->Recv(data);
- }
- channel->Send();
+size_t p2tp::Size (int fdes) {
+ if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+ return FileTransfer::files[fdes]->size();
+ else
+ return 0;
}
-void Channel::Tick () {
- // choking/unchoking
- // keepalives
- // ack timeout
- // if unchoked: don't bother
- // whether to unchoke
- // reevaluate reciprocity
- // otherwise, send update (if needed)
- // otherwise, send a keepalive
- CleanStaleHintIn();
- CleanStaleHintOut();
- if (last_send_time && Datagram::now-last_send_time>=Channel::TIMEOUT/2)
- Send();
+size_t p2tp::Complete (int fdes) {
+ if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+ return FileTransfer::files[fdes]->complete();
+ else
+ return 0;
}
+size_t p2tp::SeqComplete (int fdes) {
+ if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+ return FileTransfer::files[fdes]->seq_complete();
+ else
+ return 0;
+}
+
/** <h2> P2TP handshake </h2>
Basic rules:
<ul>
<b>Note:</b>
*/
-
-void Channel::Loop (tint time) {
-
- tint untiltime = Datagram::Time()+time;
-
- while ( Datagram::now <= untiltime ) {
-
- tint towait = min(untiltime,Datagram::now+TINT_1SEC) - Datagram::now;
- int rd = Datagram::Wait(sock_count_,sockets_,towait);
- if (rd!=-1)
- Recv(rd);
-
- /*if (Datagram::now-last_tick>TINT_1SEC) {
- for(int i=0; i<channels.size(); i++)
- if (channels[i])
- channels[i]->Tick();
- last_tick = Datagram::now;
- }*/
-
- }
-
-}
-
-int p2tp::Open (const char* filename) {
- int fd = ::open(filename,O_RDONLY);
- if (fd<0)
- return -1;
- if (File::files.size()<fd+1)
- File::files.resize(fd+1);
- File::files[fd] = new File(fd);
- return fd;
-}
-
-int p2tp::Open (const Sha1Hash& root_hash, const char* filename) {
- int fd = ::open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
- if (fd<0)
- return -1;
- if (File::files.size()<fd+1)
- File::files.resize(fd+1);
- File::files[fd] = new File(root_hash,fd);
- return fd;
-}
-
-size_t p2tp::file_size (int fd) { return File::file(fd)->size(); }
-
-void p2tp::Close (int fid) {
- if (!File::files[fid])
- return;
- delete File::files[fid];
- File::files[fid] = NULL;
-}
-
-
-int p2tp::Connect (int fd, int sock, const struct sockaddr_in& addr, uint32_t peerch) {
- Channel *ch = new Channel(fd,sock,addr,peerch);
- ch->Send();
- return ch->id;
-}
-
-void p2tp::Loop (tint time) {
- Channel::Loop(time);
-}
-
-int p2tp::Init (int portno) {
- int sock = Datagram::Bind(portno);
- if (sock>0)
- Channel::sockets_[Channel::sock_count_++] = sock;
- return sock;
-}
-
-void p2tp::Shutdown (int sock) {
- int i=0;
- while (i<Channel::sock_count_ && Channel::sockets_[i]!=sock) i++;
- if (i==Channel::sock_count_) {
- LOG(ERROR)<<"socket "<<sock<<" is unknown to p2tp";
- return;
- }
- Channel::sockets_[i] = Channel::sockets_[--Channel::sock_count_];
- Datagram::Close(sock);
-}
-
-
-uint32_t p2tp::Width (const tbinvec& v) {
- uint32_t ret = 0;
- for(tbinvec::const_iterator i=v.begin(); i!=v.end(); i++)
- ret += i->pos.width();
- return ret;
-}
-
typedef std::deque<tintbin> tbqueue;
typedef std::deque<bin64_t> binqueue;
+ typedef Datagram::Address Address;
typedef enum {
P2TP_HANDSHAKE = 0,
public:
/** Open/submit/retrieve a file. */
- FileTransfer(const Sha1Hash& _root_hash, const char *file_name);
+ FileTransfer(const char *file_name, const Sha1Hash& _root_hash=Sha1Hash::ZERO);
/** Close everything. */
~FileTransfer();
void OfferHash (bin64_t pos, const Sha1Hash& hash);
/** Offer data; the behavior is the same as with a hash:
accept or remember or drop. Returns true => ACK is sent. */
- bool OfferData (bin64_t bin, uint8_t* data, size_t length);
+ bool OfferData (bin64_t bin, const uint8_t* data, size_t length);
- bin64_t PickBinForRequest (bins& from, uint8_t layer) ; static FileTransfer* Find (const Sha1Hash& hash);
+ static FileTransfer* Find (const Sha1Hash& hash);
static FileTransfer* file (int fd) {
return fd<files.size() ? files[fd] : NULL;
}
uint64_t seq_complete () const { return seq_complete_; }
bins& ack_out () { return ack_out_; }
int file_descriptor () const { return fd_; }
-
- friend int Open (const char* filename);
- friend int Open (const Sha1Hash& hash, const char* filename);
- friend void Close (int fdes);
+ PiecePicker* picker () { return picker_; }
static int instance; // FIXME this smells
void LoadPeaks();
friend class Channel;
+ friend size_t Size (int fdes);
+ friend size_t Complete (int fdes);
+ friend size_t SeqComplete (int fdes);
+ friend int Open (const char* filename, const Sha1Hash& hash) ;
+ friend void Close (int fd) ;
};
-
- int Open (const char* filename) ;
- int Open (const Sha1Hash& hash, const char* filename) ;
- void Close (int fid) ;
- void Loop (tint till);
- int Bind (int port);
- void Shutdown (int fd);
- void HeardOfPeer (const Sha1Hash& root, struct sockaddr_in address);
-
class CongestionController {
public:
class PeerSelector {
public:
- virtual void PeerKnown
- (const Sha1Hash& root, struct sockaddr_in& addr) = 0;
- virtual struct sockaddr_in GetPeer
- (const Sha1Hash& for_root) = 0;
+ virtual void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) = 0;
+ virtual Datagram::Address GetPeer (const Sha1Hash& for_root) = 0;
};
class DataStorer {
hash or a fragment of it into every datagram.) */
class Channel {
public:
- Channel (int filedes, int socket, struct sockaddr_in peer,
- uint32_t peer_channel, uint64_t supports=0);
+ Channel (FileTransfer* file, int socket, struct sockaddr_in peer);
~Channel();
static void Recv (int socket);
FileTransfer& file() { return *file_; }
- /*friend int Connect (int fd, int sock,
- const struct sockaddr_in& addr, uint32_t peerch=0);
- friend int Init (int portno);
- friend std::ostream& operator << (std::ostream& os, const Channel& s);*/
private:
/** Channel id: index in the channel array. */
uint32_t id;
/** Socket address of the peer. */
- struct sockaddr_in peer;
+ Datagram::Address peer;
/** The UDP socket fd. */
- int socket;
+ int socket_;
/** Descriptor of the file in question. */
FileTransfer* file_;
/** Peer channel id; zero if we are trying to open a channel. */
static PeerSelector* peer_selector;
- static int MAX_REORDERING;
- static tint TIMEOUT;
+ static int MAX_REORDERING;
+ static tint TIMEOUT;
static std::vector<Channel*> channels;
- int sockets[4];
- int sock_count;
- static tint last_tick;
+ static int sockets[8];
+ static int socket_count;
+ static tint last_tick;
+ friend int Listen (Datagram::Address addr);
+ friend void Shutdown (int sock_des);
+ friend void AddPeer (Datagram::Address address, const Sha1Hash& root);
};
+
+ /*************** The top-level API ****************/
+ /** Start listening a port. Returns socket descriptor. */
+ int Listen (Datagram::Address addr);
+ /** Run send/receive loop for the specified amount of time. */
+ void Loop (tint till);
+ /** Stop listening to a port. */
+ void Shutdown (int sock_des);
+
+ /** Open a file, start a transmission; fill it with content for a given root hash;
+ in case the hash is omitted, the file is a fresh submit. */
+ int Open (const char* filename, const Sha1Hash& hash=Sha1Hash::ZERO) ;
+ /** Close a file and a transmission. */
+ void Close (int fd) ;
+ /** Add a possible peer which participares in a given transmission. In the case
+ root hash is zero, the peer might be talked to regarding any transmission
+ (likely, a tracker, cache or an archive). */
+ void AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO);
+
+ /** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte
+ before the transmission is complete. */
+ size_t Size (int fdes);
+ /** Returns the amount of retrieved and verified data, in bytes.
+ A 100% complete transmission has Size()==Complete(). */
+ size_t Complete (int fdes);
+ /** Returns the number of bytes that are complete sequentially, starting from the
+ beginning, till the first not-yet-retrieved packet. */
+ size_t SeqComplete (int fdes);
+
+
//uint32_t Width (const tbinvec& v);
}
#include <glog/logging.h>
#include "p2tp.h"
+#include "ext/dummy_controller.cpp"
+
using namespace std;
using namespace p2tp;
-
void Channel::AddPeakHashes (Datagram& dgram) {
- const std::vector<binhash>& peaks = file().hashes.peak_hashes();
- for(int i=0; i<peaks.size(); i++) {
+ for(int i=0; i<file().peak_count(); i++) {
dgram.Push8(P2TP_HASH);
- dgram.Push32(peaks[i].first);
- dgram.PushHash(peaks[i].second);
- DLOG(INFO)<<"#"<<id<<" +pHASH"<<peaks[i].first;
+ dgram.Push32((uint32_t)file().peak(i));
+ dgram.PushHash(file().peak_hash(i));
+ DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
}
}
-void Channel::AddUncleHashes (Datagram& dgram, bin pos) {
- bin root = pos;
- while (root.parent()<=file().hashes.data_mass() && ack_in.clean(root.parent()))
- root = root.parent();
- while (root!=pos) {
- root = root.child(pos);
- bin uncle = root.sibling();
+void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
+ bin64_t peak = file().peak_for(pos);
+ while (pos!=peak && ack_in.get(pos.parent())==bins::EMPTY) {
+ bin64_t uncle = pos.sibling();
dgram.Push8(P2TP_HASH);
- dgram.Push32(uncle);
- dgram.PushHash( file().hashes[uncle] );
+ dgram.Push32((uint32_t)uncle);
+ dgram.PushHash( file().hash(uncle) );
DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
- }
+ pos = pos.parent();
+ }
}
-bin Channel::SenderPiecePick () { // TODO: resilience
+bin64_t Channel::DequeueHint () { // TODO: resilience
while (!hint_in.empty()) {
- bin hint = hint_in.front().pos;
+ bin64_t hint = hint_in.front();
hint_in.pop_front();
- if (ack_in.contains(hint))
+ if (ack_in.get(hint)==bins::FILLED)
continue;
- if (hint.layer()) {
- bin l=hint.left(), r=hint.right();
- //if (false)//rand()&1)
+ if ( file().ack_out().get(hint)==bins::EMPTY )
+ continue;
+ if (!hint.is_base()) {
+ bin64_t l=hint.left(), r=hint.right();
+ //if (rand()&1)
// swap(l,r);
- hint_in.push_front(tintbin(Datagram::now,r));
- hint_in.push_front(tintbin(Datagram::now,l));
+ hint_in.push_front(r);
+ hint_in.push_front(l);
continue;
}
- if ( !file().ack_out.contains(hint) )
- continue;
return hint;
}
- return 0;
-}
-
-
-
-Channel::state_t Channel::state () const {
- if (!peer_channel_id)
- return HS_REQ_OUT;
- if (cc_.avg_rtt()==0)
- return HS_RES_OUT;
- return HS_DONE;
+ return bin64_t::NONE;
}
-void Channel::CleanStaleDataOut (bin ack_pos) {
-
- if (ack_pos)
- for(int i=0; i<data_out.size() && i<MAX_REORDERING*2; i++)
- if (data_out[i].pos && ack_pos.contains(data_out[i].pos)) {
- cc_.RttSample(Datagram::now-data_out[i].time);
- cc_.OnCongestionEvent(CongestionControl::ACK_EV);
- data_out[i].pos = 0;
- }
- while (!data_out.empty() && data_out[0].pos==0)
- data_out.pop_front();
-
- tint timed_out = Datagram::now - cc_.safe_avg_rtt();
- while (!data_out.empty() && data_out.front().time < timed_out) {
- DLOG(INFO)<<*this<<" loss: "<<data_out.front().pos;
- // reordering is a loss, collision is a loss
- cc_.OnCongestionEvent(CongestionControl::LOSS_EV);
- data_out.pop_front();
- }
-
-}
-
-
-void Channel::CleanStaleHintOut () {
- while ( !hint_out.empty() && file().ack_out.contains(hint_out.front().pos) )
- hint_out.pop_front();
- tint timed_out = Datagram::now - cc_.safe_avg_rtt()*4; //FIXME: timeout
+void Channel::CleanStaleHints () {
+ while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED )
+ hint_out.pop_front(); // FIXME must normally clear fulfilled entries
+ tint timed_out = Datagram::now - cc->rtt_avg()*8;
while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
- file().hint_out -= hint_out.front().pos;
- hint_out.pop_front(); // TODO: ignore count
+ file().picker()->Snubbed(hint_out.front().bin);
+ hint_out.pop_front();
}
}
-void Channel::CleanStaleHintIn () {
- // do I need it?
-}
-
-void Channel::SendHandshake () {
- Datagram dgram(socket_,peer);
- dgram.Push32(peer_channel_id);
+void Channel::AddHandshake (Datagram& dgram) {
dgram.Push8(P2TP_HANDSHAKE);
dgram.Push32(EncodeID(id));
if (!peer_channel_id) { // initiating
dgram.Push8(P2TP_HASH);
- dgram.Push32(bin::ALL);
- dgram.PushHash(file().hashes.root);
- AddAck(dgram);
- } else { // responding
- AddAck(dgram);
+ dgram.Push32(bin64_t::ALL32);
+ dgram.PushHash(file().root_hash());
}
- DLOG(INFO)<<"#"<<id<<" sending a handshake to "<<*this;
- PCHECK( dgram.Send() != -1 )<<"error sending";
- last_send_time = Datagram::now;
+ AddAck(dgram);
+ //DLOG(INFO)<<"#"<<id<<" sending a handshake to "<<this->id_string();
}
-void Channel::SendData () {
- CleanStaleDataOut(0);
- int round = 0;
+tint Channel::Send () {
Datagram dgram(socket_,peer);
dgram.Push32(peer_channel_id);
- AddAck(dgram);
- AddHint(dgram);
- while (cc_.cwnd()>data_out.size()) {
- AddData(dgram); // always the last: might be tail block
- if (dgram.size()==4 && Datagram::now-last_send_time<TIMEOUT/2)
- break; // nothing to send
- DLOG(INFO)<<"#"<<id<<" sending "<<dgram.size()<<" bytes";
- PCHECK( dgram.Send() != -1 )<<"error sending";
- last_send_time = Datagram::now;
- round++;
- dgram.Clear();
- dgram.Push32(peer_channel_id);
- }
- DLOG(INFO)<<"#"<<id<<" sent "<<round<<" rounds";
-}
-
-
-void Channel::Send () {
- if (state()==HS_DONE)
- SendData();
- else
- SendHandshake();
-}
-
-
-bin Channel::ReceiverPiecePick (int limit) {
- bins diff(ack_in);
- diff -= file().ack_out;
- diff -= file().hint_out;
- if (diff.empty()) {
- //uninteresting = true;
- return 0;
- }
- bin need = *(diff.begin());
- while (need.width()>std::max(1,limit))
- need = need.left();
- return need;
+ if ( is_established() ) {
+ AddAck(dgram);
+ AddHint(dgram);
+ if (cc->free_cwnd() && Datagram::now>=cc->next_send_time()) {
+ bin64_t data = AddData(dgram);
+ cc->OnDataSent(data);
+ }
+ } else {
+ AddHandshake(dgram);
+ }
+ DLOG(INFO)<<"#"<<id<<" sending "<<dgram.size()<<" bytes";
+ PCHECK( dgram.Send() != -1 )<<"error sending";
+ last_send_time = Datagram::now;
+ return cc->next_send_time();
}
void Channel::AddHint (Datagram& dgram) {
- CleanStaleHintOut();
- int onesec = TINT_1SEC/cc_.data_in_rate();
- if (Width(hint_out)<onesec) {
- bin hint = ReceiverPiecePick(onesec);
- if (hint) {
- dgram.Push8(P2TP_HINT);
- dgram.Push32(hint);
- hint_out.push_back(tintbin(Datagram::now,hint));
- file().hint_out |= hint; // FIXME: incapsulate File data
- DLOG(INFO)<<"#"<<id<<" +HINT"<<hint;
- }
- }
+ CleanStaleHints();
+ uint64_t outstanding = 0;
+ for(tbqueue::iterator i=hint_out.begin(); i!=hint_out.end(); i++)
+ outstanding += i->bin.width();
+ uint64_t kbps = TINT_SEC * cc->peer_cwnd() / cc->rtt_avg();
+ if (outstanding>kbps) // have enough
+ return;
+ uint8_t layer = 0;
+ while( (1<<layer) < kbps ) layer++;
+ bin64_t hint = file().picker()->Pick(ack_in,layer);
+ if (hint==bin64_t::NONE)
+ return;
+ dgram.Push8(P2TP_HINT);
+ dgram.Push32(hint);
+ hint_out.push_back(tintbin(Datagram::now,hint));
+ DLOG(INFO)<<"#"<<id<<" +HINT"<<hint;
}
-bin Channel::AddData (Datagram& dgram) {
- if (!file().history.size()) // know nothing
- return 0;
- bin tosend = hash_out ? hash_out : SenderPiecePick();
- hash_out = 0;
- if (tosend==0) {
- LOG(WARNING)<<*this<<" no idea what to send";
- cc_.OnCongestionEvent(CongestionControl::LOSS_EV);
- return 0;
+bin64_t Channel::AddData (Datagram& dgram) {
+ if (!file().size()) // know nothing
+ return bin64_t::NONE;
+ bin64_t tosend = DequeueHint();
+ if (tosend==bin64_t::NONE) {
+ //LOG(WARNING)<<this->id_string()<<" no idea what to send";
+ cc->OnDataSent(bin64_t::NONE);
+ return bin64_t::NONE;
}
- if (peer_status()==File::EMPTY && file().history.size()) //FIXME
+ if (ack_in.empty() && file().size())
AddPeakHashes(dgram);
AddUncleHashes(dgram,tosend);
uint8_t buf[1024];
- size_t r = pread(fd,buf,1024,tosend.offset()<<10); // TODO: ??? corrupted data, retries
+ size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); // TODO: ??? corrupted data, retries
if (r<0) {
PLOG(ERROR)<<"error on reading";
return 0;
}
- if (dgram.space()<r+4+1) {
- hash_out = tosend;
- return -tosend; // FIXME
- }
+ assert(dgram.space()>=r+4+1);
dgram.Push8(P2TP_DATA);
dgram.Push32(tosend);
dgram.Push(buf,r);
- data_out.push_back(tintbin(Datagram::Time(),tosend));
DLOG(INFO)<<"#"<<id<<" +DATA"<<tosend;
return tosend;
}
void Channel::AddAck (Datagram& dgram) {
- int ackspace = min(4,dgram.space()/5);
- if (data_in_) {
- dgram.Push8(P2TP_ACK);
- dgram.Push32(data_in_);
- DLOG(INFO)<<"#"<<id<<" +!ACK"<<data_in_;
- ackspace--;
- }
- while (ack_out<file().history.size() && ackspace) {
- bin h=file().history[ack_out++];
- if (!file().ack_out.contains(h.parent()) && h!=data_in_) {
- dgram.Push8(P2TP_ACK);
- dgram.Push32(h);
- DLOG(INFO)<<"#"<<id<<" +ACK"<<h;
- ackspace--;
- }
+ if (data_in_.time) {
+ dgram.Push8(P2TP_ACK_TS);
+ dgram.Push32(data_in_.bin);
+ dgram.Push64(data_in_.time);
+ data_in_.time = 0;
+ DLOG(INFO)<<"#"<<id<<" +!ACK"<<data_in_.bin;
}
- data_in_ = 0;
+ bin64_t h=file().data_in(ack_out_);
+ int count=0;
+ while (h!=bin64_t::NONE && count++<4) {
+ dgram.Push8(P2TP_ACK);
+ dgram.Push32(h);
+ DLOG(INFO)<<"#"<<id<<" +ACK"<<h;
+ h=file().data_in(++ack_out_);
+ }
}
while (dgram.size()) {
uint8_t type = dgram.Pull8();
switch (type) {
+ case P2TP_HANDSHAKE: OnHandshake(dgram); break;
case P2TP_DATA: OnData(dgram); break;
+ case P2TP_ACK_TS: OnAckTs(dgram); break;
case P2TP_ACK: OnAck(dgram); break;
case P2TP_HASH: OnHash(dgram); break;
case P2TP_HINT: OnHint(dgram); break;
+ case P2TP_PEX_ADD: OnPex(dgram); break;
default:
- LOG(ERROR) << *this << " malformed datagram";
+ //LOG(ERROR) << this->id_string() << " malformed datagram";
return;
}
}
void Channel::OnHash (Datagram& dgram) {
- bin pos = dgram.Pull32();
+ bin64_t pos = dgram.Pull32();
Sha1Hash hash = dgram.PullHash();
- if (file().OfferHash(pos,hash))
- DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
+ file().OfferHash(pos,hash);
+ DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
}
void Channel::OnData (Datagram& dgram) {
- bin pos = dgram.Pull32();
- uint8_t* data;
- size_t length = dgram.Pull(&data,1024);
+ bin64_t pos = dgram.Pull32();
DLOG(INFO)<<"#"<<id<<" .DATA"<<pos;
- if (pos.layer())
- RETLOG("non-base layer DATA pos");
- if (file().ack_out.contains(pos))
- RETLOG("duplicate transmission: "<<pos);
- if (file().status()==File::EMPTY)
- RETLOG("DATA for an empty file");
- if (pos.offset()>=file().packet_size())
- RETLOG("DATA pos out of bounds");
- Sha1Hash hash(data,length);
- if (file().OfferHash(pos, hash)) {
- //memcpy(file->data+offset*KILO,
- //channel->datagram->data+channel->datagram->offset,KILO);
- pwrite(fd,data,length,pos.offset()*1024); // TODO; if (last) ftruncate
- if (pos==file().hashes.data_mass()) {
- int lendiff = 1024-length;
- ftruncate(fd, file().size()-lendiff);
- }
- data_in_ = pos;
- file().ack_out |= pos;
- file().history.push_back(file().ack_out.get(pos));
- if (file().history.size()==file().packet_size()+1) // FIXME: encapsulate
- file().status_ = File::DONE;
- cc_.OnCongestionEvent(CongestionControl::DATA_EV);
- //DLOG(INFO)<<*this<<" DATA< "<<pos;
- CleanStaleHintOut();
- } else
- LOG(ERROR)<<"data hash is not accepted "<<pos<<" len "<<length;
+ file().OfferData(pos, *dgram, dgram.size());
+ cc->OnDataRecvd(pos);
+ CleanStaleHints();
}
void Channel::OnAck (Datagram& dgram) {
- // FIXME check whether it is in the range
- bin pos = dgram.Pull32();
+ // note: no bound checking
+ bin64_t pos = dgram.Pull32();
DLOG(INFO)<<"#"<<id<<" .ACK"<<pos;
- if (file().hashes.data_mass() && pos>file().hashes.data_mass()) {
- LOG(WARNING) << "out-of-bounds ACK";
- return;
- }
- ack_in |= pos;
-
- CleanStaleDataOut(pos);
-
- if (peer_status_==File::EMPTY) {
- peer_status_ = File::IN_PROGRESS;
- } else if (peer_status_==File::IN_PROGRESS) {
- // FIXME: FINISHED ack_in_.filled(file().size())
- }
+ ack_in.set(pos);
+}
+
+
+void Channel::OnAckTs (Datagram& dgram) {
+ bin64_t pos = dgram.Pull32();
+ tint ts = dgram.Pull64();
+ DLOG(INFO)<<"#"<<id<<" ,ACK"<<pos;
+ //dprintf("%lli #%i +ack %lli +ts %lli",Datagram::now,id,pos,ts);
+ ack_in.set(pos);
+ cc->OnAckRcvd(tintbin(ts,pos));
}
void Channel::OnHint (Datagram& dgram) {
- bin hint = dgram.Pull32();
- hint_in.push_back(tintbin(Datagram::now,hint));
+ bin64_t hint = dgram.Pull32();
+ hint_in.push_back(hint);
+}
+
+
+void Channel::OnHandshake (Datagram& dgram) {
+ peer_channel_id = dgram.Pull32();
+ // FUTURE: channel forking
+}
+
+
+void Channel::OnPex (Datagram& dgram) {
+ uint32_t addr = dgram.Pull32();
+ uint16_t port = dgram.Pull16();
+ if (peer_selector)
+ peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
+}
+
+
+void Channel::Recv (int socket) {
+ Datagram data(socket);
+ data.Recv();
+ if (data.size()<4)
+ RETLOG("datagram shorter than 4 bytes");
+ uint32_t mych = data.Pull32();
+ Sha1Hash hash;
+ Channel* channel;
+ if (!mych) { // handshake initiated
+ if (data.size()<1+4+1+4+Sha1Hash::SIZE)
+ RETLOG ("incorrect size initial handshake packet");
+ uint8_t hashid = data.Pull8();
+ if (hashid!=P2TP_HASH)
+ RETLOG ("no hash in the initial handshake");
+ bin pos = data.Pull32();
+ if (pos!=bin64_t::ALL32)
+ RETLOG ("that is not the root hash");
+ hash = data.PullHash();
+ FileTransfer* file = FileTransfer::Find(hash);
+ if (!file)
+ RETLOG ("hash unknown, no such file");
+ channel = new Channel(file, socket, data.address());
+ } else {
+ mych = DecodeID(mych);
+ if (mych>=channels.size())
+ RETLOG ("invalid channel id");
+ channel = channels[mych];
+ if (!channel)
+ RETLOG ("channel is closed");
+ if (channel->peer != data.address())
+ RETLOG ("invalid peer address");
+ channel->Recv(data);
+ }
+ channel->Send();
+}
+
+
+bool tblater (const tintbin& a, const tintbin& b) {
+ return a.time > b.time;
}
+void Channel::Loop (tint time) {
+
+ tint untiltime = Datagram::Time()+time;
+ tbqueue send_queue;
+ for(int i=0; i<channels.size(); i++)
+ if (channels[i])
+ send_queue.push_back(tintbin(Datagram::now,i));
+
+ while ( Datagram::now <= untiltime ) {
+
+ tintbin next_send = send_queue.front();
+ pop_heap(send_queue.begin(), send_queue.end(), tblater);
+ send_queue.pop_back();
+ tint wake_on = min(next_send.time,untiltime);
+ tint towait = min(wake_on-Datagram::now,TINT_SEC); // towait<0?
+
+ int rd = Datagram::Wait(socket_count,sockets,towait);
+ if (rd!=-1)
+ Recv(rd);
+
+ int chid = (int)(next_send.bin);
+ Channel* sender = channels[chid];
+ if (sender) {
+ tint next_time = sender->Send();
+ if (next_time!=TINT_NEVER) {
+ send_queue.push_back(tintbin(next_time,chid));
+ push_heap(send_queue.begin(),send_queue.end(),tblater);
+ } else {
+ delete sender;
+ channels[chid] = NULL;
+ }
+ }
+
+ }
+
+}
EXPECT_EQ(bin64_t(3,0),i);
}
+TEST(Bin64Test, Bits) {
+ bin64_t all = bin64_t::ALL, none = bin64_t::NONE, big = bin64_t(40,18);
+ uint32_t a32 = all.to32(), n32 = none.to32(), b32 = big.to32();
+ EXPECT_EQ(0x7fffffff,a32);
+ EXPECT_EQ(0xffffffff,n32);
+ EXPECT_EQ(bin64_t::NONE32,b32);
+}
+
int main (int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
#include <glog/logging.h>
#include "p2tp.h"
+using namespace p2tp;
/*TEST(P2TP, ConnectTest) {
TEST(P2TP,CwndTest) {
int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
int size = rand()%(1<<19) + (1<<19);
+ int sizek = (size>>10) + ((size&1023)?1:0);
char* b = (char*)malloc(size);
for(int i=0; i<size; i++)
b[i] = (i%1024!=1023) ? ('A' + rand()%('Z'-'A')) : ('\n');
write(f,b,size);
free(b);
close(f);
-
+
+ /*
struct sockaddr_in addr1, addr2;
addr1.sin_family = AF_INET;
addr1.sin_port = htons(7003);
addr2.sin_family = AF_INET;
addr2.sin_port = htons(7004);
addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-
- int sock1 = p2tp::Init(7003);
- int sock2 = p2tp::Init(7004);
+ */
+ int sock1 = p2tp::Listen(7003);
ASSERT_TRUE(sock1>=0);
- ASSERT_TRUE(sock2>=0);
+ //ASSERT_TRUE(sock2>=0);
+ p2tp::AddPeer(Datagram::Address("127.0.0.1",7001));
+
int file = p2tp::Open("big_test_file");
- p2tp::File& fileobj = * p2tp::File::file(file);
-
- int copy = p2tp::Open(fileobj.root_hash(),"big_test_file_copy");
- p2tp::File& copyobj = * p2tp::File::file(copy);
+ FileTransfer* fileobj = FileTransfer::file(file);
+
+ int copy = p2tp::Open("big_test_file_copy",fileobj->root_hash());
- int chan = p2tp::Connect(copy,sock1,addr2);
-
- p2tp::Loop();
- p2tp::Loop();
- p2tp::Channel& sendch = * p2tp::Channel::channel(chan+1);
- while (copyobj.status()!=p2tp::File::DONE) {
- p2tp::Loop();
- LOG(INFO)<<sendch.congestion_control().cwnd()<<" cwnd";
- //EXPECT_GE(1,sendch.congestion_control().cwnd());
- }
-
- ASSERT_EQ(p2tp::file_size(file),p2tp::file_size(copy));
+ p2tp::Loop(TINT_MSEC);
+
+ ASSERT_EQ(sizek<<10,p2tp::Size(copy));
+
+ int count = 0;
+ while (p2tp::SeqComplete(copy)!=size && count++<(1<<14))
+ p2tp::Loop(TINT_MSEC);
+ ASSERT_EQ(size,p2tp::SeqComplete(copy));
+
p2tp::Close(file);
p2tp::Close(copy);
p2tp::Shutdown(sock1);
- p2tp::Shutdown(sock2);
+ //p2tp::Release(sock2);
}
int main (int argc, char** argv) {
- bin::init();
- bins::init();
+ //bin::init();
+ //bins::init();
google::InitGoogleLogging(argv[0]);
testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
int datalen = strlen(text)+1+2+4+8;
ASSERT_EQ(datalen,d.Send());
int socks[1] = {socket};
- socket = Datagram::Wait(1,socks);
+ ASSERT_EQ (socket, Datagram::Wait(1,socks));
Datagram rcv(socket);
ASSERT_EQ(datalen,rcv.Recv());
char* rbuf;
// FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
-FileTransfer::FileTransfer (const Sha1Hash& _root_hash, const char* filename) :
+FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false),
peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
complete_(0), completek_(0), seq_complete_(0)
fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
if (fd_<0)
return;
+ if (files.size()<fd_+1)
+ files.resize(fd_+1);
+ files[fd_] = this;
if (root_hash_==Sha1Hash::ZERO) // fresh submit, hash it
Submit();
else
}
-bin64_t FileTransfer::PickBinForRequest (bins& from, uint8_t layer) {
- return picker_->Pick(from,layer);
-}
-
-
void FileTransfer::LoadPeaks () {
char file_name[1024];
sprintf(file_name,PEAK_FILE_TEMPLATE,root_hash().hex().c_str(),instance);
}
+bin64_t FileTransfer::peak_for (bin64_t pos) const {
+ int pi=0;
+ while (pi<peak_count_ && !pos.within(peaks_[pi]))
+ pi++;
+ return pi==peak_count_ ? bin64_t(bin64_t::NONE) : peaks_[pi];
+}
+
+
void FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {
if (!size_) // only peak hashes are accepted at this point
return OfferPeak(pos,hash);
}
-bool FileTransfer::OfferData (bin64_t pos, uint8_t* data, size_t length) {
+bool FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_t length) {
if (!pos.is_base())
return false;
if (length<1024 && pos!=bin64_t(0,sizek_-1))
return false;
if (ack_out_.get(pos)==bins::FILLED)
return true; // ???
- int pi=0;
- while (pi<peak_count_ && !pos.within(peaks_[pi]))
- pi++;
- if (pi==peak_count_)
+ bin64_t peak = peak_for(pos);
+ if (peak==bin64_t::NONE)
return false;
- bin64_t peak = peaks_[pi];
Sha1Hash hash(data,length);
bin64_t p = pos;
munmap(hashes_,sizek_*2*Sha1Hash::SIZE);
close(hashfd_);
close(fd_);
+ files[fd_] = NULL;
}
}
-int p2tp::Open (const char* filename) {
- return Open(Sha1Hash::ZERO,filename);
-}
-
-
-int p2tp::Open (const Sha1Hash& hash, const char* filename) {
- FileTransfer* ft = new FileTransfer(hash, filename);
- if (ft->fd_>0) {
- if (FileTransfer::files.size()<ft->fd_)
- FileTransfer::files.resize(ft->fd_);
- FileTransfer::files[ft->fd_] = ft;
- return ft->fd_;
+int p2tp::Open (const char* filename, const Sha1Hash& hash) {
+ FileTransfer* ft = new FileTransfer(filename, hash);
+ int fdes = ft->file_descriptor();
+ if (fdes>0) {
+ if (FileTransfer::files.size()<fdes)
+ FileTransfer::files.resize(fdes);
+ FileTransfer::files[fdes] = ft;
+ return fdes;
} else {
delete ft;
return -1;
}
-void p2tp::Close (int fdes) {
- // FIXME delete all channels
- delete FileTransfer::files[fdes];
- FileTransfer::files[fdes] = NULL;
-}
-
-
-
/*
for(int i=0; i<peak_hash_count; i++) {
bin64_t x = peaks[i], end = x.sibling();