From 80a3848790b2c72c2e24a5e30475953cccff2ec5 Mon Sep 17 00:00:00 2001 From: victor Date: Sun, 18 Oct 2009 18:20:22 +0000 Subject: [PATCH] compiles again git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@438 e16421f0-f15b-0410-abcd-98678b794739 --- ChangeLog | 4 + bin64.cpp | 10 + bin64.h | 3 + bins.h | 13 +- datagram.cpp | 13 +- datagram.h | 27 ++- ext/ledbat_controller.cpp | 55 ++++- ext/simple_selector.cpp | 23 +- hashtree.cpp | 2 +- hashtree.h | 2 +- p2tp.cpp | 248 +++++----------------- p2tp.h | 86 +++++--- sendrecv.cpp | 426 +++++++++++++++++++------------------- tests/bin64test.cpp | 8 + tests/connecttest.cpp | 48 ++--- tests/dgramtest.cpp | 2 +- transfer.cpp | 56 +++-- 17 files changed, 480 insertions(+), 546 deletions(-) diff --git a/ChangeLog b/ChangeLog index d272883..ad12850 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +0.003 - This is not a release as well - 18 Oct 2009 + + - but at least, it compiles now + 0.002 - This is not a release - 7 Oct 2009 - it does not even compile, committed for reading purposes only diff --git a/bin64.cpp b/bin64.cpp index 90524d5..599c4df 100644 --- a/bin64.cpp +++ b/bin64.cpp @@ -10,6 +10,16 @@ const uint64_t bin64_t::NONE = 0xffffffffffffffffULL; const uint64_t bin64_t::ALL = 0x7fffffffffffffffULL; +const uint32_t bin64_t::NONE32 = 0xffffffffU; +const uint32_t bin64_t::ALL32 = 0x7fffffffU; + +uint32_t bin64_t::to32() const { + if (v<0xffffffff && v!=0x7fffffff) + return (uint32_t)v; + if (v==ALL) + return ALL32; + return NONE32; +} bin64_t bin64_t::next_dfsio (uint8_t floor) { /*while (ret.is_right()) diff --git a/bin64.h b/bin64.h index a9a7698..f6b2bf3 100644 --- a/bin64.h +++ b/bin64.h @@ -17,6 +17,8 @@ struct bin64_t { uint64_t v; static const uint64_t NONE; static const uint64_t ALL; + static const uint32_t NONE32; + static const uint32_t ALL32; bin64_t() : v(NONE) {} bin64_t(const bin64_t&b) : v(b.v) {} @@ -24,6 +26,7 @@ struct bin64_t { bin64_t(uint8_t layer, uint64_t offset) : v( (offset<<(layer+1)) | ((1ULL<> 3; // van Jac + in_flight_--; + } + + ~LedbatController() { + } }; diff --git a/ext/simple_selector.cpp b/ext/simple_selector.cpp index e0d54e8..a45c76f 100644 --- a/ext/simple_selector.cpp +++ b/ext/simple_selector.cpp @@ -7,28 +7,33 @@ * */ +#include #include "p2tp.h" using namespace p2tp; class SimpleSelector : public PeerSelector { - typedef std::pair memo_t; - std::queue peers; + typedef std::pair memo_t; + typedef std::deque peer_queue_t; + peer_queue_t peers; public: - virtual void PeerKnown (const Sha1Hash& root, struct sockaddr_in& addr) { - peers.push_front(memo_t(addr,root.fingerprint())); + SimpleSelector () { } - virtual sockaddr_in GetPeer (const Sha1Hash& for_root) { - uint32_t fp = for_root.fingerprint(); - for(std::queue::iterator i=peers.begin(); i!=peers.end(); i++) - if (i->second==fp) { + void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) { + peers.push_front(memo_t(addr,root)); //,root.fingerprint() !!! + } + Address GetPeer (const Sha1Hash& for_root) { + //uint32_t fp = for_root.fingerprint(); + for(peer_queue_t::iterator i=peers.begin(); i!=peers.end(); i++) + if (i->second==for_root) { i->second = 0; sockaddr_in ret = i->first; while (peers.begin()->second==0) peers.pop_front(); return ret; } + return Address(); } }; -static Channel::peer_selector = new SimpleSelector(); \ No newline at end of file +PeerSelector* Channel::peer_selector = new SimpleSelector(); \ No newline at end of file diff --git a/hashtree.cpp b/hashtree.cpp index 4e4d201..18c88d3 100644 --- a/hashtree.cpp +++ b/hashtree.cpp @@ -41,7 +41,7 @@ Sha1Hash::Sha1Hash(bool hex, const char* hash) { memcpy(bits,hash,SIZE); } -string Sha1Hash::hex() { +string Sha1Hash::hex() const { char hex[HASHSZ*2+1]; for(int i=0; i Channel::channels(1); -std::vector File::files(4); -int* Channel::sockets_ = (int*)malloc(40); -int Channel::sock_count_ = 0; +int Channel::sockets[8] = {0,0,0,0,0,0,0,0}; +int Channel::socket_count = 0; -Channel::Channel (int fd_, int socket, struct sockaddr_in peer_, - uint32_t peer_channel_, uint64_t supports_) : - fd(fd_), peer(peer_), peer_channel_id(peer_channel_), ack_out(0), - peer_status_(File::EMPTY), socket_(socket) +Channel::Channel (FileTransfer* file, int socket, struct sockaddr_in peer_addr) : + file_(file), peer(peer_addr), peer_channel_id(0), + socket_(socket) // FIXME { this->id = channels.size(); channels.push_back(this); - DLOG(INFO)<<"new channel "<0) ::close(fd); +int Channel::DecodeID(int scrambled) { + return scrambled; +} +int Channel::EncodeID(int unscrambled) { + return unscrambled; } -bool File::OfferHash (bin pos, const Sha1Hash& hash) { - HashTree::hashres_t res = hashes.offer(pos,hash); - if (res==HashTree::PEAK_ACCEPT) { // file size is finally known - ftruncate(fd, size()); - LOG(INFO)<::iterator i=files.begin(); i!=files.end(); i++) - if (*i && (*i)->hashes.root==hash) - return *i; - return NULL; +void p2tp::Shutdown (int sock_des) { + for(int i=0; iFileTransfer::files.size() && FileTransfer::files[fd]) + delete FileTransfer::files[fd]; } -std::ostream& p2tp::operator << (std::ostream& os, const Channel& ch) { - return os<<'{'<'<AddPeer(address,root); } -void Channel::Recv (int socket) { - Datagram data(socket); - data.Recv(); - //LOG(INFO)<<" RECV "<fd, socket, data.address(), peerch); - } else { - mych = DecodeID(mych); - if (mych>=channels.size()) - RETLOG ("invalid channel id"); - channel = channels[mych]; - id = channel->id; - if (channel->peer.sin_addr.s_addr != data.address().sin_addr.s_addr) - RETLOG ("invalid peer address"); - if (channel->peer.sin_port!=data.address().sin_port) - RETLOG ("invalid peer port"); - if (!channel->peer_channel_id) { // handshake response - if (data.size()<5) - RETLOG ("insufficient return handshake length"); - type = data.Pull8(); - if (type) - RETLOG ("it is not a handshake, after all"); - channel->peer_channel_id = data.Pull32(); - LOG(INFO)<<"out channel is open: "<<*channel; - } else if (channel->cc_.avg_rtt()==0) { - LOG(INFO)<<"in channel is open: "<<*channel; - } - if (channel->cc_.avg_rtt()==0) - channel->cc_.RttSample(Datagram::now - channel->last_send_time + 1); - channel->Recv(data); - } - channel->Send(); +size_t p2tp::Size (int fdes) { + if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes]) + return FileTransfer::files[fdes]->size(); + else + return 0; } -void Channel::Tick () { - // choking/unchoking - // keepalives - // ack timeout - // if unchoked: don't bother - // whether to unchoke - // reevaluate reciprocity - // otherwise, send update (if needed) - // otherwise, send a keepalive - CleanStaleHintIn(); - CleanStaleHintOut(); - if (last_send_time && Datagram::now-last_send_time>=Channel::TIMEOUT/2) - Send(); +size_t p2tp::Complete (int fdes) { + if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes]) + return FileTransfer::files[fdes]->complete(); + else + return 0; } +size_t p2tp::SeqComplete (int fdes) { + if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes]) + return FileTransfer::files[fdes]->seq_complete(); + else + return 0; +} + /**

P2TP handshake

Basic rules:
    @@ -188,92 +125,3 @@ void Channel::Tick () { Note: */ - -void Channel::Loop (tint time) { - - tint untiltime = Datagram::Time()+time; - - while ( Datagram::now <= untiltime ) { - - tint towait = min(untiltime,Datagram::now+TINT_1SEC) - Datagram::now; - int rd = Datagram::Wait(sock_count_,sockets_,towait); - if (rd!=-1) - Recv(rd); - - /*if (Datagram::now-last_tick>TINT_1SEC) { - for(int i=0; iTick(); - last_tick = Datagram::now; - }*/ - - } - -} - -int p2tp::Open (const char* filename) { - int fd = ::open(filename,O_RDONLY); - if (fd<0) - return -1; - if (File::files.size()size(); } - -void p2tp::Close (int fid) { - if (!File::files[fid]) - return; - delete File::files[fid]; - File::files[fid] = NULL; -} - - -int p2tp::Connect (int fd, int sock, const struct sockaddr_in& addr, uint32_t peerch) { - Channel *ch = new Channel(fd,sock,addr,peerch); - ch->Send(); - return ch->id; -} - -void p2tp::Loop (tint time) { - Channel::Loop(time); -} - -int p2tp::Init (int portno) { - int sock = Datagram::Bind(portno); - if (sock>0) - Channel::sockets_[Channel::sock_count_++] = sock; - return sock; -} - -void p2tp::Shutdown (int sock) { - int i=0; - while (ipos.width(); - return ret; -} - diff --git a/p2tp.h b/p2tp.h index bdb39de..8623285 100644 --- a/p2tp.h +++ b/p2tp.h @@ -68,6 +68,7 @@ namespace p2tp { typedef std::deque tbqueue; typedef std::deque binqueue; + typedef Datagram::Address Address; typedef enum { P2TP_HANDSHAKE = 0, @@ -91,7 +92,7 @@ namespace p2tp { public: /** Open/submit/retrieve a file. */ - FileTransfer(const Sha1Hash& _root_hash, const char *file_name); + FileTransfer(const char *file_name, const Sha1Hash& _root_hash=Sha1Hash::ZERO); /** Close everything. */ ~FileTransfer(); @@ -102,9 +103,9 @@ namespace p2tp { void OfferHash (bin64_t pos, const Sha1Hash& hash); /** Offer data; the behavior is the same as with a hash: accept or remember or drop. Returns true => ACK is sent. */ - bool OfferData (bin64_t bin, uint8_t* data, size_t length); + bool OfferData (bin64_t bin, const uint8_t* data, size_t length); - bin64_t PickBinForRequest (bins& from, uint8_t layer) ; static FileTransfer* Find (const Sha1Hash& hash); + static FileTransfer* Find (const Sha1Hash& hash); static FileTransfer* file (int fd) { return fd channels; - int sockets[4]; - int sock_count; - static tint last_tick; + static int sockets[8]; + static int socket_count; + static tint last_tick; + friend int Listen (Datagram::Address addr); + friend void Shutdown (int sock_des); + friend void AddPeer (Datagram::Address address, const Sha1Hash& root); }; + + /*************** The top-level API ****************/ + /** Start listening a port. Returns socket descriptor. */ + int Listen (Datagram::Address addr); + /** Run send/receive loop for the specified amount of time. */ + void Loop (tint till); + /** Stop listening to a port. */ + void Shutdown (int sock_des); + + /** Open a file, start a transmission; fill it with content for a given root hash; + in case the hash is omitted, the file is a fresh submit. */ + int Open (const char* filename, const Sha1Hash& hash=Sha1Hash::ZERO) ; + /** Close a file and a transmission. */ + void Close (int fd) ; + /** Add a possible peer which participares in a given transmission. In the case + root hash is zero, the peer might be talked to regarding any transmission + (likely, a tracker, cache or an archive). */ + void AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO); + + /** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte + before the transmission is complete. */ + size_t Size (int fdes); + /** Returns the amount of retrieved and verified data, in bytes. + A 100% complete transmission has Size()==Complete(). */ + size_t Complete (int fdes); + /** Returns the number of bytes that are complete sequentially, starting from the + beginning, till the first not-yet-retrieved packet. */ + size_t SeqComplete (int fdes); + + //uint32_t Width (const tbinvec& v); } diff --git a/sendrecv.cpp b/sendrecv.cpp index b2a16aa..58c7fdd 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -10,236 +10,163 @@ #include #include "p2tp.h" +#include "ext/dummy_controller.cpp" + using namespace std; using namespace p2tp; - void Channel::AddPeakHashes (Datagram& dgram) { - const std::vector& peaks = file().hashes.peak_hashes(); - for(int i=0; irtt_avg()*8; while ( !hint_out.empty() && hint_out.front().time < timed_out ) { - file().hint_out -= hint_out.front().pos; - hint_out.pop_front(); // TODO: ignore count + file().picker()->Snubbed(hint_out.front().bin); + hint_out.pop_front(); } } -void Channel::CleanStaleHintIn () { - // do I need it? -} - -void Channel::SendHandshake () { - Datagram dgram(socket_,peer); - dgram.Push32(peer_channel_id); +void Channel::AddHandshake (Datagram& dgram) { dgram.Push8(P2TP_HANDSHAKE); dgram.Push32(EncodeID(id)); if (!peer_channel_id) { // initiating dgram.Push8(P2TP_HASH); - dgram.Push32(bin::ALL); - dgram.PushHash(file().hashes.root); - AddAck(dgram); - } else { // responding - AddAck(dgram); + dgram.Push32(bin64_t::ALL32); + dgram.PushHash(file().root_hash()); } - DLOG(INFO)<<"#"<id_string(); } -void Channel::SendData () { - CleanStaleDataOut(0); - int round = 0; +tint Channel::Send () { Datagram dgram(socket_,peer); dgram.Push32(peer_channel_id); - AddAck(dgram); - AddHint(dgram); - while (cc_.cwnd()>data_out.size()) { - AddData(dgram); // always the last: might be tail block - if (dgram.size()==4 && Datagram::now-last_send_timestd::max(1,limit)) - need = need.left(); - return need; + if ( is_established() ) { + AddAck(dgram); + AddHint(dgram); + if (cc->free_cwnd() && Datagram::now>=cc->next_send_time()) { + bin64_t data = AddData(dgram); + cc->OnDataSent(data); + } + } else { + AddHandshake(dgram); + } + DLOG(INFO)<<"#"<next_send_time(); } void Channel::AddHint (Datagram& dgram) { - CleanStaleHintOut(); - int onesec = TINT_1SEC/cc_.data_in_rate(); - if (Width(hint_out)bin.width(); + uint64_t kbps = TINT_SEC * cc->peer_cwnd() / cc->rtt_avg(); + if (outstanding>kbps) // have enough + return; + uint8_t layer = 0; + while( (1<Pick(ack_in,layer); + if (hint==bin64_t::NONE) + return; + dgram.Push8(P2TP_HINT); + dgram.Push32(hint); + hint_out.push_back(tintbin(Datagram::now,hint)); + DLOG(INFO)<<"#"<id_string()<<" no idea what to send"; + cc->OnDataSent(bin64_t::NONE); + return bin64_t::NONE; } - if (peer_status()==File::EMPTY && file().history.size()) //FIXME + if (ack_in.empty() && file().size()) AddPeakHashes(dgram); AddUncleHashes(dgram,tosend); uint8_t buf[1024]; - size_t r = pread(fd,buf,1024,tosend.offset()<<10); // TODO: ??? corrupted data, retries + size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); // TODO: ??? corrupted data, retries if (r<0) { PLOG(ERROR)<<"error on reading"; return 0; } - if (dgram.space()=r+4+1); dgram.Push8(P2TP_DATA); dgram.Push32(tosend); dgram.Push(buf,r); - data_out.push_back(tintbin(Datagram::Time(),tosend)); DLOG(INFO)<<"#"<id_string() << " malformed datagram"; return; } } @@ -260,71 +190,135 @@ void Channel::Recv (Datagram& dgram) { void Channel::OnHash (Datagram& dgram) { - bin pos = dgram.Pull32(); + bin64_t pos = dgram.Pull32(); Sha1Hash hash = dgram.PullHash(); - if (file().OfferHash(pos,hash)) - DLOG(INFO)<<"#"<=file().packet_size()) - RETLOG("DATA pos out of bounds"); - Sha1Hash hash(data,length); - if (file().OfferHash(pos, hash)) { - //memcpy(file->data+offset*KILO, - //channel->datagram->data+channel->datagram->offset,KILO); - pwrite(fd,data,length,pos.offset()*1024); // TODO; if (last) ftruncate - if (pos==file().hashes.data_mass()) { - int lendiff = 1024-length; - ftruncate(fd, file().size()-lendiff); - } - data_in_ = pos; - file().ack_out |= pos; - file().history.push_back(file().ack_out.get(pos)); - if (file().history.size()==file().packet_size()+1) // FIXME: encapsulate - file().status_ = File::DONE; - cc_.OnCongestionEvent(CongestionControl::DATA_EV); - //DLOG(INFO)<<*this<<" DATA< "<OnDataRecvd(pos); + CleanStaleHints(); } void Channel::OnAck (Datagram& dgram) { - // FIXME check whether it is in the range - bin pos = dgram.Pull32(); + // note: no bound checking + bin64_t pos = dgram.Pull32(); DLOG(INFO)<<"#"<file().hashes.data_mass()) { - LOG(WARNING) << "out-of-bounds ACK"; - return; - } - ack_in |= pos; - - CleanStaleDataOut(pos); - - if (peer_status_==File::EMPTY) { - peer_status_ = File::IN_PROGRESS; - } else if (peer_status_==File::IN_PROGRESS) { - // FIXME: FINISHED ack_in_.filled(file().size()) - } + ack_in.set(pos); +} + + +void Channel::OnAckTs (Datagram& dgram) { + bin64_t pos = dgram.Pull32(); + tint ts = dgram.Pull64(); + DLOG(INFO)<<"#"<OnAckRcvd(tintbin(ts,pos)); } void Channel::OnHint (Datagram& dgram) { - bin hint = dgram.Pull32(); - hint_in.push_back(tintbin(Datagram::now,hint)); + bin64_t hint = dgram.Pull32(); + hint_in.push_back(hint); +} + + +void Channel::OnHandshake (Datagram& dgram) { + peer_channel_id = dgram.Pull32(); + // FUTURE: channel forking +} + + +void Channel::OnPex (Datagram& dgram) { + uint32_t addr = dgram.Pull32(); + uint16_t port = dgram.Pull16(); + if (peer_selector) + peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash()); +} + + +void Channel::Recv (int socket) { + Datagram data(socket); + data.Recv(); + if (data.size()<4) + RETLOG("datagram shorter than 4 bytes"); + uint32_t mych = data.Pull32(); + Sha1Hash hash; + Channel* channel; + if (!mych) { // handshake initiated + if (data.size()<1+4+1+4+Sha1Hash::SIZE) + RETLOG ("incorrect size initial handshake packet"); + uint8_t hashid = data.Pull8(); + if (hashid!=P2TP_HASH) + RETLOG ("no hash in the initial handshake"); + bin pos = data.Pull32(); + if (pos!=bin64_t::ALL32) + RETLOG ("that is not the root hash"); + hash = data.PullHash(); + FileTransfer* file = FileTransfer::Find(hash); + if (!file) + RETLOG ("hash unknown, no such file"); + channel = new Channel(file, socket, data.address()); + } else { + mych = DecodeID(mych); + if (mych>=channels.size()) + RETLOG ("invalid channel id"); + channel = channels[mych]; + if (!channel) + RETLOG ("channel is closed"); + if (channel->peer != data.address()) + RETLOG ("invalid peer address"); + channel->Recv(data); + } + channel->Send(); +} + + +bool tblater (const tintbin& a, const tintbin& b) { + return a.time > b.time; } +void Channel::Loop (tint time) { + + tint untiltime = Datagram::Time()+time; + tbqueue send_queue; + for(int i=0; iSend(); + if (next_time!=TINT_NEVER) { + send_queue.push_back(tintbin(next_time,chid)); + push_heap(send_queue.begin(),send_queue.end(),tblater); + } else { + delete sender; + channels[chid] = NULL; + } + } + + } + +} diff --git a/tests/bin64test.cpp b/tests/bin64test.cpp index 3a2bb44..7f3c44d 100644 --- a/tests/bin64test.cpp +++ b/tests/bin64test.cpp @@ -76,6 +76,14 @@ TEST(Bin64Test, Iteration) { EXPECT_EQ(bin64_t(3,0),i); } +TEST(Bin64Test, Bits) { + bin64_t all = bin64_t::ALL, none = bin64_t::NONE, big = bin64_t(40,18); + uint32_t a32 = all.to32(), n32 = none.to32(), b32 = big.to32(); + EXPECT_EQ(0x7fffffff,a32); + EXPECT_EQ(0xffffffff,n32); + EXPECT_EQ(bin64_t::NONE32,b32); +} + int main (int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/tests/connecttest.cpp b/tests/connecttest.cpp index d831a5e..9cecd7a 100644 --- a/tests/connecttest.cpp +++ b/tests/connecttest.cpp @@ -11,6 +11,7 @@ #include #include "p2tp.h" +using namespace p2tp; /*TEST(P2TP, ConnectTest) { @@ -50,13 +51,15 @@ TEST(P2TP,CwndTest) { int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); int size = rand()%(1<<19) + (1<<19); + int sizek = (size>>10) + ((size&1023)?1:0); char* b = (char*)malloc(size); for(int i=0; i=0); - ASSERT_TRUE(sock2>=0); + //ASSERT_TRUE(sock2>=0); + p2tp::AddPeer(Datagram::Address("127.0.0.1",7001)); + int file = p2tp::Open("big_test_file"); - p2tp::File& fileobj = * p2tp::File::file(file); - - int copy = p2tp::Open(fileobj.root_hash(),"big_test_file_copy"); - p2tp::File& copyobj = * p2tp::File::file(copy); + FileTransfer* fileobj = FileTransfer::file(file); + + int copy = p2tp::Open("big_test_file_copy",fileobj->root_hash()); - int chan = p2tp::Connect(copy,sock1,addr2); - - p2tp::Loop(); - p2tp::Loop(); - p2tp::Channel& sendch = * p2tp::Channel::channel(chan+1); - while (copyobj.status()!=p2tp::File::DONE) { - p2tp::Loop(); - LOG(INFO)<Pick(from,layer); -} - - void FileTransfer::LoadPeaks () { char file_name[1024]; sprintf(file_name,PEAK_FILE_TEMPLATE,root_hash().hex().c_str(),instance); @@ -154,6 +152,14 @@ void FileTransfer::Submit () { } +bin64_t FileTransfer::peak_for (bin64_t pos) const { + int pi=0; + while (pifd_>0) { - if (FileTransfer::files.size()fd_) - FileTransfer::files.resize(ft->fd_); - FileTransfer::files[ft->fd_] = ft; - return ft->fd_; +int p2tp::Open (const char* filename, const Sha1Hash& hash) { + FileTransfer* ft = new FileTransfer(filename, hash); + int fdes = ft->file_descriptor(); + if (fdes>0) { + if (FileTransfer::files.size()