}
+bin64_t Channel::ImposeHint () {
+ uint64_t twist = peer_channel_id_; // got no hints, send something randomly
+ twist &= file().peak(0); // FIXME may make it semi-seq here
+ file().ack_out().twist(twist);
+ ack_in_.twist(twist);
+ bin64_t my_pick =
+ file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
+ while (my_pick.width()>max(1,(int)cwnd_))
+ my_pick = my_pick.left();
+ file().ack_out().twist(0);
+ ack_in_.twist(0);
+ return my_pick.twisted(twist);
+}
+
+
bin64_t Channel::DequeueHint () {
- if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
- uint64_t twist = peer_channel_id_; // got no hints, send something randomly
- twist &= file().peak(0); // may make it semi-seq here
- file().ack_out().twist(twist);
- ack_in_.twist(twist);
- bin64_t my_pick =
- file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
- while (my_pick.width()>max(1,(int)cwnd_))
- my_pick = my_pick.left();
- file().ack_out().twist(0);
- ack_in_.twist(0);
+ if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
+ bin64_t my_pick = ImposeHint(); // FIXME move to the loop
if (my_pick!=bin64_t::NONE) {
- my_pick = my_pick.twisted(twist);
hint_in_.push_back(my_pick);
dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
}
}
//if (time < NOW-TINT_SEC*3/2 )
// continue; bad idea
- if (ack_in_.get(hint)!=binmap_t::FILLED)
+ if (ack_in_.get(hint)!=binmap_t::FILLED)
send = hint;
}
uint64_t mass = 0;
if (!file().is_complete())
AddHint(dgram);
AddPex(dgram);
- CleanDataOut();
+ TimeoutDataOut();
data = AddData(dgram);
} else {
AddHandshake(dgram);
tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
if (dgram.size()==4) {// only the channel id; bare keep-alive
data = bin64_t::ALL;
- //dprintf("%s #%u considering keepalive %i %f %s\n",
- // tintstr(),id_,(int)data_out_.size(),cwnd_,SEND_CONTROL_MODES[send_control_]);
- //if (data_out_.size()<cwnd_ && send_control_!=KEEP_ALIVE_CONTROL) {
- //if ( cwnd_ < 1 )
- // SwitchSendControl(KEEP_ALIVE_CONTROL);
- //else
- // cwnd_ = cwnd_/2.0;
- //}
- //if (data_out_.empty() && send_control_!=KEEP_ALIVE_CONTROL)
- // SwitchSendControl(KEEP_ALIVE_CONTROL);// we did our best
- //if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
- // return; // don't send empty dgram
}
if (dgram.Send()==-1)
print_error("can't send datagram");
void Channel::AddHint (Datagram& dgram) {
tint plan_for = max(TINT_SEC,rtt_avg_*4);
-
+
tint timed_out = NOW - plan_for*2;
while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
hint_out_size_ -= hint_out_.front().bin.width();
hint_out_.pop_front();
}
-
- /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
- if (!peer_cwnd)
- peer_cwnd = 1;*/
+
int plan_pck = max ( (tint)1, plan_for / dip_avg_ );
-
+
if ( hint_out_size_ < plan_pck ) {
-
+
int diff = plan_pck - hint_out_size_; // TODO: aggregate
bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
-
+
if (hint!=bin64_t::NONE) {
dgram.Push8(SWIFT_HINT);
dgram.Push32(hint);
hint_out_size_ += hint.width();
} else
dprintf("%s #%u Xhint\n",tintstr(),id_);
-
+
}
}
bin64_t Channel::AddData (Datagram& dgram) {
-
+
if (!file().size()) // know nothing
return bin64_t::NONE;
-
+
bin64_t tosend = bin64_t::NONE;
tint luft = send_interval_>>4; // may wake up a bit earlier
if (data_out_.size()<cwnd_ &&
} else
dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
tintstr(),id_,cwnd_,data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
-
- if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
+
+ if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
-
+
if (ack_in_.is_empty() && file().size())
AddPeakHashes(dgram);
AddUncleHashes(dgram,tosend);
dgram.Send(); // kind of fragmentation
dgram.Push32(peer_channel_id_);
}
-
+
dgram.Push8(SWIFT_DATA);
dgram.Push32(tosend.to32());
-
+
uint8_t buf[1024];
- size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
+ size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
// TODO: corrupted data, retries, caching
if (r<0) {
print_error("error on reading");
}
assert(dgram.space()>=r+4+1);
dgram.Push(buf,r);
-
+
last_data_out_time_ = NOW;
data_out_.push_back(tosend);
dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
-
+
return tosend;
}
-void Channel::AddTs (Datagram& dgram) {
- dgram.Push8(SWIFT_TS);
- dgram.Push64(data_in_.time);
- dprintf("%s #%u +ts %s\n",tintstr(),id_,tintstr(data_in_.time));
+void Channel::AddAck (Datagram& dgram) {
+ if (data_in_==tintbin())
+ return;
+ dgram.Push8(SWIFT_ACK);
+ dgram.Push32(data_in_.bin.to32());
+ dgram.Push32(data_in_.time);
+ ack_out_.set(data_in_.bin);
+ dprintf("%s #%u +ack %s %s\n",
+ tintstr(),id_,data_in_.bin.str(),tintstr(data_in_.time));
+ if (data_in_.bin.layer()>2)
+ data_in_dbl_ = data_in_.bin;
+ data_in_ = tintbin();
}
-void Channel::AddAck (Datagram& dgram) {
+void Channel::AddHave (Datagram& dgram) {
if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
- dgram.Push8(SWIFT_ACK);
+ dgram.Push8(SWIFT_HAVE);
dgram.Push32(data_in_dbl_.to32());
data_in_dbl_=bin64_t::NONE;
}
- if (data_in_.time!=TINT_NEVER) { // TODO: ACK NONE for corrupted data
- AddTs(dgram);
- bin64_t pos = data_in_.bin; // be precise file().ack_out().cover(data_in_.bin);
- dgram.Push8(SWIFT_ACK);
- dgram.Push32(pos.to32());
- //dgram.Push64(data_in_.time);
- ack_out_.set(pos);
- dprintf("%s #%u +ack %s %s\n",tintstr(),id_,pos.str(),tintstr(data_in_.time));
- data_in_ = tintbin(TINT_NEVER,bin64_t::NONE);
- if (pos.layer()>2)
- data_in_dbl_ = pos;
- }
for(int count=0; count<4; count++) {
- bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, binmap_t::FILLED);
+ bin64_t ack = file().ack_out().find_filtered // FIXME: do rotating queue
+ (ack_out_, bin64_t::ALL, binmap_t::FILLED);
if (ack==bin64_t::NONE)
break;
ack = file().ack_out().cover(ack);
ack_out_.set(ack);
- dgram.Push8(SWIFT_ACK);
+ dgram.Push8(SWIFT_HAVE);
dgram.Push32(ack.to32());
- dprintf("%s #%u +ack %s\n",tintstr(),id_,ack.str());
+ dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str());
}
}
void Channel::Recv (Datagram& dgram) {
dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4);
- peer_send_time_ = 0; // has scope of 1 datagram
dgrams_rcvd_++;
if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
rtt_avg_ = NOW - last_send_time_;
switch (type) {
case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
case SWIFT_DATA: data=OnData(dgram); break;
- case SWIFT_TS: OnTs(dgram); break;
+ case SWIFT_HAVE: OnHave(dgram); break;
case SWIFT_ACK: OnAck(dgram); break;
case SWIFT_HASH: OnHash(dgram); break;
case SWIFT_HINT: OnHint(dgram); break;
bin64_t pos = dgram.Pull32();
Sha1Hash hash = dgram.PullHash();
file().OfferHash(pos,hash);
- //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
}
}
-bin64_t Channel::OnData (Datagram& dgram) {
+bin64_t Channel::OnData (Datagram& dgram) { // TODO: HAVE NONE for corrupted data
bin64_t pos = dgram.Pull32();
uint8_t *data;
int length = dgram.Pull(&data,1024);
bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
data_in_ = tintbin(NOW,bin64_t::NONE);
- if (!ok)
+ if (!ok)
return bin64_t::NONE;
data_in_.bin = pos;
if (pos!=bin64_t::NONE) {
}
last_data_in_time_ = NOW;
}
- CleanHintOut(pos);
+ CleanHintOut(pos);
return pos;
}
-void Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
-
- int max_ack_off = 0;
-
- if (ackd_pos!=bin64_t::NONE) {
- for (int i=0; i<data_out_.size(); i++) {
- if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
- if (peer_send_time_)
- for(tbqueue::iterator j=data_out_tmo_.begin(); j!=data_out_tmo_.end(); j++)
- if (j->bin==data_out_[i].bin)
- peer_send_time_=0; // possibly retransmit
- if (peer_send_time_) { // well, it is sorta ACK ACK
- tint rtt = NOW-data_out_[i].time;
- rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
- dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
- assert(data_out_[i].time!=TINT_NEVER);
- tint owd = peer_send_time_ - data_out_[i].time;
- owd_cur_bin_ = (owd_cur_bin_+1) & 3;
- owd_current_[owd_cur_bin_] = owd;
- if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
- owd_min_bin_start_ = NOW;
- owd_min_bin_ = (owd_min_bin_+1) & 3;
- owd_min_bins_[owd_min_bin_] = TINT_NEVER;
- }
- if (owd_min_bins_[owd_min_bin_]>owd)
- owd_min_bins_[owd_min_bin_] = owd;
- peer_send_time_ = 0;
- }
- dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
- tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str());
- bin64_t pos = data_out_[i].bin;
- ack_rcvd_recent_++;
- data_out_[i]=tintbin();
- max_ack_off = i;
- if (ackd_pos==pos)
- break;
- }
- }
- while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
- data_out_.pop_front();
- max_ack_off--;
- }
- static const int MAX_REORDERING = 2; // the triple-ACK principle
- if (max_ack_off>MAX_REORDERING) {
- while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
- || ack_in_.is_filled(data_out_.front().bin)) ) {
- data_out_.pop_front();
- max_ack_off--;
- }
- while (max_ack_off>MAX_REORDERING) {
- ack_not_rcvd_recent_++;
- data_out_tmo_.push_back(data_out_.front().bin);
- dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
- data_out_.pop_front();
- max_ack_off--;
- data_out_cap_ = bin64_t::ALL;
- }
+void Channel::OnAck (Datagram& dgram) {
+ bin64_t ackd_pos = dgram.Pull32();
+ tint peer_time_ = dgram.Pull64();
+ if (ackd_pos==bin64_t::NONE)
+ return; // likely, brocken packet / insufficient hashes
+ if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
+ eprintf("invalid ack: %s\n",ackd_pos.str());
+ return;
+ }
+ ack_in_.set(ackd_pos);
+ int di = 0, ri = 0;
+ // find an entry for the send (data out) event
+ while ( di<data_out_.size() && ( data_out_[di]==tintbin() ||
+ !data_out_[di].bin.within(ackd_pos) ) )
+ di++;
+ // FUTURE: delayed acks
+ // rule out retransmits
+ while ( ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
+ ri++;
+ dprintf("%s #%u %cack %s %s\n",tintstr(),id_,
+ di==data_out_.size()?'?':'-',ackd_pos.str(),tintstr(peer_time_));
+ if (ri==data_out_tmo_.size()) { // not a retransmit
+ // round trip time calculations
+ tint rtt = NOW-data_out_[di].time;
+ rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
+ dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+ assert(data_out_[di].time!=TINT_NEVER);
+ // one-way delay calculations
+ tint owd = peer_time_ - data_out_[di].time;
+ owd_cur_bin_ = (owd_cur_bin_+1) & 3;
+ owd_current_[owd_cur_bin_] = owd;
+ if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
+ owd_min_bin_start_ = NOW;
+ owd_min_bin_ = (owd_min_bin_+1) & 3;
+ owd_min_bins_[owd_min_bin_] = TINT_NEVER;
}
- peer_send_time_ = 0;
+ if (owd_min_bins_[owd_min_bin_]>owd)
+ owd_min_bins_[owd_min_bin_] = owd;
+ dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
+ tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str());
+ ack_rcvd_recent_++;
+ data_out_[di]=tintbin();
+ }
+ // early loss detection by packet reordering
+ for (int re=0; re<di-MAX_REORDERING; re++) {
+ if (data_out_[re]==tintbin())
+ continue;
+ ack_not_rcvd_recent_++;
+ data_out_tmo_.push_back(data_out_.front().bin);
+ dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
+ data_out_cap_ = bin64_t::ALL;
+ data_out_[ri] = tintbin();
}
- tint timeout = NOW - rtt_avg_ - 4*max(dev_avg_,TINT_MSEC*50);
- while (!data_out_.empty() && data_out_.front().time<timeout) {
- if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
+ // clear zeroed items
+ while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
+ ack_in_.is_filled(data_out_.front().bin) ) )
+ data_out_.pop_front();
+ assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
+}
+
+
+void Channel::TimeoutDataOut ( ) {
+ // losses: timeouted packets
+ tint timeout = NOW - ack_timeout();
+ for (int i=0; i<data_out_.size() && data_out_[i].time<timeout; i++) {
+ if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
ack_not_rcvd_recent_++;
data_out_cap_ = bin64_t::ALL;
data_out_tmo_.push_back(data_out_.front().bin);
}
data_out_.pop_front();
}
- while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
- data_out_.pop_front();
- assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
+ // clear retransmit queue of older items
while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
data_out_tmo_.pop_front();
-
}
-void Channel::OnAck (Datagram& dgram) {
+void Channel::OnHave (Datagram& dgram) {
bin64_t ackd_pos = dgram.Pull32();
if (ackd_pos==bin64_t::NONE)
- return; // likely, brocken packet / insufficient hashes
- if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
- eprintf("invalid ack: %s\n",ackd_pos.str());
- return;
- }
- dprintf("%s #%u -ack %s\n",tintstr(),id_,ackd_pos.str());
+ return; // wow, peer has hashes
ack_in_.set(ackd_pos);
- CleanDataOut(ackd_pos); // FIXME do AFTER all ACKs
-}
-
-
-void Channel::OnTs (Datagram& dgram) {
- peer_send_time_ = dgram.Pull64();
- dprintf("%s #%u -ts %lli\n",tintstr(),id_,peer_send_time_);
+ dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str());
}
void Channel::OnHint (Datagram& dgram) {
bin64_t hint = dgram.Pull32();
+ // FIXME: wake up here
hint_in_.push_back(hint);
- //ack_in_.set(hint,binmap_t::EMPTY);
- //RequeueSend(cc_->OnHintRecvd(hint));
dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
}
data.Recv();
const Address& addr = data.address();
#define return_log(...) { printf(__VA_ARGS__); return NULL; }
- if (data.size()<4)
+ if (data.size()<4)
return_log("datagram shorter than 4 bytes %s\n",addr.str());
uint32_t mych = data.Pull32();
Sha1Hash hash;
Channel* channel = NULL;
if (!mych) { // handshake initiated
- if (data.size()<1+4+1+4+Sha1Hash::SIZE)
+ if (data.size()<1+4+1+4+Sha1Hash::SIZE)
return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
tintstr(),data.size(),addr.str());
uint8_t hashid = data.Pull8();
- if (hashid!=SWIFT_HASH)
+ if (hashid!=SWIFT_HASH)
return_log ("%s #0 no hash in the initial handshake %s\n",
tintstr(),addr.str());
bin64_t pos = data.Pull32();
- if (pos!=bin64_t::ALL)
+ if (pos!=bin64_t::ALL)
return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
hash = data.PullHash();
FileTransfer* file = FileTransfer::Find(hash);
- if (!file)
+ if (!file)
return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
- if (channels[*i] && channels[*i]->peer_==data.address() &&
+ if (channels[*i] && channels[*i]->peer_==data.address() &&
channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
channel = new Channel(file, socket, data.address());
} else {
mych = DecodeID(mych);
- if (mych>=channels.size())
+ if (mych>=channels.size())
return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
channel = channels[mych];
- if (!channel)
+ if (!channel)
return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
- if (channel->peer() != addr)
+ if (channel->peer() != addr)
return_log ("%s #%u invalid peer address %s!=%s\n",
tintstr(),mych,channel->peer().str(),addr.str());
channel->own_id_mentioned_ = true;
}
-void Channel::Loop (tint howlong) {
-
+void Channel::Loop (tint howlong) {
+
tint limit = Datagram::Time() + howlong;
-
+
do {
tint send_time(TINT_NEVER);
sender->next_send_time_!=TINT_NEVER )
sender = NULL; // it was a stale entry
}
-
+
if ( sender!=NULL && send_time<=NOW ) { // it's time
-
+
dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
tintstr(send_time));
sender->Send();
sender->Reschedule();
-
+
} else { // it's too early, wait
-
+
tint towait = min(limit,send_time) - NOW;
dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
int rd = Datagram::Wait(socket_count,sockets,towait);
}
if (sender) // get back to that later
send_queue.push(tintbin(send_time,sender->id()));
-
+
}
-
+
} while (NOW<limit);
-
+
}
-
+
void Channel::Close () {
this->SwitchSendControl(CLOSE_CONTROL);
}
void Channel::Reschedule () {
+ TimeoutDataOut(); // precaution to know free cwnd
next_send_time_ = NextSendTime();
if (next_send_time_!=TINT_NEVER) {
assert(next_send_time_<NOW+TINT_MIN);
DATA 01, bin_32, buffer
1K of data.
- ACK 02, bin_32
- ACKTS 08, bin_32, timestamp_32
+ ACK 02, bin_32, timestamp_32
+ HAVE 03, bin_32
Confirms successfull delivery of data. Used for
congestion control, as well.
- HINT 03, bin_32
+ HINT 08, bin_32
Practical value of "hints" is to avoid overlap, mostly.
Hints might be lost in the network or ignored.
Peer might send out data without a hint.
namespace swift {
#define NOW Datagram::now
-
+
/** tintbin is basically a pair<tint,bin64_t> plus some nice operators.
- Most frequently used in different queues (acknowledgements, requests,
+ Most frequently used in different queues (acknowledgements, requests,
etc). */
struct tintbin {
tint time;
tintbin() : time(TINT_NEVER), bin(bin64_t::NONE) {}
tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
tintbin(bin64_t bin_) : time(NOW), bin(bin_) {}
- bool operator < (const tintbin& b) const
+ bool operator < (const tintbin& b) const
{ return time > b.time; }
bool operator == (const tintbin& b) const
{ return time==b.time && bin==b.bin; }
SWIFT_HANDSHAKE = 0,
SWIFT_DATA = 1,
SWIFT_ACK = 2,
- SWIFT_TS = 8,
- SWIFT_HINT = 3,
+ SWIFT_HAVE = 3,
SWIFT_HASH = 4,
SWIFT_PEX_ADD = 5,
SWIFT_PEX_RM = 6,
SWIFT_SIGNED_HASH = 7,
- SWIFT_MSGTYPE_SENT = 8,
+ SWIFT_HINT = 8,
SWIFT_MSGTYPE_RCVD = 9,
SWIFT_MESSAGE_COUNT = 10
} messageid_t;
public:
- /** A constructor. Open/submit/retrieve a file.
- * @param file_name the name of the file
+ /** A constructor. Open/submit/retrieve a file.
+ * @param file_name the name of the file
* @param root_hash the root hash of the file; zero hash if the file
is newly submitted */
FileTransfer(const char *file_name, const Sha1Hash& root_hash=Sha1Hash::ZERO);
/** Messages we are accepting. */
uint64_t cap_out_;
-
+
tint init_time_;
public:
public:
virtual void Randomize (uint64_t twist) = 0;
/** The piece picking method itself.
- * @param offered the daata acknowledged by the peer
+ * @param offered the daata acknowledged by the peer
* @param max_width maximum number of packets to ask for
* @param expires (not used currently) when to consider request expired
* @return the bin number to request */
being transferred between two peers. As we don't need buffers and
lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
Normally, API users do not deal with this class. */
- class Channel {
+ class Channel {
public:
Channel (FileTransfer* file, int socket=-1, Address peer=Address());
~Channel();
-
+
typedef enum {
KEEP_ALIVE_CONTROL,
PING_PONG_CONTROL,
LEDBAT_CONTROL,
CLOSE_CONTROL
} send_control_t;
-
+
static const char* SEND_CONTROL_MODES[];
static Channel*
void Close ();
void OnAck (Datagram& dgram);
- void OnTs (Datagram& dgram);
+ void OnHave (Datagram& dgram);
bin64_t OnData (Datagram& dgram);
void OnHint (Datagram& dgram);
void OnHash (Datagram& dgram);
void AddHandshake (Datagram& dgram);
bin64_t AddData (Datagram& dgram);
void AddAck (Datagram& dgram);
- void AddTs (Datagram& dgram);
+ void AddHave (Datagram& dgram);
void AddHint (Datagram& dgram);
void AddUncleHashes (Datagram& dgram, bin64_t pos);
void AddPeakHashes (Datagram& dgram);
tint SlowStartNextSendTime ();
tint AimdNextSendTime ();
tint LedbatNextSendTime ();
-
+
static int MAX_REORDERING;
static tint TIMEOUT;
static tint MIN_DEV;
static bool SELF_CONN_OK;
static tint MAX_POSSIBLE_RTT;
static FILE* debug_file;
-
+
const std::string id_string () const;
/** A channel is "established" if had already sent and received packets. */
bool is_established () { return peer_channel_id_ && own_id_mentioned_; }
tint ack_timeout () {
tint dev = dev_avg_ < MIN_DEV ? MIN_DEV : dev_avg_;
tint tmo = rtt_avg_ + dev * 4;
- return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC;
+ return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC;
}
uint32_t id () const { return id_; }
-
+
static int DecodeID(int scrambled);
static int EncodeID(int unscrambled);
static Channel* channel(int i) {
tint last_data_in_time_;
tint last_loss_time_;
tint next_send_time_;
- tint peer_send_time_;
/** Congestion window; TODO: int, bytes. */
float cwnd_;
/** Data sending interval. */
}
/** Get a request for one packet from the queue of peer's requests. */
bin64_t DequeueHint();
- void CleanDataOut (bin64_t acks_pos=bin64_t::NONE);
+ bin64_t ImposeHint();
+ void TimeoutDataOut ();
void CleanStaleHintOut();
void CleanHintOut(bin64_t pos);
void Reschedule();
static SOCKET sockets[8];
static int socket_count;
static tint last_tick;
- static tbheap send_queue;
+ static tbheap send_queue;
static Address tracker;
static std::vector<Channel*> channels;