From: victor Date: Sat, 14 Nov 2009 15:03:00 +0000 (+0000) Subject: apparently, I'fixed consequences of yesterday's coding X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=a605c714debafdef9bebff5d8d937ddb78951824;p=swift-upb.git apparently, I'fixed consequences of yesterday's coding git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@552 e16421f0-f15b-0410-abcd-98678b794739 --- diff --git a/BUGS b/BUGS index 9098364..9d19b0d 100644 --- a/BUGS +++ b/BUGS @@ -25,3 +25,4 @@ * hint queue buildup * file operations are not 64-bit ready http://mail.python.org/pipermail/patches/2000-June/000848.html + * recovery: last packet diff --git a/bin64.cpp b/bin64.cpp index 10c4859..6b2ccb5 100644 --- a/bin64.cpp +++ b/bin64.cpp @@ -21,6 +21,15 @@ uint32_t bin64_t::to32() const { return NONE32; } +bin64_t::bin64_t(const uint32_t val) { + if (val==ALL32) + v = ALL; + else if (val==NONE32) + v = NONE; + else + v = val; +} + bin64_t bin64_t::next_dfsio (uint8_t floor) { /*while (ret.is_right()) ret = ret.parent(); @@ -54,3 +63,18 @@ int bin64_t::peaks (uint64_t length, bin64_t* peaks) { peaks[pp] = NONE; return pp; } + +#include + +const char* bin64_t::str () const { + static char _b64sr[4][32]; + static int _rsc; + _rsc = (_rsc+1) & 3; + if (v==ALL) + return "(ALL)"; + else if (v==NONE) + return "(NONE)"; + else + sprintf(_b64sr[_rsc],"(%i,%lli)",(int)layer(),offset()); + return _b64sr[_rsc]; +} diff --git a/bin64.h b/bin64.h index 8b8befd..c2193ad 100644 --- a/bin64.h +++ b/bin64.h @@ -25,6 +25,7 @@ struct bin64_t { bin64_t() : v(NONE) {} bin64_t(const bin64_t&b) : v(b.v) {} + bin64_t(const uint32_t val) ; bin64_t(const uint64_t val) : v(val) {} bin64_t(uint8_t layer, uint64_t offset) : v( (offset<<(layer+1)) | ((1ULL<>1; } + + const char* str () const; /** The array must have 64 cells, as it is the max number of peaks possible +1 (and there are no reason diff --git a/bins.cpp b/bins.cpp index 2718115..23e942f 100644 --- a/bins.cpp +++ b/bins.cpp @@ -251,6 +251,8 @@ bin64_t bins::find (const bin64_t range, fill_t seek) { uint16_t bins::get (bin64_t bin) { + if (bin==bin64_t::NONE) + return EMPTY; iterator i(this,bin,true); //while ( i.pos!=bin && // (i.deep() || (*i!=BIN_FULL && *i!=BIN_EMPTY)) ) @@ -281,6 +283,8 @@ uint64_t bins::mass () { void bins::set (bin64_t bin, fill_t val) { + if (bin==bin64_t::NONE) + return; assert(val==FILLED || val==EMPTY); iterator i(this,bin,false); while (i.bin()!=bin && (i.deep() || *i!=val)) @@ -303,7 +307,7 @@ uint64_t* bins::get_stripes (int& count) { count = 0; uint16_t cur = bins::EMPTY; stripes[count++] = 0; - iterator i(this,0,false); + iterator i(this,bin64_t(0,0),false); while (!i.solid()) i.left(); diff --git a/bins.h b/bins.h index 3f8494f..45ef4ea 100644 --- a/bins.h +++ b/bins.h @@ -122,7 +122,7 @@ public: // rm this uint8_t layer_; bin64_t pos; // TODO: half[] layer bin public: - iterator(bins* host, bin64_t start=0, bool split=false); + iterator(bins* host, bin64_t start=bin64_t(0,0), bool split=false); ~iterator(); bool deep () { return host->deep(half); } bool solid () { diff --git a/datagram.cpp b/datagram.cpp index 11931f8..8d60b08 100644 --- a/datagram.cpp +++ b/datagram.cpp @@ -31,10 +31,8 @@ const char* tintstr (tint time) { static char ret_str[4][32]; // wow static int i; i = (i+1) & 3; - if (time==TINT_NEVER) { - strcpy(ret_str[i],"NEVER"); - return ret_str[i]; - } + if (time==TINT_NEVER) + return "NEVER"; time -= Datagram::epoch; assert(time>=0); int hours = time/TINT_HOUR; diff --git a/ext/send_control.cpp b/ext/send_control.cpp index 5d5d066..b520210 100644 --- a/ext/send_control.cpp +++ b/ext/send_control.cpp @@ -20,61 +20,51 @@ void SendController::Swap (SendController* newctrl) { } -bool PingPongController::MaySendData(){ - return ch_->data_out_.empty(); +void SendController::Schedule (tint next_time) { + ch_->Schedule(next_time); } - -tint PingPongController::NextSendTime () { - if (unanswered_>=3) - return TINT_NEVER; - return ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_*4; // remind on timeout -} - -void PingPongController::OnDataSent(bin64_t b) { - unanswered_++; - if ( (b==bin64_t::ALL && MaySendData()) ) // nothing to send - Swap(new KeepAliveController(this)); -} - -void PingPongController::OnDataRecvd(bin64_t b) { - unanswered_ = 0; -} - -void PingPongController::OnAckRcvd(bin64_t ackd) { - //if (ch_->data_out_.empty()) - Swap(new SlowStartController(this)); + + + +KeepAliveController::KeepAliveController (Channel* ch) : SendController(ch), delay_(ch->rtt_avg_) { } KeepAliveController::KeepAliveController(SendController* prev, tint delay) : SendController(prev), delay_(delay) { - ch_->dev_avg_ = TINT_SEC; // without active measurement, rtt is unreliable + ch_->dev_avg_ = TINT_SEC; // without constant active measurement, rtt is unreliable + delay_=ch_->rtt_avg_; } bool KeepAliveController::MaySendData() { return true; } -tint KeepAliveController::NextSendTime () { - if (!delay_) - delay_ = ch_->rtt_avg_; - if (ch_->last_recv_time_ < ch_->last_send_time_-TINT_MIN) - return TINT_NEVER; - return ch_->last_send_time_ + delay_; -} - + void KeepAliveController::OnDataSent(bin64_t b) { - delay_ = (NOW - std::max(ch_->last_send_time_,ch_->last_recv_time_)) * 3 / 2; - if (delay_>TINT_SEC*58) - delay_ = TINT_SEC*58; - if (b!=bin64_t::ALL && b!=bin64_t::NONE) + if (b==bin64_t::ALL || b==bin64_t::NONE) { + delay_ = delay_ * 2; // backing off + if (delay_>TINT_SEC*58) // keep NAT mappings alive + delay_ = TINT_SEC*58; + if (delay_>=4*TINT_SEC && ch_->last_recv_time_ < NOW-TINT_MIN) + Schedule(TINT_NEVER); // no response; enter close timeout + else + Schedule(NOW+delay_); // all right, just keep it alive + } else { + Schedule(NOW+ch_->rtt_avg_); // cwnd==1 => next send in 1 rtt Swap(new SlowStartController(this)); + } } void KeepAliveController::OnDataRecvd(bin64_t b) { + if (b!=bin64_t::NONE && b!=bin64_t::ALL) { // channel is alive + delay_ = ch_->rtt_avg_; + Schedule(NOW); // schedule an ACK; TODO: aggregate + } } void KeepAliveController::OnAckRcvd(bin64_t ackd) { + // probably to something sent by CwndControllers before this one got installed } @@ -83,30 +73,36 @@ SendController(orig), cwnd_(cwnd), last_change_(0) { } bool CwndController::MaySendData() { + tint spacing = ch_->rtt_avg_ / cwnd_; dprintf("%s #%i sendctrl may send %i < %f & %s (rtt %lli)\n",tintstr(), - ch_->id,(int)ch_->data_out_.size(),cwnd_,tintstr(NextSendTime()), - ch_->rtt_avg_); + ch_->id,(int)ch_->data_out_.size(),cwnd_, + tintstr(ch_->last_send_data_time_+spacing), ch_->rtt_avg_); return ch_->data_out_.empty() || - (ch_->data_out_.size() < cwnd_ && NOW >= NextSendTime()); -} - -tint CwndController::NextSendTime () { - tint sendtime; - if (ch_->data_out_.size() < cwnd_) - sendtime = ch_->last_send_time_ + (ch_->rtt_avg_ / cwnd_); - else - sendtime = ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_ * 4 ; - return sendtime; + (ch_->data_out_.size() < cwnd_ && NOW-ch_->last_send_data_time_ >= spacing); } + void CwndController::OnDataSent(bin64_t b) { - if (b==bin64_t::ALL || b==bin64_t::NONE) { - if (MaySendData()) - Swap(new KeepAliveController(this)); - } + if ( (b==bin64_t::ALL || b==bin64_t::NONE) && MaySendData() ) { // no more data + Schedule(NOW+ch_->rtt_avg_); + Swap(new KeepAliveController(this)); + } else { + tint spacing = ch_->rtt_avg_ / cwnd_; + if (ch_->data_out_.size() < cwnd_) { // have cwnd; not the right time yet + Schedule(ch_->last_send_data_time_+spacing); + } else { // no free cwnd + tint timeout = std::max( ch_->rtt_avg_+ch_->dev_avg_*4, 500*TINT_MSEC ); + assert(!ch_->data_out_.empty()); + Schedule(ch_->data_out_.front().time+timeout); // wait for ACK or timeout + } + } } - + + void CwndController::OnDataRecvd(bin64_t b) { + if (b!=bin64_t::NONE && b!=bin64_t::ALL) { + Schedule(NOW); // send ACK; todo: aggregate ACKs + } } void CwndController::OnAckRcvd(bin64_t ackd) { @@ -122,6 +118,8 @@ void CwndController::OnAckRcvd(bin64_t ackd) { else cwnd_ += 1.0/cwnd_; dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_); + tint spacing = ch_->rtt_avg_ / cwnd_; + Schedule(ch_->last_send_time_+spacing); } } @@ -135,17 +133,3 @@ void SlowStartController::OnAckRcvd (bin64_t pos) { cwnd_ /= 2; } - -void AIMDController::OnAckRcvd (bin64_t pos) { - if (pos==bin64_t::NONE) { - dprintf("%s #%i sendctrl loss detected\n",tintstr(),ch_->id); - if (NOW>last_change_+ch_->rtt_avg_) { - cwnd_ /= 2; - last_change_ = NOW; - } - } else { - cwnd_ += 1.0/cwnd_; - dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_); - } -} - diff --git a/ext/send_control.h b/ext/send_control.h index 67ae52f..91318a6 100644 --- a/ext/send_control.h +++ b/ext/send_control.h @@ -21,11 +21,11 @@ struct SendController { SendController(SendController* orig) : ch_(orig->ch_) { } void Swap (SendController* replacement); + void Schedule (tint time); virtual const char* type() const = 0; virtual bool MaySendData() = 0; - virtual tint NextSendTime () = 0; /** A datagram was sent to the peer. * @param data the bin number for the data sent; bin64_t::NONE if only @@ -45,33 +45,19 @@ struct SendController { }; -struct PingPongController : public SendController { - - int unanswered_; - - PingPongController (SendController* orig) : - SendController(orig), unanswered_(0) {} - PingPongController (Channel* ch) : - unanswered_(0), SendController(ch) {} - const char* type() const { return "PingPong"; } - bool MaySendData(); - tint NextSendTime (); - void OnDataSent(bin64_t b); - void OnDataRecvd(bin64_t b); - void OnAckRcvd(bin64_t ackd) ; - ~PingPongController() {} - -}; - - +/** Mission of the keepalive controller to keep the channel + alive as no data sending happens; If no data is transmitted + in either direction, inter-packet times grow exponentially + till 58 sec, which refresh period is deemed necessary to keep + NAT mappings alive. */ struct KeepAliveController : public SendController { tint delay_; + KeepAliveController (Channel* ch); KeepAliveController(SendController* prev, tint delay=0) ; const char* type() const { return "KeepAlive"; } bool MaySendData(); - tint NextSendTime () ; void OnDataSent(bin64_t b) ; void OnDataRecvd(bin64_t b) ; void OnAckRcvd(bin64_t ackd) ; @@ -79,6 +65,7 @@ struct KeepAliveController : public SendController { }; +/** Base class for any congestion window based algorithm. */ struct CwndController : public SendController { double cwnd_; @@ -87,7 +74,6 @@ struct CwndController : public SendController { CwndController(SendController* orig, int cwnd=1) ; bool MaySendData() ; - tint NextSendTime () ; void OnDataSent(bin64_t b) ; void OnDataRecvd(bin64_t b) ; void OnAckRcvd(bin64_t ackd) ; @@ -95,6 +81,7 @@ struct CwndController : public SendController { }; +/** TCP-like exponential "slow" start algorithm. */ struct SlowStartController : public CwndController { SlowStartController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {} @@ -104,11 +91,14 @@ struct SlowStartController : public CwndController { }; +/** The classics: additive increase - multiplicative decrease algorithm. + A naive version of "standard" TCP congestion control. Potentially useful + for seedboxes, so needs to be improved. (QUBIC?) */ struct AIMDController : public CwndController { AIMDController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {} const char* type() const { return "AIMD"; } - void OnAckRcvd(bin64_t ackd) ; + //void OnAckRcvd(bin64_t ackd) ; }; diff --git a/ext/seq_picker.cpp b/ext/seq_picker.cpp index 3cacb2b..986c3ac 100644 --- a/ext/seq_picker.cpp +++ b/ext/seq_picker.cpp @@ -16,7 +16,6 @@ class SeqPiecePicker : public PiecePicker { bins ack_hint_out_; FileTransfer* transfer_; uint64_t twist_; - tbheap hint_out_; // FIXME since I use fixed 1.5 sec expiration, may replace for a queue public: @@ -34,8 +33,6 @@ public: } virtual bin64_t Pick (bins& offer, uint64_t max_width, tint expires) { - while (hint_out_.size() && hint_out_.peek().time Channel::channels(1); SOCKET Channel::sockets[8] = {0,0,0,0,0,0,0,0}; int Channel::socket_count = 0; Address Channel::tracker; -tbqueue Channel::send_queue; +tbheap Channel::send_queue; #include "ext/simple_selector.cpp" PeerSelector* Channel::peer_selector = new SimpleSelector(); Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) : transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0), socket_(socket==-1?sockets[0]:socket), // FIXME - data_out_cap_(bin64_t::ALL), last_data_time_(0), + data_out_cap_(bin64_t::ALL), last_send_data_time_(0), last_recv_data_time_(0), own_id_mentioned_(false), next_send_time_(0), last_send_time_(0), - last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC), - hint_out_(0), hint_out_mark_(), hint_out_am_(0) + last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC) { if (peer_==Address()) peer_ = tracker; this->id = channels.size(); channels.push_back(this); - cc_ = new PingPongController(this); - RequeueSend(NOW); + cc_ = new KeepAliveController(this); + Schedule(NOW); // FIXME ugly } diff --git a/p2tp.h b/p2tp.h index d068c0a..4681a56 100644 --- a/p2tp.h +++ b/p2tp.h @@ -70,13 +70,15 @@ namespace p2tp { tint time; bin64_t bin; tintbin(const tintbin& b) : time(b.time), bin(b.bin) {} - tintbin() : time(0), bin(bin64_t::NONE) {} + tintbin() : time(TINT_NEVER), bin(bin64_t::NONE) {} tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {} tintbin(bin64_t bin_) : time(NOW), bin(bin_) {} bool operator < (const tintbin& b) const { return time > b.time; } bool operator == (const tintbin& b) const { return time==b.time && bin==b.bin; } + bool operator != (const tintbin& b) const + { return !(*this==b); } }; typedef std::deque tbqueue; @@ -188,6 +190,7 @@ namespace p2tp { public: virtual void Randomize (uint64_t twist) = 0; virtual bin64_t Pick (bins& offered, uint64_t max_width, tint expires) = 0; + virtual void Expired (bin64_t bin) = 0; virtual void Received (bin64_t bin) = 0; }; @@ -277,10 +280,7 @@ namespace p2tp { /** Transmit schedule: in most cases filled with the peer's hints */ tbqueue hint_in_; /** Hints sent (to detect and reschedule ignored hints). */ - // tbqueue hint_out_; - uint64_t hint_out_; - tintbin hint_out_mark_; - uint64_t hint_out_am_; + tbqueue hint_out_; /** The congestion control strategy. */ SendController *cc_; /** Types of messages the peer accepts. */ @@ -292,20 +292,23 @@ namespace p2tp { /** Smoothed averages for RTT, RTT deviation and data interarrival periods. */ tint rtt_avg_, dev_avg_, dip_avg_; tint last_send_time_; - tint last_data_time_; tint last_recv_time_; + tint last_send_data_time_; + tint last_recv_data_time_; tint next_send_time_; tint peer_send_time_; - static tbqueue send_queue; + static tbheap send_queue; - void RequeueSend (tint next_time); int PeerBPS() const { return TINT_SEC / dip_avg_ * 1024; } /** Get a request for one packet from the queue of peer's requests. */ bin64_t DequeueHint(); void ClearStaleDataOut (); - //void CleanStaleHints(); + void CleanStaleHintOut(); + void CleanFulfilledHints(bin64_t pos); + void CleanFulfilledDataOut(bin64_t pos); + void Schedule(tint send_time); static PeerSelector* peer_selector; diff --git a/sendrecv.cpp b/sendrecv.cpp index 07151d3..e93e12c 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -31,7 +31,7 @@ void Channel::AddPeakHashes (Datagram& dgram) { dgram.Push32((uint32_t)peak); dgram.PushHash(file().peak_hash(i)); //DLOG(INFO)<<"#"<RoundTripTime()*8; - while ( !hint_out.empty() && hint_out.front().time < timed_out ) { - file().picker()->Snubbed(hint_out.front().bin); - hint_out.pop_front(); - } -}*/ - - void Channel::AddHandshake (Datagram& dgram) { if (!peer_channel_id_) { // initiating dgram.Push8(P2TP_HASH); @@ -101,14 +90,16 @@ void Channel::AddHandshake (Datagram& dgram) { void Channel::ClearStaleDataOut() { int oldsize = data_out_.size(); - while ( data_out_.size() && data_out_.front().time < - NOW - rtt_avg_ - dev_avg_*4 ) + tint timeout = NOW - max( rtt_avg_-dev_avg_*4, 500*TINT_MSEC ); + while ( data_out_.size() && data_out_.front().time < timeout ) { + dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str()); data_out_.pop_front(); + } if (data_out_.size()!=oldsize) { cc_->OnAckRcvd(bin64_t::NONE); data_out_cap_ = bin64_t::ALL; } - while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED) + while (data_out_.size() && (data_out_.front()==tintbin() || ack_in_.get(data_out_.front().bin)==bins::FILLED)) data_out_.pop_front(); } @@ -124,10 +115,7 @@ void Channel::Send () { AddHint(dgram); AddPex(dgram); ClearStaleDataOut(); - if (cc_->MaySendData()) - data = AddData(dgram); - else - dprintf("%s #%i no cwnd\n",tintstr(),id); + data = AddData(dgram); } else { AddHandshake(dgram); AddAck(dgram); @@ -141,75 +129,91 @@ void Channel::Send () { } +void Channel::CleanStaleHintOut () { + tint timed_out = NOW - 8*rtt_avg_; + while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) { + transfer().picker().Expired(hint_out_.front().bin); + hint_out_.pop_front(); + } +} + + void Channel::AddHint (Datagram& dgram) { + CleanStaleHintOut(); + + uint64_t hint_out_mass=0; + for(int i=0; i hint_out_+hint_out_am_ ) { //4*peer_cwnd + if ( hint_out_mass < 4*peer_cwnd ) { - int diff = peer_pps - hint_out_ - hint_out_am_; // 4*peer_cwnd + int diff = 5*peer_cwnd - hint_out_mass; if (diff>4 && diff>2*peer_cwnd) diff >>= 1; - bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+TINT_SEC*3/2); //rtt_avg_*8+TINT_MSEC*10 + bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100); if (hint!=bin64_t::NONE) { dgram.Push8(P2TP_HINT); dgram.Push32(hint); - dprintf("%s #%i +hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset()); - if (hint_out_mark_.bin==bin64_t::NONE) { - hint_out_mark_ = tintbin(NOW,hint); - hint_out_ = hint_out_am_; - hint_out_am_ = 0; - } - hint_out_am_ += hint.width(); - //hint_out_ += hint.width(); - } + dprintf("%s #%i +hint %s\n",tintstr(),id,hint.str()); + hint_out_.push_back(hint); + } else + printf("%s #%i Xhint\n",tintstr(),id); } } bin64_t Channel::AddData (Datagram& dgram) { - if (!file().size()) // know nothing + + if (!file().size()) // know nothing return bin64_t::NONE; - bin64_t tosend = DequeueHint(); - if (tosend==bin64_t::NONE) { - dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id); - return bin64_t::NONE; - } - if (ack_in_.is_empty() && file().size()) - AddPeakHashes(dgram); - AddUncleHashes(dgram,tosend); - uint8_t buf[1024]; - size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); - // TODO: ??? corrupted data, retries - if (r<0) { - print_error("error on reading"); - return bin64_t::NONE; + + bin64_t tosend = bin64_t::NONE; + if (cc_->MaySendData()) { + tosend = DequeueHint(); + if (tosend==bin64_t::NONE) + dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id); + } else + dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id); + + if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty())) + return bin64_t::NONE; // once in a while, empty data is sent just to check rtt + + if (tosend!=bin64_t::NONE) { // hashes + if (ack_in_.is_empty() && file().size()) + AddPeakHashes(dgram); + AddUncleHashes(dgram,tosend); + data_out_cap_ = tosend; } - assert(dgram.space()>=r+4+1); + dgram.Push8(P2TP_DATA); - dgram.Push32(tosend); - dgram.Push(buf,r); - dprintf("%s #%i +data (%lli)\n",tintstr(),id,tosend.base_offset()); + dgram.Push32(tosend.to32()); + + if (tosend!=bin64_t::NONE) { // data + uint8_t buf[1024]; + size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); + // TODO: corrupted data, retries, caching + if (r<0) { + print_error("error on reading"); + return bin64_t::NONE; + } + assert(dgram.space()>=r+4+1); + dgram.Push(buf,r); + } + + last_send_data_time_ = NOW; data_out_.push_back(tosend); - data_out_cap_ = tosend; - // FIXME BUG this makes data_out_ all stale ack_in_.set(tosend); + dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str()); + return tosend; } @@ -229,8 +233,7 @@ void Channel::AddAck (Datagram& dgram) { dgram.Push32(pos); //dgram.Push64(data_in_.time); ack_out_.set(pos); - dprintf("%s #%i +ack (%i,%lli) %s\n",tintstr(),id, - pos.layer(),pos.offset(),tintstr(data_in_.time)); + dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time)); data_in_ = tintbin(0,bin64_t::NONE); } for(int count=0; count<4; count++) { @@ -241,7 +244,7 @@ void Channel::AddAck (Datagram& dgram) { ack_out_.set(ack); dgram.Push8(P2TP_ACK); dgram.Push32(ack); - dprintf("%s #%i +ack (%i,%lli)\n",tintstr(),id,ack.layer(),ack.offset()); + dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str()); } } @@ -272,8 +275,10 @@ void Channel::Recv (Datagram& dgram) { } cc_->OnDataRecvd(data); last_recv_time_ = NOW; - if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC) + if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC) { + Datagram::Time(); Send(); + } } @@ -282,7 +287,29 @@ void Channel::OnHash (Datagram& dgram) { Sha1Hash hash = dgram.PullHash(); file().OfferHash(pos,hash); //DLOG(INFO)<<"#"<> 2; - } - last_data_time_ = NOW; - if (pos.within(hint_out_mark_.bin)) { - hint_out_mark_.bin = bin64_t::NONE; + if (pos!=bin64_t::NONE) { + if (last_recv_data_time_) { + tint dip = NOW - last_recv_data_time_; + dip_avg_ = ( dip_avg_*3 + dip ) >> 2; + } + last_recv_data_time_ = NOW; } - if (hint_out_) - hint_out_--; - else if (hint_out_am_) // probably, the marking HINT was lost or whatever - hint_out_am_--; + CleanFulfilledHints(pos); return pos; } -void Channel::OnAck (Datagram& dgram) { - bin64_t ackd_pos = dgram.Pull32(); - if (file().size() && ackd_pos.base_offset()>=file().packet_size()) { - eprintf("invalid ack: (%i,%lli)\n",ackd_pos.layer(),ackd_pos.offset()); - return; - } - dprintf("%s #%i -ack (%i,%lli)\n",tintstr(),id,ackd_pos.layer(),ackd_pos.offset()); +void Channel::CleanFulfilledDataOut (bin64_t ackd_pos) { for (int i=0; i<8 && i> 2; + rtt_avg_ = (rtt_avg_*7 + rtt) >> 3; dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2; dprintf("%s #%i rtt %lli dev %lli\n", tintstr(),id,rtt_avg_,dev_avg_); - cc_->OnAckRcvd(data_out_[i].bin); // may be invoked twice FIXME FIXME FIXME + cc_->OnAckRcvd(data_out_[i].bin); + data_out_[i]=tintbin(); } - ack_in_.set(ackd_pos); - while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED) + while ( data_out_.size() && ( data_out_.front()==tintbin() || + ack_in_.get(data_out_.front().bin)==bins::FILLED ) ) data_out_.pop_front(); } -/*void Channel::OnAckTs (Datagram& dgram) { // FIXME: OnTs - bin64_t pos = dgram.Pull32(); - tint ts = dgram.Pull64(); - // TODO sanity check - dprintf("%s #%i -ackts (%i,%lli) %s\n", - tintstr(),id,pos.layer(),pos.offset(),tintstr(ts)); - ack_in_.set(pos); - cc_->OnAckRcvd(pos,ts); -}*/ +void Channel::OnAck (Datagram& dgram) { + bin64_t ackd_pos = dgram.Pull32(); + if (ackd_pos!=bin64_t::NONE && file().size() && ackd_pos.base_offset()>=file().packet_size()) { + eprintf("invalid ack: %s\n",ackd_pos.str()); + return; + } + dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str()); + ack_in_.set(ackd_pos); + CleanFulfilledDataOut(ackd_pos); +} + void Channel::OnTs (Datagram& dgram) { peer_send_time_ = dgram.Pull64(); @@ -355,7 +374,7 @@ void Channel::OnHint (Datagram& dgram) { hint_in_.push_back(hint); //ack_in_.set(hint,bins::EMPTY); //RequeueSend(cc_->OnHintRecvd(hint)); - dprintf("%s #%i -hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset()); + dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str()); } @@ -402,7 +421,7 @@ void Channel::RecvDatagram (int socket) { if (hashid!=P2TP_HASH) RETLOG ("no hash in the initial handshake"); bin64_t pos = data.Pull32(); - if (pos!=bin64_t::ALL32) + if (pos!=bin64_t::ALL) RETLOG ("that is not the root hash"); hash = data.PullHash(); FileTransfer* file = FileTransfer::Find(hash); @@ -426,27 +445,11 @@ void Channel::RecvDatagram (int socket) { RETLOG ("invalid peer address"); channel->own_id_mentioned_ = true; } - dprintf("recvd %i bytes for %i\n",data.size(),channel->id); + //dprintf("recvd %i bytes for %i\n",data.size(),channel->id); channel->Recv(data); } -bool tblater (const tintbin& a, const tintbin& b) { - return a.time > b.time; -} - - -void Channel::RequeueSend (tint next_time) { - if (next_time==next_send_time_) - return; - next_send_time_ = next_time; - send_queue.push_back - (tintbin(next_time==TINT_NEVER?NOW+TINT_MIN:next_time,id)); - push_heap(send_queue.begin(),send_queue.end(),tblater); - dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time)); -} - - void Channel::Loop (tint howlong) { tint limit = Datagram::Time() + howlong; @@ -455,16 +458,15 @@ void Channel::Loop (tint howlong) { tint send_time(TINT_NEVER); Channel* sender(NULL); - while (!send_queue.empty()) { - send_time = send_queue.front().time; - sender = channel((int)send_queue.front().bin); + while (!send_queue.is_empty()) { + send_time = send_queue.peek().time; + sender = channel((int)send_queue.peek().bin); if (sender) if ( sender->next_send_time_==send_time || sender->next_send_time_==TINT_NEVER ) break; sender = NULL; // it was a stale entry - pop_heap(send_queue.begin(), send_queue.end(), tblater); - send_queue.pop_back(); + send_queue.pop(); } if (send_time>limit) send_time = limit; @@ -473,23 +475,31 @@ void Channel::Loop (tint howlong) { tintstr(send_time)); sender->Send(); sender->last_send_time_ = NOW; - sender->RequeueSend(sender->cc_->NextSendTime()); - pop_heap(send_queue.begin(), send_queue.end(), tblater); - send_queue.pop_back(); + // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl + send_queue.pop(); } else if ( send_time > NOW ) { tint towait = send_time - NOW; dprintf("%s waiting %lliusec\n",tintstr(),towait); int rd = Datagram::Wait(socket_count,sockets,towait); if (rd!=INVALID_SOCKET) RecvDatagram(rd); - } else { //if (sender->next_send_time_==TINT_NEVER) { + } else if (sender) { // FIXME FIXME FIXME REWRITE!!! if (sender->next_send_time_==TINT_NEVER) { dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id); delete sender; - pop_heap(send_queue.begin(), send_queue.end(), tblater); - send_queue.pop_back(); + send_queue.pop(); } } while (Datagram::Time()file(); - EXPECT_TRUE(A==seed->hash(0)); + EXPECT_TRUE(A==seed->hash(bin64_t(0,0))); EXPECT_TRUE(E==seed->hash(bin64_t(0,4))); EXPECT_TRUE(ABCD==seed->hash(bin64_t(2,0))); EXPECT_TRUE(ROOT==seed->root_hash()); diff --git a/transfer.cpp b/transfer.cpp index 62bc51b..8c2d42d 100644 --- a/transfer.cpp +++ b/transfer.cpp @@ -17,14 +17,14 @@ #include "p2tp.h" #include "compat/util.h" +#include "ext/seq_picker.cpp" // FIXME FIXME FIXME FIXME + using namespace p2tp; std::vector FileTransfer::files(20); #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash)) -#include "ext/seq_picker.cpp" - // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress() FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :