v peers don't cooperate
* RecoverProgress fails sometime
v leecher can't see file is done already
- * why leecher waits 1sec?
+ v why leecher waits 1sec?
* hint queue buildup
* file operations are not 64-bit ready
http://mail.python.org/pipermail/patches/2000-June/000848.html
* recovery: last packet
- * no-HINT sending to a dead peer
+ v no-HINT sending to a dead peer
* what if rtt>1sec
- * unHINTed repeated sending
- * 1259859412.out#8,9 connection breaks, #8 rtt 1000, #9 hint -
+ v unHINTed repeated sending
+ v 1259859412.out#8,9 connection breaks, #8 rtt 1000, #9 hint -
mudachestvo, cwnd => send int 0.5sec
0_11_10_075_698 #9 sendctrl may send 0 < 0.000000 & 1732919509_-49_-45_-200_-111 (rtt 59661)
0_11_10_075_698 #9 +data (0,194)
0_11_10_575_703 #9 Tdata (0,194)
0_11_10_575_703 #9 sendctrl may send 0 < 0.000000 & 1732919509_-49_-44_-700_-110 (rtt 59661)
* complete peer reconnects 1259967418.out.gz
+ * underhinting causes repetition causes interarr underest causes underhinting
+ * misterious initiating handshake bursts
+ v whether sending is limited by cwnd or app
+ * actually: whether packets are ACKed faster than sent
+ * uproot DATA NONE: complicates and deceives
target = 'p2tp'
source = [ 'bin64.cpp','sha1.cpp','hashtree.cpp','datagram.cpp','bins.cpp',
- 'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 'ext/send_control.cpp',
+ 'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 'send_control.cpp',
'compat/hirestimeofday.cpp', 'compat.cpp', 'compat/util.cpp']
env = Environment()
#ifdef _WIN32
int e = WSAGetLastError();
if (e)
- fprintf(stderr,"network error #%i\n",e);
+ fprintf(stderr,"network error #%u\n",e);
#endif
}
+++ /dev/null
-/*
- * send_control.cpp
- * p2tp
- *
- * Created by Victor Grishchenko on 11/4/09.
- * Copyright 2009 Delft University of Technology. All rights reserved.
- *
- */
-#include "p2tp.h"
-#ifdef _MSC_VER
- // To avoid complaints about std::max. Appears to work in VS2008
- #undef min
- #undef max
-#endif
-
-using namespace p2tp;
-
-
-void SendController::Swap (SendController* newctrl) {
- dprintf("%s #%i sendctrl %s->%s\n",tintstr(),ch_->id,type(),newctrl->type());
- assert(this==ch_->cc_);
- ch_->cc_ = newctrl;
- delete this;
-}
-
-
-void SendController::Schedule (tint next_time) {
- ch_->Schedule(next_time);
-}
-
-bool PingPongController::MaySendData() {
- return ch_->data_out_.empty();
-}
-
-void PingPongController::OnDataSent(bin64_t b) {
- Schedule(NOW+ch_->rtt_avg_+std::max(ch_->dev_avg_*4,500*TINT_MSEC));
- if (++sent_>=10 || ++unanswered_>=3)
- Swap(new KeepAliveController(this));
-}
-
-void PingPongController::OnDataRecvd(bin64_t b) {
- unanswered_ = 0;
- Schedule(NOW); // pong
-}
-
-void PingPongController::OnAckRcvd(bin64_t ackd) {
- if (ackd!=bin64_t::NONE) {
- Schedule(NOW);
- Swap(new SlowStartController(this));
- }
-}
-
-
-KeepAliveController::KeepAliveController (Channel* ch) :
-SendController(ch), delay_(ch->rtt_avg_) {
-}
-
-
-KeepAliveController::KeepAliveController(SendController* prev, tint delay) :
-SendController(prev), delay_(delay) {
- ch_->dev_avg_ = TINT_SEC; // without constant active measurement, rtt is unreliable
- delay_=ch_->rtt_avg_;
-}
-
-bool KeepAliveController::MaySendData() {
- return true;
-}
-
-
-void KeepAliveController::OnDataSent(bin64_t b) {
- if (b==bin64_t::ALL || b==bin64_t::NONE) {
- if (delay_>TINT_SEC*58) // keep NAT mappings alive
- delay_ = TINT_SEC*58;
- if (delay_>=4*TINT_SEC && ch_->last_recv_time_ < NOW-TINT_MIN)
- Schedule(TINT_NEVER); // no response; enter close timeout
- else
- Schedule(NOW+delay_); // all right, just keep it alive
- delay_ = delay_ * 2; // backing off
- } else {
- Schedule(NOW+ch_->rtt_avg_); // cwnd==1 => next send in 1 rtt
- Swap(new SlowStartController(this));
- }
-}
-
-void KeepAliveController::OnDataRecvd(bin64_t b) {
- if (b!=bin64_t::NONE && b!=bin64_t::ALL) { // channel is alive
- delay_ = ch_->rtt_avg_;
- Schedule(NOW); // schedule an ACK; TODO: aggregate
- }
-}
-
-void KeepAliveController::OnAckRcvd(bin64_t ackd) {
- // probably to something sent by CwndControllers before this one got installed
-}
-
-
-CwndController::CwndController(SendController* orig, int cwnd) :
-SendController(orig), cwnd_(cwnd), last_change_(0) {
-}
-
-bool CwndController::MaySendData() {
- tint spacing = ch_->rtt_avg_ / cwnd_;
- dprintf("%s #%i sendctrl may send %i < %f & %s (rtt %lli)\n",tintstr(),
- ch_->id,(int)ch_->data_out_.size(),cwnd_,
- tintstr(ch_->last_send_data_time_+spacing), ch_->rtt_avg_);
- return ch_->data_out_.empty() ||
- (ch_->data_out_.size() < cwnd_ && NOW-ch_->last_send_data_time_ >= spacing);
-}
-
-
-void CwndController::OnDataSent(bin64_t b) {
- if ( (b==bin64_t::ALL || b==bin64_t::NONE) && MaySendData() ) {// no more data (no hints?)
- Schedule(NOW+ch_->rtt_avg_); // soft pause; nothing to send yet
- if (ch_->last_send_data_time_ < NOW-ch_->rtt_avg_)
- Swap(new KeepAliveController(this)); // really, nothing to send
- } else { // FIXME: mandatory rescheduling after send/recv; based on state
- tint spacing = ch_->rtt_avg_ / cwnd_;
- if (ch_->data_out_.size() < cwnd_) { // have cwnd; not the right time yet
- Schedule(ch_->last_send_data_time_+spacing);
- } else { // no free cwnd
- tint timeout = std::max( ch_->rtt_avg_+ch_->dev_avg_*4, 500*TINT_MSEC );
- assert(!ch_->data_out_.empty());
- Schedule(ch_->data_out_.front().time+timeout); // wait for ACK or timeout
- }
- }
-}
-
-
-void CwndController::OnDataRecvd(bin64_t b) {
- if (b!=bin64_t::NONE && b!=bin64_t::ALL) {
- Schedule(NOW); // send ACK; todo: aggregate ACKs
- }
-}
-
-void CwndController::OnAckRcvd(bin64_t ackd) {
- if (ackd==bin64_t::NONE) {
- dprintf("%s #%i sendctrl loss detected\n",tintstr(),ch_->id);
- if (NOW>last_change_+ch_->rtt_avg_) {
- cwnd_ /= 2;
- last_change_ = NOW;
- }
- } else {
- if (cwnd_<1)
- cwnd_ *= 2;
- else
- cwnd_ += 1.0/cwnd_;
- dprintf("%s #%i sendctrl cwnd to %f\n",tintstr(),ch_->id,cwnd_);
- }
- tint spacing = ch_->rtt_avg_ / cwnd_;
- Schedule(ch_->last_send_time_+spacing);
-}
-
-
-void SlowStartController::OnAckRcvd (bin64_t pos) {
- if (pos!=bin64_t::NONE) {
- cwnd_ += 1;
- if (TINT_SEC*cwnd_/ch_->rtt_avg_>=10) {
- Schedule(NOW);
- Swap(new AIMDController(this,cwnd_));
- }
- } else
- cwnd_ /= 2;
-}
-
+++ /dev/null
-/*
- * send_control.h
- * p2tp
- *
- * Created by Victor Grishchenko on 11/4/09.
- * Copyright 2009 Delft University of Technology. All rights reserved.
- *
- */
-// included into p2tp.h
-#ifndef P2TP_SEND_CONTROL
-#define P2TP_SEND_CONTROL
-
-class Channel;
-
-struct SendController {
-
- Channel* ch_;
-
- SendController (Channel* ch) : ch_(ch) {}
-
- SendController(SendController* orig) : ch_(orig->ch_) { }
-
- void Swap (SendController* replacement);
- void Schedule (tint time);
-
- virtual const char* type() const = 0;
-
- virtual bool MaySendData() = 0;
-
- /** A datagram was sent to the peer.
- * @param data the bin number for the data sent; bin64_t::NONE if only
- metadata was sent; bin64_t::ALL if datagram was empty */
- virtual void OnDataSent(bin64_t data) = 0;
-
- /** A datagram was received from the peer.
- @param data follows the same conventions as data in OnDataSent() */
- virtual void OnDataRecvd(bin64_t data) = 0;
-
- /** An acknowledgement on OUR data message was receiveed from the peer.
- @param ackd bin number for the data sent; bin64_t::NONE if no
- acknowledgement was received (timeout event) */
- virtual void OnAckRcvd(bin64_t ackd) = 0;
-
- virtual ~SendController() {}
-};
-
-struct PingPongController : public SendController {
-
- int sent_, unanswered_;
-
- PingPongController (SendController* orig) :
- SendController(orig), sent_(0), unanswered_(0) {}
- PingPongController (Channel* ch) :
- unanswered_(0), sent_(0), SendController(ch) {}
- const char* type() const { return "PingPong"; }
- bool MaySendData();
- void OnDataSent(bin64_t b);
- void OnDataRecvd(bin64_t b);
- void OnAckRcvd(bin64_t ackd) ;
- ~PingPongController() {}
-
-};
-/** Mission of the keepalive controller to keep the channel
- alive as no data sending happens; If no data is transmitted
- in either direction, inter-packet times grow exponentially
- till 58 sec, which refresh period is deemed necessary to keep
- NAT mappings alive. */
-struct KeepAliveController : public SendController {
-
- tint delay_;
-
- KeepAliveController (Channel* ch);
- KeepAliveController(SendController* prev, tint delay=0) ;
- const char* type() const { return "KeepAlive"; }
- bool MaySendData();
- void OnDataSent(bin64_t b) ;
- void OnDataRecvd(bin64_t b) ;
- void OnAckRcvd(bin64_t ackd) ;
-
-};
-
-
-/** Base class for any congestion window based algorithm. */
-struct CwndController : public SendController {
-
- double cwnd_;
- tint last_change_;
-
- CwndController(SendController* orig, int cwnd=1) ;
-
- bool MaySendData() ;
- void OnDataSent(bin64_t b) ;
- void OnDataRecvd(bin64_t b) ;
- void OnAckRcvd(bin64_t ackd) ;
-
-};
-
-
-/** TCP-like exponential "slow" start algorithm. */
-struct SlowStartController : public CwndController {
-
- SlowStartController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {}
- const char* type() const { return "SlowStart"; }
- void OnAckRcvd(bin64_t ackd) ;
-
-};
-
-
-/** The classics: additive increase - multiplicative decrease algorithm.
- A naive version of "standard" TCP congestion control. Potentially useful
- for seedboxes, so needs to be improved. (QUBIC?) */
-struct AIMDController : public CwndController {
-
- AIMDController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {}
- const char* type() const { return "AIMD"; }
- //void OnAckRcvd(bin64_t ackd) ;
-
-};
-
-
-#endif
}
virtual bin64_t Pick (bins& offer, uint64_t max_width, tint expires) {
- while (hint_out_.size() && hint_out_.front().time<NOW-TINT_SEC*3/2) {
+ while (hint_out_.size() && hint_out_.front().time<NOW-TINT_SEC*3/2) { // FIXME sec
ack_hint_out_.copy_range(file().ack_out(), hint_out_.front().bin);
hint_out_.pop_front();
}
Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) :
transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
socket_(socket==-1?sockets[0]:socket), // FIXME
- data_out_cap_(bin64_t::ALL), last_send_data_time_(0), last_recv_data_time_(0),
+ data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0),
own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
- data_in_dbl_(bin64_t::NONE), hint_out_size_(0)
+ data_in_dbl_(bin64_t::NONE), hint_out_size_(0),
+ cwnd_(1), send_interval_(TINT_SEC), send_control_(PING_PONG_CONTROL),
+ sent_since_recv_(0), ack_rcvd_recent_(0), ack_not_rcvd_recent_(0),
+ last_loss_time_(0), owd_min_bin_(0), owd_min_bin_start_(NOW), owd_cur_bin_(0)
{
if (peer_==Address())
peer_ = tracker;
this->id = channels.size();
channels.push_back(this);
- cc_ = new PingPongController(this);
- dprintf("%s #%i init %s\n",tintstr(),id,peer_.str());
- Schedule(NOW); // FIXME ugly
+ transfer_->hs_in_.push_back(id);
+ for(int i=0; i<4; i++) {
+ owd_min_bins_[i] = TINT_NEVER;
+ owd_current_[i] = TINT_NEVER;
+ }
+ Reschedule();
+ dprintf("%s #%u init %s\n",tintstr(),id,peer_.str());
}
Channel::~Channel () {
channels[id] = NULL;
- delete cc_;
}
Channel (FileTransfer* file, int socket=-1, Address peer=Address());
~Channel();
-
- static void RecvDatagram (int socket);
+
+ typedef enum {
+ KEEP_ALIVE_CONTROL,
+ PING_PONG_CONTROL,
+ SLOW_START_CONTROL,
+ AIMD_CONTROL,
+ LEDBAT_CONTROL
+ } send_control_t;
+
+ static Channel*
+ RecvDatagram (int socket);
static void Loop (tint till);
void Recv (Datagram& dgram);
void OnAck (Datagram& dgram);
void OnTs (Datagram& dgram);
- bin64_t OnData (Datagram& dgram);
+ bin64_t OnData (Datagram& dgram);
void OnHint (Datagram& dgram);
void OnHash (Datagram& dgram);
void OnPex (Datagram& dgram);
void OnHandshake (Datagram& dgram);
void AddHandshake (Datagram& dgram);
- bin64_t AddData (Datagram& dgram);
+ bin64_t AddData (Datagram& dgram);
void AddAck (Datagram& dgram);
void AddTs (Datagram& dgram);
void AddHint (Datagram& dgram);
void AddPeakHashes (Datagram& dgram);
void AddPex (Datagram& dgram);
+ void BackOffOnLosses ();
+ tint SwitchSendControl (int control_mode);
+ tint NextSendTime ();
+ tint KeepAliveNextSendTime ();
+ tint PingPongNextSendTime ();
+ tint CwndRateNextSendTime ();
+ tint SlowStartNextSendTime ();
+ tint AimdNextSendTime ();
+ tint LedbatNextSendTime ();
+
+ static int MAX_REORDERING;
+ static tint TIMEOUT;
+ static tint MIN_DEV;
+ static tint MAX_SEND_INTERVAL;
+ static tint LEDBAT_TARGET;
+ static float LEDBAT_GAIN;
+ static tint LEDBAT_DELAY_BIN;
+
const std::string id_string () const;
/** A channel is "established" if had already sent and received packets. */
bool is_established () { return peer_channel_id_ && own_id_mentioned_; }
FileTransfer& transfer() { return *transfer_; }
HashTree& file () { return transfer_->file(); }
const Address& peer() const { return peer_; }
-
+ tint ack_timeout () {
+ return rtt_avg_ + std::max(dev_avg_,MIN_DEV)*4;
+ }
+
static int DecodeID(int scrambled);
static int EncodeID(int unscrambled);
static Channel* channel(int i) {
/** Hints sent (to detect and reschedule ignored hints). */
tbqueue hint_out_;
uint64_t hint_out_size_;
- /** The congestion control strategy. */
- SendController *cc_;
/** Types of messages the peer accepts. */
uint64_t cap_in_;
/** For repeats. */
tint rtt_avg_, dev_avg_, dip_avg_;
tint last_send_time_;
tint last_recv_time_;
- tint last_send_data_time_;
- tint last_recv_data_time_;
+ tint last_data_out_time_;
+ tint last_data_in_time_;
+ tint last_loss_time_;
tint next_send_time_;
tint peer_send_time_;
- static tbheap send_queue;
+ /** Congestion window; TODO: int, bytes. */
+ float cwnd_;
+ /** Data sending interval. */
+ tint send_interval_;
+ /** The congestion control strategy. */
+ int send_control_;
+ /** Datagrams (not data) sent since last recv. */
+ int sent_since_recv_;
+ /** Recent acknowlegements for data previously sent. */
+ int ack_rcvd_recent_;
+ /** Recent non-acknowlegements (losses) of data previously sent. */
+ int ack_not_rcvd_recent_;
+ /** LEDBAT one-way delay machinery */
+ tint owd_min_bins_[4];
+ int owd_min_bin_;
+ tint owd_min_bin_start_;
+ tint owd_current_[4];
+ int owd_cur_bin_;
int PeerBPS() const {
return TINT_SEC / dip_avg_ * 1024;
}
/** Get a request for one packet from the queue of peer's requests. */
- bin64_t DequeueHint();
+ bin64_t DequeueHint();
void CleanDataOut (bin64_t acks_pos=bin64_t::NONE);
void CleanStaleHintOut();
void CleanHintOut(bin64_t pos);
- void Schedule(tint send_time);
+ void Reschedule();
static PeerSelector* peer_selector;
- static int MAX_REORDERING;
- static tint TIMEOUT;
static SOCKET sockets[8];
static int socket_count;
static tint last_tick;
+ static tbheap send_queue;
static Address tracker;
static std::vector<Channel*> channels;
--- /dev/null
+/*
+ * send_control.cpp
+ * p2tp
+ *
+ * Created by Victor Grishchenko on 12/10/09.
+ * Copyright 2009 Delft University of Technology. All rights reserved.
+ *
+ */
+
+#include "p2tp.h"
+
+using namespace p2tp;
+using namespace std;
+
+tint Channel::MIN_DEV = 50*TINT_MSEC;
+tint Channel::MAX_SEND_INTERVAL = TINT_SEC*58;
+tint Channel::LEDBAT_TARGET = TINT_MSEC*25;
+float Channel::LEDBAT_GAIN = 1.0/LEDBAT_TARGET;
+tint Channel::LEDBAT_DELAY_BIN = TINT_SEC*30;
+
+
+tint Channel::NextSendTime () {
+ switch (send_control_) {
+ case KEEP_ALIVE_CONTROL: return KeepAliveNextSendTime();
+ case PING_PONG_CONTROL: return PingPongNextSendTime();
+ case SLOW_START_CONTROL: return SlowStartNextSendTime();
+ case AIMD_CONTROL: return AimdNextSendTime();
+ case LEDBAT_CONTROL: return LedbatNextSendTime();
+ default: assert(false);
+ }
+}
+
+tint Channel::SwitchSendControl (int control_mode) {
+ dprintf("%s #%u sendctrl %i->%i\n",tintstr(),id,send_control_,control_mode);
+ switch (control_mode) {
+ case KEEP_ALIVE_CONTROL:
+ send_interval_ = max(TINT_SEC/10,rtt_avg_);
+ dev_avg_ = max(TINT_SEC,rtt_avg_);
+ cwnd_ = 1;
+ break;
+ case PING_PONG_CONTROL:
+ dev_avg_ = max(TINT_SEC,rtt_avg_);
+ cwnd_ = 1;
+ break;
+ case SLOW_START_CONTROL:
+ break;
+ case AIMD_CONTROL:
+ break;
+ case LEDBAT_CONTROL:
+ break;
+ default:
+ assert(false);
+ }
+ send_control_ = control_mode;
+ return NextSendTime();
+}
+
+// TODO: transitions, consistently
+// TODO: may send data
+tint Channel::KeepAliveNextSendTime () {
+ if (sent_since_recv_>=3 && last_recv_time_<NOW-TINT_MIN)
+ return TINT_NEVER;
+ if (ack_rcvd_recent_)
+ return SwitchSendControl(SLOW_START_CONTROL);
+ send_interval_ <<= 1;
+ if (send_interval_>MAX_SEND_INTERVAL)
+ send_interval_ = MAX_SEND_INTERVAL;
+ return last_send_time_ + send_interval_;
+}
+
+tint Channel::PingPongNextSendTime () {
+ if (last_recv_time_ < last_send_time_-TINT_SEC*3) {
+ // FIXME keepalive <-> pingpong (peers, transition)
+ } // last_data_out_time_ < last_send_time_ - TINT_SEC...
+ if (false)
+ return SwitchSendControl(KEEP_ALIVE_CONTROL);
+ if (ack_rcvd_recent_)
+ return SwitchSendControl(SLOW_START_CONTROL);
+ if (last_recv_time_>last_send_time_)
+ return NOW;
+ else if (last_send_time_)
+ return last_send_time_ + ack_timeout();
+ else
+ return NOW;
+}
+
+tint Channel::CwndRateNextSendTime () {
+ send_interval_ = rtt_avg_/cwnd_;
+ if (data_out_.size()<cwnd_) {
+ return last_data_out_time_ + send_interval_;
+ } else {
+ tint next_timeout = data_out_.front().time + ack_timeout();
+ return last_data_out_time_ + next_timeout;
+ }
+}
+
+void Channel::BackOffOnLosses () {
+ ack_rcvd_recent_ = 0;
+ ack_not_rcvd_recent_ = 0;
+ if (last_loss_time_<NOW-rtt_avg_) {
+ cwnd_ /= 2;
+ last_loss_time_ = NOW;
+ }
+}
+
+tint Channel::SlowStartNextSendTime () {
+ if (ack_not_rcvd_recent_) {
+ BackOffOnLosses();
+ return SwitchSendControl(AIMD_CONTROL);
+ }
+ cwnd_+=ack_rcvd_recent_;
+ ack_rcvd_recent_=0;
+ return CwndRateNextSendTime();
+}
+
+tint Channel::AimdNextSendTime () {
+ if (ack_not_rcvd_recent_)
+ BackOffOnLosses();
+ cwnd_ += ack_rcvd_recent_/cwnd_;
+ ack_rcvd_recent_=0;
+ return CwndRateNextSendTime();
+}
+
+tint Channel::LedbatNextSendTime () {
+ tint owd_cur(TINT_NEVER), owd_min(TINT_NEVER);
+ for(int i=0; i<4; i++) {
+ if (owd_min>owd_min_bins_[i])
+ owd_min = owd_min_bins_[i];
+ if (owd_cur>owd_current_[i])
+ owd_cur = owd_current_[i];
+ }
+ if (ack_not_rcvd_recent_)
+ BackOffOnLosses();
+ ack_rcvd_recent_ = 0;
+ tint queueing_delay = owd_cur - owd_min;
+ tint off_target = LEDBAT_TARGET - queueing_delay;
+ cwnd_ += LEDBAT_GAIN * off_target / cwnd_;
+ return CwndRateNextSendTime();
+}
+
+
+
using namespace p2tp;
-using namespace std; // FIXME remove
+using namespace std;
/*
TODO 25 Oct 18:55
dgram.Push32((uint32_t)peak);
dgram.PushHash(file().peak_hash(i));
//DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
- dprintf("%s #%i +phash %s\n",tintstr(),id,peak.str());
+ dprintf("%s #%u +phash %s\n",tintstr(),id,peak.str());
}
}
dgram.Push32((uint32_t)uncle);
dgram.PushHash( file().hash(uncle) );
//DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
- dprintf("%s #%i +hash %s\n",tintstr(),id,uncle.str());
+ dprintf("%s #%u +hash %s\n",tintstr(),id,uncle.str());
pos = pos.parent();
}
}
-bin64_t Channel::DequeueHint () { // TODO: resilience
+bin64_t Channel::DequeueHint () {
+ if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
+ uint64_t twist = peer_channel_id_; // got no hints, send something randomly
+ twist &= file().peak(0); // may make it semi-seq here
+ file().ack_out().twist(twist);
+ ack_in_.twist(twist);
+ bin64_t my_pick =
+ file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED);
+ while (my_pick.width()>max(1,(int)cwnd_))
+ my_pick = my_pick.left();
+ file().ack_out().twist(0);
+ ack_in_.twist(0);
+ if (my_pick!=bin64_t::NONE) {
+ my_pick = my_pick.twisted(twist);
+ hint_in_.push_back(my_pick);
+ dprintf("%s #%u *hint %s\n",tintstr(),id,my_pick.str());
+ }
+ }
bin64_t send = bin64_t::NONE;
while (!hint_in_.empty() && send==bin64_t::NONE) {
bin64_t hint = hint_in_.front().bin;
tint time = hint_in_.front().time;
hint_in_.pop_front();
- //if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_)
- // continue;
- // Totally flawed:
- // a. May empty the queue when you least expect
- // b. May lose parts of partially ACKd HINTs
- send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
- send = send.left_foot(); // single packet
- if (send!=bin64_t::NONE)
- while (send!=hint) {
- hint = hint.towards(send);
- hint_in_.push_front(hint.sibling());
- }
- }
- if (send==bin64_t::NONE) {
- send = file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED);
- if (send!=bin64_t::NONE) // NEED FIXME: twist here!!!
- send = send.left_foot();
+ while (!hint.is_base()) { // FIXME optimize; possible attack
+ hint_in_.push_front(tintbin(time,hint.right()));
+ hint = hint.left();
+ }
+ //if (time < NOW-TINT_SEC*3/2 )
+ // continue; bad idea
+ if (ack_in_.get(hint)!=bins::FILLED)
+ send = hint;
}
uint64_t mass = 0;
for(int i=0; i<hint_in_.size(); i++)
mass += hint_in_[i].bin.width();
- dprintf("%s #%i dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
+ dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
return send;
}
dgram.Push8(P2TP_HASH);
dgram.Push32(bin64_t::ALL32);
dgram.PushHash(file().root_hash());
- dprintf("%s #%i +hash ALL %s\n",
+ dprintf("%s #%u +hash ALL %s\n",
tintstr(),id,file().root_hash().hex().c_str());
}
dgram.Push8(P2TP_HANDSHAKE);
int encoded = EncodeID(id);
dgram.Push32(encoded);
- dprintf("%s #%i +hs %i\n",tintstr(),id,encoded);
+ dprintf("%s #%u +hs %i\n",tintstr(),id,encoded);
ack_out_.clear();
AddAck(dgram);
}
AddHandshake(dgram);
AddAck(dgram);
}
- dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str());
- if (dgram.size()==4) // only the channel id; bare keep-alive
+ dprintf("%s #%u sent %ib %s:%u\n",
+ tintstr(),id,dgram.size(),peer().str(),peer_channel_id_);
+ if (dgram.size()==4) {// only the channel id; bare keep-alive
data = bin64_t::ALL;
- cc_->OnDataSent(data);
+ if (send_control_!=KEEP_ALIVE_CONTROL) // we did our best
+ SwitchSendControl(KEEP_ALIVE_CONTROL);
+ if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
+ return; // don't send empty dgram
+ }
if (dgram.Send()==-1)
print_error("can't send datagram");
+ last_send_time_ = NOW;
+ sent_since_recv_++;
}
void Channel::AddHint (Datagram& dgram) {
- tint timed_out = NOW - TINT_SEC*3/2;
+ tint plan_for = max(TINT_SEC,rtt_avg_*4);
+
+ tint timed_out = NOW - plan_for*2;
while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
hint_out_size_ -= hint_out_.front().bin.width();
hint_out_.pop_front();
}
- int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
+ /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
if (!peer_cwnd)
- peer_cwnd = 1;
- int peer_pps = TINT_SEC / dip_avg_; // data packets per sec
- if (!peer_pps)
- peer_pps = 1;
+ peer_cwnd = 1;*/
+ int plan_pck = std::max ( 1LL, plan_for / dip_avg_ );
- if ( hint_out_size_ < peer_pps ) { //4*peer_cwnd ) {
+ if ( hint_out_size_ < plan_pck ) {
- int diff = peer_pps - hint_out_size_;
- //if (diff>4 && diff>2*peer_cwnd)
- // diff >>= 1;
- bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
+ int diff = plan_pck - hint_out_size_; // TODO: aggregate
+ bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
if (hint!=bin64_t::NONE) {
dgram.Push8(P2TP_HINT);
dgram.Push32(hint);
- dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
+ dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
hint_out_.push_back(hint);
hint_out_size_ += hint.width();
} else
- dprintf("%s #%i .hint\n",tintstr(),id);
+ dprintf("%s #%u Xhint\n",tintstr(),id);
}
}
return bin64_t::NONE;
bin64_t tosend = bin64_t::NONE;
- if (cc_->MaySendData()) {
+ if (data_out_.size()<cwnd_ && last_data_out_time_<=NOW-send_interval_) {
tosend = DequeueHint();
if (tosend==bin64_t::NONE)
- dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
+ dprintf("%s #%u out of hints #sendctrl\n",tintstr(),id);
} else
- dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id);
+ dprintf("%s #%u no cwnd #sendctrl\n",tintstr(),id);
- if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty()))
- return bin64_t::NONE; // once in a while, empty data is sent just to check rtt
+ if (tosend==bin64_t::NONE && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
+ return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXME
if (tosend!=bin64_t::NONE) { // hashes
if (ack_in_.is_empty() && file().size())
dgram.Push(buf,r);
}
- last_send_data_time_ = NOW;
+ last_data_out_time_ = NOW;
data_out_.push_back(tosend);
- dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str());
+ dprintf("%s #%u +data %s\n",tintstr(),id,tosend.str());
return tosend;
}
void Channel::AddTs (Datagram& dgram) {
dgram.Push8(P2TP_TS);
dgram.Push64(data_in_.time);
- dprintf("%s #%i +ts %s\n",tintstr(),id,tintstr(data_in_.time));
+ dprintf("%s #%u +ts %s\n",tintstr(),id,tintstr(data_in_.time));
}
dgram.Push32(pos);
//dgram.Push64(data_in_.time);
ack_out_.set(pos);
- dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
+ dprintf("%s #%u +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
data_in_ = tintbin(0,bin64_t::NONE);
if (pos.layer()>2)
data_in_dbl_ = pos;
ack_out_.set(ack);
dgram.Push8(P2TP_ACK);
dgram.Push32(ack);
- dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str());
+ dprintf("%s #%u +ack %s\n",tintstr(),id,ack.str());
}
}
void Channel::Recv (Datagram& dgram) {
- dprintf("%s #%i recvd %i\n",tintstr(),id,dgram.size()+4);
+ dprintf("%s #%u recvd %i\n",tintstr(),id,dgram.size()+4);
+ peer_send_time_ = 0; // has scope of 1 datagram
if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
rtt_avg_ = NOW - last_send_time_;
dev_avg_ = rtt_avg_;
dip_avg_ = rtt_avg_;
- transfer().hs_in_.push_back(id);
- dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
+ dprintf("%s #%u rtt init %lli\n",tintstr(),id,rtt_avg_);
}
bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
while (dgram.size()) {
uint8_t type = dgram.Pull8();
switch (type) {
case P2TP_HANDSHAKE: OnHandshake(dgram); break;
- case P2TP_DATA: data=OnData(dgram); break;
- case P2TP_TS: OnTs(dgram); break;
- case P2TP_ACK: OnAck(dgram); break;
- case P2TP_HASH: OnHash(dgram); break;
- case P2TP_HINT: OnHint(dgram); break;
- case P2TP_PEX_ADD: OnPex(dgram); break;
+ case P2TP_DATA: data=OnData(dgram); break;
+ case P2TP_TS: OnTs(dgram); break;
+ case P2TP_ACK: OnAck(dgram); break;
+ case P2TP_HASH: OnHash(dgram); break;
+ case P2TP_HINT: OnHint(dgram); break;
+ case P2TP_PEX_ADD: OnPex(dgram); break;
default:
- eprintf("%s #%i ?msg id unknown %i\n",tintstr(),id,(int)type);
+ eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id,(int)type);
return;
}
}
- cc_->OnDataRecvd(data);
last_recv_time_ = NOW;
+ sent_since_recv_ = 0;
}
Sha1Hash hash = dgram.PullHash();
file().OfferHash(pos,hash);
//DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
- dprintf("%s #%i -hash %s\n",tintstr(),id,pos.str());
+ dprintf("%s #%u -hash %s\n",tintstr(),id,pos.str());
}
uint8_t *data;
int length = dgram.Pull(&data,1024);
bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
- dprintf("%s #%i %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
+ dprintf("%s #%u %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
if (!ok)
return bin64_t::NONE;
data_in_ = tintbin(NOW,pos);
if (pos!=bin64_t::NONE) {
- if (last_recv_data_time_) {
- tint dip = NOW - last_recv_data_time_;
+ if (last_data_in_time_) {
+ tint dip = NOW - last_data_in_time_;
dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
}
- last_recv_data_time_ = NOW;
+ last_data_in_time_ = NOW;
}
CleanHintOut(pos);
return pos;
}
-void Channel::CleanDataOut (bin64_t ackd_pos) {
+void Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
int max_ack_off = 0;
+ //FIXME do LEDBAT magic somewhere here
if (ackd_pos!=bin64_t::NONE) {
for (int i=0; i<8 && i<data_out_.size(); i++) {
tint rtt = NOW-data_out_[i].time;
rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
- dprintf("%s #%i rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_);
+ dprintf("%s #%u rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_);
bin64_t pos = data_out_[i].bin;
- cc_->OnAckRcvd(pos);
+ ack_rcvd_recent_++;
data_out_[i]=tintbin();
max_ack_off = i;
if (ackd_pos==pos)
max_ack_off--;
}
while (max_ack_off>MAX_REORDERING) {
- cc_->OnAckRcvd(bin64_t::NONE);
- dprintf("%s #%i Rdata %s\n",tintstr(),id,data_out_.front().bin.str());
+ ack_not_rcvd_recent_++;
+ dprintf("%s #%u Rdata %s\n",tintstr(),id,data_out_.front().bin.str());
data_out_.pop_front();
max_ack_off--;
data_out_cap_ = bin64_t::ALL;
tint timeout = NOW - rtt_avg_ - 4*std::max(dev_avg_,TINT_MSEC*50);
while (!data_out_.empty() && data_out_.front().time<timeout) {
if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
- cc_->OnAckRcvd(bin64_t::NONE);
+ ack_not_rcvd_recent_++;
data_out_cap_ = bin64_t::ALL;
- dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
+ dprintf("%s #%u Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
}
data_out_.pop_front();
}
eprintf("invalid ack: %s\n",ackd_pos.str());
return;
}
- dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str());
+ dprintf("%s #%u -ack %s\n",tintstr(),id,ackd_pos.str());
ack_in_.set(ackd_pos);
CleanDataOut(ackd_pos);
}
void Channel::OnTs (Datagram& dgram) {
peer_send_time_ = dgram.Pull64();
- dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
+ dprintf("%s #%u -ts %lli\n",tintstr(),id,peer_send_time_);
}
hint_in_.push_back(hint);
//ack_in_.set(hint,bins::EMPTY);
//RequeueSend(cc_->OnHintRecvd(hint));
- dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str());
+ dprintf("%s #%u -hint %s\n",tintstr(),id,hint.str());
}
void Channel::OnHandshake (Datagram& dgram) {
peer_channel_id_ = dgram.Pull32();
- dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
+ dprintf("%s #%u -hs %i\n",tintstr(),id,peer_channel_id_);
// FUTURE: channel forking
}
uint32_t ipv4 = dgram.Pull32();
uint16_t port = dgram.Pull16();
Address addr(ipv4,port);
- dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str());
+ dprintf("%s #%u -pex %s\n",tintstr(),id,addr.str());
transfer().OnPexIn(addr);
}
dgram.Push8(P2TP_PEX_ADD);
dgram.Push32(a.ipv4());
dgram.Push16(a.port());
- dprintf("%s #%i +pex %s\n",tintstr(),id,a.str());
+ dprintf("%s #%u +pex %s\n",tintstr(),id,a.str());
}
-void Channel::RecvDatagram (int socket) {
+Channel* Channel::RecvDatagram (int socket) {
Datagram data(socket);
data.Recv();
Address& addr = data.addr;
-#define return_log(...) { eprintf(__VA_ARGS__); return; }
+#define return_log(...) { eprintf(__VA_ARGS__); return NULL; }
if (data.size()<4)
return_log("datagram shorter than 4 bytes %s\n",addr.str());
uint32_t mych = data.Pull32();
dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
if (channels[*i] && channels[*i]->peer_==data.addr &&
- channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
+ channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
return_log("have a channel already to %s\n",addr.str());
channel = new Channel(file, socket, data.address());
} else {
mych = DecodeID(mych);
if (mych>=channels.size())
- return_log("invalid channel #%i, %s\n",mych,addr.str());
+ return_log("invalid channel #%u, %s\n",mych,addr.str());
channel = channels[mych];
if (!channel)
- return_log ("channel #%i is already closed\n",mych,addr.str());
+ return_log ("channel #%u is already closed\n",mych,addr.str());
if (channel->peer() != addr)
- return_log ("invalid peer address #%i %s!=%s\n",mych,channel->peer().str(),addr.str());
+ return_log ("invalid peer address #%u %s!=%s\n",mych,channel->peer().str(),addr.str());
channel->own_id_mentioned_ = true;
}
//dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
channel->Recv(data);
+ return channel;
}
tint send_time(TINT_NEVER);
Channel* sender(NULL);
- while (!send_queue.is_empty()) {
+ while (!sender && !send_queue.is_empty()) { // dequeue
send_time = send_queue.peek().time;
sender = channel((int)send_queue.peek().bin);
- if (sender)
- if ( sender->next_send_time_==send_time ||
- sender->next_send_time_==TINT_NEVER )
- break;
- sender = NULL; // it was a stale entry
send_queue.pop();
+ if (sender && sender->next_send_time_!=send_time &&
+ sender->next_send_time_!=TINT_NEVER )
+ sender = NULL; // it was a stale entry
}
- if (send_time>limit)
- send_time = limit;
- if ( sender && sender->next_send_time_ <= NOW ) {
- dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
- tintstr(send_time));
- sender->Send();
- sender->last_send_time_ = NOW;
- // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl
- send_queue.pop();
- } else if ( send_time > NOW ) {
- tint towait = send_time - NOW;
+
+ if ( sender && send_time <= NOW ) { // it's time
+
+ if (sender->next_send_time_<NOW+TINT_MIN) { // either send
+ dprintf("%s #%u sch_send %s\n",tintstr(),sender->id,
+ tintstr(send_time));
+ sender->Send();
+ sender->Reschedule();
+ } else { // or close the channel
+ dprintf("%s #%u closed sendctrl\n",tintstr(),sender->id);
+ delete sender;
+ }
+
+ } else { // it's too early, wait
+
+ tint towait = min(limit,send_time) - NOW;
dprintf("%s waiting %lliusec\n",tintstr(),towait);
int rd = Datagram::Wait(socket_count,sockets,towait);
- if (rd!=INVALID_SOCKET)
- RecvDatagram(rd);
- } else { // FIXME FIXME FIXME REWRITE!!! if (sender->next_send_time_==TINT_NEVER) {
- if (sender) {
- dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
- delete sender;
+ if (rd!=INVALID_SOCKET) { // in meantime, received something
+ Channel* receiver = RecvDatagram(rd);
+ if (receiver) // receiver's state may have changed
+ receiver->Reschedule();
}
- send_queue.pop();
+ if (sender) // get back to that later
+ send_queue.push(tintbin(send_time,sender->id));
+
}
} while (Datagram::Time()<limit);
-
+
}
-void Channel::Schedule (tint next_time) {
- next_send_time_ = next_time;
- if (next_time==TINT_NEVER)
- next_time = NOW + TINT_MIN; // 1min timeout
- send_queue.push(tintbin(next_time,id));
- dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
+void Channel::Reschedule () {
+ next_send_time_ = NextSendTime();
+ if (next_send_time_!=TINT_NEVER)
+ send_queue.push(tintbin(next_send_time_,id));
+ else
+ send_queue.push(tintbin(NOW+TINT_MIN,id));
+ dprintf("%s requeue #%u for %s\n",tintstr(),id,tintstr(next_send_time_));
}
EXPECT_FALSE(bin64_t(0,1).within(bin64_t::NONE));
EXPECT_TRUE(bin64_t(0,1).within(bin64_t::ALL));
+ EXPECT_EQ(0,bin64_t::none().width());
+ EXPECT_EQ(bin64_t::none(),bin64_t::none().twisted(123));
/*EXPECT_EQ(bin64_t::NONE.parent(),bin64_t::NONE);
EXPECT_EQ(bin64_t::NONE.left(),bin64_t::NONE);
EXPECT_EQ(bin64_t::NONE.right(),bin64_t::NONE);
}
-int FileTransfer::RevealChannel (int& pex_out_) {
+int FileTransfer::RevealChannel (int& pex_out_) { // FIXME brainfuck
pex_out_ -= hs_in_offset_;
if (pex_out_<0)
pex_out_ = 0;
while (pex_out_<hs_in_.size()) {
Channel* c = Channel::channels[hs_in_[pex_out_]];
if (c && c->transfer().fd()==this->fd()) {
- pex_out_ += hs_in_offset_ + 1;
- return c->id;
+ if (c->own_id_mentioned_) {
+ pex_out_ += hs_in_offset_ + 1;
+ return c->id;
+ } else
+ pex_out_++;
} else {
hs_in_[pex_out_] = hs_in_[0];
hs_in_.pop_front();