dgram.Push8(SWIFT_HASH);
dgram.Push32((uint32_t)peak);
dgram.PushHash(file().peak_hash(i));
- //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
}
}
dgram.Push8(SWIFT_HASH);
dgram.Push32((uint32_t)uncle);
dgram.PushHash( file().hash(uncle) );
- //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
pos = pos.parent();
}
bin64_t data = bin64_t::NONE;
if ( is_established() ) {
// FIXME: seeder check
+ AddHave(dgram);
AddAck(dgram);
if (!file().is_complete())
AddHint(dgram);
data = AddData(dgram);
} else {
AddHandshake(dgram);
+ AddHave(dgram);
AddAck(dgram);
}
dprintf("%s #%u sent %ib %s:%x\n",
last_send_time_ = NOW;
sent_since_recv_++;
dgrams_sent_++;
+ Reschedule();
}
}
} else
dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
- tintstr(),id_,cwnd_,data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
+ tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
void Channel::Recv (Datagram& dgram) {
- dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4);
+ dprintf("%s #%u recvd %ib\n",tintstr(),id_,dgram.size()+4);
dgrams_rcvd_++;
if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
rtt_avg_ = NOW - last_send_time_;
}
last_recv_time_ = NOW;
sent_since_recv_ = 0;
+ Reschedule();
}
bin64_t pos = dgram.Pull32();
uint8_t *data;
int length = dgram.Pull(&data,1024);
- bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
+ bool ok = (pos==bin64_t::NONE) ||
+ (!file().ack_out().get(pos) && file().OfferData(pos, (char*)data, length) );
dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
data_in_ = tintbin(NOW,bin64_t::NONE);
if (!ok)
return bin64_t::NONE;
+ bin64_t cover = transfer().ack_out().cover(pos);
+ for(int i=0; i<transfer().cb_installed; i++)
+ if (cover.layer()>=transfer().cb_agg[i])
+ transfer().callbacks[i](transfer().fd(),cover); // FIXME
data_in_.bin = pos;
if (pos!=bin64_t::NONE) {
if (last_data_in_time_) {
void Channel::OnAck (Datagram& dgram) {
bin64_t ackd_pos = dgram.Pull32();
- tint peer_time_ = dgram.Pull64(); // FIXME 32
+ tint peer_time = dgram.Pull64(); // FIXME 32
// FIXME FIXME: wrap around here
if (ackd_pos==bin64_t::NONE)
return; // likely, brocken packet / insufficient hashes
while ( ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
ri++;
dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
- di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time_);
+ di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time);
if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
// round trip time calculations
tint rtt = NOW-data_out_[di].time;
rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
- dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+ dev_avg_ = ( dev_avg_*3 + ::abs(rtt-rtt_avg_) ) >> 2;
assert(data_out_[di].time!=TINT_NEVER);
// one-way delay calculations
- tint owd = peer_time_ - data_out_[di].time;
- owd_cur_bin_ = (owd_cur_bin_+1) & 3;
+ tint owd = peer_time - data_out_[di].time;
+ owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
owd_current_[owd_cur_bin_] = owd;
if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
owd_min_bin_start_ = NOW;
dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str());
ack_rcvd_recent_++;
- data_out_[di]=tintbin();
- }
- // early loss detection by packet reordering
- for (int re=0; re<di-MAX_REORDERING; re++) {
- if (data_out_[re]==tintbin())
- continue;
- ack_not_rcvd_recent_++;
- data_out_tmo_.push_back(data_out_.front().bin);
- dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
- data_out_cap_ = bin64_t::ALL;
- data_out_[ri] = tintbin();
+ // early loss detection by packet reordering
+ for (int re=0; re<di-MAX_REORDERING; re++) {
+ if (data_out_[re]==tintbin())
+ continue;
+ ack_not_rcvd_recent_++;
+ data_out_tmo_.push_back(data_out_[re].bin);
+ dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
+ data_out_cap_ = bin64_t::ALL;
+ data_out_[re] = tintbin();
+ }
}
+ if (di!=data_out_.size())
+ data_out_[di]=tintbin();
// clear zeroed items
while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
ack_in_.is_filled(data_out_.front().bin) ) )
void Channel::TimeoutDataOut ( ) {
// losses: timeouted packets
tint timeout = NOW - ack_timeout();
- for (int i=0; i<data_out_.size() && data_out_[i].time<timeout; i++) {
+ while (!data_out_.empty() &&
+ ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
ack_not_rcvd_recent_++;
data_out_cap_ = bin64_t::ALL;
}
-Channel* Channel::RecvDatagram (int socket) {
+void Channel::RecvDatagram (SOCKET socket) {
Datagram data(socket);
data.Recv();
const Address& addr = data.address();
-#define return_log(...) { printf(__VA_ARGS__); return NULL; }
+#define return_log(...) { fprintf(stderr,__VA_ARGS__); return; }
if (data.size()<4)
return_log("datagram shorter than 4 bytes %s\n",addr.str());
uint32_t mych = data.Pull32();
Sha1Hash hash;
Channel* channel = NULL;
- if (!mych) { // handshake initiated
+ if (mych==0) { // handshake initiated
if (data.size()<1+4+1+4+Sha1Hash::SIZE)
return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
tintstr(),data.size(),addr.str());
}
//dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
channel->Recv(data);
- return channel;
}
dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
tintstr(send_time));
sender->Send();
- sender->Reschedule();
} else { // it's too early, wait
tint towait = min(limit,send_time) - NOW;
dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
- int rd = Datagram::Wait(socket_count,sockets,towait);
- if (rd!=INVALID_SOCKET) { // in meantime, received something
- Channel* receiver = RecvDatagram(rd);
- if (receiver) // receiver's state may have changed
- receiver->Reschedule();
- }
+ Datagram::Wait(towait);
if (sender) // get back to that later
send_queue.push(tintbin(send_time,sender->id()));
void Channel::Reschedule () {
- TimeoutDataOut(); // precaution to know free cwnd
next_send_time_ = NextSendTime();
if (next_send_time_!=TINT_NEVER) {
assert(next_send_time_<NOW+TINT_MIN);