}
bool within (bin64_t maybe_asc) {
+ if (maybe_asc==bin64_t::NONE)
+ return false;
uint64_t short_tail = maybe_asc.tail_bits();
if (tail_bits()>short_tail)
return false;
bool is_right() const { return !is_left(); }
bin64_t left_foot () const {
+ if (v==NONE)
+ return NONE;
return bin64_t(0,base_offset());
}
}
-bin64_t bins::find (const bin64_t range, const uint8_t layer, fill_t seek) {
+bin64_t bins::find (const bin64_t range, fill_t seek) {
iterator i(this,range,true);
fill_t stop = seek==EMPTY ? FILLED : EMPTY;
while (true) {
- while ( i.layer()>layer && (i.deep() || *i!=stop) )
+ while ( i.deep() || (*i!=stop && *i!=seek) )
i.left();
- if (i.layer()==layer && !i.deep() && *i==seek)
+ if (!i.deep() && *i==seek)
return i.bin();
while (i.bin().is_right() && i.bin()!=range)
i.parent();
bin64_t bins::find_filtered
- (bins& filter, bin64_t range, const uint8_t layer, fill_t seek)
+ (bins& filter, bin64_t range, fill_t seek)
{
if (range==bin64_t::ALL)
range = bin64_t ( height>filter.height ? height : filter.height, 0 );
iterator ti(this,range,true), fi(&filter,range,true);
fill_t stop = seek==EMPTY ? FILLED : EMPTY;
while (true) {
- while ( ti.layer()>layer ) {
- bool go = fi.deep() ?
+ while (
+ fi.deep() ?
(ti.deep() || *ti!=stop) :
- (ti.deep() ? *fi!=FILLED : (*ti^stop)&~*fi );
- if (go) {
- ti.left(); fi.left();
- } else
- break;
+ (ti.deep() ? *fi!=FILLED : ( ((*ti^stop)&~*fi) && (*ti!=seek || *fi!=EMPTY) ) )
+ )
+ {
+ ti.left(); fi.left();
}
- //while ( i.bin().layer()>layer && (i.deep() || *i!=stop || j.deep() || *j!=FILLED) )
- // i.left(), j.left(); // TODO may optimize a lot here
- if (ti.layer()==layer && !ti.deep() && *ti==seek && *fi==EMPTY)
+ if (!ti.deep() && *ti==seek && !fi.deep() && *fi==EMPTY)
return ti.bin();
while (ti.bin().is_right() && ti.bin()!=range)
ti.parent(), fi.parent();
void copy_range (bins& origin, bin64_t range);
- bin64_t find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
+ bin64_t find (const bin64_t range, fill_t seek=EMPTY) ;
bin64_t find_filtered
- (bins& filter, bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ;
+ (bins& filter, bin64_t range, fill_t seek=EMPTY) ;
void remove (bins& b);
cells[(half>>1)|0xf] &= ~(1<<(half&0x1f));
}
- void extend_range();
+ void extend_range();
uint16_t alloc_cell ();
void free_cell (uint16_t cell);
}
-
+KeepAliveController::KeepAliveController(SendController* prev, tint delay) :
+SendController(prev), delay_(delay) {
+ ch_->dev_avg_ = TINT_SEC; // without active measurement, rtt is unreliable
+}
+
bool KeepAliveController::MaySendData() {
return true;
}
}
void KeepAliveController::OnDataSent(bin64_t b) {
- delay_ *= 2;
+ delay_ = (NOW - std::max(ch_->last_send_time_,ch_->last_recv_time_)) * 3 / 2;
if (delay_>TINT_SEC*58)
delay_ = TINT_SEC*58;
if (b!=bin64_t::ALL && b!=bin64_t::NONE)
CwndController::CwndController(SendController* orig, int cwnd) :
SendController(orig), cwnd_(cwnd), last_change_(0) {
- ch_->rtt_avg_ = TINT_SEC; // cannot trust the past value
- ch_->dev_avg_ = 0;
}
bool CwndController::MaySendData() {
dprintf("%s #%i sendctrl may send %i < %f & %s (rtt %lli)\n",tintstr(),
ch_->id,(int)ch_->data_out_.size(),cwnd_,tintstr(NextSendTime()),
ch_->rtt_avg_);
- return ch_->data_out_.size() < cwnd_ && NOW >= NextSendTime();
+ return ch_->data_out_.empty() ||
+ (ch_->data_out_.size() < cwnd_ && NOW >= NextSendTime());
}
tint CwndController::NextSendTime () {
tint delay_;
- KeepAliveController(SendController* prev, tint delay=0) :
- SendController(prev), delay_(delay) {}
+ KeepAliveController(SendController* prev, tint delay=0) ;
const char* type() const { return "KeepAlive"; }
bool MaySendData();
tint NextSendTime () ;
bins ack_hint_out_;
FileTransfer* transfer_;
uint64_t twist_;
+ tbheap hint_out_; // FIXME since I use fixed 1.5 sec expiration, may replace for a queue
public:
twist_ = twist;
}
- virtual bin64_t Pick (bins& offer, uint8_t layer) {
+ virtual bin64_t Pick (bins& offer, uint64_t max_width, tint expires) {
+ while (hint_out_.size() && hint_out_.peek().time<NOW)
+ ack_hint_out_.copy_range(file().ack_out(), hint_out_.pop().bin);
//dprintf("twist is %lli\n",twist_);
- if (!file().size())
- return bin64_t(0,0); // a hack to get peak hashes; FIXME
+ if (!file().size()) {
+ return bin64_t(0,0); // whoever sends it first
+ }
twist_ &= (file().peak(0)) & ((1<<6)-1);
if (twist_) {
offer.twist(twist_);
ack_hint_out_.twist(twist_);
}
- bin64_t hint = offer.find_filtered
- (ack_hint_out_,bin64_t::ALL,layer,bins::FILLED);
+ bin64_t hint = offer.find_filtered (ack_hint_out_,bin64_t::ALL,bins::FILLED);
if (twist_) {
hint = hint.twisted(twist_);
offer.twist(0);
}
if (hint==bin64_t::NONE)
return hint; // TODO: end-game mode
- while (hint.layer()>layer)
+ while (hint.width()>max_width)
hint = hint.left();
assert(ack_hint_out_.get(hint)==bins::EMPTY);
- if (file().ack_out().get(hint)!=bins::EMPTY) {
+ if (hint.offset() && file().ack_out().get(hint)!=bins::EMPTY) { // FIXME DEBUG remove
eprintf("bogus hint: (%i,%lli)\n",(int)hint.layer(),hint.offset());
exit(1);
}
ack_hint_out_.set(hint);
+ hint_out_.push(tintbin(expires,hint));
return hint;
- /*for (int l=layer; l>=0; l--) {
- for(int i=0; i<file_->peak_count(); i++) {
- bin64_t pick = may_pick.find(file_->peak(i),l,bins::FILLED);
- if (pick!=bin64_t::NONE)
- return pick;
- }
- }
- return bin64_t::NONE;*/
- }
-
- virtual void Received (bin64_t b) {
- ack_hint_out_.set(b,bins::FILLED);
- }
-
- virtual void Expired (bin64_t b) {
- ack_hint_out_.copy_range(file().ack_out(),b);
}
};
socket_(socket==-1?sockets[0]:socket), // FIXME
data_out_cap_(bin64_t::ALL), last_data_time_(0),
own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
- last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC)
+ last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
+ hint_out_(0), hint_out_mark_(), hint_out_am_(0)
{
if (peer_==Address())
peer_ = tracker;
Channel::~Channel () {
channels[id] = NULL;
- for(int i=0; i<hint_out_.size(); i++)
- transfer().picker().Expired(hint_out_[i].bin);
delete cc_;
}
#else
#include <stdint.h>
#endif
-#include <vector>
#include <deque>
+#include <vector>
+#include <algorithm>
#include <string>
#include "bin64.h"
#include "bins.h"
tintbin() : time(0), bin(bin64_t::NONE) {}
tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
tintbin(bin64_t bin_) : time(NOW), bin(bin_) {}
+ bool operator < (const tintbin& b) const
+ { return time > b.time; }
+ bool operator == (const tintbin& b) const
+ { return time==b.time && bin==b.bin; }
};
typedef std::deque<tintbin> tbqueue;
typedef std::deque<bin64_t> binqueue;
typedef Address Address;
+ class tbheap {
+ tbqueue data_;
+ public:
+ int size () const { return data_.size(); }
+ bool is_empty () const { return data_.empty(); }
+ tintbin pop() {
+ tintbin ret = data_.front();
+ std::pop_heap(data_.begin(),data_.end());
+ data_.pop_back();
+ return ret;
+ }
+ void push(const tintbin& tb) {
+ data_.push_back(tb);
+ push_heap(data_.begin(),data_.end());
+ }
+ const tintbin& peek() const {
+ return data_.front();
+ }
+ };
+
typedef enum {
P2TP_HANDSHAKE = 0,
P2TP_DATA = 1,
class PiecePicker {
public:
virtual void Randomize (uint64_t twist) = 0;
- virtual bin64_t Pick (bins& offered, uint8_t layer) = 0;
- virtual void Expired (bin64_t b) = 0;
- virtual void Received (bin64_t b) = 0;
+ virtual bin64_t Pick (bins& offered, uint64_t max_width, tint expires) = 0;
};
Channel (FileTransfer* file, int socket=-1, Address peer=Address());
~Channel();
- static void Recv (int socket);
+ static void RecvDatagram (int socket);
static void Loop (tint till);
void Recv (Datagram& dgram);
/** Index in the history array. */
bins ack_out_;
/** Transmit schedule: in most cases filled with the peer's hints */
- tbqueue hint_in_;
+ tbqueue hint_in_;
/** Hints sent (to detect and reschedule ignored hints). */
- tbqueue hint_out_;
+ // tbqueue hint_out_;
+ uint64_t hint_out_;
+ tintbin hint_out_mark_;
+ uint64_t hint_out_am_;
/** The congestion control strategy. */
SendController *cc_;
/** Types of messages the peer accepts. */
* Copyright 2009 Delft University of Technology. All rights reserved.
*
*/
-#include <algorithm>
-//#include <glog/logging.h>
#include "p2tp.h"
#include "compat/util.h"
bin64_t hint = hint_in_.front().bin;
tint time = hint_in_.front().time;
hint_in_.pop_front();
- if (time < NOW-8*rtt_avg_)
+ if (time < NOW-2*TINT_SEC ) //NOW-8*rtt_avg_)
continue;
- send = file().ack_out().find_filtered
- (ack_in_,hint,0,bins::FILLED);
+ send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
+ send = send.left_foot(); // single packet
dprintf("%s #%i dequeued %lli\n",tintstr(),id,send.base_offset());
if (send!=bin64_t::NONE)
while (send!=hint) {
void Channel::AddHint (Datagram& dgram) {
- while (!hint_out_.empty()) {
- tintbin f = hint_out_.front();
- if (f.time<NOW-rtt_avg_*8) {
- hint_out_.pop_front();
- dprintf("%s #%i !hint (%i,%lli)\n",
- tintstr(),id,(int)f.bin.layer(),f.bin.offset());
- transfer().picker().Expired(f.bin);
- } else {
- int status = file().ack_out().get(f.bin);
- if (status==bins::EMPTY) {
- break;
- } else if (status==bins::FILLED) {
- hint_out_.pop_front();
- transfer().picker().Expired(f.bin);
- } else { // mixed
- hint_out_.front().bin = f.bin.right();
- f.bin = f.bin.left();
- hint_out_.push_front(f);
- } // FIXME: simplify this mess
- }
- }
- /*while (!hint_out_.empty() &&
- (hint_out_.front().time<NOW-TINT_SEC ||
- file().ack_out().get(hint_out_.front().bin)==bins::FILLED ) ) {
- file().picker().Expired(hint_out_.front().bin);
- hint_out_.pop_front();
- }*/
- uint64_t hinted = 0;
- for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
- hinted += i->bin.width();
- //int bps = PeerBPS();
- //double kbps = max(4,TINT_SEC / dip_avg_);
- double peer_cwnd = rtt_avg_ / dip_avg_;
- if (peer_cwnd<1)
+ int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
+ if (!peer_cwnd)
peer_cwnd = 1;
- dprintf("%s #%i hinted %lli peer_cwnd %lli/%lli=%f\n",
- tintstr(),id,hinted,rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_));
-
- if ( 4*peer_cwnd > hinted ) { //hinted*1024 < peer_cwnd*4 ) {
+ int peer_pps = TINT_SEC / dip_avg_;
+ if (!peer_pps)
+ peer_pps = 1;
+ dprintf("%s #%i hint_out_ %lli+%lli mark (%i,%lli) peer_cwnd %lli/%lli=%f\n",
+ tintstr(),id,hint_out_,hint_out_am_,(int)hint_out_mark_.bin.layer(),
+ hint_out_mark_.bin.offset(),
+ rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_));
+
+ if ( hint_out_mark_.time < NOW - TINT_SEC*2 ) { //NOW-rtt_avg_*8-dev_avg_) {
+ hint_out_mark_.bin=bin64_t::NONE;
+ hint_out_ = hint_out_am_;
+ hint_out_am_ = 0;
+ }
+
+ if ( peer_pps > hint_out_+hint_out_am_ ) { //4*peer_cwnd
- uint8_t layer = 2; // actually, enough
- bin64_t hint = transfer().picker().Pick(ack_in_,layer);
- // FIXME FIXME FIXME: any layer
- if (hint==bin64_t::NONE)
- hint = transfer().picker().Pick(ack_in_,0);
+ int diff = peer_pps - hint_out_ - hint_out_am_; // 4*peer_cwnd
+ if (diff>4 && diff>2*peer_cwnd)
+ diff >>= 1;
+ bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+TINT_SEC*3/2); //rtt_avg_*8+TINT_MSEC*10
if (hint!=bin64_t::NONE) {
- hint_out_.push_back(hint);
dgram.Push8(P2TP_HINT);
dgram.Push32(hint);
dprintf("%s #%i +hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
+ if (hint_out_mark_.bin==bin64_t::NONE)
+ hint_out_mark_ = hint;
+ hint_out_am_ += hint.width();
+ //hint_out_ += hint.width();
}
}
data_in_ = tintbin(0,bin64_t::NONE);
}
for(int count=0; count<4; count++) {
- bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, 0, bins::FILLED);
- // TODO bins::ANY_LAYER
+ bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, bins::FILLED);
if (ack==bin64_t::NONE)
break;
ack = file().ack_out().cover(ack);
cc_->OnDataRecvd(data);
last_recv_time_ = NOW;
if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC)
- Send(); //RequeueSend(NOW);
+ Send();
}
int length = dgram.Pull(&data,1024);
bool ok = file().OfferData(pos, (char*)data, length) ;
dprintf("%s #%i %cdata (%lli)\n",tintstr(),id,ok?'-':'!',pos.offset());
- if (ok) {
- data_in_ = tintbin(NOW,pos);
- if (last_data_time_) {
- tint dip = NOW - last_data_time_;
- dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
- }
- last_data_time_ = NOW;
- transfer().picker().Received(pos); // so dirty; FIXME FIXME FIXME
- return pos;
- } else
+ if (!ok)
return bin64_t::NONE;
+ data_in_ = tintbin(NOW,pos);
+ if (last_data_time_) {
+ tint dip = NOW - last_data_time_;
+ dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
+ }
+ last_data_time_ = NOW;
+ if (pos.within(hint_out_mark_.bin)) {
+ hint_out_mark_.bin = bin64_t::NONE;
+ hint_out_ = hint_out_am_;
+ hint_out_am_ = 0;
+ }
+ if (hint_out_)
+ hint_out_--;
+ else if (hint_out_am_) // probably, the marking HINT was lost or whatever
+ hint_out_am_--;
+ return pos;
}
void Channel::OnHint (Datagram& dgram) {
bin64_t hint = dgram.Pull32();
hint_in_.push_back(hint);
- ack_in_.set(hint,bins::EMPTY);
+ //ack_in_.set(hint,bins::EMPTY);
//RequeueSend(cc_->OnHintRecvd(hint));
dprintf("%s #%i -hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
}
}
-void Channel::Recv (int socket) {
+void Channel::RecvDatagram (int socket) {
Datagram data(socket);
data.Recv();
if (data.size()<4)
dprintf("%s waiting %lliusec\n",tintstr(),towait);
int rd = Datagram::Wait(socket_count,sockets,towait);
if (rd!=INVALID_SOCKET)
- Recv(rd);
+ RecvDatagram(rd);
} else { //if (sender->next_send_time_==TINT_NEVER) {
dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
delete sender;
TEST(Bin64Test,Overflows) {
+ EXPECT_FALSE(bin64_t(0,1).within(bin64_t::NONE));
+ EXPECT_TRUE(bin64_t(0,1).within(bin64_t::ALL));
/*EXPECT_EQ(bin64_t::NONE.parent(),bin64_t::NONE);
EXPECT_EQ(bin64_t::NONE.left(),bin64_t::NONE);
EXPECT_EQ(bin64_t::NONE.right(),bin64_t::NONE);
hole.set(bin64_t(4,0),bins::FILLED);
hole.set(bin64_t(1,1),bins::EMPTY);
hole.set(bin64_t(0,7),bins::EMPTY);
- bin64_t f = hole.find(bin64_t(4,0),0);
+ bin64_t f = hole.find(bin64_t(4,0)).left_foot();
EXPECT_EQ(bin64_t(0,2),f);
}
filter.set(bin64_t(1,4));
filter.set(bin64_t(0,13));
- bin64_t x = data.find_filtered(filter,bin64_t(4,0),0);
+ bin64_t x = data.find_filtered(filter,bin64_t(4,0)).left_foot();
EXPECT_EQ(bin64_t(0,12),x);
}
for(int j=1; j<1024; j+=2)
filter.set(bin64_t(0,j));
filter.set(bin64_t(0,501),bins::EMPTY);
- EXPECT_EQ(bin64_t(0,501),data.find_filtered(filter,bin64_t(10,0),0));
+ EXPECT_EQ(bin64_t(0,501),data.find_filtered(filter,bin64_t(10,0)).left_foot());
data.set(bin64_t(0,501));
- EXPECT_EQ(bin64_t::NONE,data.find_filtered(filter,bin64_t(10,0),0));
+ EXPECT_EQ(bin64_t::NONE,data.find_filtered(filter,bin64_t(10,0)).left_foot());
}
b.twist(1<<3);
EXPECT_EQ(bins::FILLED,b.get(bin64_t(3,3)));
EXPECT_EQ(bins::EMPTY,b.get(bin64_t(3,2)));
- bin64_t tw = b.find(bin64_t(5,0),3,bins::FILLED);
+ bin64_t tw = b.find(bin64_t(5,0),bins::FILLED);
+ while (tw.width()>(1<<3))
+ tw = tw.left();
tw = tw.twisted(1<<3);
EXPECT_EQ(bin64_t(3,2),tw);
b.twist(0);
Sha1Hash A,B,C,D,E,AB,CD,ABCD,E0,E000,ABCDE000,ROOT;
+TEST(TransferTest,TBHeap) {
+ tbheap tbh;
+ ASSERT_TRUE(tbh.is_empty());
+ tbh.push(tintbin(3,bin64_t::NONE));
+ tbh.push(tintbin(1,bin64_t::NONE));
+ ASSERT_EQ(2,tbh.size());
+ tbh.push(tintbin(2,bin64_t::ALL));
+ ASSERT_EQ(1,tbh.pop().time);
+ ASSERT_EQ(bin64_t::ALL,tbh.peek().bin);
+ ASSERT_EQ(2,tbh.pop().time);
+ ASSERT_EQ(3,tbh.pop().time);
+}
+
+
TEST(TransferTest,TransferFile) {
AB = Sha1Hash(A,B);
EXPECT_EQ(2,leech->packets_complete());
EXPECT_EQ(bin64_t(2,0),leech->peak(0));
}
- bin64_t next = leech_transfer->picker().Pick(seed->ack_out(),0);
+ bin64_t next = leech_transfer->picker().Pick(seed->ack_out(),1,TINT_NEVER);
ASSERT_NE(bin64_t::NONE,next);
ASSERT_TRUE(next.base_offset()<5);
uint8_t buf[1024]; //size_t len = seed->storer->ReadData(next,&buf);