/*
TODO 25 Oct 18:55
- - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
- - ANY_LAYER
- range: ALL
- randomized testing of advanced ops (new testcase)
- - PeerCwnd()
- - bins hint_out_, tbqueue hint_out_ts_
-
*/
void Channel::AddPeakHashes (Datagram& dgram) {
AddHandshake(dgram);
AddAck(dgram);
}
- dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str().c_str());
+ dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str());
if (dgram.size()==4) // only the channel id; bare keep-alive
data = bin64_t::ALL;
cc_->OnDataSent(data);
}
-void Channel::CleanStaleHintOut () {
- tint timed_out = NOW - 8*rtt_avg_; // FIXME BULLSHIT (take rtt=0)
+void Channel::AddHint (Datagram& dgram) {
+
+ tint timed_out = NOW - TINT_SEC*3/2;
while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
- transfer().picker().Expired(hint_out_.front().bin);
+ hint_out_size_ -= hint_out_.front().bin.width();
hint_out_.pop_front();
}
-}
-
-
-void Channel::AddHint (Datagram& dgram) {
-
- CleanStaleHintOut();
-
- uint64_t hint_out_mass=0;
- for(int i=0; i<hint_out_.size(); i++)
- hint_out_mass += hint_out_[i].bin.width();
int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
if (!peer_cwnd)
peer_cwnd = 1;
- int peer_pps = TINT_SEC / dip_avg_;
+ int peer_pps = TINT_SEC / dip_avg_; // data packets per sec
if (!peer_pps)
peer_pps = 1;
- if ( hint_out_mass < peer_pps ) { //4*peer_cwnd ) {
+ if ( hint_out_size_ < peer_pps ) { //4*peer_cwnd ) {
- int diff = peer_pps - hint_out_mass;
+ int diff = peer_pps - hint_out_size_;
//if (diff>4 && diff>2*peer_cwnd)
// diff >>= 1;
bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
if (hint!=bin64_t::NONE) {
dgram.Push8(P2TP_HINT);
dgram.Push32(hint);
- dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_mass);
+ dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
hint_out_.push_back(hint);
+ hint_out_size_ += hint.width();
} else
dprintf("%s #%i .hint\n",tintstr(),id);
}
-void Channel::CleanFulfilledHints (bin64_t pos) {
+void Channel::CleanHintOut (bin64_t pos) {
int hi = 0;
- while (hi<hint_out_.size() && hi<8 && !pos.within(hint_out_[hi].bin))
+ while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
hi++;
- if (hi<8 && hi<hint_out_.size()) {
- while (hi--) {
- transfer().picker().Expired(hint_out_.front().bin);
- hint_out_.pop_front();
- }
- while (hint_out_.front().bin!=pos) {
- tintbin f = hint_out_.front();
- f.bin = f.bin.towards(pos);
- hint_out_.front().bin = f.bin.sibling();
- hint_out_.push_front(f);
- }
+ if (hi==hint_out_.size())
+ return;
+ while (hi--) { // removing likely snubbed hints
+ hint_out_size_ -= hint_out_.front().bin.width();
hint_out_.pop_front();
}
- // every HINT ends up as either Expired or Received
- transfer().picker().Received(pos);
+ while (hint_out_.front().bin!=pos) {
+ tintbin f = hint_out_.front();
+ f.bin = f.bin.towards(pos);
+ hint_out_.front().bin = f.bin.sibling();
+ hint_out_.push_front(f);
+ }
+ hint_out_.pop_front();
+ hint_out_size_--;
}
}
last_recv_data_time_ = NOW;
}
- CleanFulfilledHints(pos);
+ CleanHintOut(pos);
return pos;
}
uint32_t ipv4 = dgram.Pull32();
uint16_t port = dgram.Pull16();
Address addr(ipv4,port);
- dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str().c_str());
+ dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str());
transfer().OnPexIn(addr);
}
dgram.Push8(P2TP_PEX_ADD);
dgram.Push32(a.ipv4());
dgram.Push16(a.port());
- dprintf("%s #%i +pex %s\n",tintstr(),id,a.str().c_str());
+ dprintf("%s #%i +pex %s\n",tintstr(),id,a.str());
}
void Channel::RecvDatagram (int socket) {
Datagram data(socket);
data.Recv();
+ Address& addr = data.addr;
+#define return_log(...) { eprintf(__VA_ARGS__); return; }
if (data.size()<4)
- RETLOG("datagram shorter than 4 bytes");
+ return_log("datagram shorter than 4 bytes %s\n",addr.str());
uint32_t mych = data.Pull32();
Sha1Hash hash;
Channel* channel = NULL;
if (!mych) { // handshake initiated
if (data.size()<1+4+1+4+Sha1Hash::SIZE)
- RETLOG ("incorrect size initial handshake packet");
+ return_log ("incorrect size %i initial handshake packet %s\n",data.size(),addr.str());
uint8_t hashid = data.Pull8();
if (hashid!=P2TP_HASH)
- RETLOG ("no hash in the initial handshake");
+ return_log ("no hash in the initial handshake %s\n",addr.str());
bin64_t pos = data.Pull32();
if (pos!=bin64_t::ALL)
- RETLOG ("that is not the root hash");
+ return_log ("that is not the root hash %s\n",addr.str());
hash = data.PullHash();
FileTransfer* file = FileTransfer::Find(hash);
if (!file)
- RETLOG ("hash unknown, no such file");
+ return_log ("hash %s unknown, no such file %s\n",hash.hex().c_str(),addr.str());
dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
if (channels[*i] && channels[*i]->peer_==data.addr &&
channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
- RETLOG("have a channel already");
+ return_log("have a channel already to %s\n",addr.str());
channel = new Channel(file, socket, data.address());
} else {
mych = DecodeID(mych);
- if (mych>=channels.size()) {
- eprintf("invalid channel #%i\n",mych);
- return;
- }
+ if (mych>=channels.size())
+ return_log("invalid channel #%i, %s\n",mych,addr.str());
channel = channels[mych];
if (!channel)
- RETLOG ("channel is closed");
- if (channel->peer() != data.address())
- RETLOG ("invalid peer address");
+ return_log ("channel #%i is already closed\n",mych,addr.str());
+ if (channel->peer() != addr)
+ return_log ("invalid peer address #%i %s!=%s\n",mych,channel->peer().str(),addr.str());
channel->own_id_mentioned_ = true;
}
//dprintf("recvd %i bytes for %i\n",data.size(),channel->id);