/*
- * datasendrecv.cpp
- * serp++
+ * sendrecv.cpp
+ * most of the swift's state machine
*
* Created by Victor Grishchenko on 3/6/09.
* Copyright 2009 Delft University of Technology. All rights reserved.
*
*/
-#include <algorithm>
-#include <glog/logging.h>
-#include "p2tp.h"
+#include "swift.h"
+#include <algorithm> // kill it
-
-using namespace p2tp;
-using namespace std; // FIXME remove
+using namespace swift;
+using namespace std;
/*
TODO 25 Oct 18:55
- - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
- - ANY_LAYER
- range: ALL
- randomized testing of advanced ops (new testcase)
- - PeerCwnd()
- - bins hint_out_, tbqueue hint_out_ts_
-
*/
-void Channel::AddPeakHashes (Datagram& dgram) {
- for(int i=0; i<file().peak_count(); i++) {
+void Channel::AddPeakHashes (Datagram& dgram) {
+ for(int i=0; i<file().peak_count(); i++) {
bin64_t peak = file().peak(i);
- dgram.Push8(P2TP_HASH);
- dgram.Push32((uint32_t)peak);
- dgram.PushHash(file().peak_hash(i));
- //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
- dprintf("%s #%i +phash (%i,%lli)\n",Datagram::TimeStr(),id,peak.layer(),peak.offset());
- }
+ dgram.Push8(SWIFT_HASH);
+ dgram.Push32((uint32_t)peak);
+ dgram.PushHash(file().peak_hash(i));
+ dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
+ }
}
-void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
+void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
bin64_t peak = file().peak_for(pos);
- while (pos!=peak && ack_in_.get(pos.parent())==bins::EMPTY) {
+ while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
+ ack_in_.get(pos.parent())==binmap_t::EMPTY ) {
bin64_t uncle = pos.sibling();
- dgram.Push8(P2TP_HASH);
- dgram.Push32((uint32_t)uncle);
- dgram.PushHash( file().hash(uncle) );
- //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
- dprintf("%s #%i +hash (%i,%lli)\n",Datagram::TimeStr(),id,uncle.layer(),uncle.offset());
+ dgram.Push8(SWIFT_HASH);
+ dgram.Push32((uint32_t)uncle);
+ dgram.PushHash( file().hash(uncle) );
+ dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
pos = pos.parent();
}
}
-bin64_t Channel::DequeueHint () { // TODO: resilience
+bin64_t Channel::ImposeHint () {
+ uint64_t twist = peer_channel_id_; // got no hints, send something randomly
+ twist &= file().peak(0); // FIXME may make it semi-seq here
+ file().ack_out().twist(twist);
+ ack_in_.twist(twist);
+ bin64_t my_pick =
+ file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED);
+ while (my_pick.width()>max(1,(int)cwnd_))
+ my_pick = my_pick.left();
+ file().ack_out().twist(0);
+ ack_in_.twist(0);
+ return my_pick.twisted(twist);
+}
+
+
+bin64_t Channel::DequeueHint () {
+ if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
+ bin64_t my_pick = ImposeHint(); // FIXME move to the loop
+ if (my_pick!=bin64_t::NONE) {
+ hint_in_.push_back(my_pick);
+ dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str());
+ }
+ }
bin64_t send = bin64_t::NONE;
while (!hint_in_.empty() && send==bin64_t::NONE) {
- bin64_t hint = hint_in_.front();
+ bin64_t hint = hint_in_.front().bin;
+ tint time = hint_in_.front().time;
hint_in_.pop_front();
- send = file().ack_out().find_filtered
- (ack_in_,hint,0,bins::FILLED);
- dprintf("%s #%i may_send %lli\n",Datagram::TimeStr(),id,send.base_offset());
- if (send!=bin64_t::NONE)
- while (send!=hint) {
- hint = hint.towards(send);
- hint_in_.push_front(hint.sibling());
- }
+ while (!hint.is_base()) { // FIXME optimize; possible attack
+ hint_in_.push_front(tintbin(time,hint.right()));
+ hint = hint.left();
+ }
+ //if (time < NOW-TINT_SEC*3/2 )
+ // continue; bad idea
+ if (ack_in_.get(hint)!=binmap_t::FILLED)
+ send = hint;
}
+ uint64_t mass = 0;
+ for(int i=0; i<hint_in_.size(); i++)
+ mass += hint_in_[i].bin.width();
+ dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id_,send.str(),mass);
return send;
}
-/*void Channel::CleanStaleHints () {
- while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED )
- hint_out.pop_front(); // FIXME must normally clear fulfilled entries
- tint timed_out = Datagram::now - cc_->RoundTripTime()*8;
- while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
- file().picker()->Snubbed(hint_out.front().bin);
- hint_out.pop_front();
- }
-}*/
-
-
-void Channel::AddHandshake (Datagram& dgram) {
- if (!peer_channel_id_) { // initiating
- dgram.Push8(P2TP_HASH);
- dgram.Push32(bin64_t::ALL32);
- dgram.PushHash(file().root_hash());
- dprintf("%s #%i +hash ALL %s\n",
- Datagram::TimeStr(),id,file().root_hash().hex().c_str());
- }
- dgram.Push8(P2TP_HANDSHAKE);
- dgram.Push32(EncodeID(id));
- dprintf("%s #%i +hs\n",Datagram::TimeStr(),id);
- AddAck(dgram);
- ack_out_.clear();
-}
-
-
-void Channel::ClearStaleDataOut() {
- int oldsize = data_out_.size();
- while ( data_out_.size() && data_out_.front().time <
- Datagram::now - rtt_avg_ - dev_avg_*4 )
- data_out_.pop_front();
- if (data_out_.size()!=oldsize)
- cc_->OnAckRcvd(bin64_t::NONE);
+void Channel::AddHandshake (Datagram& dgram) {
+ if (!peer_channel_id_) { // initiating
+ dgram.Push8(SWIFT_HASH);
+ dgram.Push32(bin64_t::ALL32);
+ dgram.PushHash(file().root_hash());
+ dprintf("%s #%u +hash ALL %s\n",
+ tintstr(),id_,file().root_hash().hex().c_str());
+ }
+ dgram.Push8(SWIFT_HANDSHAKE);
+ int encoded = EncodeID(id_);
+ dgram.Push32(encoded);
+ dprintf("%s #%u +hs %x\n",tintstr(),id_,encoded);
+ have_out_.clear();
+ AddHave(dgram);
}
-void Channel::Send () {
+void Channel::Send () {
Datagram dgram(socket_,peer());
dgram.Push32(peer_channel_id_);
bin64_t data = bin64_t::NONE;
if ( is_established() ) {
+ // FIXME: seeder check
+ AddHave(dgram);
AddAck(dgram);
- AddHint(dgram);
- ClearStaleDataOut();
- if (cc_->MaySendData())
- data = AddData(dgram);
- else
- dprintf("%s #%i no cwnd\n",Datagram::TimeStr(),id);
+ if (!file().is_complete())
+ AddHint(dgram);
+ AddPex(dgram);
+ TimeoutDataOut();
+ data = AddData(dgram);
} else {
AddHandshake(dgram);
+ AddHave(dgram);
AddAck(dgram);
}
- dprintf("%s #%i sent %ib %s\n",Datagram::TimeStr(),id,dgram.size(),peer().str().c_str());
- PCHECK( dgram.Send() != -1 )<<"error sending";
- if (dgram.size()==4) // only the channel id; bare keep-alive
+ dprintf("%s #%u sent %ib %s:%x\n",
+ tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_);
+ if (dgram.size()==4) {// only the channel id; bare keep-alive
data = bin64_t::ALL;
- cc_->OnDataSent(data);
- last_send_time_ = Datagram::now;
- RequeueSend(cc_->NextSendTime());
+ }
+ if (dgram.Send()==-1)
+ print_error("can't send datagram");
+ last_send_time_ = NOW;
+ sent_since_recv_++;
+ dgrams_sent_++;
+ Reschedule();
}
-void Channel::AddHint (Datagram& dgram) {
+void Channel::AddHint (Datagram& dgram) {
- while (!hint_out_.empty() &&
- (hint_out_.front().time<Datagram::now-TINT_SEC ||
- file().ack_out().get(hint_out_.front().bin)==bins::FILLED ) ) {
- file().picker().Expired(hint_out_.front().bin);
+ tint plan_for = max(TINT_SEC,rtt_avg_*4);
+
+ tint timed_out = NOW - plan_for*2;
+ while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
+ hint_out_size_ -= hint_out_.front().bin.width();
hint_out_.pop_front();
}
- uint64_t hinted = 0;
- for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
- hinted += i->bin.width();
- int bps = PeerBPS();
- dprintf("%s #%i hinted %lli peer_bps %i\n",Datagram::TimeStr(),id,hinted,bps);
- //float peer_cwnd = cc_->PeerBPS() * cc_->RoundTripTime() / TINT_SEC;
-
- if ( bps > hinted*1024 ) { //hinted*1024 < peer_cwnd*4 ) {
-
- uint8_t layer = 2; // actually, enough
- bin64_t hint = file().picker().Pick(ack_in_,layer);
- // FIXME FIXME FIXME: any layer
- if (hint==bin64_t::NONE)
- hint = file().picker().Pick(ack_in_,0);
-
+
+ int plan_pck = max ( (tint)1, plan_for / dip_avg_ );
+
+ if ( hint_out_size_ < plan_pck ) {
+
+ int diff = plan_pck - hint_out_size_; // TODO: aggregate
+ bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
+
if (hint!=bin64_t::NONE) {
- hint_out_.push_back(hint);
- dgram.Push8(P2TP_HINT);
+ dgram.Push8(SWIFT_HINT);
dgram.Push32(hint);
- dprintf("%s #%i +hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset());
- }
-
+ dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(),hint_out_size_);
+ hint_out_.push_back(hint);
+ hint_out_size_ += hint.width();
+ } else
+ dprintf("%s #%u Xhint\n",tintstr(),id_);
+
}
}
-bin64_t Channel::AddData (Datagram& dgram) {
- if (!file().size()) // know nothing
- return bin64_t::NONE;
- bin64_t tosend = DequeueHint();
- if (tosend==bin64_t::NONE)
+bin64_t Channel::AddData (Datagram& dgram) {
+
+ if (!file().size()) // know nothing
return bin64_t::NONE;
+
+ bin64_t tosend = bin64_t::NONE;
+ tint luft = send_interval_>>4; // may wake up a bit earlier
+ if (data_out_.size()<cwnd_ &&
+ last_data_out_time_+send_interval_<=NOW+luft) {
+ tosend = DequeueHint();
+ if (tosend==bin64_t::NONE) {
+ dprintf("%s #%u sendctrl no idea what to send\n",tintstr(),id_);
+ if (send_control_!=KEEP_ALIVE_CONTROL)
+ SwitchSendControl(KEEP_ALIVE_CONTROL);
+ }
+ } else
+ dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
+ tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
+
+ if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
+ return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
+
if (ack_in_.is_empty() && file().size())
AddPeakHashes(dgram);
AddUncleHashes(dgram,tosend);
+ if (!ack_in_.is_empty()) // TODO: cwnd_>1
+ data_out_cap_ = tosend;
+
+ if (dgram.size()>254) {
+ dgram.Send(); // kind of fragmentation
+ dgram.Push32(peer_channel_id_);
+ }
+
+ dgram.Push8(SWIFT_DATA);
+ dgram.Push32(tosend.to32());
+
uint8_t buf[1024];
- size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
- // TODO: ??? corrupted data, retries
+ size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
+ // TODO: corrupted data, retries, caching
if (r<0) {
- PLOG(ERROR)<<"error on reading";
- return 0;
+ print_error("error on reading");
+ return bin64_t::NONE;
}
assert(dgram.space()>=r+4+1);
- dgram.Push8(P2TP_DATA);
- dgram.Push32(tosend);
dgram.Push(buf,r);
- dprintf("%s #%i +data (%lli)\n",Datagram::TimeStr(),id,tosend.base_offset());
+
+ last_data_out_time_ = NOW;
data_out_.push_back(tosend);
- return tosend;
+ dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str());
+
+ return tosend;
}
-void Channel::AddTs (Datagram& dgram) {
- dgram.Push8(P2TP_TS);
- dgram.Push64(data_in_.time);
- dprintf("%s #%i +ts %lli\n",Datagram::TimeStr(),id,data_in_.time);
+void Channel::AddAck (Datagram& dgram) {
+ if (data_in_==tintbin())
+ return;
+ dgram.Push8(SWIFT_ACK);
+ dgram.Push32(data_in_.bin.to32()); // FIXME not cover
+ dgram.Push64(data_in_.time); // FIXME 32
+ have_out_.set(data_in_.bin);
+ dprintf("%s #%u +ack %s %s\n",
+ tintstr(),id_,data_in_.bin.str(),tintstr(data_in_.time));
+ if (data_in_.bin.layer()>2)
+ data_in_dbl_ = data_in_.bin;
+ data_in_ = tintbin();
}
-void Channel::AddAck (Datagram& dgram) {
- if (data_in_.bin!=bin64_t::NONE) {
- AddTs(dgram);
- bin64_t pos = data_in_.bin;
- dgram.Push8(P2TP_ACK);
- dgram.Push32(pos);
- //dgram.Push64(data_in_.time);
- ack_out_.set(pos);
- dprintf("%s #%i +ack (%i,%lli) %s\n",Datagram::TimeStr(),id,
- pos.layer(),pos.offset(),Datagram::TimeStr(data_in_.time));
- data_in_ = tintbin(0,bin64_t::NONE);
- }
+void Channel::AddHave (Datagram& dgram) {
+ if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
+ dgram.Push8(SWIFT_HAVE);
+ dgram.Push32(data_in_dbl_.to32());
+ data_in_dbl_=bin64_t::NONE;
+ }
for(int count=0; count<4; count++) {
- bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, 0, bins::FILLED);
- // TODO bins::ANY_LAYER
+ bin64_t ack = file().ack_out().find_filtered // FIXME: do rotating queue
+ (have_out_, bin64_t::ALL, binmap_t::FILLED);
if (ack==bin64_t::NONE)
break;
- while (file().ack_out().get(ack.parent())==bins::FILLED)
- ack = ack.parent();
- ack_out_.set(ack);
- dgram.Push8(P2TP_ACK);
- dgram.Push32(ack);
- dprintf("%s #%i +ack (%i,%lli)\n",Datagram::TimeStr(),id,ack.layer(),ack.offset());
+ ack = file().ack_out().cover(ack);
+ have_out_.set(ack);
+ dgram.Push8(SWIFT_HAVE);
+ dgram.Push32(ack.to32());
+ dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str());
}
}
-void Channel::Recv (Datagram& dgram) {
+void Channel::Recv (Datagram& dgram) {
+ dprintf("%s #%u recvd %ib\n",tintstr(),id_,dgram.size()+4);
+ dgrams_rcvd_++;
+ if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
+ rtt_avg_ = NOW - last_send_time_;
+ dev_avg_ = rtt_avg_;
+ dip_avg_ = rtt_avg_;
+ dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
+ }
bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
- while (dgram.size()) {
- uint8_t type = dgram.Pull8();
- switch (type) {
- case P2TP_HANDSHAKE: OnHandshake(dgram); break;
- case P2TP_DATA: data=OnData(dgram); break;
- case P2TP_TS: OnTs(dgram); break;
- case P2TP_ACK: OnAck(dgram); break;
- case P2TP_HASH: OnHash(dgram); break;
- case P2TP_HINT: OnHint(dgram); break;
- case P2TP_PEX_ADD: OnPex(dgram); break;
- default:
- //LOG(ERROR) << this->id_string() << " malformed datagram";
- return;
- }
- }
- cc_->OnDataRecvd(data);
- last_recv_time_ = Datagram::now;
- if (data!=bin64_t::ALL)
- RequeueSend(Datagram::now);
+ while (dgram.size()) {
+ uint8_t type = dgram.Pull8();
+ switch (type) {
+ case SWIFT_HANDSHAKE: OnHandshake(dgram); break;
+ case SWIFT_DATA: data=OnData(dgram); break;
+ case SWIFT_HAVE: OnHave(dgram); break;
+ case SWIFT_ACK: OnAck(dgram); break;
+ case SWIFT_HASH: OnHash(dgram); break;
+ case SWIFT_HINT: OnHint(dgram); break;
+ case SWIFT_PEX_ADD: OnPex(dgram); break;
+ default:
+ eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type);
+ return;
+ }
+ }
+ last_recv_time_ = NOW;
+ sent_since_recv_ = 0;
+ Reschedule();
}
-void Channel::OnHash (Datagram& dgram) {
- bin64_t pos = dgram.Pull32();
- Sha1Hash hash = dgram.PullHash();
- file().OfferHash(pos,hash);
- //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
- dprintf("%s #%i -hash (%i,%lli)\n",Datagram::TimeStr(),id,pos.layer(),pos.offset());
+void Channel::OnHash (Datagram& dgram) {
+ bin64_t pos = dgram.Pull32();
+ Sha1Hash hash = dgram.PullHash();
+ file().OfferHash(pos,hash);
+ dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str());
}
-bin64_t Channel::OnData (Datagram& dgram) {
- bin64_t pos = dgram.Pull32();
+void Channel::CleanHintOut (bin64_t pos) {
+ int hi = 0;
+ while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
+ hi++;
+ if (hi==hint_out_.size())
+ return; // something not hinted or hinted in far past
+ while (hi--) { // removing likely snubbed hints
+ hint_out_size_ -= hint_out_.front().bin.width();
+ hint_out_.pop_front();
+ }
+ while (hint_out_.front().bin!=pos) {
+ tintbin f = hint_out_.front();
+ f.bin = f.bin.towards(pos);
+ hint_out_.front().bin = f.bin.sibling();
+ hint_out_.push_front(f);
+ }
+ hint_out_.pop_front();
+ hint_out_size_--;
+}
+
+
+bin64_t Channel::OnData (Datagram& dgram) { // TODO: HAVE NONE for corrupted data
+ bin64_t pos = dgram.Pull32();
uint8_t *data;
int length = dgram.Pull(&data,1024);
- bool ok = file().OfferData(pos, data, length) ;
- dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset());
- if (ok) {
- data_in_ = tintbin(Datagram::now,pos);
- if (last_recv_time_) {
- tint dip = Datagram::now - last_recv_time_;
+ bool ok = (pos==bin64_t::NONE) ||
+ (!file().ack_out().get(pos) && file().OfferData(pos, (char*)data, length) );
+ dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
+ data_in_ = tintbin(NOW,bin64_t::NONE);
+ if (!ok)
+ return bin64_t::NONE;
+ bin64_t cover = transfer().ack_out().cover(pos);
+ for(int i=0; i<transfer().cb_installed; i++)
+ if (cover.layer()>=transfer().cb_agg[i])
+ transfer().callbacks[i](transfer().fd(),cover); // FIXME
+ data_in_.bin = pos;
+ if (pos!=bin64_t::NONE) {
+ if (last_data_in_time_) {
+ tint dip = NOW - last_data_in_time_;
dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
}
- return pos;
- } else
- return bin64_t::NONE;
+ last_data_in_time_ = NOW;
+ }
+ CleanHintOut(pos);
+ return pos;
}
-void Channel::OnAck (Datagram& dgram) {
- bin64_t ackd_pos = dgram.Pull32();
- if (ackd_pos.base_offset()>file().size())
+void Channel::OnAck (Datagram& dgram) {
+ bin64_t ackd_pos = dgram.Pull32();
+ tint peer_time = dgram.Pull64(); // FIXME 32
+ // FIXME FIXME: wrap around here
+ if (ackd_pos==bin64_t::NONE)
+ return; // likely, brocken packet / insufficient hashes
+ if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
+ eprintf("invalid ack: %s\n",ackd_pos.str());
return;
- dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,ackd_pos.layer(),ackd_pos.offset());
- for (int i=0; i<8 && i<data_out_.size(); i++)
- if (data_out_[i].bin.within(ackd_pos)) {
- tintbin x = data_out_[i];
- data_out_[i].bin = bin64_t::ALL;
- tint rtt = Datagram::now-x.time;
- rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
- dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
- dprintf("%s #%i rtt %lli dev %lli\n",
- Datagram::TimeStr(),id,rtt_avg_,dev_avg_);
- cc_->OnAckRcvd(x.bin);
+ }
+ ack_in_.set(ackd_pos);
+ int di = 0, ri = 0;
+ // find an entry for the send (data out) event
+ while ( di<data_out_.size() && ( data_out_[di]==tintbin() ||
+ !data_out_[di].bin.within(ackd_pos) ) )
+ di++;
+ // FUTURE: delayed acks
+ // rule out retransmits
+ while ( ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
+ ri++;
+ dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
+ di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time);
+ if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
+ // round trip time calculations
+ tint rtt = NOW-data_out_[di].time;
+ rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
+ dev_avg_ = ( dev_avg_*3 + ::abs(rtt-rtt_avg_) ) >> 2;
+ assert(data_out_[di].time!=TINT_NEVER);
+ // one-way delay calculations
+ tint owd = peer_time - data_out_[di].time;
+ owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
+ owd_current_[owd_cur_bin_] = owd;
+ if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
+ owd_min_bin_start_ = NOW;
+ owd_min_bin_ = (owd_min_bin_+1) & 3;
+ owd_min_bins_[owd_min_bin_] = TINT_NEVER;
+ }
+ if (owd_min_bins_[owd_min_bin_]>owd)
+ owd_min_bins_[owd_min_bin_] = owd;
+ dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
+ tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str());
+ ack_rcvd_recent_++;
+ // early loss detection by packet reordering
+ for (int re=0; re<di-MAX_REORDERING; re++) {
+ if (data_out_[re]==tintbin())
+ continue;
+ ack_not_rcvd_recent_++;
+ data_out_tmo_.push_back(data_out_[re].bin);
+ dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
+ data_out_cap_ = bin64_t::ALL;
+ data_out_[re] = tintbin();
}
- while (data_out_.size() && data_out_.front().bin==bin64_t::ALL)
+ }
+ if (di!=data_out_.size())
+ data_out_[di]=tintbin();
+ // clear zeroed items
+ while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
+ ack_in_.is_filled(data_out_.front().bin) ) )
data_out_.pop_front();
- ack_in_.set(ackd_pos);
+ assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
}
-/*void Channel::OnAckTs (Datagram& dgram) { // FIXME: OnTs
- bin64_t pos = dgram.Pull32();
- tint ts = dgram.Pull64();
- // TODO sanity check
- dprintf("%s #%i -ackts (%i,%lli) %s\n",
- Datagram::TimeStr(),id,pos.layer(),pos.offset(),Datagram::TimeStr(ts));
- ack_in_.set(pos);
- cc_->OnAckRcvd(pos,ts);
-}*/
+void Channel::TimeoutDataOut ( ) {
+ // losses: timeouted packets
+ tint timeout = NOW - ack_timeout();
+ while (!data_out_.empty() &&
+ ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
+ if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
+ ack_not_rcvd_recent_++;
+ data_out_cap_ = bin64_t::ALL;
+ data_out_tmo_.push_back(data_out_.front().bin);
+ dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
+ }
+ data_out_.pop_front();
+ }
+ // clear retransmit queue of older items
+ while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
+ data_out_tmo_.pop_front();
+}
+
-void Channel::OnTs (Datagram& dgram) {
- peer_send_time_ = dgram.Pull64();
- dprintf("%s #%i -ts %lli\n",Datagram::TimeStr(),id,peer_send_time_);
+void Channel::OnHave (Datagram& dgram) {
+ bin64_t ackd_pos = dgram.Pull32();
+ if (ackd_pos==bin64_t::NONE)
+ return; // wow, peer has hashes
+ ack_in_.set(ackd_pos);
+ dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str());
}
-void Channel::OnHint (Datagram& dgram) {
- bin64_t hint = dgram.Pull32();
- hint_in_.push_back(hint);
- //RequeueSend(cc_->OnHintRecvd(hint));
- dprintf("%s #%i -hint (%i,%lli)\n",Datagram::TimeStr(),id,hint.layer(),hint.offset());
+void Channel::OnHint (Datagram& dgram) {
+ bin64_t hint = dgram.Pull32();
+ // FIXME: wake up here
+ hint_in_.push_back(hint);
+ dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str());
}
void Channel::OnHandshake (Datagram& dgram) {
peer_channel_id_ = dgram.Pull32();
- dprintf("%s #%i -hs %i\n",Datagram::TimeStr(),id,peer_channel_id_);
+ dprintf("%s #%u -hs %x\n",tintstr(),id_,peer_channel_id_);
+ // self-connection check
+ if (!SELF_CONN_OK) {
+ uint32_t try_id = DecodeID(peer_channel_id_);
+ if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
+ peer_channel_id_ = 0;
+ Close();
+ return; // this is a self-connection
+ }
+ }
// FUTURE: channel forking
}
uint32_t ipv4 = dgram.Pull32();
uint16_t port = dgram.Pull16();
Address addr(ipv4,port);
- //if (peer_selector)
- // peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
- file_->pex_in_.push_back(addr);
- if (file_->pex_in_.size()>1000)
- file_->pex_in_.pop_front();
- static int ENOUGH_PEERS_THRESHOLD = 20;
- if (file_->channel_count()<ENOUGH_PEERS_THRESHOLD) {
- int i = 0, chno;
- while ( (chno=file_->RevealChannel(i)) != -1 ) {
- if (channels[i]->peer()==addr)
- break;
- }
- if (chno==-1)
- new Channel(file_,socket_,addr);
- }
+ dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str());
+ transfer().OnPexIn(addr);
}
void Channel::AddPex (Datagram& dgram) {
- int ch = file().RevealChannel(this->pex_out_);
- if (ch==-1)
+ int chid = transfer().RevealChannel(pex_out_);
+ if (chid==-1 || chid==id_)
return;
- Address a = channels[ch]->peer();
- dgram.Push8(P2TP_PEX_ADD);
+ Address a = channels[chid]->peer();
+ dgram.Push8(SWIFT_PEX_ADD);
dgram.Push32(a.ipv4());
dgram.Push16(a.port());
+ dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str());
}
-void Channel::Recv (int socket) {
- Datagram data(socket);
- data.Recv();
- if (data.size()<4)
- RETLOG("datagram shorter than 4 bytes");
- uint32_t mych = data.Pull32();
- Sha1Hash hash;
- Channel* channel;
- if (!mych) { // handshake initiated
- if (data.size()<1+4+1+4+Sha1Hash::SIZE)
- RETLOG ("incorrect size initial handshake packet");
- uint8_t hashid = data.Pull8();
- if (hashid!=P2TP_HASH)
- RETLOG ("no hash in the initial handshake");
- bin pos = data.Pull32();
- if (pos!=bin64_t::ALL32)
- RETLOG ("that is not the root hash");
- hash = data.PullHash();
- FileTransfer* file = FileTransfer::Find(hash);
- if (!file)
- RETLOG ("hash unknown, no such file");
- dprintf("%s #0 -hash ALL %s\n",Datagram::TimeStr(),hash.hex().c_str());
- channel = new Channel(file, socket, data.address());
- } else {
- mych = DecodeID(mych);
- if (mych>=channels.size()) {
- eprintf("invalid channel #%i\n",mych);
- return;
- }
- channel = channels[mych];
- if (!channel)
- RETLOG ("channel is closed");
- if (channel->peer() != data.address())
- RETLOG ("invalid peer address");
+void Channel::RecvDatagram (SOCKET socket) {
+ Datagram data(socket);
+ data.Recv();
+ const Address& addr = data.address();
+#define return_log(...) { fprintf(stderr,__VA_ARGS__); return; }
+ if (data.size()<4)
+ return_log("datagram shorter than 4 bytes %s\n",addr.str());
+ uint32_t mych = data.Pull32();
+ Sha1Hash hash;
+ Channel* channel = NULL;
+ if (mych==0) { // handshake initiated
+ if (data.size()<1+4+1+4+Sha1Hash::SIZE)
+ return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
+ tintstr(),data.size(),addr.str());
+ uint8_t hashid = data.Pull8();
+ if (hashid!=SWIFT_HASH)
+ return_log ("%s #0 no hash in the initial handshake %s\n",
+ tintstr(),addr.str());
+ bin64_t pos = data.Pull32();
+ if (pos!=bin64_t::ALL)
+ return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
+ hash = data.PullHash();
+ FileTransfer* file = FileTransfer::Find(hash);
+ if (!file)
+ return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
+ dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
+ for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
+ if (channels[*i] && channels[*i]->peer_==data.address() &&
+ channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
+ return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
+ channel = new Channel(file, socket, data.address());
+ } else {
+ mych = DecodeID(mych);
+ if (mych>=channels.size())
+ return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
+ channel = channels[mych];
+ if (!channel)
+ return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
+ if (channel->peer() != addr)
+ return_log ("%s #%u invalid peer address %s!=%s\n",
+ tintstr(),mych,channel->peer().str(),addr.str());
channel->own_id_mentioned_ = true;
- }
+ }
+ //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
channel->Recv(data);
}
-bool tblater (const tintbin& a, const tintbin& b) {
- return a.time > b.time;
-}
-
-
-void Channel::RequeueSend (tint next_time) {
- if (next_time==next_send_time_)
- return;
- next_send_time_ = next_time;
- send_queue.push_back(tintbin(next_time,id));
- push_heap(send_queue.begin(),send_queue.end(),tblater);
- dprintf("%s requeue #%i for %s\n",Datagram::TimeStr(),id,Datagram::TimeStr(next_time));
-}
-
+void Channel::Loop (tint howlong) {
-void Channel::Loop (tint howlong) {
-
tint limit = Datagram::Time() + howlong;
-
+
do {
tint send_time(TINT_NEVER);
Channel* sender(NULL);
- while (!send_queue.empty()) {
- send_time = send_queue.front().time;
- sender = channel((int)send_queue.front().bin);
- if (sender && sender->next_send_time_==send_time)
- break;
- sender = NULL; // it was a stale entry
- pop_heap(send_queue.begin(), send_queue.end(), tblater);
- send_queue.pop_back();
+ while (!sender && !send_queue.is_empty()) { // dequeue
+ tintbin next = send_queue.pop();
+ sender = channel((int)next.bin);
+ send_time = next.time;
+ if (sender && sender->next_send_time_!=send_time &&
+ sender->next_send_time_!=TINT_NEVER )
+ sender = NULL; // it was a stale entry
}
- if (send_time>limit)
- send_time = limit;
- if (sender && send_time<=Datagram::now) {
- dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id,
- Datagram::TimeStr(send_time));
+
+ if ( sender!=NULL && send_time<=NOW ) { // it's time
+
+ dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
+ tintstr(send_time));
sender->Send();
- pop_heap(send_queue.begin(), send_queue.end(), tblater);
- send_queue.pop_back();
- } else {
- tint towait = send_time - Datagram::now;
- dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
- int rd = Datagram::Wait(socket_count,sockets,towait);
- if (rd!=-1)
- Recv(rd);
+
+ } else { // it's too early, wait
+
+ tint towait = min(limit,send_time) - NOW;
+ dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
+ Datagram::Wait(towait);
+ if (sender) // get back to that later
+ send_queue.push(tintbin(send_time,sender->id()));
+
}
-
- } while (Datagram::Time()<limit);
-
+
+ } while (NOW<limit);
+
}
+void Channel::Close () {
+ this->SwitchSendControl(CLOSE_CONTROL);
+}
-/*
-
- tint untiltime = Datagram::Time()+time;
- if (send_queue.empty())
- dprintf("%s empty send_queue\n", Datagram::TimeStr());
-
- while ( Datagram::now <= untiltime && !send_queue.empty() ) {
-
- // BUG BUG BUG no scheduled sends => just listen
-
- tintbin next_send = send_queue.front();
- tint wake_on = next_send.time;
- Channel* sender = channel(next_send.bin);
-
- // BUG BUG BUG filter stale timeouts here
-
- //if (wake_on<=untiltime) {
- pop_heap(send_queue.begin(), send_queue.end(), tblater);
- send_queue.pop_back();
- //}// else
- //sender = 0; // BUG will never wake up
-
- if (sender->next_send_time_!=next_send.time)
- continue;
-
- if (wake_on<Datagram::now)
- wake_on = Datagram::now;
- if (wake_on>untiltime)
- wake_on = untiltime;
- tint towait = min(wake_on-Datagram::now,TINT_SEC);
- dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
- int rd = Datagram::Wait(socket_count,sockets,towait);
- if (rd!=-1)
- Recv(rd);
- // BUG WRONG BUG WRONG another may need to send
- if (sender) {
- dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id,
- Datagram::TimeStr(next_send.time));
- sender->Send();
- // if (sender->cc_->next_send_time==TINT_NEVER)
- }
-
- }
-
- */
+
+void Channel::Reschedule () {
+ next_send_time_ = NextSendTime();
+ if (next_send_time_!=TINT_NEVER) {
+ assert(next_send_time_<NOW+TINT_MIN);
+ send_queue.push(tintbin(next_send_time_,id_));
+ dprintf("%s #%u requeue for %s\n",tintstr(),id_,tintstr(next_send_time_));
+ } else {
+ dprintf("%s #%u closed\n",tintstr(),id_);
+ delete this;
+ }
+}