From d62eadb52bd9aad8bb12da1e54b58d7b5ac83781 Mon Sep 17 00:00:00 2001 From: victor Date: Fri, 11 Dec 2009 16:06:08 +0000 Subject: [PATCH] refactored; barely works git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@734 e16421f0-f15b-0410-abcd-98678b794739 --- BUGS | 13 ++- SConstruct | 2 +- compat.cpp | 2 +- ext/send_control.cpp | 164 ---------------------------- ext/send_control.h | 121 --------------------- ext/seq_picker.cpp | 2 +- p2tp.cpp | 18 ++-- p2tp.h | 73 ++++++++++--- send_control.cpp | 142 ++++++++++++++++++++++++ sendrecv.cpp | 251 +++++++++++++++++++++++-------------------- tests/bin64test.cpp | 2 + transfer.cpp | 9 +- 12 files changed, 368 insertions(+), 431 deletions(-) delete mode 100644 ext/send_control.cpp delete mode 100644 ext/send_control.h create mode 100644 send_control.cpp diff --git a/BUGS b/BUGS index 280fd6b..373b47e 100644 --- a/BUGS +++ b/BUGS @@ -21,15 +21,15 @@ v peers don't cooperate * RecoverProgress fails sometime v leecher can't see file is done already - * why leecher waits 1sec? + v why leecher waits 1sec? * hint queue buildup * file operations are not 64-bit ready http://mail.python.org/pipermail/patches/2000-June/000848.html * recovery: last packet - * no-HINT sending to a dead peer + v no-HINT sending to a dead peer * what if rtt>1sec - * unHINTed repeated sending - * 1259859412.out#8,9 connection breaks, #8 rtt 1000, #9 hint - + v unHINTed repeated sending + v 1259859412.out#8,9 connection breaks, #8 rtt 1000, #9 hint - mudachestvo, cwnd => send int 0.5sec 0_11_10_075_698 #9 sendctrl may send 0 < 0.000000 & 1732919509_-49_-45_-200_-111 (rtt 59661) 0_11_10_075_698 #9 +data (0,194) @@ -37,3 +37,8 @@ 0_11_10_575_703 #9 Tdata (0,194) 0_11_10_575_703 #9 sendctrl may send 0 < 0.000000 & 1732919509_-49_-44_-700_-110 (rtt 59661) * complete peer reconnects 1259967418.out.gz + * underhinting causes repetition causes interarr underest causes underhinting + * misterious initiating handshake bursts + v whether sending is limited by cwnd or app + * actually: whether packets are ACKed faster than sent + * uproot DATA NONE: complicates and deceives diff --git a/SConstruct b/SConstruct index 2cb36f7..0764672 100644 --- a/SConstruct +++ b/SConstruct @@ -21,7 +21,7 @@ TestDir='tests' target = 'p2tp' source = [ 'bin64.cpp','sha1.cpp','hashtree.cpp','datagram.cpp','bins.cpp', - 'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 'ext/send_control.cpp', + 'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 'send_control.cpp', 'compat/hirestimeofday.cpp', 'compat.cpp', 'compat/util.cpp'] env = Environment() diff --git a/compat.cpp b/compat.cpp index f269da2..0945744 100644 --- a/compat.cpp +++ b/compat.cpp @@ -54,7 +54,7 @@ void print_error(const char* msg) { #ifdef _WIN32 int e = WSAGetLastError(); if (e) - fprintf(stderr,"network error #%i\n",e); + fprintf(stderr,"network error #%u\n",e); #endif } diff --git a/ext/send_control.cpp b/ext/send_control.cpp deleted file mode 100644 index c2c1cad..0000000 --- a/ext/send_control.cpp +++ /dev/null @@ -1,164 +0,0 @@ -/* - * send_control.cpp - * p2tp - * - * Created by Victor Grishchenko on 11/4/09. - * Copyright 2009 Delft University of Technology. All rights reserved. - * - */ -#include "p2tp.h" -#ifdef _MSC_VER - // To avoid complaints about std::max. Appears to work in VS2008 - #undef min - #undef max -#endif - -using namespace p2tp; - - -void SendController::Swap (SendController* newctrl) { - dprintf("%s #%i sendctrl %s->%s\n",tintstr(),ch_->id,type(),newctrl->type()); - assert(this==ch_->cc_); - ch_->cc_ = newctrl; - delete this; -} - - -void SendController::Schedule (tint next_time) { - ch_->Schedule(next_time); -} - -bool PingPongController::MaySendData() { - return ch_->data_out_.empty(); -} - -void PingPongController::OnDataSent(bin64_t b) { - Schedule(NOW+ch_->rtt_avg_+std::max(ch_->dev_avg_*4,500*TINT_MSEC)); - if (++sent_>=10 || ++unanswered_>=3) - Swap(new KeepAliveController(this)); -} - -void PingPongController::OnDataRecvd(bin64_t b) { - unanswered_ = 0; - Schedule(NOW); // pong -} - -void PingPongController::OnAckRcvd(bin64_t ackd) { - if (ackd!=bin64_t::NONE) { - Schedule(NOW); - Swap(new SlowStartController(this)); - } -} - - -KeepAliveController::KeepAliveController (Channel* ch) : -SendController(ch), delay_(ch->rtt_avg_) { -} - - -KeepAliveController::KeepAliveController(SendController* prev, tint delay) : -SendController(prev), delay_(delay) { - ch_->dev_avg_ = TINT_SEC; // without constant active measurement, rtt is unreliable - delay_=ch_->rtt_avg_; -} - -bool KeepAliveController::MaySendData() { - return true; -} - - -void KeepAliveController::OnDataSent(bin64_t b) { - if (b==bin64_t::ALL || b==bin64_t::NONE) { - if (delay_>TINT_SEC*58) // keep NAT mappings alive - delay_ = TINT_SEC*58; - if (delay_>=4*TINT_SEC && ch_->last_recv_time_ < NOW-TINT_MIN) - Schedule(TINT_NEVER); // no response; enter close timeout - else - Schedule(NOW+delay_); // all right, just keep it alive - delay_ = delay_ * 2; // backing off - } else { - Schedule(NOW+ch_->rtt_avg_); // cwnd==1 => next send in 1 rtt - Swap(new SlowStartController(this)); - } -} - -void KeepAliveController::OnDataRecvd(bin64_t b) { - if (b!=bin64_t::NONE && b!=bin64_t::ALL) { // channel is alive - delay_ = ch_->rtt_avg_; - Schedule(NOW); // schedule an ACK; TODO: aggregate - } -} - -void KeepAliveController::OnAckRcvd(bin64_t ackd) { - // probably to something sent by CwndControllers before this one got installed -} - - -CwndController::CwndController(SendController* orig, int cwnd) : -SendController(orig), cwnd_(cwnd), last_change_(0) { -} - -bool CwndController::MaySendData() { - tint spacing = ch_->rtt_avg_ / cwnd_; - dprintf("%s #%i sendctrl may send %i < %f & %s (rtt %lli)\n",tintstr(), - ch_->id,(int)ch_->data_out_.size(),cwnd_, - tintstr(ch_->last_send_data_time_+spacing), ch_->rtt_avg_); - return ch_->data_out_.empty() || - (ch_->data_out_.size() < cwnd_ && NOW-ch_->last_send_data_time_ >= spacing); -} - - -void CwndController::OnDataSent(bin64_t b) { - if ( (b==bin64_t::ALL || b==bin64_t::NONE) && MaySendData() ) {// no more data (no hints?) - Schedule(NOW+ch_->rtt_avg_); // soft pause; nothing to send yet - if (ch_->last_send_data_time_ < NOW-ch_->rtt_avg_) - Swap(new KeepAliveController(this)); // really, nothing to send - } else { // FIXME: mandatory rescheduling after send/recv; based on state - tint spacing = ch_->rtt_avg_ / cwnd_; - if (ch_->data_out_.size() < cwnd_) { // have cwnd; not the right time yet - Schedule(ch_->last_send_data_time_+spacing); - } else { // no free cwnd - tint timeout = std::max( ch_->rtt_avg_+ch_->dev_avg_*4, 500*TINT_MSEC ); - assert(!ch_->data_out_.empty()); - Schedule(ch_->data_out_.front().time+timeout); // wait for ACK or timeout - } - } -} - - -void CwndController::OnDataRecvd(bin64_t b) { - if (b!=bin64_t::NONE && b!=bin64_t::ALL) { - Schedule(NOW); // send ACK; todo: aggregate ACKs - } -} - -void CwndController::OnAckRcvd(bin64_t ackd) { - if (ackd==bin64_t::NONE) { - dprintf("%s #%i sendctrl loss detected\n",tintstr(),ch_->id); - if (NOW>last_change_+ch_->rtt_avg_) { - cwnd_ /= 2; - last_change_ = NOW; - } - } else { - if (cwnd_<1) - cwnd_ *= 2; - else - cwnd_ += 1.0/cwnd_; - dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_); - } - tint spacing = ch_->rtt_avg_ / cwnd_; - Schedule(ch_->last_send_time_+spacing); -} - - -void SlowStartController::OnAckRcvd (bin64_t pos) { - if (pos!=bin64_t::NONE) { - cwnd_ += 1; - if (TINT_SEC*cwnd_/ch_->rtt_avg_>=10) { - Schedule(NOW); - Swap(new AIMDController(this,cwnd_)); - } - } else - cwnd_ /= 2; -} - diff --git a/ext/send_control.h b/ext/send_control.h deleted file mode 100644 index e4887e0..0000000 --- a/ext/send_control.h +++ /dev/null @@ -1,121 +0,0 @@ -/* - * send_control.h - * p2tp - * - * Created by Victor Grishchenko on 11/4/09. - * Copyright 2009 Delft University of Technology. All rights reserved. - * - */ -// included into p2tp.h -#ifndef P2TP_SEND_CONTROL -#define P2TP_SEND_CONTROL - -class Channel; - -struct SendController { - - Channel* ch_; - - SendController (Channel* ch) : ch_(ch) {} - - SendController(SendController* orig) : ch_(orig->ch_) { } - - void Swap (SendController* replacement); - void Schedule (tint time); - - virtual const char* type() const = 0; - - virtual bool MaySendData() = 0; - - /** A datagram was sent to the peer. - * @param data the bin number for the data sent; bin64_t::NONE if only - metadata was sent; bin64_t::ALL if datagram was empty */ - virtual void OnDataSent(bin64_t data) = 0; - - /** A datagram was received from the peer. - @param data follows the same conventions as data in OnDataSent() */ - virtual void OnDataRecvd(bin64_t data) = 0; - - /** An acknowledgement on OUR data message was receiveed from the peer. - @param ackd bin number for the data sent; bin64_t::NONE if no - acknowledgement was received (timeout event) */ - virtual void OnAckRcvd(bin64_t ackd) = 0; - - virtual ~SendController() {} -}; - -struct PingPongController : public SendController { - - int sent_, unanswered_; - - PingPongController (SendController* orig) : - SendController(orig), sent_(0), unanswered_(0) {} - PingPongController (Channel* ch) : - unanswered_(0), sent_(0), SendController(ch) {} - const char* type() const { return "PingPong"; } - bool MaySendData(); - void OnDataSent(bin64_t b); - void OnDataRecvd(bin64_t b); - void OnAckRcvd(bin64_t ackd) ; - ~PingPongController() {} - -}; -/** Mission of the keepalive controller to keep the channel - alive as no data sending happens; If no data is transmitted - in either direction, inter-packet times grow exponentially - till 58 sec, which refresh period is deemed necessary to keep - NAT mappings alive. */ -struct KeepAliveController : public SendController { - - tint delay_; - - KeepAliveController (Channel* ch); - KeepAliveController(SendController* prev, tint delay=0) ; - const char* type() const { return "KeepAlive"; } - bool MaySendData(); - void OnDataSent(bin64_t b) ; - void OnDataRecvd(bin64_t b) ; - void OnAckRcvd(bin64_t ackd) ; - -}; - - -/** Base class for any congestion window based algorithm. */ -struct CwndController : public SendController { - - double cwnd_; - tint last_change_; - - CwndController(SendController* orig, int cwnd=1) ; - - bool MaySendData() ; - void OnDataSent(bin64_t b) ; - void OnDataRecvd(bin64_t b) ; - void OnAckRcvd(bin64_t ackd) ; - -}; - - -/** TCP-like exponential "slow" start algorithm. */ -struct SlowStartController : public CwndController { - - SlowStartController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {} - const char* type() const { return "SlowStart"; } - void OnAckRcvd(bin64_t ackd) ; - -}; - - -/** The classics: additive increase - multiplicative decrease algorithm. - A naive version of "standard" TCP congestion control. Potentially useful - for seedboxes, so needs to be improved. (QUBIC?) */ -struct AIMDController : public CwndController { - - AIMDController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {} - const char* type() const { return "AIMD"; } - //void OnAckRcvd(bin64_t ackd) ; - -}; - - -#endif diff --git a/ext/seq_picker.cpp b/ext/seq_picker.cpp index 91222fe..2a2290d 100644 --- a/ext/seq_picker.cpp +++ b/ext/seq_picker.cpp @@ -34,7 +34,7 @@ public: } virtual bin64_t Pick (bins& offer, uint64_t max_width, tint expires) { - while (hint_out_.size() && hint_out_.front().timeid = channels.size(); channels.push_back(this); - cc_ = new PingPongController(this); - dprintf("%s #%i init %s\n",tintstr(),id,peer_.str()); - Schedule(NOW); // FIXME ugly + transfer_->hs_in_.push_back(id); + for(int i=0; i<4; i++) { + owd_min_bins_[i] = TINT_NEVER; + owd_current_[i] = TINT_NEVER; + } + Reschedule(); + dprintf("%s #%u init %s\n",tintstr(),id,peer_.str()); } Channel::~Channel () { channels[id] = NULL; - delete cc_; } diff --git a/p2tp.h b/p2tp.h index 0a872f5..8c51f3a 100644 --- a/p2tp.h +++ b/p2tp.h @@ -220,8 +220,17 @@ namespace p2tp { Channel (FileTransfer* file, int socket=-1, Address peer=Address()); ~Channel(); - - static void RecvDatagram (int socket); + + typedef enum { + KEEP_ALIVE_CONTROL, + PING_PONG_CONTROL, + SLOW_START_CONTROL, + AIMD_CONTROL, + LEDBAT_CONTROL + } send_control_t; + + static Channel* + RecvDatagram (int socket); static void Loop (tint till); void Recv (Datagram& dgram); @@ -229,13 +238,13 @@ namespace p2tp { void OnAck (Datagram& dgram); void OnTs (Datagram& dgram); - bin64_t OnData (Datagram& dgram); + bin64_t OnData (Datagram& dgram); void OnHint (Datagram& dgram); void OnHash (Datagram& dgram); void OnPex (Datagram& dgram); void OnHandshake (Datagram& dgram); void AddHandshake (Datagram& dgram); - bin64_t AddData (Datagram& dgram); + bin64_t AddData (Datagram& dgram); void AddAck (Datagram& dgram); void AddTs (Datagram& dgram); void AddHint (Datagram& dgram); @@ -243,13 +252,34 @@ namespace p2tp { void AddPeakHashes (Datagram& dgram); void AddPex (Datagram& dgram); + void BackOffOnLosses (); + tint SwitchSendControl (int control_mode); + tint NextSendTime (); + tint KeepAliveNextSendTime (); + tint PingPongNextSendTime (); + tint CwndRateNextSendTime (); + tint SlowStartNextSendTime (); + tint AimdNextSendTime (); + tint LedbatNextSendTime (); + + static int MAX_REORDERING; + static tint TIMEOUT; + static tint MIN_DEV; + static tint MAX_SEND_INTERVAL; + static tint LEDBAT_TARGET; + static float LEDBAT_GAIN; + static tint LEDBAT_DELAY_BIN; + const std::string id_string () const; /** A channel is "established" if had already sent and received packets. */ bool is_established () { return peer_channel_id_ && own_id_mentioned_; } FileTransfer& transfer() { return *transfer_; } HashTree& file () { return transfer_->file(); } const Address& peer() const { return peer_; } - + tint ack_timeout () { + return rtt_avg_ + std::max(dev_avg_,MIN_DEV)*4; + } + static int DecodeID(int scrambled); static int EncodeID(int unscrambled); static Channel* channel(int i) { @@ -283,8 +313,6 @@ namespace p2tp { /** Hints sent (to detect and reschedule ignored hints). */ tbqueue hint_out_; uint64_t hint_out_size_; - /** The congestion control strategy. */ - SendController *cc_; /** Types of messages the peer accepts. */ uint64_t cap_in_; /** For repeats. */ @@ -295,29 +323,46 @@ namespace p2tp { tint rtt_avg_, dev_avg_, dip_avg_; tint last_send_time_; tint last_recv_time_; - tint last_send_data_time_; - tint last_recv_data_time_; + tint last_data_out_time_; + tint last_data_in_time_; + tint last_loss_time_; tint next_send_time_; tint peer_send_time_; - static tbheap send_queue; + /** Congestion window; TODO: int, bytes. */ + float cwnd_; + /** Data sending interval. */ + tint send_interval_; + /** The congestion control strategy. */ + int send_control_; + /** Datagrams (not data) sent since last recv. */ + int sent_since_recv_; + /** Recent acknowlegements for data previously sent. */ + int ack_rcvd_recent_; + /** Recent non-acknowlegements (losses) of data previously sent. */ + int ack_not_rcvd_recent_; + /** LEDBAT one-way delay machinery */ + tint owd_min_bins_[4]; + int owd_min_bin_; + tint owd_min_bin_start_; + tint owd_current_[4]; + int owd_cur_bin_; int PeerBPS() const { return TINT_SEC / dip_avg_ * 1024; } /** Get a request for one packet from the queue of peer's requests. */ - bin64_t DequeueHint(); + bin64_t DequeueHint(); void CleanDataOut (bin64_t acks_pos=bin64_t::NONE); void CleanStaleHintOut(); void CleanHintOut(bin64_t pos); - void Schedule(tint send_time); + void Reschedule(); static PeerSelector* peer_selector; - static int MAX_REORDERING; - static tint TIMEOUT; static SOCKET sockets[8]; static int socket_count; static tint last_tick; + static tbheap send_queue; static Address tracker; static std::vector channels; diff --git a/send_control.cpp b/send_control.cpp new file mode 100644 index 0000000..ff46b4d --- /dev/null +++ b/send_control.cpp @@ -0,0 +1,142 @@ +/* + * send_control.cpp + * p2tp + * + * Created by Victor Grishchenko on 12/10/09. + * Copyright 2009 Delft University of Technology. All rights reserved. + * + */ + +#include "p2tp.h" + +using namespace p2tp; +using namespace std; + +tint Channel::MIN_DEV = 50*TINT_MSEC; +tint Channel::MAX_SEND_INTERVAL = TINT_SEC*58; +tint Channel::LEDBAT_TARGET = TINT_MSEC*25; +float Channel::LEDBAT_GAIN = 1.0/LEDBAT_TARGET; +tint Channel::LEDBAT_DELAY_BIN = TINT_SEC*30; + + +tint Channel::NextSendTime () { + switch (send_control_) { + case KEEP_ALIVE_CONTROL: return KeepAliveNextSendTime(); + case PING_PONG_CONTROL: return PingPongNextSendTime(); + case SLOW_START_CONTROL: return SlowStartNextSendTime(); + case AIMD_CONTROL: return AimdNextSendTime(); + case LEDBAT_CONTROL: return LedbatNextSendTime(); + default: assert(false); + } +} + +tint Channel::SwitchSendControl (int control_mode) { + dprintf("%s #%u sendctrl %i->%i\n",tintstr(),id,send_control_,control_mode); + switch (control_mode) { + case KEEP_ALIVE_CONTROL: + send_interval_ = max(TINT_SEC/10,rtt_avg_); + dev_avg_ = max(TINT_SEC,rtt_avg_); + cwnd_ = 1; + break; + case PING_PONG_CONTROL: + dev_avg_ = max(TINT_SEC,rtt_avg_); + cwnd_ = 1; + break; + case SLOW_START_CONTROL: + break; + case AIMD_CONTROL: + break; + case LEDBAT_CONTROL: + break; + default: + assert(false); + } + send_control_ = control_mode; + return NextSendTime(); +} + +// TODO: transitions, consistently +// TODO: may send data +tint Channel::KeepAliveNextSendTime () { + if (sent_since_recv_>=3 && last_recv_time_MAX_SEND_INTERVAL) + send_interval_ = MAX_SEND_INTERVAL; + return last_send_time_ + send_interval_; +} + +tint Channel::PingPongNextSendTime () { + if (last_recv_time_ < last_send_time_-TINT_SEC*3) { + // FIXME keepalive <-> pingpong (peers, transition) + } // last_data_out_time_ < last_send_time_ - TINT_SEC... + if (false) + return SwitchSendControl(KEEP_ALIVE_CONTROL); + if (ack_rcvd_recent_) + return SwitchSendControl(SLOW_START_CONTROL); + if (last_recv_time_>last_send_time_) + return NOW; + else if (last_send_time_) + return last_send_time_ + ack_timeout(); + else + return NOW; +} + +tint Channel::CwndRateNextSendTime () { + send_interval_ = rtt_avg_/cwnd_; + if (data_out_.size()owd_min_bins_[i]) + owd_min = owd_min_bins_[i]; + if (owd_cur>owd_current_[i]) + owd_cur = owd_current_[i]; + } + if (ack_not_rcvd_recent_) + BackOffOnLosses(); + ack_rcvd_recent_ = 0; + tint queueing_delay = owd_cur - owd_min; + tint off_target = LEDBAT_TARGET - queueing_delay; + cwnd_ += LEDBAT_GAIN * off_target / cwnd_; + return CwndRateNextSendTime(); +} + + + diff --git a/sendrecv.cpp b/sendrecv.cpp index 2b7ebd6..374512b 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -11,7 +11,7 @@ using namespace p2tp; -using namespace std; // FIXME remove +using namespace std; /* TODO 25 Oct 18:55 @@ -26,7 +26,7 @@ void Channel::AddPeakHashes (Datagram& dgram) { dgram.Push32((uint32_t)peak); dgram.PushHash(file().peak_hash(i)); //DLOG(INFO)<<"#"<NOW-rtt_avg_-TINT_SEC) { + uint64_t twist = peer_channel_id_; // got no hints, send something randomly + twist &= file().peak(0); // may make it semi-seq here + file().ack_out().twist(twist); + ack_in_.twist(twist); + bin64_t my_pick = + file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED); + while (my_pick.width()>max(1,(int)cwnd_)) + my_pick = my_pick.left(); + file().ack_out().twist(0); + ack_in_.twist(0); + if (my_pick!=bin64_t::NONE) { + my_pick = my_pick.twisted(twist); + hint_in_.push_back(my_pick); + dprintf("%s #%u *hint %s\n",tintstr(),id,my_pick.str()); + } + } bin64_t send = bin64_t::NONE; while (!hint_in_.empty() && send==bin64_t::NONE) { bin64_t hint = hint_in_.front().bin; tint time = hint_in_.front().time; hint_in_.pop_front(); - //if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_) - // continue; - // Totally flawed: - // a. May empty the queue when you least expect - // b. May lose parts of partially ACKd HINTs - send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED); - send = send.left_foot(); // single packet - if (send!=bin64_t::NONE) - while (send!=hint) { - hint = hint.towards(send); - hint_in_.push_front(hint.sibling()); - } - } - if (send==bin64_t::NONE) { - send = file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED); - if (send!=bin64_t::NONE) // NEED FIXME: twist here!!! - send = send.left_foot(); + while (!hint.is_base()) { // FIXME optimize; possible attack + hint_in_.push_front(tintbin(time,hint.right())); + hint = hint.left(); + } + //if (time < NOW-TINT_SEC*3/2 ) + // continue; bad idea + if (ack_in_.get(hint)!=bins::FILLED) + send = hint; } uint64_t mass = 0; for(int i=0; iOnDataSent(data); + if (send_control_!=KEEP_ALIVE_CONTROL) // we did our best + SwitchSendControl(KEEP_ALIVE_CONTROL); + if (NOW4 && diff>2*peer_cwnd) - // diff >>= 1; - bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100); + int diff = plan_pck - hint_out_size_; // TODO: aggregate + bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2); if (hint!=bin64_t::NONE) { dgram.Push8(P2TP_HINT); dgram.Push32(hint); - dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_); + dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_); hint_out_.push_back(hint); hint_out_size_ += hint.width(); } else - dprintf("%s #%i .hint\n",tintstr(),id); + dprintf("%s #%u Xhint\n",tintstr(),id); } } @@ -161,15 +174,15 @@ bin64_t Channel::AddData (Datagram& dgram) { return bin64_t::NONE; bin64_t tosend = bin64_t::NONE; - if (cc_->MaySendData()) { + if (data_out_.size()NOW-TINT_SEC || data_out_.empty())) - return bin64_t::NONE; // once in a while, empty data is sent just to check rtt + if (tosend==bin64_t::NONE && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty())) + return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXME if (tosend!=bin64_t::NONE) { // hashes if (ack_in_.is_empty() && file().size()) @@ -199,9 +212,9 @@ bin64_t Channel::AddData (Datagram& dgram) { dgram.Push(buf,r); } - last_send_data_time_ = NOW; + last_data_out_time_ = NOW; data_out_.push_back(tosend); - dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str()); + dprintf("%s #%u +data %s\n",tintstr(),id,tosend.str()); return tosend; } @@ -210,7 +223,7 @@ bin64_t Channel::AddData (Datagram& dgram) { void Channel::AddTs (Datagram& dgram) { dgram.Push8(P2TP_TS); dgram.Push64(data_in_.time); - dprintf("%s #%i +ts %s\n",tintstr(),id,tintstr(data_in_.time)); + dprintf("%s #%u +ts %s\n",tintstr(),id,tintstr(data_in_.time)); } @@ -227,7 +240,7 @@ void Channel::AddAck (Datagram& dgram) { dgram.Push32(pos); //dgram.Push64(data_in_.time); ack_out_.set(pos); - dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time)); + dprintf("%s #%u +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time)); data_in_ = tintbin(0,bin64_t::NONE); if (pos.layer()>2) data_in_dbl_ = pos; @@ -240,38 +253,38 @@ void Channel::AddAck (Datagram& dgram) { ack_out_.set(ack); dgram.Push8(P2TP_ACK); dgram.Push32(ack); - dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str()); + dprintf("%s #%u +ack %s\n",tintstr(),id,ack.str()); } } void Channel::Recv (Datagram& dgram) { - dprintf("%s #%i recvd %i\n",tintstr(),id,dgram.size()+4); + dprintf("%s #%u recvd %i\n",tintstr(),id,dgram.size()+4); + peer_send_time_ = 0; // has scope of 1 datagram if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) { rtt_avg_ = NOW - last_send_time_; dev_avg_ = rtt_avg_; dip_avg_ = rtt_avg_; - transfer().hs_in_.push_back(id); - dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_); + dprintf("%s #%u rtt init %lli\n",tintstr(),id,rtt_avg_); } bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL; while (dgram.size()) { uint8_t type = dgram.Pull8(); switch (type) { case P2TP_HANDSHAKE: OnHandshake(dgram); break; - case P2TP_DATA: data=OnData(dgram); break; - case P2TP_TS: OnTs(dgram); break; - case P2TP_ACK: OnAck(dgram); break; - case P2TP_HASH: OnHash(dgram); break; - case P2TP_HINT: OnHint(dgram); break; - case P2TP_PEX_ADD: OnPex(dgram); break; + case P2TP_DATA: data=OnData(dgram); break; + case P2TP_TS: OnTs(dgram); break; + case P2TP_ACK: OnAck(dgram); break; + case P2TP_HASH: OnHash(dgram); break; + case P2TP_HINT: OnHint(dgram); break; + case P2TP_PEX_ADD: OnPex(dgram); break; default: - eprintf("%s #%i ?msg id unknown %i\n",tintstr(),id,(int)type); + eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id,(int)type); return; } } - cc_->OnDataRecvd(data); last_recv_time_ = NOW; + sent_since_recv_ = 0; } @@ -280,7 +293,7 @@ void Channel::OnHash (Datagram& dgram) { Sha1Hash hash = dgram.PullHash(); file().OfferHash(pos,hash); //DLOG(INFO)<<"#"<> 2; } - last_recv_data_time_ = NOW; + last_data_in_time_ = NOW; } CleanHintOut(pos); return pos; } -void Channel::CleanDataOut (bin64_t ackd_pos) { +void Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long? int max_ack_off = 0; + //FIXME do LEDBAT magic somewhere here if (ackd_pos!=bin64_t::NONE) { for (int i=0; i<8 && i> 3; dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2; - dprintf("%s #%i rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_); + dprintf("%s #%u rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_); bin64_t pos = data_out_[i].bin; - cc_->OnAckRcvd(pos); + ack_rcvd_recent_++; data_out_[i]=tintbin(); max_ack_off = i; if (ackd_pos==pos) @@ -357,8 +371,8 @@ void Channel::CleanDataOut (bin64_t ackd_pos) { max_ack_off--; } while (max_ack_off>MAX_REORDERING) { - cc_->OnAckRcvd(bin64_t::NONE); - dprintf("%s #%i Rdata %s\n",tintstr(),id,data_out_.front().bin.str()); + ack_not_rcvd_recent_++; + dprintf("%s #%u Rdata %s\n",tintstr(),id,data_out_.front().bin.str()); data_out_.pop_front(); max_ack_off--; data_out_cap_ = bin64_t::ALL; @@ -368,9 +382,9 @@ void Channel::CleanDataOut (bin64_t ackd_pos) { tint timeout = NOW - rtt_avg_ - 4*std::max(dev_avg_,TINT_MSEC*50); while (!data_out_.empty() && data_out_.front().timeOnAckRcvd(bin64_t::NONE); + ack_not_rcvd_recent_++; data_out_cap_ = bin64_t::ALL; - dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str()); + dprintf("%s #%u Tdata %s\n",tintstr(),id,data_out_.front().bin.str()); } data_out_.pop_front(); } @@ -386,7 +400,7 @@ void Channel::OnAck (Datagram& dgram) { eprintf("invalid ack: %s\n",ackd_pos.str()); return; } - dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str()); + dprintf("%s #%u -ack %s\n",tintstr(),id,ackd_pos.str()); ack_in_.set(ackd_pos); CleanDataOut(ackd_pos); } @@ -394,7 +408,7 @@ void Channel::OnAck (Datagram& dgram) { void Channel::OnTs (Datagram& dgram) { peer_send_time_ = dgram.Pull64(); - dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_); + dprintf("%s #%u -ts %lli\n",tintstr(),id,peer_send_time_); } @@ -403,13 +417,13 @@ void Channel::OnHint (Datagram& dgram) { hint_in_.push_back(hint); //ack_in_.set(hint,bins::EMPTY); //RequeueSend(cc_->OnHintRecvd(hint)); - dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str()); + dprintf("%s #%u -hint %s\n",tintstr(),id,hint.str()); } void Channel::OnHandshake (Datagram& dgram) { peer_channel_id_ = dgram.Pull32(); - dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_); + dprintf("%s #%u -hs %i\n",tintstr(),id,peer_channel_id_); // FUTURE: channel forking } @@ -418,7 +432,7 @@ void Channel::OnPex (Datagram& dgram) { uint32_t ipv4 = dgram.Pull32(); uint16_t port = dgram.Pull16(); Address addr(ipv4,port); - dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str()); + dprintf("%s #%u -pex %s\n",tintstr(),id,addr.str()); transfer().OnPexIn(addr); } @@ -431,15 +445,15 @@ void Channel::AddPex (Datagram& dgram) { dgram.Push8(P2TP_PEX_ADD); dgram.Push32(a.ipv4()); dgram.Push16(a.port()); - dprintf("%s #%i +pex %s\n",tintstr(),id,a.str()); + dprintf("%s #%u +pex %s\n",tintstr(),id,a.str()); } -void Channel::RecvDatagram (int socket) { +Channel* Channel::RecvDatagram (int socket) { Datagram data(socket); data.Recv(); Address& addr = data.addr; -#define return_log(...) { eprintf(__VA_ARGS__); return; } +#define return_log(...) { eprintf(__VA_ARGS__); return NULL; } if (data.size()<4) return_log("datagram shorter than 4 bytes %s\n",addr.str()); uint32_t mych = data.Pull32(); @@ -461,22 +475,23 @@ void Channel::RecvDatagram (int socket) { dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str()); for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++) if (channels[*i] && channels[*i]->peer_==data.addr && - channels[*i]->last_recv_time_>NOW-TINT_SEC*2) + channels[*i]->last_recv_time_>NOW-TINT_SEC*2) return_log("have a channel already to %s\n",addr.str()); channel = new Channel(file, socket, data.address()); } else { mych = DecodeID(mych); if (mych>=channels.size()) - return_log("invalid channel #%i, %s\n",mych,addr.str()); + return_log("invalid channel #%u, %s\n",mych,addr.str()); channel = channels[mych]; if (!channel) - return_log ("channel #%i is already closed\n",mych,addr.str()); + return_log ("channel #%u is already closed\n",mych,addr.str()); if (channel->peer() != addr) - return_log ("invalid peer address #%i %s!=%s\n",mych,channel->peer().str(),addr.str()); + return_log ("invalid peer address #%u %s!=%s\n",mych,channel->peer().str(),addr.str()); channel->own_id_mentioned_ = true; } //dprintf("recvd %i bytes for %i\n",data.size(),channel->id); channel->Recv(data); + return channel; } @@ -488,48 +503,52 @@ void Channel::Loop (tint howlong) { tint send_time(TINT_NEVER); Channel* sender(NULL); - while (!send_queue.is_empty()) { + while (!sender && !send_queue.is_empty()) { // dequeue send_time = send_queue.peek().time; sender = channel((int)send_queue.peek().bin); - if (sender) - if ( sender->next_send_time_==send_time || - sender->next_send_time_==TINT_NEVER ) - break; - sender = NULL; // it was a stale entry send_queue.pop(); + if (sender && sender->next_send_time_!=send_time && + sender->next_send_time_!=TINT_NEVER ) + sender = NULL; // it was a stale entry } - if (send_time>limit) - send_time = limit; - if ( sender && sender->next_send_time_ <= NOW ) { - dprintf("%s #%i sch_send %s\n",tintstr(),sender->id, - tintstr(send_time)); - sender->Send(); - sender->last_send_time_ = NOW; - // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl - send_queue.pop(); - } else if ( send_time > NOW ) { - tint towait = send_time - NOW; + + if ( sender && send_time <= NOW ) { // it's time + + if (sender->next_send_time_id, + tintstr(send_time)); + sender->Send(); + sender->Reschedule(); + } else { // or close the channel + dprintf("%s #%u closed sendctrl\n",tintstr(),sender->id); + delete sender; + } + + } else { // it's too early, wait + + tint towait = min(limit,send_time) - NOW; dprintf("%s waiting %lliusec\n",tintstr(),towait); int rd = Datagram::Wait(socket_count,sockets,towait); - if (rd!=INVALID_SOCKET) - RecvDatagram(rd); - } else { // FIXME FIXME FIXME REWRITE!!! if (sender->next_send_time_==TINT_NEVER) { - if (sender) { - dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id); - delete sender; + if (rd!=INVALID_SOCKET) { // in meantime, received something + Channel* receiver = RecvDatagram(rd); + if (receiver) // receiver's state may have changed + receiver->Reschedule(); } - send_queue.pop(); + if (sender) // get back to that later + send_queue.push(tintbin(send_time,sender->id)); + } } while (Datagram::Time()transfer().fd()==this->fd()) { - pex_out_ += hs_in_offset_ + 1; - return c->id; + if (c->own_id_mentioned_) { + pex_out_ += hs_in_offset_ + 1; + return c->id; + } else + pex_out_++; } else { hs_in_[pex_out_] = hs_in_[0]; hs_in_.pop_front(); -- 2.20.1