From: victor Date: Fri, 20 Nov 2009 09:11:58 +0000 (+0000) Subject: reworking queues X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=9db961abb4e5362c8b44ba30099575e5f4e0b819;p=swift-upb.git reworking queues git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@607 e16421f0-f15b-0410-abcd-98678b794739 --- diff --git a/bins.cpp b/bins.cpp index 23e942f..cfca3a2 100644 --- a/bins.cpp +++ b/bins.cpp @@ -421,6 +421,17 @@ uint64_t bins::seq_length () { return i.pos.base_offset() + (*i==FILLED ? 1 : 0); } + +bool bins::is_empty (bin64_t range) { + if (range==bin64_t::ALL) + return !deep(0) && !halves[0]; + iterator i(this,range,false); + while ( i.pos!=range && (i.deep() || !i.solid()) ) + i.towards(range); + return !i.deep() && *i==EMPTY; +} + + binheap::binheap() { size_ = 32; heap_ = (bin64_t*) malloc(size_*sizeof(bin64_t)); diff --git a/bins.h b/bins.h index 45ef4ea..8275c73 100644 --- a/bins.h +++ b/bins.h @@ -46,7 +46,7 @@ public: uint64_t mass (); - bool is_empty () const { return !deep(0) && !halves[0]; } + bool is_empty (bin64_t range=bin64_t::ALL) ; void clear (); diff --git a/datagram.cpp b/datagram.cpp index f26a78f..ecb72df 100644 --- a/datagram.cpp +++ b/datagram.cpp @@ -23,7 +23,7 @@ namespace p2tp { tint Datagram::now = Datagram::Time(); tint Datagram::start = now; -tint Datagram::epoch = now/360000000LL*360000000LL; +tint Datagram::epoch = now/360000000LL*360000000LL; // make logs mergeable uint32_t Address::LOCALHOST = INADDR_LOOPBACK; uint64_t Datagram::dgrams_up=0, Datagram::dgrams_down=0, Datagram::bytes_up=0, Datagram::bytes_down=0; diff --git a/datagram.h b/datagram.h index bc3c52e..27a1caf 100644 --- a/datagram.h +++ b/datagram.h @@ -93,11 +93,13 @@ struct Address { addr.sin_port==b.addr.sin_port && addr.sin_addr.s_addr==b.addr.sin_addr.s_addr; } - std::string str () const { - char s[32]; - sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff, + const char* str () const { + static char rs[4][32]; + static int i; + i = (i+1) & 3; + sprintf(rs[i],"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff, (ipv4()>>8)&0xff,ipv4()&0xff,port()); - return std::string(s); + return rs[i]; } bool operator != (const Address& b) const { return !(*this==b); } }; diff --git a/do_tests.sh b/do_tests.sh index d478ec6..d87ec38 100755 --- a/do_tests.sh +++ b/do_tests.sh @@ -1,5 +1,9 @@ #!/bin/bash for tst in `ls tests/*test`; do - $tst + if echo $tst; $tst > $tst.log; then + echo $tst OK + else + echo $tst FAIL + fi done diff --git a/exec/SConscript b/exec/SConscript index 5b2dd64..f195cef 100644 --- a/exec/SConscript +++ b/exec/SConscript @@ -22,12 +22,12 @@ else: print "tests: libpath is",libpath -env.Program( - target='trial', - source=['trial.cpp'], - CPPPATH=cpppath, - LIBS=libs, - LIBPATH=libpath ) +#env.Program( +# target='trial', +# source=['trial.cpp'], +# CPPPATH=cpppath, +# LIBS=libs, +# LIBPATH=libpath ) env.Program( target='seeder', diff --git a/exec/leecher.cpp b/exec/leecher.cpp index 9f02950..c07253f 100644 --- a/exec/leecher.cpp +++ b/exec/leecher.cpp @@ -35,7 +35,7 @@ int main (int argn, char** args) { Address tracker(args[3]), bindaddr; if (tracker==Address()) { - fprintf(stderr,"Tracker address format: [1.2.3.4:]12345\n"); + fprintf(stderr,"Tracker address format: [1.2.3.4:]12345, not %s\n",args[3]); return -2; } if (argn>=5) diff --git a/exec/seeder.cpp b/exec/seeder.cpp index 0cff2d7..629f7d4 100644 --- a/exec/seeder.cpp +++ b/exec/seeder.cpp @@ -36,7 +36,7 @@ int main (int argn, char** args) { } assert(0max_width) hint = hint.left(); assert(ack_hint_out_.get(hint)==bins::EMPTY); - if (hint.offset() && file().ack_out().get(hint)!=bins::EMPTY) { // FIXME DEBUG remove - eprintf("bogus hint: %s\n",hint.str()); - exit(1); - } ack_hint_out_.set(hint); + hint_out_.push_back(hint); return hint; } - void Received (bin64_t bin) { - ack_hint_out_.set(bin); - } - - void Expired (bin64_t bin) { - ack_hint_out_.copy_range(file().ack_out(), bin); - } - }; diff --git a/hashtree.cpp b/hashtree.cpp index 1c17153..fd3a713 100644 --- a/hashtree.cpp +++ b/hashtree.cpp @@ -173,7 +173,8 @@ void HashTree::RecoverProgress () { if (hashes_[pos]==Sha1Hash::ZERO) continue; size_t rd = read(fd_,buf,1<<10); - assert(rd==(1<<10) || p==packet_size()-1); // FIXME BUG + if (rd!=(1<<10) && p!=packet_size()-1) + break; if (rd==(1<<10) && !memcmp(buf, zeros, rd) && hashes_[pos]!=kilo_zero) // FIXME continue; @@ -182,6 +183,8 @@ void HashTree::RecoverProgress () { ack_out_.set(pos); completek_++; complete_+=rd; + if (rd!=(1<<10) && p==packet_size()-1) + size_ = ((sizek_-1)<<10) + rd; } } diff --git a/p2tp.cpp b/p2tp.cpp index 9211e16..2ff6504 100644 --- a/p2tp.cpp +++ b/p2tp.cpp @@ -43,14 +43,14 @@ Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) : data_out_cap_(bin64_t::ALL), last_send_data_time_(0), last_recv_data_time_(0), own_id_mentioned_(false), next_send_time_(0), last_send_time_(0), last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC), - data_in_dbl_(bin64_t::NONE) + data_in_dbl_(bin64_t::NONE), hint_out_size_(0) { if (peer_==Address()) peer_ = tracker; this->id = channels.size(); channels.push_back(this); cc_ = new PingPongController(this); - dprintf("%s #%i init %s\n",tintstr(),id,peer_.str().c_str()); + dprintf("%s #%i init %s\n",tintstr(),id,peer_.str()); Schedule(NOW); // FIXME ugly } diff --git a/p2tp.h b/p2tp.h index 90ea4b6..ebef7bc 100644 --- a/p2tp.h +++ b/p2tp.h @@ -192,8 +192,6 @@ namespace p2tp { public: virtual void Randomize (uint64_t twist) = 0; virtual bin64_t Pick (bins& offered, uint64_t max_width, tint expires) = 0; - virtual void Expired (bin64_t bin) = 0; - virtual void Received (bin64_t bin) = 0; }; @@ -284,6 +282,7 @@ namespace p2tp { tbqueue hint_in_; /** Hints sent (to detect and reschedule ignored hints). */ tbqueue hint_out_; + uint64_t hint_out_size_; /** The congestion control strategy. */ SendController *cc_; /** Types of messages the peer accepts. */ @@ -309,7 +308,7 @@ namespace p2tp { bin64_t DequeueHint(); void ClearStaleDataOut (); void CleanStaleHintOut(); - void CleanFulfilledHints(bin64_t pos); + void CleanHintOut(bin64_t pos); void CleanFulfilledDataOut(bin64_t pos); void Schedule(tint send_time); @@ -373,9 +372,6 @@ namespace p2tp { //uint32_t Width (const tbinvec& v); -// FIXME kill this macro -#define RETLOG(str) { fprintf(stderr,"%s\n",str); return; } - /** Must be called by any client using the library */ void LibraryInit(void); diff --git a/sendrecv.cpp b/sendrecv.cpp index c328c18..da3fe14 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -15,13 +15,8 @@ using namespace std; // FIXME remove /* TODO 25 Oct 18:55 - - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop) - - ANY_LAYER - range: ALL - randomized testing of advanced ops (new testcase) - - PeerCwnd() - - bins hint_out_, tbqueue hint_out_ts_ - */ void Channel::AddPeakHashes (Datagram& dgram) { @@ -125,7 +120,7 @@ void Channel::Send () { AddHandshake(dgram); AddAck(dgram); } - dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str().c_str()); + dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str()); if (dgram.size()==4) // only the channel id; bare keep-alive data = bin64_t::ALL; cc_->OnDataSent(data); @@ -134,33 +129,24 @@ void Channel::Send () { } -void Channel::CleanStaleHintOut () { - tint timed_out = NOW - 8*rtt_avg_; // FIXME BULLSHIT (take rtt=0) +void Channel::AddHint (Datagram& dgram) { + + tint timed_out = NOW - TINT_SEC*3/2; while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) { - transfer().picker().Expired(hint_out_.front().bin); + hint_out_size_ -= hint_out_.front().bin.width(); hint_out_.pop_front(); } -} - - -void Channel::AddHint (Datagram& dgram) { - - CleanStaleHintOut(); - - uint64_t hint_out_mass=0; - for(int i=0; i4 && diff>2*peer_cwnd) // diff >>= 1; bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100); @@ -168,8 +154,9 @@ void Channel::AddHint (Datagram& dgram) { if (hint!=bin64_t::NONE) { dgram.Push8(P2TP_HINT); dgram.Push32(hint); - dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_mass); + dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_); hint_out_.push_back(hint); + hint_out_size_ += hint.width(); } else dprintf("%s #%i .hint\n",tintstr(),id); @@ -300,25 +287,24 @@ void Channel::OnHash (Datagram& dgram) { } -void Channel::CleanFulfilledHints (bin64_t pos) { +void Channel::CleanHintOut (bin64_t pos) { int hi = 0; - while (hihs_in_.begin(); i!=file->hs_in_.end(); i++) if (channels[*i] && channels[*i]->peer_==data.addr && channels[*i]->last_recv_time_>NOW-TINT_SEC*2) - RETLOG("have a channel already"); + return_log("have a channel already to %s\n",addr.str()); channel = new Channel(file, socket, data.address()); } else { mych = DecodeID(mych); - if (mych>=channels.size()) { - eprintf("invalid channel #%i\n",mych); - return; - } + if (mych>=channels.size()) + return_log("invalid channel #%i, %s\n",mych,addr.str()); channel = channels[mych]; if (!channel) - RETLOG ("channel is closed"); - if (channel->peer() != data.address()) - RETLOG ("invalid peer address"); + return_log ("channel #%i is already closed\n",mych,addr.str()); + if (channel->peer() != addr) + return_log ("invalid peer address #%i %s!=%s\n",mych,channel->peer().str(),addr.str()); channel->own_id_mentioned_ = true; } //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);