From a6055049df22e8ae3ab26f0ab72cebc579984382 Mon Sep 17 00:00:00 2001 From: victor Date: Fri, 13 Nov 2009 10:17:54 +0000 Subject: [PATCH] hint_out_ queue went to piece picker git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@546 e16421f0-f15b-0410-abcd-98678b794739 --- bin64.h | 4 ++ bins.cpp | 25 ++++----- bins.h | 6 +-- ext/send_control.cpp | 13 +++-- ext/send_control.h | 3 +- ext/seq_picker.cpp | 34 ++++-------- p2tp.cpp | 5 +- p2tp.h | 40 +++++++++++--- sendrecv.cpp | 115 ++++++++++++++++++----------------------- tests/bin64test.cpp | 2 + tests/binstest2.cpp | 12 +++-- tests/transfertest.cpp | 16 +++++- 12 files changed, 147 insertions(+), 128 deletions(-) diff --git a/bin64.h b/bin64.h index 988de0b..8b8befd 100644 --- a/bin64.h +++ b/bin64.h @@ -91,6 +91,8 @@ struct bin64_t { } bool within (bin64_t maybe_asc) { + if (maybe_asc==bin64_t::NONE) + return false; uint64_t short_tail = maybe_asc.tail_bits(); if (tail_bits()>short_tail) return false; @@ -123,6 +125,8 @@ struct bin64_t { bool is_right() const { return !is_left(); } bin64_t left_foot () const { + if (v==NONE) + return NONE; return bin64_t(0,base_offset()); } diff --git a/bins.cpp b/bins.cpp index c745b2d..2718115 100644 --- a/bins.cpp +++ b/bins.cpp @@ -231,13 +231,13 @@ void iterator::parent () { } -bin64_t bins::find (const bin64_t range, const uint8_t layer, fill_t seek) { +bin64_t bins::find (const bin64_t range, fill_t seek) { iterator i(this,range,true); fill_t stop = seek==EMPTY ? FILLED : EMPTY; while (true) { - while ( i.layer()>layer && (i.deep() || *i!=stop) ) + while ( i.deep() || (*i!=stop && *i!=seek) ) i.left(); - if (i.layer()==layer && !i.deep() && *i==seek) + if (!i.deep() && *i==seek) return i.bin(); while (i.bin().is_right() && i.bin()!=range) i.parent(); @@ -359,25 +359,22 @@ bin64_t bins::cover(bin64_t val) { bin64_t bins::find_filtered - (bins& filter, bin64_t range, const uint8_t layer, fill_t seek) + (bins& filter, bin64_t range, fill_t seek) { if (range==bin64_t::ALL) range = bin64_t ( height>filter.height ? height : filter.height, 0 ); iterator ti(this,range,true), fi(&filter,range,true); fill_t stop = seek==EMPTY ? FILLED : EMPTY; while (true) { - while ( ti.layer()>layer ) { - bool go = fi.deep() ? + while ( + fi.deep() ? (ti.deep() || *ti!=stop) : - (ti.deep() ? *fi!=FILLED : (*ti^stop)&~*fi ); - if (go) { - ti.left(); fi.left(); - } else - break; + (ti.deep() ? *fi!=FILLED : ( ((*ti^stop)&~*fi) && (*ti!=seek || *fi!=EMPTY) ) ) + ) + { + ti.left(); fi.left(); } - //while ( i.bin().layer()>layer && (i.deep() || *i!=stop || j.deep() || *j!=FILLED) ) - // i.left(), j.left(); // TODO may optimize a lot here - if (ti.layer()==layer && !ti.deep() && *ti==seek && *fi==EMPTY) + if (!ti.deep() && *ti==seek && !fi.deep() && *fi==EMPTY) return ti.bin(); while (ti.bin().is_right() && ti.bin()!=range) ti.parent(), fi.parent(); diff --git a/bins.h b/bins.h index 349519d..3f8494f 100644 --- a/bins.h +++ b/bins.h @@ -27,10 +27,10 @@ public: void copy_range (bins& origin, bin64_t range); - bin64_t find (const bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ; + bin64_t find (const bin64_t range, fill_t seek=EMPTY) ; bin64_t find_filtered - (bins& filter, bin64_t range, const uint8_t layer, fill_t seek=EMPTY) ; + (bins& filter, bin64_t range, fill_t seek=EMPTY) ; void remove (bins& b); @@ -87,7 +87,7 @@ private: cells[(half>>1)|0xf] &= ~(1<<(half&0x1f)); } - void extend_range(); + void extend_range(); uint16_t alloc_cell (); void free_cell (uint16_t cell); diff --git a/ext/send_control.cpp b/ext/send_control.cpp index f4484cf..5d5d066 100644 --- a/ext/send_control.cpp +++ b/ext/send_control.cpp @@ -46,7 +46,11 @@ void PingPongController::OnAckRcvd(bin64_t ackd) { } - +KeepAliveController::KeepAliveController(SendController* prev, tint delay) : +SendController(prev), delay_(delay) { + ch_->dev_avg_ = TINT_SEC; // without active measurement, rtt is unreliable +} + bool KeepAliveController::MaySendData() { return true; } @@ -60,7 +64,7 @@ tint KeepAliveController::NextSendTime () { } void KeepAliveController::OnDataSent(bin64_t b) { - delay_ *= 2; + delay_ = (NOW - std::max(ch_->last_send_time_,ch_->last_recv_time_)) * 3 / 2; if (delay_>TINT_SEC*58) delay_ = TINT_SEC*58; if (b!=bin64_t::ALL && b!=bin64_t::NONE) @@ -76,15 +80,14 @@ void KeepAliveController::OnAckRcvd(bin64_t ackd) { CwndController::CwndController(SendController* orig, int cwnd) : SendController(orig), cwnd_(cwnd), last_change_(0) { - ch_->rtt_avg_ = TINT_SEC; // cannot trust the past value - ch_->dev_avg_ = 0; } bool CwndController::MaySendData() { dprintf("%s #%i sendctrl may send %i < %f & %s (rtt %lli)\n",tintstr(), ch_->id,(int)ch_->data_out_.size(),cwnd_,tintstr(NextSendTime()), ch_->rtt_avg_); - return ch_->data_out_.size() < cwnd_ && NOW >= NextSendTime(); + return ch_->data_out_.empty() || + (ch_->data_out_.size() < cwnd_ && NOW >= NextSendTime()); } tint CwndController::NextSendTime () { diff --git a/ext/send_control.h b/ext/send_control.h index baf12cd..67ae52f 100644 --- a/ext/send_control.h +++ b/ext/send_control.h @@ -68,8 +68,7 @@ struct KeepAliveController : public SendController { tint delay_; - KeepAliveController(SendController* prev, tint delay=0) : - SendController(prev), delay_(delay) {} + KeepAliveController(SendController* prev, tint delay=0) ; const char* type() const { return "KeepAlive"; } bool MaySendData(); tint NextSendTime () ; diff --git a/ext/seq_picker.cpp b/ext/seq_picker.cpp index 58ca215..c97a2b0 100644 --- a/ext/seq_picker.cpp +++ b/ext/seq_picker.cpp @@ -16,6 +16,7 @@ class SeqPiecePicker : public PiecePicker { bins ack_hint_out_; FileTransfer* transfer_; uint64_t twist_; + tbheap hint_out_; // FIXME since I use fixed 1.5 sec expiration, may replace for a queue public: @@ -32,17 +33,19 @@ public: twist_ = twist; } - virtual bin64_t Pick (bins& offer, uint8_t layer) { + virtual bin64_t Pick (bins& offer, uint64_t max_width, tint expires) { + while (hint_out_.size() && hint_out_.peek().timelayer) + while (hint.width()>max_width) hint = hint.left(); assert(ack_hint_out_.get(hint)==bins::EMPTY); - if (file().ack_out().get(hint)!=bins::EMPTY) { + if (hint.offset() && file().ack_out().get(hint)!=bins::EMPTY) { // FIXME DEBUG remove eprintf("bogus hint: (%i,%lli)\n",(int)hint.layer(),hint.offset()); exit(1); } ack_hint_out_.set(hint); + hint_out_.push(tintbin(expires,hint)); return hint; - /*for (int l=layer; l>=0; l--) { - for(int i=0; ipeak_count(); i++) { - bin64_t pick = may_pick.find(file_->peak(i),l,bins::FILLED); - if (pick!=bin64_t::NONE) - return pick; - } - } - return bin64_t::NONE;*/ - } - - virtual void Received (bin64_t b) { - ack_hint_out_.set(b,bins::FILLED); - } - - virtual void Expired (bin64_t b) { - ack_hint_out_.copy_range(file().ack_out(),b); } }; diff --git a/p2tp.cpp b/p2tp.cpp index c0c7016..3ef544a 100644 --- a/p2tp.cpp +++ b/p2tp.cpp @@ -42,7 +42,8 @@ Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) : socket_(socket==-1?sockets[0]:socket), // FIXME data_out_cap_(bin64_t::ALL), last_data_time_(0), own_id_mentioned_(false), next_send_time_(0), last_send_time_(0), - last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC) + last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC), + hint_out_(0), hint_out_mark_(), hint_out_am_(0) { if (peer_==Address()) peer_ = tracker; @@ -55,8 +56,6 @@ Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) : Channel::~Channel () { channels[id] = NULL; - for(int i=0; i #endif -#include #include +#include +#include #include #include "bin64.h" #include "bins.h" @@ -72,12 +73,36 @@ namespace p2tp { tintbin() : time(0), bin(bin64_t::NONE) {} tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {} tintbin(bin64_t bin_) : time(NOW), bin(bin_) {} + bool operator < (const tintbin& b) const + { return time > b.time; } + bool operator == (const tintbin& b) const + { return time==b.time && bin==b.bin; } }; typedef std::deque tbqueue; typedef std::deque binqueue; typedef Address Address; + class tbheap { + tbqueue data_; + public: + int size () const { return data_.size(); } + bool is_empty () const { return data_.empty(); } + tintbin pop() { + tintbin ret = data_.front(); + std::pop_heap(data_.begin(),data_.end()); + data_.pop_back(); + return ret; + } + void push(const tintbin& tb) { + data_.push_back(tb); + push_heap(data_.begin(),data_.end()); + } + const tintbin& peek() const { + return data_.front(); + } + }; + typedef enum { P2TP_HANDSHAKE = 0, P2TP_DATA = 1, @@ -162,9 +187,7 @@ namespace p2tp { class PiecePicker { public: virtual void Randomize (uint64_t twist) = 0; - virtual bin64_t Pick (bins& offered, uint8_t layer) = 0; - virtual void Expired (bin64_t b) = 0; - virtual void Received (bin64_t b) = 0; + virtual bin64_t Pick (bins& offered, uint64_t max_width, tint expires) = 0; }; @@ -194,7 +217,7 @@ namespace p2tp { Channel (FileTransfer* file, int socket=-1, Address peer=Address()); ~Channel(); - static void Recv (int socket); + static void RecvDatagram (int socket); static void Loop (tint till); void Recv (Datagram& dgram); @@ -251,9 +274,12 @@ namespace p2tp { /** Index in the history array. */ bins ack_out_; /** Transmit schedule: in most cases filled with the peer's hints */ - tbqueue hint_in_; + tbqueue hint_in_; /** Hints sent (to detect and reschedule ignored hints). */ - tbqueue hint_out_; + // tbqueue hint_out_; + uint64_t hint_out_; + tintbin hint_out_mark_; + uint64_t hint_out_am_; /** The congestion control strategy. */ SendController *cc_; /** Types of messages the peer accepts. */ diff --git a/sendrecv.cpp b/sendrecv.cpp index 5cdf7e1..721e22a 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -6,8 +6,6 @@ * Copyright 2009 Delft University of Technology. All rights reserved. * */ -#include -//#include #include "p2tp.h" #include "compat/util.h" @@ -59,10 +57,10 @@ bin64_t Channel::DequeueHint () { // TODO: resilience bin64_t hint = hint_in_.front().bin; tint time = hint_in_.front().time; hint_in_.pop_front(); - if (time < NOW-8*rtt_avg_) + if (time < NOW-2*TINT_SEC ) //NOW-8*rtt_avg_) continue; - send = file().ack_out().find_filtered - (ack_in_,hint,0,bins::FILLED); + send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED); + send = send.left_foot(); // single packet dprintf("%s #%i dequeued %lli\n",tintstr(),id,send.base_offset()); if (send!=bin64_t::NONE) while (send!=hint) { @@ -145,57 +143,38 @@ void Channel::Send () { void Channel::AddHint (Datagram& dgram) { - while (!hint_out_.empty()) { - tintbin f = hint_out_.front(); - if (f.timebin.width(); - //int bps = PeerBPS(); - //double kbps = max(4,TINT_SEC / dip_avg_); - double peer_cwnd = rtt_avg_ / dip_avg_; - if (peer_cwnd<1) + int peer_cwnd = (int)(rtt_avg_ / dip_avg_); + if (!peer_cwnd) peer_cwnd = 1; - dprintf("%s #%i hinted %lli peer_cwnd %lli/%lli=%f\n", - tintstr(),id,hinted,rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_)); - - if ( 4*peer_cwnd > hinted ) { //hinted*1024 < peer_cwnd*4 ) { + int peer_pps = TINT_SEC / dip_avg_; + if (!peer_pps) + peer_pps = 1; + dprintf("%s #%i hint_out_ %lli+%lli mark (%i,%lli) peer_cwnd %lli/%lli=%f\n", + tintstr(),id,hint_out_,hint_out_am_,(int)hint_out_mark_.bin.layer(), + hint_out_mark_.bin.offset(), + rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_)); + + if ( hint_out_mark_.time < NOW - TINT_SEC*2 ) { //NOW-rtt_avg_*8-dev_avg_) { + hint_out_mark_.bin=bin64_t::NONE; + hint_out_ = hint_out_am_; + hint_out_am_ = 0; + } + + if ( peer_pps > hint_out_+hint_out_am_ ) { //4*peer_cwnd - uint8_t layer = 2; // actually, enough - bin64_t hint = transfer().picker().Pick(ack_in_,layer); - // FIXME FIXME FIXME: any layer - if (hint==bin64_t::NONE) - hint = transfer().picker().Pick(ack_in_,0); + int diff = peer_pps - hint_out_ - hint_out_am_; // 4*peer_cwnd + if (diff>4 && diff>2*peer_cwnd) + diff >>= 1; + bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+TINT_SEC*3/2); //rtt_avg_*8+TINT_MSEC*10 if (hint!=bin64_t::NONE) { - hint_out_.push_back(hint); dgram.Push8(P2TP_HINT); dgram.Push32(hint); dprintf("%s #%i +hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset()); + if (hint_out_mark_.bin==bin64_t::NONE) + hint_out_mark_ = hint; + hint_out_am_ += hint.width(); + //hint_out_ += hint.width(); } } @@ -252,8 +231,7 @@ void Channel::AddAck (Datagram& dgram) { data_in_ = tintbin(0,bin64_t::NONE); } for(int count=0; count<4; count++) { - bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, 0, bins::FILLED); - // TODO bins::ANY_LAYER + bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, bins::FILLED); if (ack==bin64_t::NONE) break; ack = file().ack_out().cover(ack); @@ -292,7 +270,7 @@ void Channel::Recv (Datagram& dgram) { cc_->OnDataRecvd(data); last_recv_time_ = NOW; if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC) - Send(); //RequeueSend(NOW); + Send(); } @@ -311,17 +289,24 @@ bin64_t Channel::OnData (Datagram& dgram) { int length = dgram.Pull(&data,1024); bool ok = file().OfferData(pos, (char*)data, length) ; dprintf("%s #%i %cdata (%lli)\n",tintstr(),id,ok?'-':'!',pos.offset()); - if (ok) { - data_in_ = tintbin(NOW,pos); - if (last_data_time_) { - tint dip = NOW - last_data_time_; - dip_avg_ = ( dip_avg_*3 + dip ) >> 2; - } - last_data_time_ = NOW; - transfer().picker().Received(pos); // so dirty; FIXME FIXME FIXME - return pos; - } else + if (!ok) return bin64_t::NONE; + data_in_ = tintbin(NOW,pos); + if (last_data_time_) { + tint dip = NOW - last_data_time_; + dip_avg_ = ( dip_avg_*3 + dip ) >> 2; + } + last_data_time_ = NOW; + if (pos.within(hint_out_mark_.bin)) { + hint_out_mark_.bin = bin64_t::NONE; + hint_out_ = hint_out_am_; + hint_out_am_ = 0; + } + if (hint_out_) + hint_out_--; + else if (hint_out_am_) // probably, the marking HINT was lost or whatever + hint_out_am_--; + return pos; } @@ -366,7 +351,7 @@ void Channel::OnTs (Datagram& dgram) { void Channel::OnHint (Datagram& dgram) { bin64_t hint = dgram.Pull32(); hint_in_.push_back(hint); - ack_in_.set(hint,bins::EMPTY); + //ack_in_.set(hint,bins::EMPTY); //RequeueSend(cc_->OnHintRecvd(hint)); dprintf("%s #%i -hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset()); } @@ -400,7 +385,7 @@ void Channel::AddPex (Datagram& dgram) { } -void Channel::Recv (int socket) { +void Channel::RecvDatagram (int socket) { Datagram data(socket); data.Recv(); if (data.size()<4) @@ -494,7 +479,7 @@ void Channel::Loop (tint howlong) { dprintf("%s waiting %lliusec\n",tintstr(),towait); int rd = Datagram::Wait(socket_count,sockets,towait); if (rd!=INVALID_SOCKET) - Recv(rd); + RecvDatagram(rd); } else { //if (sender->next_send_time_==TINT_NEVER) { dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id); delete sender; diff --git a/tests/bin64test.cpp b/tests/bin64test.cpp index 157c447..864c9f2 100644 --- a/tests/bin64test.cpp +++ b/tests/bin64test.cpp @@ -37,6 +37,8 @@ TEST(Bin64Test,Navigation) { TEST(Bin64Test,Overflows) { + EXPECT_FALSE(bin64_t(0,1).within(bin64_t::NONE)); + EXPECT_TRUE(bin64_t(0,1).within(bin64_t::ALL)); /*EXPECT_EQ(bin64_t::NONE.parent(),bin64_t::NONE); EXPECT_EQ(bin64_t::NONE.left(),bin64_t::NONE); EXPECT_EQ(bin64_t::NONE.right(),bin64_t::NONE); diff --git a/tests/binstest2.cpp b/tests/binstest2.cpp index f50e1ee..07a750f 100755 --- a/tests/binstest2.cpp +++ b/tests/binstest2.cpp @@ -120,7 +120,7 @@ TEST(BinsTest,Find){ hole.set(bin64_t(4,0),bins::FILLED); hole.set(bin64_t(1,1),bins::EMPTY); hole.set(bin64_t(0,7),bins::EMPTY); - bin64_t f = hole.find(bin64_t(4,0),0); + bin64_t f = hole.find(bin64_t(4,0)).left_foot(); EXPECT_EQ(bin64_t(0,2),f); } @@ -222,7 +222,7 @@ TEST(BinsTest,FindFiltered) { filter.set(bin64_t(1,4)); filter.set(bin64_t(0,13)); - bin64_t x = data.find_filtered(filter,bin64_t(4,0),0); + bin64_t x = data.find_filtered(filter,bin64_t(4,0)).left_foot(); EXPECT_EQ(bin64_t(0,12),x); } @@ -250,9 +250,9 @@ TEST(BinsTest,FindFiltered2) { for(int j=1; j<1024; j+=2) filter.set(bin64_t(0,j)); filter.set(bin64_t(0,501),bins::EMPTY); - EXPECT_EQ(bin64_t(0,501),data.find_filtered(filter,bin64_t(10,0),0)); + EXPECT_EQ(bin64_t(0,501),data.find_filtered(filter,bin64_t(10,0)).left_foot()); data.set(bin64_t(0,501)); - EXPECT_EQ(bin64_t::NONE,data.find_filtered(filter,bin64_t(10,0),0)); + EXPECT_EQ(bin64_t::NONE,data.find_filtered(filter,bin64_t(10,0)).left_foot()); } @@ -297,7 +297,9 @@ TEST(BinsTest,Twist) { b.twist(1<<3); EXPECT_EQ(bins::FILLED,b.get(bin64_t(3,3))); EXPECT_EQ(bins::EMPTY,b.get(bin64_t(3,2))); - bin64_t tw = b.find(bin64_t(5,0),3,bins::FILLED); + bin64_t tw = b.find(bin64_t(5,0),bins::FILLED); + while (tw.width()>(1<<3)) + tw = tw.left(); tw = tw.twisted(1<<3); EXPECT_EQ(bin64_t(3,2),tw); b.twist(0); diff --git a/tests/transfertest.cpp b/tests/transfertest.cpp index 1c04c46..5625300 100644 --- a/tests/transfertest.cpp +++ b/tests/transfertest.cpp @@ -19,6 +19,20 @@ const char* BTF = "test_file"; Sha1Hash A,B,C,D,E,AB,CD,ABCD,E0,E000,ABCDE000,ROOT; +TEST(TransferTest,TBHeap) { + tbheap tbh; + ASSERT_TRUE(tbh.is_empty()); + tbh.push(tintbin(3,bin64_t::NONE)); + tbh.push(tintbin(1,bin64_t::NONE)); + ASSERT_EQ(2,tbh.size()); + tbh.push(tintbin(2,bin64_t::ALL)); + ASSERT_EQ(1,tbh.pop().time); + ASSERT_EQ(bin64_t::ALL,tbh.peek().bin); + ASSERT_EQ(2,tbh.pop().time); + ASSERT_EQ(3,tbh.pop().time); +} + + TEST(TransferTest,TransferFile) { AB = Sha1Hash(A,B); @@ -77,7 +91,7 @@ TEST(TransferTest,TransferFile) { EXPECT_EQ(2,leech->packets_complete()); EXPECT_EQ(bin64_t(2,0),leech->peak(0)); } - bin64_t next = leech_transfer->picker().Pick(seed->ack_out(),0); + bin64_t next = leech_transfer->picker().Pick(seed->ack_out(),1,TINT_NEVER); ASSERT_NE(bin64_t::NONE,next); ASSERT_TRUE(next.base_offset()<5); uint8_t buf[1024]; //size_t len = seed->storer->ReadData(next,&buf); -- 2.20.1