target = 'p2tp'
source = [ 'bin64.cpp','hashtree.cpp','datagram.cpp','bins.cpp',
- 'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp',
+ 'transfer.cpp', 'p2tp.cpp', 'sendrecv.cpp', 'ext/send_control.cpp',
'compat/hirestimeofday.cpp', 'compat/util.cpp']
env = Environment()
Datagram::bytes_up=0, Datagram::bytes_down=0;
char* Datagram::TimeStr (tint time) {
- static char ret_str[128];
+ static char ret_str[32][4]; // wow
+ static int i;
+ i = (i+1) & 3;
if (time==0)
time = now;
time -= epoch;
int msecs = time/TINT_MSEC;
time %= TINT_MSEC;
int usecs = time/TINT_uSEC;
- sprintf(ret_str,"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
- return ret_str;
+ sprintf(ret_str[i],"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
+ return ret_str[i];
}
int Datagram::Send () {
* p2tp
*
* Created by Victor Grishchenko on 11/3/09.
- * Copyright 2009 Delft Technical University. All rights reserved.
+ * Copyright 2009 Delft University of Technology. All rights reserved.
*
*/
#include "p2tp.h"
+++ /dev/null
-/*
- * dummy_controller.cpp
- * p2tp
- *
- * Created by Victor Grishchenko on 10/16/09.
- * Copyright 2009 Delft Technical University. All rights reserved.
- *
- */
-#include "p2tp.h"
-
-using namespace p2tp;
-
-
-/** Congestion window evolution: always 1. */
-struct BasicController : public CongestionController {
-
- tbqueue data_out_;
- tint dev_avg, rtt_avg, diat_avg;
- tint last_send_time, last_recv_time, last_cwnd_mark;
- int cwnd, peer_cwnd, cwnd_rcvd;
- bin64_t last_recv_bin;
-
- BasicController (int chann_id) : CongestionController(chann_id),
- dev_avg(0), rtt_avg(TINT_SEC),
- last_send_time(0), last_recv_time(0), last_cwnd_mark(0),
- cwnd(1), peer_cwnd(1), cwnd_rcvd(0), diat_avg(TINT_SEC),
- last_recv_bin(bin64_t::NONE)
- { }
-
- tint RoundTripTime() {
- return rtt_avg;
- }
-
- tint RoundTripTimeoutTime() {
- return rtt_avg + dev_avg * 8 + TINT_MSEC;
- }
-
- int in_flight() const {return data_out_.size();}
-
- int PeerBPS() {
- //return (peer_cwnd<<10) * TINT_SEC / rtt_avg;
- return TINT_SEC * 1024 / diat_avg;
- }
-
- float PeerCWindow() {
- return peer_cwnd;
- }
-
- /** It is provided with an argument when sending time is not clear from the context. */
- tint get_send_time () {
- tint time = TINT_NEVER;
- if (cwnd) {
- // cwnd allows => schedule transmit
- // otherwise => schedule timeout; may schedule transmit later
- if (free_cwnd())
- time = last_send_time + RoundTripTime()/cwnd; // next send
- else
- time = last_send_time + RoundTripTimeoutTime(); // timeout
- } else {
- time = last_send_time + TINT_SEC*58;
- }
- return time;
- }
-
- int free_cwnd () {
- tint timeout = Datagram::now - RoundTripTimeoutTime();
- if (!data_out_.empty() && data_out_.front().time<=timeout) {
- data_out_.clear();
- cwnd >>= 1;
- if (!cwnd)
- cwnd = 1;
- dprintf("%s #%i loss cwnd:=%i\n",Datagram::TimeStr(),channel_id,cwnd);
- }
- return cwnd - data_out_.size();
- }
-
- tint OnDataSent(bin64_t b) {
- last_send_time = Datagram::now;
- if (b==bin64_t::ALL) { // nothing to send, absolutely
- data_out_.clear();
- cwnd >>= 1;
- if (!cwnd)
- cwnd = 1;
- } else if (b==bin64_t::NONE) { // sent some metadata
- cwnd = 1; // no more data => no need for cwnd
- data_out_.clear();
- data_out_.push_back(b);
- } else {
- data_out_.push_back(b);
- }
- dprintf("%s #%i cwnd %i infl %i peer_cwnd %i //%lli\n",
- Datagram::TimeStr(),channel_id,cwnd,in_flight(),peer_cwnd,
- (uint64_t)b);
- return get_send_time();
- }
-
-
- tint OnDataRecvd(bin64_t b) {
- if (last_recv_bin!=bin64_t::ALL && last_recv_bin!=bin64_t::NONE) {
- tint diat = Datagram::now - last_recv_time;
- diat_avg = ( diat_avg*3 + diat ) >> 2;
- }
- last_recv_bin = b;
- last_recv_time = Datagram::now;
- if (rtt_avg==TINT_SEC && last_send_time) {
- rtt_avg = Datagram::now - last_send_time;
- dev_avg = rtt_avg;
- }
- if (data_out_.size() && data_out_.front().bin==bin64_t::NONE)
- data_out_.pop_front();
- if (b==bin64_t::NONE) { // pong
- peer_cwnd = 1;
- if (!cwnd)
- cwnd = 1;
- return Datagram::now;
- } else if (b==bin64_t::ALL) { // the peer has nothing to send
- //peer_cwnd = 0;
- peer_cwnd = 1;
- return get_send_time();
- } else {
- //if (!peer_cwnd)
- // peer_cwnd = 1;
- cwnd_rcvd++;
- if (last_cwnd_mark+rtt_avg<Datagram::now) {
- last_cwnd_mark = Datagram::now;
- peer_cwnd = cwnd_rcvd;
- cwnd_rcvd = 0;
- dprintf("%s #%i peer_cwnd %i\n",
- Datagram::TimeStr(),channel_id,peer_cwnd);
- }
- return Datagram::now; // at least, send an ACK
- }
- }
-
- tint OnAckRcvd(bin64_t ackd, tint when) { // FIXME::: NO ACKS => NO RTT
- tbqueue tmp;
- for (int i=0; data_out_.size() && i<6; i++) {
- tintbin x = data_out_.front();
- data_out_.pop_front();
- if (x.bin.within(ackd)) {
- // van Jacobson's rtt
- tint rtt = Datagram::now-x.time;
- rtt_avg = (rtt_avg*3 + rtt) >> 2;
- dev_avg = ( dev_avg*3 + abs(rtt-rtt_avg) ) >> 2;
- dprintf("%s #%i rtt %lli dev %lli\n",
- Datagram::TimeStr(),channel_id,rtt_avg,dev_avg);
- // insert AIMD (2) here
- cwnd++;
- break;
- } else {
- tmp.push_back(x);
- }
- }
- while (tmp.size()) {
- data_out_.push_front(tmp.back());
- tmp.pop_back();
- }
-
- return get_send_time();
- }
-
- ~BasicController() {
- }
-
-};
-
-
-/*
-
- /** A packet was sent; in case it had data, b is the bin. *
-void OnDataSent(bin64_t b) {
- if (b==bin64_t::NONE) {
- if (free_cwnd()>0) { // nothing to send; suspend
- cwnd = 0;
- in_flight = 0;
- set_send_time(last_send_time+KEEPALIVE);
- } else if (cwnd==0) { // suspended; keepalives only
- set_send_time(last_send_time+KEEPALIVE);
- } else { // probably, packet loss => stall
- tint timeout = last_send_time + rtt_avg + (dev_avg<<2);
- if (timeout<=Datagram::now) { // loss
- if (timeout+2*rtt_avg>Datagram::now) {
- in_flight = 0;
- set_send_time(Datagram::now);
- } else { // too bad
- set_send_time(TINT_NEVER);
- }
- } else
- set_send_time(timeout);
- }
- } else { // HANDSHAKE goes here with b==ALL
- in_flight++;
- set_send_time(Datagram::now + rtt_avg/cwnd);
- }
- last_bin_sent = b;
- last_send_time = Datagram::now;
-}
-
-void OnDataRecvd(bin64_t b) {
- last_recv_time = Datagram::now;
- set_send_time(Datagram::now); // to send a reply if needed
-}
-
-void OnAckRcvd(const tintbin& ack) {
- last_recv_time = Datagram::now;
- if (ack==bin64_t::NONE || last_bin_sent!=ack.bin)
- return;
- last_bin_sent = bin64_t::NONE;
- in_flight--;
- tint nst = last_send_time + ( cwnd ? rtt_avg/cwnd : KEEPALIVE );
- if (nst<Datagram::now)
- nst = Datagram::now; // in case we were waiting for free cwnd space
- set_send_time(nst);
-} // TODO: dont distinguish last send time and last data sent time
-// SOLUTION: once free_cwnd==0 => don't invoke OnDataSent
-// TODO: once it's time, but free_cwnd=0 => need to set timeout
-
- */
--- /dev/null
+/*
+ * send_control.cpp
+ * p2tp
+ *
+ * Created by Victor Grishchenko on 11/4/09.
+ * Copyright 2009 Delft University of Technology. All rights reserved.
+ *
+ */
+#include "p2tp.h"
+
+
+using namespace p2tp;
+
+
+void SendController::Swap (SendController* newctrl) {
+ dprintf("%s #%i sendctrl %s->%s\n",Datagram::TimeStr(),ch_->id,type(),newctrl->type());
+ assert(this==ch_->cc_);
+ ch_->cc_ = newctrl;
+ delete this;
+}
+
+
+bool PingPongController::MaySendData(){
+ return ch_->data_out_.empty();
+}
+
+tint PingPongController::NextSendTime () {
+ return ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_*4; // remind on timeout
+}
+
+void PingPongController::OnDataSent(bin64_t b) {
+ if ( (ch_->last_recv_time_ && ch_->last_recv_time_<Datagram::now-TINT_SEC*3) || //no reply
+ (b==bin64_t::ALL && MaySendData()) ) // nothing to send
+ Swap(new KeepAliveController(this));
+}
+
+void PingPongController::OnDataRecvd(bin64_t b) {
+}
+
+void PingPongController::OnAckRcvd(bin64_t ackd) {
+ if (ch_->data_out_.empty())
+ Swap(new SlowStartController(this));
+}
+
+
+
+bool KeepAliveController::MaySendData() {
+ return true;
+}
+
+tint KeepAliveController::NextSendTime () {
+ return ch_->last_send_time_ + TINT_SEC*58;
+}
+
+void KeepAliveController::OnDataSent(bin64_t b) {
+ if (b!=bin64_t::ALL)
+ Swap(new PingPongController(this));
+}
+
+void KeepAliveController::OnDataRecvd(bin64_t b) {
+}
+
+void KeepAliveController::OnAckRcvd(bin64_t ackd) {
+}
+
+
+
+bool CwndController::MaySendData() {
+ return ch_->data_out_.size() < cwnd_ && Datagram::now >= NextSendTime();
+}
+
+tint CwndController::NextSendTime () {
+ if (ch_->data_out_.size() < cwnd_)
+ return ch_->last_send_time_ + ch_->rtt_avg_ / cwnd_;
+ else
+ return ch_->last_send_time_ + ch_->rtt_avg_ + ch_->dev_avg_ * 4 ;
+}
+
+void CwndController::OnDataSent(bin64_t b) {
+ if (b==bin64_t::ALL || b==bin64_t::NONE) {
+ if (MaySendData())
+ Swap(new PingPongController(this));
+ }
+}
+
+void CwndController::OnDataRecvd(bin64_t b) {
+}
+
+void CwndController::OnAckRcvd(bin64_t ackd) {
+ if (ackd==bin64_t::NONE) {
+ cwnd_ /= 2;
+ } else {
+ if (cwnd_<1)
+ cwnd_ *= 2;
+ else
+ cwnd_ += 1/cwnd_;
+ }
+}
+
+
+void SlowStartController::OnAckRcvd (bin64_t pos) {
+ if (pos!=bin64_t::NONE)
+ cwnd_ += 1;
+ else
+ cwnd_ /= 2;
+}
+
\ No newline at end of file
--- /dev/null
+/*
+ * send_control.h
+ * p2tp
+ *
+ * Created by Victor Grishchenko on 11/4/09.
+ * Copyright 2009 Delft University of Technology. All rights reserved.
+ *
+ */
+// included into p2tp.h
+#ifndef P2TP_SEND_CONTROL
+#define P2TP_SEND_CONTROL
+
+class Channel;
+
+struct SendController {
+
+ Channel* ch_;
+
+ SendController (Channel* ch) : ch_(ch) {}
+
+ SendController(SendController* orig) : ch_(orig->ch_) { }
+
+ void Swap (SendController* replacement);
+
+ virtual const char* type() const = 0;
+
+ virtual bool MaySendData() = 0;
+ virtual tint NextSendTime () = 0;
+
+ /** A datagram was sent to the peer.
+ * @param data the bin number for the data sent; bin64_t::NONE if only
+ metadata was sent; bin64_t::ALL if datagram was empty */
+ virtual void OnDataSent(bin64_t data) = 0;
+
+ /** A datagram was received from the peer.
+ @param data follows the same conventions as data in OnDataSent() */
+ virtual void OnDataRecvd(bin64_t data) = 0;
+
+ /** An acknowledgement on OUR data message was receiveed from the peer.
+ @param ackd bin number for the data sent; bin64_t::NONE if no
+ acknowledgement was received (timeout event) */
+ virtual void OnAckRcvd(bin64_t ackd) = 0;
+
+ virtual ~SendController() {}
+};
+
+
+struct PingPongController : public SendController {
+
+ int fails_;
+
+ PingPongController (SendController* orig) : SendController(orig), fails_(0) {}
+ PingPongController (Channel* ch) : fails_(0), SendController(ch) {}
+ const char* type() const { return "PingPong"; }
+ bool MaySendData();
+ tint NextSendTime ();
+ void OnDataSent(bin64_t b);
+ void OnDataRecvd(bin64_t b);
+ void OnAckRcvd(bin64_t ackd) ;
+ ~PingPongController() {}
+
+};
+
+
+struct KeepAliveController : public SendController {
+
+ KeepAliveController(SendController* prev) : SendController(prev){}
+ const char* type() const { return "KeepAlive"; }
+ bool MaySendData();
+ tint NextSendTime () ;
+ void OnDataSent(bin64_t b) ;
+ void OnDataRecvd(bin64_t b) ;
+ void OnAckRcvd(bin64_t ackd) ;
+
+};
+
+
+struct CwndController : public SendController {
+
+ float cwnd_;
+
+ CwndController(SendController* orig, int cwnd=1) :
+ SendController(orig), cwnd_(cwnd) { }
+
+ bool MaySendData() ;
+ tint NextSendTime () ;
+ void OnDataSent(bin64_t b) ;
+ void OnDataRecvd(bin64_t b) ;
+ void OnAckRcvd(bin64_t ackd) ;
+
+};
+
+
+struct SlowStartController : public CwndController {
+
+ SlowStartController(SendController* orig, int cwnd=1) : CwndController(orig,cwnd) {}
+ const char* type() const { return "SlowStart"; }
+ void OnAckRcvd(bin64_t ackd) ;
+
+};
+
+#endif
\ No newline at end of file
int Channel::socket_count = 0;
Address Channel::tracker;
tbqueue Channel::send_queue;
-#include "ext/dummy_controller.cpp"
#include "ext/simple_selector.cpp"
PeerSelector* Channel::peer_selector = new SimpleSelector();
Channel::Channel (FileTransfer* file, int socket, Address peer_addr) :
file_(file), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
socket_(socket==-1?sockets[0]:socket), // FIXME
- own_id_mentioned_(false), next_send_time_(0)
+ own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
+ last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC)
{
if (peer_==Address())
peer_ = tracker;
this->id = channels.size();
channels.push_back(this);
- cc_ = new BasicController(id);
+ cc_ = new PingPongController(this);
RequeueSend(Datagram::now);
}
P2TP_HANDSHAKE = 0,
P2TP_DATA = 1,
P2TP_ACK = 2,
- P2TP_ACK_TS = 8,
+ P2TP_TS = 8,
P2TP_HINT = 3,
P2TP_HASH = 4,
P2TP_PEX_ADD = 5,
P2TP_PEX_RM = 6,
P2TP_MESSAGE_COUNT = 7
} messageid_t;
-
+
class PiecePicker;
class CongestionController;
class PeerSelector;
friend void Close (int fd) ;
};
- struct CongestionController {
- int channel_id;
- CongestionController (int chann_id) : channel_id(chann_id) {}
- virtual int free_cwnd() = 0;
- virtual tint RoundTripTime() = 0;
- virtual tint RoundTripTimeoutTime() = 0;
- virtual int PeerBPS() = 0;
- virtual float PeerCWindow() = 0;
- virtual tint OnDataSent(bin64_t b) = 0;
- virtual tint OnDataRecvd(bin64_t b) = 0;
- virtual tint OnAckRcvd(bin64_t ackd, tint peer_time=0) = 0;
- //virtual tint OnHintRecvd (bin64_t hint) = 0;
- virtual ~CongestionController() {}
- };
+
+#include "ext/send_control.h"
+
class PiecePicker {
public:
virtual void Received (bin64_t b) = 0;
};
+
class PeerSelector {
public:
virtual void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) = 0;
virtual Datagram::Address GetPeer (const Sha1Hash& for_root) = 0;
};
+
class DataStorer {
public:
DataStorer (const Sha1Hash& id, size_t size);
lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
(There was a seductive idea to remove channels, just put the root
hash or a fragment of it into every datagram.) */
- class Channel {
- public:
+ struct Channel { // normally, API users do not deal with the structure
+
Channel (FileTransfer* file, int socket=-1, Address peer=Address());
~Channel();
void Send ();
void OnAck (Datagram& dgram);
- void OnAckTs (Datagram& dgram);
+ void OnTs (Datagram& dgram);
bin64_t OnData (Datagram& dgram);
void OnHint (Datagram& dgram);
void OnHash (Datagram& dgram);
void AddHandshake (Datagram& dgram);
bin64_t AddData (Datagram& dgram);
void AddAck (Datagram& dgram);
+ void AddTs (Datagram& dgram);
void AddHint (Datagram& dgram);
void AddUncleHashes (Datagram& dgram, bin64_t pos);
void AddPeakHashes (Datagram& dgram);
bool is_established () { return peer_channel_id_ && own_id_mentioned_; }
FileTransfer& file() { return *file_; }
const Address& peer() const { return peer_; }
-
+
static int DecodeID(int scrambled);
static int EncodeID(int unscrambled);
static Channel* channel(int i) {
return i<channels.size()?channels[i]:NULL;
}
- private:
/** Channel id: index in the channel array. */
uint32_t id;
bins ack_in_;
/** Last data received; needs to be acked immediately. */
tintbin data_in_;
+ /** The history of data sent and still unacknowledged. */
+ tbqueue data_out_;
/** Index in the history array. */
bins ack_out_;
/** Transmit schedule: in most cases filled with the peer's hints */
/** Hints sent (to detect and reschedule ignored hints). */
tbqueue hint_out_;
/** The congestion control strategy. */
- CongestionController *cc_;
+ SendController *cc_;
/** Types of messages the peer accepts. */
uint64_t cap_in_;
/** For repeats. */
//tint last_send_time, last_recv_time;
/** PEX progress */
int pex_out_;
-
+ /** Smoothed averages for RTT, RTT deviation and data interarrival periods. */
+ tint rtt_avg_, dev_avg_, dip_avg_;
+ tint last_send_time_;
+ tint last_recv_time_;
tint next_send_time_;
+ tint peer_send_time_;
static tbqueue send_queue;
- void RequeueSend (tint next_time);
- /** Get a rewuest for one packet from the queue of peer's requests. */
+ void RequeueSend (tint next_time);
+ int PeerBPS() const {
+ return TINT_SEC / dip_avg_ * 1024;
+ }
+ /** Get a request for one packet from the queue of peer's requests. */
bin64_t DequeueHint();
+ void ClearStaleDataOut ();
//void CleanStaleHints();
static PeerSelector* peer_selector;
friend int Open (const char*, const Sha1Hash&) ; // FIXME
friend class FileTransfer; // FIXME!!!
+ friend class SendController; // FIXME!!!
};
#include <glog/logging.h>
#include "p2tp.h"
-#include "ext/dummy_controller.cpp"
-using namespace std;
using namespace p2tp;
+using namespace std; // FIXME remove
/*
TODO 25 Oct 18:55
}
+void Channel::ClearStaleDataOut() {
+ int oldsize = data_out_.size();
+ while ( data_out_.size() && data_out_.front().time <
+ Datagram::now - rtt_avg_ - dev_avg_*4 )
+ data_out_.pop_front();
+ if (data_out_.size()!=oldsize)
+ cc_->OnAckRcvd(bin64_t::NONE);
+}
+
+
void Channel::Send () {
Datagram dgram(socket_,peer());
dgram.Push32(peer_channel_id_);
if ( is_established() ) {
AddAck(dgram);
AddHint(dgram);
- if (cc_->free_cwnd())
+ ClearStaleDataOut();
+ if (cc_->MaySendData())
data = AddData(dgram);
else
dprintf("%s #%i no cwnd\n",Datagram::TimeStr(),id);
PCHECK( dgram.Send() != -1 )<<"error sending";
if (dgram.size()==4) // only the channel id; bare keep-alive
data = bin64_t::ALL;
- RequeueSend(cc_->OnDataSent(data));
+ cc_->OnDataSent(data);
+ last_send_time_ = Datagram::now;
+ RequeueSend(cc_->NextSendTime());
}
uint64_t hinted = 0;
for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
hinted += i->bin.width();
- int bps = cc_->PeerBPS();
+ int bps = PeerBPS();
dprintf("%s #%i hinted %lli peer_bps %i\n",Datagram::TimeStr(),id,hinted,bps);
//float peer_cwnd = cc_->PeerBPS() * cc_->RoundTripTime() / TINT_SEC;
dgram.Push32(tosend);
dgram.Push(buf,r);
dprintf("%s #%i +data (%lli)\n",Datagram::TimeStr(),id,tosend.base_offset());
+ data_out_.push_back(tosend);
return tosend;
}
+void Channel::AddTs (Datagram& dgram) {
+ dgram.Push8(P2TP_TS);
+ dgram.Push64(data_in_.time);
+ dprintf("%s #%i +ts %lli\n",Datagram::TimeStr(),id,data_in_.time);
+}
+
+
void Channel::AddAck (Datagram& dgram) {
if (data_in_.bin!=bin64_t::NONE) {
+ AddTs(dgram);
bin64_t pos = data_in_.bin;
- dgram.Push8(P2TP_ACK_TS);
+ dgram.Push8(P2TP_ACK);
dgram.Push32(pos);
- dgram.Push64(data_in_.time);
+ //dgram.Push64(data_in_.time);
ack_out_.set(pos);
- dprintf("%s #%i +ackts (%i,%lli) %s\n",Datagram::TimeStr(),id,
+ dprintf("%s #%i +ack (%i,%lli) %s\n",Datagram::TimeStr(),id,
pos.layer(),pos.offset(),Datagram::TimeStr(data_in_.time));
data_in_ = tintbin(0,bin64_t::NONE);
}
switch (type) {
case P2TP_HANDSHAKE: OnHandshake(dgram); break;
case P2TP_DATA: data=OnData(dgram); break;
- case P2TP_ACK_TS: OnAckTs(dgram); break;
+ case P2TP_TS: OnTs(dgram); break;
case P2TP_ACK: OnAck(dgram); break;
case P2TP_HASH: OnHash(dgram); break;
case P2TP_HINT: OnHint(dgram); break;
return;
}
}
- RequeueSend(cc_->OnDataRecvd(data));
+ cc_->OnDataRecvd(data);
+ last_recv_time_ = Datagram::now;
+ if (data!=bin64_t::ALL)
+ RequeueSend(Datagram::now);
}
dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset());
if (ok) {
data_in_ = tintbin(Datagram::now,pos);
+ if (last_recv_time_) {
+ tint dip = Datagram::now - last_recv_time_;
+ dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
+ }
return pos;
} else
return bin64_t::NONE;
void Channel::OnAck (Datagram& dgram) {
- // note: no bound checking
- bin64_t pos = dgram.Pull32();
- dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,pos.layer(),pos.offset());
- ack_in_.set(pos);
- RequeueSend(cc_->OnAckRcvd(pos,0));
+ bin64_t ackd_pos = dgram.Pull32();
+ if (ackd_pos.base_offset()>file().size())
+ return;
+ dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,ackd_pos.layer(),ackd_pos.offset());
+ for (int i=0; i<8 && i<data_out_.size(); i++)
+ if (data_out_[i].bin.within(ackd_pos)) {
+ tintbin x = data_out_[i];
+ data_out_[i].bin = bin64_t::ALL;
+ tint rtt = Datagram::now-x.time;
+ rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
+ dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+ dprintf("%s #%i rtt %lli dev %lli\n",
+ Datagram::TimeStr(),id,rtt_avg_,dev_avg_);
+ cc_->OnAckRcvd(x.bin);
+ }
+ while (data_out_.size() && data_out_.front().bin==bin64_t::ALL)
+ data_out_.pop_front();
+ ack_in_.set(ackd_pos);
}
-void Channel::OnAckTs (Datagram& dgram) {
+/*void Channel::OnAckTs (Datagram& dgram) { // FIXME: OnTs
bin64_t pos = dgram.Pull32();
tint ts = dgram.Pull64();
// TODO sanity check
dprintf("%s #%i -ackts (%i,%lli) %s\n",
Datagram::TimeStr(),id,pos.layer(),pos.offset(),Datagram::TimeStr(ts));
ack_in_.set(pos);
- RequeueSend(cc_->OnAckRcvd(pos,ts));
+ cc_->OnAckRcvd(pos,ts);
+}*/
+
+void Channel::OnTs (Datagram& dgram) {
+ peer_send_time_ = dgram.Pull64();
+ dprintf("%s #%i -ts %lli\n",Datagram::TimeStr(),id,peer_send_time_);
}
void Channel::OnHandshake (Datagram& dgram) {
- if (!peer_channel_id_)
- cc_->OnAckRcvd(bin64_t::ALL);
peer_channel_id_ = dgram.Pull32();
dprintf("%s #%i -hs %i\n",Datagram::TimeStr(),id,peer_channel_id_);
// FUTURE: channel forking
unlink("doc/sofi-copy.jpg");
struct stat st;
ASSERT_EQ(0,stat("doc/sofi.jpg", &st));
- int size = st.st_size, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ;
+ int size = st.st_size;//, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ;
/*int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
int size = 60<<10; //rand()%(1<<19) + (1<<19);