5 * Created by Victor Grishchenko on 3/6/09.
6 * Copyright 2009 Delft University of Technology. All rights reserved.
10 #include "compat/util.h"
19 - randomized testing of advanced ops (new testcase)
22 void Channel::AddPeakHashes (Datagram& dgram) {
23 for(int i=0; i<file().peak_count(); i++) {
24 bin64_t peak = file().peak(i);
25 dgram.Push8(P2TP_HASH);
26 dgram.Push32((uint32_t)peak);
27 dgram.PushHash(file().peak_hash(i));
28 //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
29 dprintf("%s #%u +phash %s\n",tintstr(),id,peak.str());
34 void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
35 bin64_t peak = file().peak_for(pos);
36 while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
37 ack_in_.get(pos.parent())==bins::EMPTY ) {
38 bin64_t uncle = pos.sibling();
39 dgram.Push8(P2TP_HASH);
40 dgram.Push32((uint32_t)uncle);
41 dgram.PushHash( file().hash(uncle) );
42 //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
43 dprintf("%s #%u +hash %s\n",tintstr(),id,uncle.str());
49 bin64_t Channel::DequeueHint () {
50 if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
51 uint64_t twist = peer_channel_id_; // got no hints, send something randomly
52 twist &= file().peak(0); // may make it semi-seq here
53 file().ack_out().twist(twist);
56 file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED);
57 while (my_pick.width()>max(1,(int)cwnd_))
58 my_pick = my_pick.left();
59 file().ack_out().twist(0);
61 if (my_pick!=bin64_t::NONE) {
62 my_pick = my_pick.twisted(twist);
63 hint_in_.push_back(my_pick);
64 dprintf("%s #%u *hint %s\n",tintstr(),id,my_pick.str());
67 bin64_t send = bin64_t::NONE;
68 while (!hint_in_.empty() && send==bin64_t::NONE) {
69 bin64_t hint = hint_in_.front().bin;
70 tint time = hint_in_.front().time;
72 while (!hint.is_base()) { // FIXME optimize; possible attack
73 hint_in_.push_front(tintbin(time,hint.right()));
76 //if (time < NOW-TINT_SEC*3/2 )
78 if (ack_in_.get(hint)!=bins::FILLED)
82 for(int i=0; i<hint_in_.size(); i++)
83 mass += hint_in_[i].bin.width();
84 dprintf("%s #%u dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
89 void Channel::AddHandshake (Datagram& dgram) {
90 if (!peer_channel_id_) { // initiating
91 dgram.Push8(P2TP_HASH);
92 dgram.Push32(bin64_t::ALL32);
93 dgram.PushHash(file().root_hash());
94 dprintf("%s #%u +hash ALL %s\n",
95 tintstr(),id,file().root_hash().hex().c_str());
97 dgram.Push8(P2TP_HANDSHAKE);
98 int encoded = EncodeID(id);
99 dgram.Push32(encoded);
100 dprintf("%s #%u +hs %x\n",tintstr(),id,encoded);
106 void Channel::Send () {
107 Datagram dgram(socket_,peer());
108 dgram.Push32(peer_channel_id_);
109 bin64_t data = bin64_t::NONE;
110 if ( is_established() ) {
111 // FIXME: seeder check
113 if (!file().is_complete())
117 data = AddData(dgram);
122 dprintf("%s #%u sent %ib %s:%x\n",
123 tintstr(),id,dgram.size(),peer().str(),peer_channel_id_);
124 if (dgram.size()==4) {// only the channel id; bare keep-alive
126 //dprintf("%s #%u considering keepalive %i %f %s\n",
127 // tintstr(),id,(int)data_out_.size(),cwnd_,SEND_CONTROL_MODES[send_control_]);
128 //if (data_out_.size()<cwnd_ && send_control_!=KEEP_ALIVE_CONTROL) {
130 // SwitchSendControl(KEEP_ALIVE_CONTROL);
132 // cwnd_ = cwnd_/2.0;
134 //if (data_out_.empty() && send_control_!=KEEP_ALIVE_CONTROL)
135 // SwitchSendControl(KEEP_ALIVE_CONTROL);// we did our best
136 //if (NOW<last_send_time_+MAX_SEND_INTERVAL) // no need for keepalive
137 // return; // don't send empty dgram
139 if (dgram.Send()==-1)
140 print_error("can't send datagram");
141 last_send_time_ = NOW;
147 void Channel::AddHint (Datagram& dgram) {
149 tint plan_for = max(TINT_SEC,rtt_avg_*4);
151 tint timed_out = NOW - plan_for*2;
152 while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
153 hint_out_size_ -= hint_out_.front().bin.width();
154 hint_out_.pop_front();
157 /*int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
160 int plan_pck = std::max ( 1LL, plan_for / dip_avg_ );
162 if ( hint_out_size_ < plan_pck ) {
164 int diff = plan_pck - hint_out_size_; // TODO: aggregate
165 bin64_t hint = transfer().picker().Pick(ack_in_,diff,NOW+plan_for*2);
167 if (hint!=bin64_t::NONE) {
168 dgram.Push8(P2TP_HINT);
170 dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
171 hint_out_.push_back(hint);
172 hint_out_size_ += hint.width();
174 dprintf("%s #%u Xhint\n",tintstr(),id);
180 bin64_t Channel::AddData (Datagram& dgram) {
182 if (!file().size()) // know nothing
183 return bin64_t::NONE;
185 bin64_t tosend = bin64_t::NONE;
186 if (data_out_.size()<cwnd_ && last_data_out_time_<=NOW-send_interval_) {
187 tosend = DequeueHint();
188 if (tosend==bin64_t::NONE) {
189 dprintf("%s #%u no idea what to send #sendctrl\n",tintstr(),id);
190 if (send_control_!=KEEP_ALIVE_CONTROL)
191 SwitchSendControl(KEEP_ALIVE_CONTROL);
194 dprintf("%s #%u no cwnd #sendctrl\n",tintstr(),id);
196 if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
197 return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
199 if (ack_in_.is_empty() && file().size())
200 AddPeakHashes(dgram);
201 AddUncleHashes(dgram,tosend);
202 if (!ack_in_.is_empty()) // TODO: cwnd_>1
203 data_out_cap_ = tosend;
205 if (dgram.size()>254) {
206 dgram.Send(); // kind of fragmentation
207 dgram.Push32(peer_channel_id_);
210 dgram.Push8(P2TP_DATA);
211 dgram.Push32(tosend.to32());
214 size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
215 // TODO: corrupted data, retries, caching
217 print_error("error on reading");
218 return bin64_t::NONE;
220 assert(dgram.space()>=r+4+1);
223 last_data_out_time_ = NOW;
224 data_out_.push_back(tosend);
225 dprintf("%s #%u +data %s\n",tintstr(),id,tosend.str());
231 void Channel::AddTs (Datagram& dgram) {
232 dgram.Push8(P2TP_TS);
233 dgram.Push64(data_in_.time);
234 dprintf("%s #%u +ts %s\n",tintstr(),id,tintstr(data_in_.time));
238 void Channel::AddAck (Datagram& dgram) {
239 if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better
240 dgram.Push8(P2TP_ACK);
241 dgram.Push32(data_in_dbl_.to32());
242 data_in_dbl_=bin64_t::NONE;
244 if (data_in_.time!=TINT_NEVER) { // TODO: ACK NONE for corrupted data
246 bin64_t pos = file().ack_out().cover(data_in_.bin);
247 dgram.Push8(P2TP_ACK);
248 dgram.Push32(pos.to32());
249 //dgram.Push64(data_in_.time);
251 dprintf("%s #%u +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
252 data_in_ = tintbin(TINT_NEVER,bin64_t::NONE);
256 for(int count=0; count<4; count++) {
257 bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, bins::FILLED);
258 if (ack==bin64_t::NONE)
260 ack = file().ack_out().cover(ack);
262 dgram.Push8(P2TP_ACK);
263 dgram.Push32(ack.to32());
264 dprintf("%s #%u +ack %s\n",tintstr(),id,ack.str());
269 void Channel::Recv (Datagram& dgram) {
270 dprintf("%s #%u recvd %i\n",tintstr(),id,dgram.size()+4);
271 peer_send_time_ = 0; // has scope of 1 datagram
273 if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
274 rtt_avg_ = NOW - last_send_time_;
277 dprintf("%s #%u rtt init %lli\n",tintstr(),id,rtt_avg_);
279 bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
280 while (dgram.size()) {
281 uint8_t type = dgram.Pull8();
283 case P2TP_HANDSHAKE: OnHandshake(dgram); break;
284 case P2TP_DATA: data=OnData(dgram); break;
285 case P2TP_TS: OnTs(dgram); break;
286 case P2TP_ACK: OnAck(dgram); break;
287 case P2TP_HASH: OnHash(dgram); break;
288 case P2TP_HINT: OnHint(dgram); break;
289 case P2TP_PEX_ADD: OnPex(dgram); break;
291 eprintf("%s #%u ?msg id unknown %i\n",tintstr(),id,(int)type);
295 last_recv_time_ = NOW;
296 sent_since_recv_ = 0;
300 void Channel::OnHash (Datagram& dgram) {
301 bin64_t pos = dgram.Pull32();
302 Sha1Hash hash = dgram.PullHash();
303 file().OfferHash(pos,hash);
304 //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
305 dprintf("%s #%u -hash %s\n",tintstr(),id,pos.str());
309 void Channel::CleanHintOut (bin64_t pos) {
311 while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
313 if (hi==hint_out_.size())
314 return; // something not hinted or hinted in far past
315 while (hi--) { // removing likely snubbed hints
316 hint_out_size_ -= hint_out_.front().bin.width();
317 hint_out_.pop_front();
319 while (hint_out_.front().bin!=pos) {
320 tintbin f = hint_out_.front();
321 f.bin = f.bin.towards(pos);
322 hint_out_.front().bin = f.bin.sibling();
323 hint_out_.push_front(f);
325 hint_out_.pop_front();
330 bin64_t Channel::OnData (Datagram& dgram) {
331 bin64_t pos = dgram.Pull32();
333 int length = dgram.Pull(&data,1024);
334 bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
335 dprintf("%s #%u %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
336 data_in_ = tintbin(NOW,bin64_t::NONE);
338 return bin64_t::NONE;
340 if (pos!=bin64_t::NONE) {
341 if (last_data_in_time_) {
342 tint dip = NOW - last_data_in_time_;
343 dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
345 last_data_in_time_ = NOW;
352 void Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
355 //FIXME do LEDBAT magic somewhere here
357 if (ackd_pos!=bin64_t::NONE) {
358 for (int i=0; i<8 && i<data_out_.size(); i++) {
359 if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
360 tint rtt = NOW-data_out_[i].time;
361 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
362 dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
363 if (peer_send_time_) {
364 tint owd = peer_send_time_ - data_out_[i].time;
365 owd_cur_bin_ = (owd_cur_bin_+1) & 3;
366 owd_current_[owd_cur_bin_] = owd;
367 if (owd_min_bin_start_<NOW+TINT_SEC*30) {
368 owd_min_bin_start_ = NOW;
369 owd_min_bin_ = (owd_min_bin_+1) & 3;
370 owd_min_bins_[owd_min_bin_] = TINT_NEVER;
372 if (owd_min_bins_[owd_min_bin_]>owd)
373 owd_min_bins_[owd_min_bin_] = owd;
375 dprintf("%s #%u rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_);
376 bin64_t pos = data_out_[i].bin;
378 data_out_[i]=tintbin();
384 while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
385 data_out_.pop_front();
388 static const int MAX_REORDERING = 2; // the triple-ACK principle
389 if (max_ack_off>MAX_REORDERING) {
390 while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
391 || ack_in_.is_filled(data_out_.front().bin)) ) {
392 data_out_.pop_front();
395 while (max_ack_off>MAX_REORDERING) {
396 ack_not_rcvd_recent_++;
397 dprintf("%s #%u Rdata %s\n",tintstr(),id,data_out_.front().bin.str());
398 data_out_.pop_front();
400 data_out_cap_ = bin64_t::ALL;
404 tint timeout = NOW - rtt_avg_ - 4*std::max(dev_avg_,TINT_MSEC*50);
405 while (!data_out_.empty() && data_out_.front().time<timeout) {
406 if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
407 ack_not_rcvd_recent_++;
408 data_out_cap_ = bin64_t::ALL;
409 dprintf("%s #%u Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
411 data_out_.pop_front();
413 while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
414 data_out_.pop_front();
419 void Channel::OnAck (Datagram& dgram) {
420 bin64_t ackd_pos = dgram.Pull32();
421 if (ackd_pos==bin64_t::NONE)
422 return; // likely, brocken packet / insufficient hashes
423 if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
424 eprintf("invalid ack: %s\n",ackd_pos.str());
427 dprintf("%s #%u -ack %s\n",tintstr(),id,ackd_pos.str());
428 ack_in_.set(ackd_pos);
429 CleanDataOut(ackd_pos); // FIXME do AFTER all ACKs
433 void Channel::OnTs (Datagram& dgram) {
434 peer_send_time_ = dgram.Pull64();
435 dprintf("%s #%u -ts %lli\n",tintstr(),id,peer_send_time_);
439 void Channel::OnHint (Datagram& dgram) {
440 bin64_t hint = dgram.Pull32();
441 hint_in_.push_back(hint);
442 //ack_in_.set(hint,bins::EMPTY);
443 //RequeueSend(cc_->OnHintRecvd(hint));
444 dprintf("%s #%u -hint %s\n",tintstr(),id,hint.str());
448 void Channel::OnHandshake (Datagram& dgram) {
449 peer_channel_id_ = dgram.Pull32();
450 dprintf("%s #%u -hs %x\n",tintstr(),id,peer_channel_id_);
451 // self-connection check
453 uint32_t try_id = DecodeID(peer_channel_id_);
454 if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
459 // FUTURE: channel forking
463 void Channel::OnPex (Datagram& dgram) {
464 uint32_t ipv4 = dgram.Pull32();
465 uint16_t port = dgram.Pull16();
466 Address addr(ipv4,port);
467 dprintf("%s #%u -pex %s\n",tintstr(),id,addr.str());
468 transfer().OnPexIn(addr);
472 void Channel::AddPex (Datagram& dgram) {
473 int chid = transfer().RevealChannel(pex_out_);
474 if (chid==-1 || chid==id)
476 Address a = channels[chid]->peer();
477 dgram.Push8(P2TP_PEX_ADD);
478 dgram.Push32(a.ipv4());
479 dgram.Push16(a.port());
480 dprintf("%s #%u +pex %s\n",tintstr(),id,a.str());
484 Channel* Channel::RecvDatagram (int socket) {
485 Datagram data(socket);
487 Address& addr = data.addr;
488 #define return_log(...) { eprintf(__VA_ARGS__); return NULL; }
490 return_log("datagram shorter than 4 bytes %s\n",addr.str());
491 uint32_t mych = data.Pull32();
493 Channel* channel = NULL;
494 if (!mych) { // handshake initiated
495 if (data.size()<1+4+1+4+Sha1Hash::SIZE)
496 return_log ("incorrect size %i initial handshake packet %s\n",data.size(),addr.str());
497 uint8_t hashid = data.Pull8();
498 if (hashid!=P2TP_HASH)
499 return_log ("no hash in the initial handshake %s\n",addr.str());
500 bin64_t pos = data.Pull32();
501 if (pos!=bin64_t::ALL)
502 return_log ("that is not the root hash %s\n",addr.str());
503 hash = data.PullHash();
504 FileTransfer* file = FileTransfer::Find(hash);
506 return_log ("hash %s unknown, no such file %s\n",hash.hex().c_str(),addr.str());
507 dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
508 for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
509 if (channels[*i] && channels[*i]->peer_==data.addr &&
510 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
511 return_log("have a channel already to %s\n",addr.str());
512 channel = new Channel(file, socket, data.address());
514 mych = DecodeID(mych);
515 if (mych>=channels.size())
516 return_log("invalid channel #%u, %s\n",mych,addr.str());
517 channel = channels[mych];
519 return_log ("channel #%u is already closed\n",mych,addr.str());
520 if (channel->peer() != addr)
521 return_log ("invalid peer address #%u %s!=%s\n",mych,channel->peer().str(),addr.str());
522 channel->own_id_mentioned_ = true;
524 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
530 void Channel::Loop (tint howlong) {
532 tint limit = Datagram::Time() + howlong;
536 tint send_time(TINT_NEVER);
537 Channel* sender(NULL);
538 while (!sender && !send_queue.is_empty()) { // dequeue
539 tintbin next = send_queue.pop();
540 send_time = next.time;
541 sender = channel((int)next.bin);
542 if (sender && sender->next_send_time_!=send_time &&
543 sender->next_send_time_!=TINT_NEVER )
544 sender = NULL; // it was a stale entry
547 if ( sender!=NULL && send_time<=NOW ) { // it's time
549 if (sender->next_send_time_<NOW+TINT_MIN) { // either send
550 dprintf("%s #%u sch_send %s\n",tintstr(),sender->id,
553 sender->Reschedule();
554 } else { // or close the channel
555 dprintf("%s #%u closed sendctrl\n",tintstr(),sender->id);
559 } else { // it's too early, wait
561 tint towait = min(limit,send_time) - NOW;
562 dprintf("%s waiting %lliusec\n",tintstr(),towait);
563 int rd = Datagram::Wait(socket_count,sockets,towait);
564 if (rd!=INVALID_SOCKET) { // in meantime, received something
565 Channel* receiver = RecvDatagram(rd);
566 if (receiver) // receiver's state may have changed
567 receiver->Reschedule();
569 if (sender) // get back to that later
570 send_queue.push(tintbin(send_time,sender->id));
579 void Channel::Reschedule () {
580 next_send_time_ = NextSendTime();
581 if (next_send_time_!=TINT_NEVER) {
582 assert(next_send_time_<NOW+TINT_MIN);
583 send_queue.push(tintbin(next_send_time_,id));
585 send_queue.push(tintbin(NOW+TINT_MIN,id));
586 dprintf("%s requeue #%u for %s\n",tintstr(),id,tintstr(next_send_time_));