From 24b686957e555119f0b0779611d339deaf0e162f Mon Sep 17 00:00:00 2001 From: victor Date: Fri, 6 Nov 2009 11:28:37 +0000 Subject: [PATCH] towards swarming git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@501 e16421f0-f15b-0410-abcd-98678b794739 --- datagram.cpp | 2 +- datagram.h | 78 ++++++++++++++++++++------------------- exec/leecher.cpp | 34 ++++++++++++++--- ext/ledbat_controller.cpp | 8 ++-- ext/send_control.cpp | 4 +- ext/simple_selector.cpp | 2 +- p2tp.cpp | 6 +-- p2tp.h | 27 +++++++------- sendrecv.cpp | 65 +++++++++++++++----------------- tests/connecttest.cpp | 60 +----------------------------- tests/dgramtest.cpp | 4 +- transfer.cpp | 47 ++++++++++++++++------- 12 files changed, 159 insertions(+), 178 deletions(-) diff --git a/datagram.cpp b/datagram.cpp index 3a50a98..f6325eb 100644 --- a/datagram.cpp +++ b/datagram.cpp @@ -21,7 +21,7 @@ namespace p2tp { tint Datagram::now = Datagram::Time(); tint Datagram::epoch = now; -uint32_t Datagram::Address::LOCALHOST = INADDR_LOOPBACK; +uint32_t Address::LOCALHOST = INADDR_LOOPBACK; uint64_t Datagram::dgrams_up=0, Datagram::dgrams_down=0, Datagram::bytes_up=0, Datagram::bytes_down=0; diff --git a/datagram.h b/datagram.h index d272558..35da147 100644 --- a/datagram.h +++ b/datagram.h @@ -46,46 +46,48 @@ typedef int64_t tint; #define INVALID_SOCKET -1 #endif + +struct Address { + struct sockaddr_in addr; + static uint32_t LOCALHOST; + void init(uint32_t ipv4=0, uint16_t port=0) { + memset(&addr,0,sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(ipv4); + } + Address() { init(); } + Address(const char* ip, uint16_t port) { + init(LOCALHOST,port); + inet_aton(ip,&(addr.sin_addr)); + } + Address(uint16_t port) { + init(LOCALHOST,port); + } + Address(uint32_t ipv4addr, uint16_t port) { + init(ipv4addr,port); + } + Address(const struct sockaddr_in& address) : addr(address) {} + uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); } + uint16_t port () const { return ntohs(addr.sin_port); } + operator sockaddr_in () const {return addr;} + bool operator == (const Address& b) const { + return addr.sin_family==b.addr.sin_family && + addr.sin_port==b.addr.sin_port && + addr.sin_addr.s_addr==b.addr.sin_addr.s_addr; + } + std::string str () const { + char s[32]; + sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff, + (ipv4()>>8)&0xff,ipv4()&0xff,port()); + return std::string(s); + } + bool operator != (const Address& b) const { return !(*this==b); } +}; + + struct Datagram { - struct Address { - struct sockaddr_in addr; - static uint32_t LOCALHOST; - void init(uint32_t ipv4=0, uint16_t port=0) { - memset(&addr,0,sizeof(struct sockaddr_in)); - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(ipv4); - } - Address() { init(); } - Address(const char* ip, uint16_t port) { - init(LOCALHOST,port); - inet_aton(ip,&(addr.sin_addr)); - } - Address(uint16_t port) { - init(LOCALHOST,port); - } - Address(uint32_t ipv4addr, uint16_t port) { - init(ipv4addr,port); - } - Address(const struct sockaddr_in& address) : addr(address) {} - uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); } - uint16_t port () const { return ntohs(addr.sin_port); } - operator sockaddr_in () const {return addr;} - bool operator == (const Address& b) const { - return addr.sin_family==b.addr.sin_family && - addr.sin_port==b.addr.sin_port && - addr.sin_addr.s_addr==b.addr.sin_addr.s_addr; - } - std::string str () const { - char s[32]; - sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff, - (ipv4()>>8)&0xff,ipv4()&0xff,port()); - return std::string(s); - } - bool operator != (const Address& b) const { return !(*this==b); } - }; - Address addr; SOCKET sock; int offset, length; diff --git a/exec/leecher.cpp b/exec/leecher.cpp index a68ebf9..abd05da 100644 --- a/exec/leecher.cpp +++ b/exec/leecher.cpp @@ -12,16 +12,38 @@ using namespace p2tp; +/** P2TP downloader. Params: root hash, filename, tracker ip/port, own ip/port */ int main (int argn, char** args) { - assert(0> 3; // van Jac + rtt_avg_ = (rtt_avg_*7 + (NOW-last_send_time_)) >> 3; // van Jac in_flight_--; } diff --git a/ext/send_control.cpp b/ext/send_control.cpp index 85d599e..593ef34 100644 --- a/ext/send_control.cpp +++ b/ext/send_control.cpp @@ -29,7 +29,7 @@ tint PingPongController::NextSendTime () { } void PingPongController::OnDataSent(bin64_t b) { - if ( (ch_->last_recv_time_ && ch_->last_recv_time_last_recv_time_ && ch_->last_recv_time_id,(int)ch_->data_out_.size(),cwnd_,Datagram::TimeStr(NextSendTime()), ch_->rtt_avg_); - return ch_->data_out_.size() < cwnd_ && Datagram::now >= NextSendTime(); + return ch_->data_out_.size() < cwnd_ && NOW >= NextSendTime(); } tint CwndController::NextSendTime () { diff --git a/ext/simple_selector.cpp b/ext/simple_selector.cpp index 790a205..2396b11 100644 --- a/ext/simple_selector.cpp +++ b/ext/simple_selector.cpp @@ -19,7 +19,7 @@ class SimpleSelector : public PeerSelector { public: SimpleSelector () { } - void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) { + void AddPeer (const Address& addr, const Sha1Hash& root) { peers.push_front(memo_t(addr,root)); //,root.fingerprint() !!! } Address GetPeer (const Sha1Hash& for_root) { diff --git a/p2tp.cpp b/p2tp.cpp index db8f530..485b2a2 100644 --- a/p2tp.cpp +++ b/p2tp.cpp @@ -45,7 +45,7 @@ Channel::Channel (FileTransfer* file, int socket, Address peer_addr) : this->id = channels.size(); channels.push_back(this); cc_ = new PingPongController(this); - RequeueSend(Datagram::now); + RequeueSend(NOW); } @@ -67,7 +67,7 @@ int Channel::EncodeID(int unscrambled) { } -int p2tp::Listen (Datagram::Address addr) { +int p2tp::Listen (Address addr) { int sock = Datagram::Bind(addr); if (sock!=INVALID_SOCKET) Channel::sockets[Channel::socket_count++] = sock; @@ -117,7 +117,7 @@ void p2tp::Close (int fd) { } -void p2tp::AddPeer (Datagram::Address address, const Sha1Hash& root) { +void p2tp::AddPeer (Address address, const Sha1Hash& root) { Channel::peer_selector->AddPeer(address,root); } diff --git a/p2tp.h b/p2tp.h index e7b46e1..69ea5f3 100644 --- a/p2tp.h +++ b/p2tp.h @@ -64,19 +64,19 @@ Messages namespace p2tp { + #define NOW Datagram::now struct tintbin { tint time; bin64_t bin; tintbin(const tintbin& b) : time(b.time), bin(b.bin) {} tintbin() : time(0), bin(bin64_t::NONE) {} tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {} - tintbin(bin64_t bin_) : time(Datagram::now), bin(bin_) {} + tintbin(bin64_t bin_) : time(NOW), bin(bin_) {} }; - typedef std::deque tbqueue; typedef std::deque binqueue; - typedef Datagram::Address Address; + typedef Address Address; typedef enum { P2TP_HANDSHAKE = 0, @@ -118,7 +118,7 @@ namespace p2tp { we use a rotating queue of bin completion events. */ //bin64_t RevealAck (uint64_t& offset); /** Rotating queue read for channels of this transmission. */ - uint32_t RevealChannel (int& i); + int RevealChannel (int& i); static FileTransfer* Find (const Sha1Hash& hash); static FileTransfer* file (int fd) { @@ -144,7 +144,7 @@ namespace p2tp { bins& ack_out () { return ack_out_; } int file_descriptor () const { return fd_; } PiecePicker& picker () { return *picker_; } - int channel_count () const { return handshake_in_.size(); } + int channel_count () const { return hs_in_.size(); } static int instance; // FIXME this smells @@ -188,7 +188,8 @@ namespace p2tp { char* error_; /** Channels working for this transfer. */ - std::deque handshake_in_; + binqueue hs_in_; + int hs_in_offset_; std::deque
pex_in_; /** Messages we are accepting. */ uint64_t cap_out_; @@ -226,8 +227,8 @@ namespace p2tp { class PeerSelector { public: - virtual void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) = 0; - virtual Datagram::Address GetPeer (const Sha1Hash& for_root) = 0; + virtual void AddPeer (const Address& addr, const Sha1Hash& root) = 0; + virtual Address GetPeer (const Sha1Hash& for_root) = 0; }; @@ -288,7 +289,7 @@ namespace p2tp { /** Channel id: index in the channel array. */ uint32_t id; /** Socket address of the peer. */ - Datagram::Address peer_; + Address peer_; /** The UDP socket fd. */ int socket_; /** Descriptor of the file in question. */ @@ -344,9 +345,9 @@ namespace p2tp { static Address tracker; static std::vector channels; - friend int Listen (Datagram::Address addr); + friend int Listen (Address addr); friend void Shutdown (int sock_des); - friend void AddPeer (Datagram::Address address, const Sha1Hash& root); + friend void AddPeer (Address address, const Sha1Hash& root); friend void SetTracker(const Address& tracker); friend int Open (const char*, const Sha1Hash&) ; // FIXME @@ -358,7 +359,7 @@ namespace p2tp { /*************** The top-level API ****************/ /** Start listening a port. Returns socket descriptor. */ - int Listen (Datagram::Address addr); + int Listen (Address addr); /** Run send/receive loop for the specified amount of time. */ void Loop (tint till); /** Stop listening to a port. */ @@ -372,7 +373,7 @@ namespace p2tp { /** Add a possible peer which participares in a given transmission. In the case root hash is zero, the peer might be talked to regarding any transmission (likely, a tracker, cache or an archive). */ - void AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO); + void AddPeer (Address address, const Sha1Hash& root=Sha1Hash::ZERO); void SetTracker(const Address& tracker); diff --git a/sendrecv.cpp b/sendrecv.cpp index 2d73dde..ac65e2d 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -72,7 +72,7 @@ bin64_t Channel::DequeueHint () { // TODO: resilience /*void Channel::CleanStaleHints () { while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED ) hint_out.pop_front(); // FIXME must normally clear fulfilled entries - tint timed_out = Datagram::now - cc_->RoundTripTime()*8; + tint timed_out = NOW - cc_->RoundTripTime()*8; while ( !hint_out.empty() && hint_out.front().time < timed_out ) { file().picker()->Snubbed(hint_out.front().bin); hint_out.pop_front(); @@ -99,7 +99,7 @@ void Channel::AddHandshake (Datagram& dgram) { void Channel::ClearStaleDataOut() { int oldsize = data_out_.size(); while ( data_out_.size() && data_out_.front().time < - Datagram::now - rtt_avg_ - dev_avg_*4 ) + NOW - rtt_avg_ - dev_avg_*4 ) data_out_.pop_front(); if (data_out_.size()!=oldsize) cc_->OnAckRcvd(bin64_t::NONE); @@ -113,6 +113,7 @@ void Channel::Send () { if ( is_established() ) { AddAck(dgram); AddHint(dgram); + AddPex(dgram); ClearStaleDataOut(); if (cc_->MaySendData()) data = AddData(dgram); @@ -127,7 +128,7 @@ void Channel::Send () { if (dgram.size()==4) // only the channel id; bare keep-alive data = bin64_t::ALL; cc_->OnDataSent(data); - last_send_time_ = Datagram::now; + last_send_time_ = NOW; RequeueSend(cc_->NextSendTime()); } @@ -136,7 +137,7 @@ void Channel::AddHint (Datagram& dgram) { while (!hint_out_.empty()) { tintbin f = hint_out_.front(); - if (f.timehs_in_.push_back(id); dprintf("%s #%i rtt init %lli\n",Datagram::TimeStr(),id,rtt_avg_); } bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL; @@ -272,9 +274,9 @@ void Channel::Recv (Datagram& dgram) { } } cc_->OnDataRecvd(data); - last_recv_time_ = Datagram::now; + last_recv_time_ = NOW; if (data!=bin64_t::ALL) - RequeueSend(Datagram::now); + RequeueSend(NOW); } @@ -294,9 +296,9 @@ bin64_t Channel::OnData (Datagram& dgram) { bool ok = file().OfferData(pos, data, length) ; dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset()); if (ok) { - data_in_ = tintbin(Datagram::now,pos); + data_in_ = tintbin(NOW,pos); if (last_recv_time_) { - tint dip = Datagram::now - last_recv_time_; + tint dip = NOW - last_recv_time_; dip_avg_ = ( dip_avg_*3 + dip ) >> 2; } return pos; @@ -312,7 +314,7 @@ void Channel::OnAck (Datagram& dgram) { dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,ackd_pos.layer(),ackd_pos.offset()); for (int i=0; i<8 && i> 2; dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2; dprintf("%s #%i rtt %lli dev %lli\n", @@ -360,32 +362,20 @@ void Channel::OnPex (Datagram& dgram) { uint32_t ipv4 = dgram.Pull32(); uint16_t port = dgram.Pull16(); Address addr(ipv4,port); - //if (peer_selector) - // peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash()); - file_->pex_in_.push_back(addr); - if (file_->pex_in_.size()>1000) - file_->pex_in_.pop_front(); - static int ENOUGH_PEERS_THRESHOLD = 20; - if (file_->channel_count()RevealChannel(i)) != -1 ) { - if (channels[i]->peer()==addr) - break; - } - if (chno==-1) - new Channel(file_,socket_,addr); - } + dprintf("%s #%i -pex %s\n",Datagram::TimeStr(),id,addr.str().c_str()); + file_->OnPexIn(addr); } void Channel::AddPex (Datagram& dgram) { - int ch = file().RevealChannel(this->pex_out_); - if (ch==-1) + int chid = file_->RevealChannel(pex_out_); + if (chid==-1 || chid==id) return; - Address a = channels[ch]->peer(); + Address a = channels[chid]->peer(); dgram.Push8(P2TP_PEX_ADD); dgram.Push32(a.ipv4()); dgram.Push16(a.port()); + dprintf("%s #%i +pex %s\n",Datagram::TimeStr(),id,a.str().c_str()); } @@ -396,7 +386,7 @@ void Channel::Recv (int socket) { RETLOG("datagram shorter than 4 bytes"); uint32_t mych = data.Pull32(); Sha1Hash hash; - Channel* channel; + Channel* channel = NULL; if (!mych) { // handshake initiated if (data.size()<1+4+1+4+Sha1Hash::SIZE) RETLOG ("incorrect size initial handshake packet"); @@ -411,6 +401,9 @@ void Channel::Recv (int socket) { if (!file) RETLOG ("hash unknown, no such file"); dprintf("%s #0 -hash ALL %s\n",Datagram::TimeStr(),hash.hex().c_str()); + for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++) + if (channels[*i] && channels[*i]->peer_==data.addr) + RETLOG("have a channel already"); channel = new Channel(file, socket, data.address()); } else { mych = DecodeID(mych); @@ -463,14 +456,14 @@ void Channel::Loop (tint howlong) { } if (send_time>limit) send_time = limit; - if (sender && send_time<=Datagram::now) { + if (sender && send_time<=NOW) { dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id, Datagram::TimeStr(send_time)); sender->Send(); pop_heap(send_queue.begin(), send_queue.end(), tblater); send_queue.pop_back(); } else { - tint towait = send_time - Datagram::now; + tint towait = send_time - NOW; dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait); int rd = Datagram::Wait(socket_count,sockets,towait); if (rd!=-1) @@ -489,7 +482,7 @@ void Channel::Loop (tint howlong) { if (send_queue.empty()) dprintf("%s empty send_queue\n", Datagram::TimeStr()); - while ( Datagram::now <= untiltime && !send_queue.empty() ) { + while ( NOW <= untiltime && !send_queue.empty() ) { // BUG BUG BUG no scheduled sends => just listen @@ -508,11 +501,11 @@ void Channel::Loop (tint howlong) { if (sender->next_send_time_!=next_send.time) continue; - if (wake_onuntiltime) wake_on = untiltime; - tint towait = min(wake_on-Datagram::now,TINT_SEC); + tint towait = min(wake_on-NOW,TINT_SEC); dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait); int rd = Datagram::Wait(socket_count,sockets,towait); if (rd!=-1) diff --git a/tests/connecttest.cpp b/tests/connecttest.cpp index dd9094a..f4b18b9 100644 --- a/tests/connecttest.cpp +++ b/tests/connecttest.cpp @@ -13,40 +13,6 @@ using namespace p2tp; -/*TEST(P2TP, ConnectTest) { - - uint8_t buf[1024]; - int f = open("test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); - for(char c='a'; c<='c'; c++) { - memset(buf,c,1024); - write(f,buf,1024); - } - close(f); - - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(7001); - addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - - int sock = p2tp::Init(7001); - ASSERT_TRUE(0state(),p2tp::Channel::HS_DONE); FIXME: status - ASSERT_EQ(p2tp::file_size(file),p2tp::file_size(copy)); - ASSERT_EQ(p2tp::File::DONE,copyobj.status()); - p2tp::Close(file); - p2tp::Close(copy); - - p2tp::Shutdown(sock); - -}*/ - TEST(P2TP,CwndTest) { @@ -57,35 +23,14 @@ TEST(P2TP,CwndTest) { ASSERT_EQ(0,stat("doc/sofi.jpg", &st)); int size = st.st_size;//, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ; - /*int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); - int size = 60<<10; //rand()%(1<<19) + (1<<19); - int sizek = (size>>10) + ((size&1023)?1:0); - char* b = (char*)malloc(size); - for(int i=0; i=0); - //ASSERT_TRUE(sock2>=0); int file = p2tp::Open("doc/sofi.jpg"); FileTransfer* fileobj = FileTransfer::file(file); FileTransfer::instance++; - p2tp::SetTracker(Datagram::Address("127.0.0.1",7001)); + p2tp::SetTracker(Address("127.0.0.1",7001)); int copy = p2tp::Open("doc/sofi-copy.jpg",fileobj->root_hash()); @@ -100,15 +45,12 @@ TEST(P2TP,CwndTest) { p2tp::Close(copy); p2tp::Shutdown(sock1); - //p2tp::Release(sock2); } int main (int argc, char** argv) { - //bin::init(); - //bins::init(); google::InitGoogleLogging(argv[0]); testing::InitGoogleTest(&argc, argv); int ret = RUN_ALL_TESTS(); diff --git a/tests/dgramtest.cpp b/tests/dgramtest.cpp index 4a58d8b..f12f5b1 100644 --- a/tests/dgramtest.cpp +++ b/tests/dgramtest.cpp @@ -21,7 +21,7 @@ TEST(Datagram, BinaryTest) { addr.sin_family = AF_INET; addr.sin_port = htons(7001); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - Datagram d(socket,addr); //Datagram::Address(7001)); + Datagram d(socket,addr); //Address(7001)); const char * text = "text"; const uint8_t num8 = 0xab; const uint16_t num16 = 0xabcd; @@ -75,7 +75,7 @@ TEST(Datagram,TwoPortTest) { addr2.sin_port = htons(10002); addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);*/ - Datagram send(sock1,Datagram::Address(10002)); + Datagram send(sock1,Address(10002)); send.Push32(1234); send.Send(); diff --git a/transfer.cpp b/transfer.cpp index 059dc96..f0983f4 100644 --- a/transfer.cpp +++ b/transfer.cpp @@ -31,7 +31,7 @@ int FileTransfer::instance = 0; FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) : root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false), peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0), - complete_(0), completek_(0), seq_complete_(0) //, data_in_off_(0) + complete_(0), completek_(0), seq_complete_(0), hs_in_offset_(0) { fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); if (fd_<0) @@ -339,13 +339,6 @@ FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) { } -void FileTransfer::OnPexIn (const Address& addr) { - pex_in_.push_back(addr); - if (pex_in_.size()>1000) - pex_in_.pop_front(); -} - - std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix) { std::string tempfile = gettmpdir(); @@ -371,11 +364,39 @@ std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std }*/ -uint32_t FileTransfer::RevealChannel (int& offset) { - while (offsetfile_!=this) ) - offset++; - return offset < Channel::channels.size() ? offset : -1; +void FileTransfer::OnPexIn (const Address& addr) { + for(int i=0; ifile_==this && c->peer_==addr) + return; // already connected + } + if (hs_in_.size()<20) { + new Channel(this,Channel::sockets[0],addr); + } else { + pex_in_.push_back(addr); + if (pex_in_.size()>1000) + pex_in_.pop_front(); + } +} + + +int FileTransfer::RevealChannel (int& pex_out_) { + pex_out_ -= hs_in_offset_; + if (pex_out_<0) + pex_out_ = 0; + while (pex_out_file_==this) { + pex_out_ += hs_in_offset_ + 1; + return c->id; + } else { + hs_in_[pex_out_] = hs_in_[0]; + hs_in_.pop_front(); + hs_in_offset_++; + } + } + pex_out_ += hs_in_offset_; + return -1; } -- 2.20.1