5 * Created by Victor Grishchenko on 3/6/09.
6 * Copyright 2009 Delft University of Technology. All rights reserved.
10 #include "compat/util.h"
14 using namespace std; // FIXME remove
19 - randomized testing of advanced ops (new testcase)
22 void Channel::AddPeakHashes (Datagram& dgram) {
23 for(int i=0; i<file().peak_count(); i++) {
24 bin64_t peak = file().peak(i);
25 dgram.Push8(P2TP_HASH);
26 dgram.Push32((uint32_t)peak);
27 dgram.PushHash(file().peak_hash(i));
28 //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
29 dprintf("%s #%i +phash %s\n",tintstr(),id,peak.str());
34 void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
35 bin64_t peak = file().peak_for(pos);
36 while (pos!=peak && ((NOW&3)==3 || !data_out_cap_.within(pos.parent())) &&
37 ack_in_.get(pos.parent())==bins::EMPTY) {
38 bin64_t uncle = pos.sibling();
39 dgram.Push8(P2TP_HASH);
40 dgram.Push32((uint32_t)uncle);
41 dgram.PushHash( file().hash(uncle) );
42 //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
43 dprintf("%s #%i +hash %s\n",tintstr(),id,uncle.str());
49 bin64_t Channel::DequeueHint () { // TODO: resilience
50 bin64_t send = bin64_t::NONE;
51 while (!hint_in_.empty() && send==bin64_t::NONE) {
52 bin64_t hint = hint_in_.front().bin;
53 tint time = hint_in_.front().time;
55 //if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_)
58 // a. May empty the queue when you least expect
59 // b. May lose parts of partially ACKd HINTs
60 send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
61 send = send.left_foot(); // single packet
62 if (send!=bin64_t::NONE)
64 hint = hint.towards(send);
65 hint_in_.push_front(hint.sibling());
68 if (send==bin64_t::NONE) {
69 send = file().ack_out().find_filtered(ack_in_,bin64_t::ALL,bins::FILLED);
70 if (send!=bin64_t::NONE)
71 send = send.left_foot();
74 for(int i=0; i<hint_in_.size(); i++)
75 mass += hint_in_[i].bin.width();
76 dprintf("%s #%i dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
81 void Channel::AddHandshake (Datagram& dgram) {
82 if (!peer_channel_id_) { // initiating
83 dgram.Push8(P2TP_HASH);
84 dgram.Push32(bin64_t::ALL32);
85 dgram.PushHash(file().root_hash());
86 dprintf("%s #%i +hash ALL %s\n",
87 tintstr(),id,file().root_hash().hex().c_str());
89 dgram.Push8(P2TP_HANDSHAKE);
90 int encoded = EncodeID(id);
91 dgram.Push32(encoded);
92 dprintf("%s #%i +hs %i\n",tintstr(),id,encoded);
98 void Channel::Send () {
99 Datagram dgram(socket_,peer());
100 dgram.Push32(peer_channel_id_);
101 bin64_t data = bin64_t::NONE;
102 if ( is_established() ) {
103 // FIXME: seeder check
105 if (!file().is_complete())
109 data = AddData(dgram);
114 dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str());
115 if (dgram.size()==4) // only the channel id; bare keep-alive
117 cc_->OnDataSent(data);
118 if (dgram.Send()==-1)
119 print_error("can't send datagram");
123 void Channel::AddHint (Datagram& dgram) {
125 tint timed_out = NOW - TINT_SEC*3/2;
126 while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
127 hint_out_size_ -= hint_out_.front().bin.width();
128 hint_out_.pop_front();
131 int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
134 int peer_pps = TINT_SEC / dip_avg_; // data packets per sec
138 if ( hint_out_size_ < peer_pps ) { //4*peer_cwnd ) {
140 int diff = peer_pps - hint_out_size_;
141 //if (diff>4 && diff>2*peer_cwnd)
143 bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
145 if (hint!=bin64_t::NONE) {
146 dgram.Push8(P2TP_HINT);
148 dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_size_);
149 hint_out_.push_back(hint);
150 hint_out_size_ += hint.width();
152 dprintf("%s #%i .hint\n",tintstr(),id);
158 bin64_t Channel::AddData (Datagram& dgram) {
160 if (!file().size()) // know nothing
161 return bin64_t::NONE;
163 bin64_t tosend = bin64_t::NONE;
164 if (cc_->MaySendData()) {
165 tosend = DequeueHint();
166 if (tosend==bin64_t::NONE)
167 dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
169 dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id);
171 if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty()))
172 return bin64_t::NONE; // once in a while, empty data is sent just to check rtt
174 if (tosend!=bin64_t::NONE) { // hashes
175 if (ack_in_.is_empty() && file().size())
176 AddPeakHashes(dgram);
177 AddUncleHashes(dgram,tosend);
178 if (!ack_in_.is_empty()) // TODO: cwnd_>1
179 data_out_cap_ = tosend;
182 if (dgram.size()>254) {
183 dgram.Send(); // kind of fragmentation
184 dgram.Push32(peer_channel_id_);
187 dgram.Push8(P2TP_DATA);
188 dgram.Push32(tosend.to32());
190 if (tosend!=bin64_t::NONE) { // data
192 size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
193 // TODO: corrupted data, retries, caching
195 print_error("error on reading");
196 return bin64_t::NONE;
198 assert(dgram.space()>=r+4+1);
202 last_send_data_time_ = NOW;
203 data_out_.push_back(tosend);
204 dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str());
210 void Channel::AddTs (Datagram& dgram) {
211 dgram.Push8(P2TP_TS);
212 dgram.Push64(data_in_.time);
213 dprintf("%s #%i +ts %s\n",tintstr(),id,tintstr(data_in_.time));
217 void Channel::AddAck (Datagram& dgram) {
218 if (data_in_dbl_!=bin64_t::NONE) {
219 dgram.Push8(P2TP_ACK);
220 dgram.Push32(data_in_dbl_);
221 data_in_dbl_=bin64_t::NONE;
223 if (data_in_.bin!=bin64_t::NONE) {
225 bin64_t pos = file().ack_out().cover(data_in_.bin);
226 dgram.Push8(P2TP_ACK);
228 //dgram.Push64(data_in_.time);
230 dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
231 data_in_ = tintbin(0,bin64_t::NONE);
235 for(int count=0; count<4; count++) {
236 bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, bins::FILLED);
237 if (ack==bin64_t::NONE)
239 ack = file().ack_out().cover(ack);
241 dgram.Push8(P2TP_ACK);
243 dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str());
248 void Channel::Recv (Datagram& dgram) {
249 dprintf("%s #%i recvd %i\n",tintstr(),id,dgram.size()+4);
250 if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
251 rtt_avg_ = NOW - last_send_time_;
254 transfer().hs_in_.push_back(id);
255 dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
257 bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
258 while (dgram.size()) {
259 uint8_t type = dgram.Pull8();
261 case P2TP_HANDSHAKE: OnHandshake(dgram); break;
262 case P2TP_DATA: data=OnData(dgram); break;
263 case P2TP_TS: OnTs(dgram); break;
264 case P2TP_ACK: OnAck(dgram); break;
265 case P2TP_HASH: OnHash(dgram); break;
266 case P2TP_HINT: OnHint(dgram); break;
267 case P2TP_PEX_ADD: OnPex(dgram); break;
269 eprintf("%s #%i ?msg id unknown %i\n",tintstr(),id,(int)type);
273 cc_->OnDataRecvd(data);
274 last_recv_time_ = NOW;
278 void Channel::OnHash (Datagram& dgram) {
279 bin64_t pos = dgram.Pull32();
280 Sha1Hash hash = dgram.PullHash();
281 file().OfferHash(pos,hash);
282 //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
283 dprintf("%s #%i -hash %s\n",tintstr(),id,pos.str());
287 void Channel::CleanHintOut (bin64_t pos) {
289 while (hi<hint_out_.size() && !pos.within(hint_out_[hi].bin))
291 if (hi==hint_out_.size())
292 return; // something not hinted or hinted in far past
293 while (hi--) { // removing likely snubbed hints
294 hint_out_size_ -= hint_out_.front().bin.width();
295 hint_out_.pop_front();
297 while (hint_out_.front().bin!=pos) {
298 tintbin f = hint_out_.front();
299 f.bin = f.bin.towards(pos);
300 hint_out_.front().bin = f.bin.sibling();
301 hint_out_.push_front(f);
303 hint_out_.pop_front();
308 bin64_t Channel::OnData (Datagram& dgram) {
309 bin64_t pos = dgram.Pull32();
311 int length = dgram.Pull(&data,1024);
312 bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
313 dprintf("%s #%i %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
315 return bin64_t::NONE;
316 data_in_ = tintbin(NOW,pos);
317 if (pos!=bin64_t::NONE) {
318 if (last_recv_data_time_) {
319 tint dip = NOW - last_recv_data_time_;
320 dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
322 last_recv_data_time_ = NOW;
329 void Channel::CleanDataOut (bin64_t ackd_pos) {
333 if (ackd_pos!=bin64_t::NONE) {
334 for (int i=0; i<8 && i<data_out_.size(); i++) {
335 if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
336 tint rtt = NOW-data_out_[i].time;
337 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
338 dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
339 dprintf("%s #%i rtt %lli dev %lli\n",tintstr(),id,rtt_avg_,dev_avg_);
340 bin64_t pos = data_out_[i].bin;
342 data_out_[i]=tintbin();
348 while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) {
349 data_out_.pop_front();
352 static const int MAX_REORDERING = 2; // the triple-ACK principle
353 if (max_ack_off>MAX_REORDERING) {
354 while (max_ack_off && (data_out_.front().bin==bin64_t::NONE
355 || ack_in_.is_filled(data_out_.front().bin)) ) {
356 data_out_.pop_front();
359 while (max_ack_off>MAX_REORDERING) {
360 cc_->OnAckRcvd(bin64_t::NONE);
361 dprintf("%s #%i Rdata %s\n",tintstr(),id,data_out_.front().bin.str());
362 data_out_.pop_front();
364 data_out_cap_ = bin64_t::ALL;
368 tint timeout = NOW - rtt_avg_ - 4*std::max(dev_avg_,TINT_MSEC*50);
369 while (!data_out_.empty() && data_out_.front().time<timeout) {
370 if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
371 cc_->OnAckRcvd(bin64_t::NONE);
372 data_out_cap_ = bin64_t::ALL;
373 dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
375 data_out_.pop_front();
377 while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
378 data_out_.pop_front();
383 void Channel::OnAck (Datagram& dgram) {
384 bin64_t ackd_pos = dgram.Pull32();
385 if (ackd_pos!=bin64_t::NONE && file().size() && ackd_pos.base_offset()>=file().packet_size()) {
386 eprintf("invalid ack: %s\n",ackd_pos.str());
389 dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str());
390 ack_in_.set(ackd_pos);
391 CleanDataOut(ackd_pos);
395 void Channel::OnTs (Datagram& dgram) {
396 peer_send_time_ = dgram.Pull64();
397 dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
401 void Channel::OnHint (Datagram& dgram) {
402 bin64_t hint = dgram.Pull32();
403 hint_in_.push_back(hint);
404 //ack_in_.set(hint,bins::EMPTY);
405 //RequeueSend(cc_->OnHintRecvd(hint));
406 dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str());
410 void Channel::OnHandshake (Datagram& dgram) {
411 peer_channel_id_ = dgram.Pull32();
412 dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
413 // FUTURE: channel forking
417 void Channel::OnPex (Datagram& dgram) {
418 uint32_t ipv4 = dgram.Pull32();
419 uint16_t port = dgram.Pull16();
420 Address addr(ipv4,port);
421 dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str());
422 transfer().OnPexIn(addr);
426 void Channel::AddPex (Datagram& dgram) {
427 int chid = transfer().RevealChannel(pex_out_);
428 if (chid==-1 || chid==id)
430 Address a = channels[chid]->peer();
431 dgram.Push8(P2TP_PEX_ADD);
432 dgram.Push32(a.ipv4());
433 dgram.Push16(a.port());
434 dprintf("%s #%i +pex %s\n",tintstr(),id,a.str());
438 void Channel::RecvDatagram (int socket) {
439 Datagram data(socket);
441 Address& addr = data.addr;
442 #define return_log(...) { eprintf(__VA_ARGS__); return; }
444 return_log("datagram shorter than 4 bytes %s\n",addr.str());
445 uint32_t mych = data.Pull32();
447 Channel* channel = NULL;
448 if (!mych) { // handshake initiated
449 if (data.size()<1+4+1+4+Sha1Hash::SIZE)
450 return_log ("incorrect size %i initial handshake packet %s\n",data.size(),addr.str());
451 uint8_t hashid = data.Pull8();
452 if (hashid!=P2TP_HASH)
453 return_log ("no hash in the initial handshake %s\n",addr.str());
454 bin64_t pos = data.Pull32();
455 if (pos!=bin64_t::ALL)
456 return_log ("that is not the root hash %s\n",addr.str());
457 hash = data.PullHash();
458 FileTransfer* file = FileTransfer::Find(hash);
460 return_log ("hash %s unknown, no such file %s\n",hash.hex().c_str(),addr.str());
461 dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
462 for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
463 if (channels[*i] && channels[*i]->peer_==data.addr &&
464 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
465 return_log("have a channel already to %s\n",addr.str());
466 channel = new Channel(file, socket, data.address());
468 mych = DecodeID(mych);
469 if (mych>=channels.size())
470 return_log("invalid channel #%i, %s\n",mych,addr.str());
471 channel = channels[mych];
473 return_log ("channel #%i is already closed\n",mych,addr.str());
474 if (channel->peer() != addr)
475 return_log ("invalid peer address #%i %s!=%s\n",mych,channel->peer().str(),addr.str());
476 channel->own_id_mentioned_ = true;
478 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
483 void Channel::Loop (tint howlong) {
485 tint limit = Datagram::Time() + howlong;
489 tint send_time(TINT_NEVER);
490 Channel* sender(NULL);
491 while (!send_queue.is_empty()) {
492 send_time = send_queue.peek().time;
493 sender = channel((int)send_queue.peek().bin);
495 if ( sender->next_send_time_==send_time ||
496 sender->next_send_time_==TINT_NEVER )
498 sender = NULL; // it was a stale entry
503 if ( sender && sender->next_send_time_ <= NOW ) {
504 dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
507 sender->last_send_time_ = NOW;
508 // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl
510 } else if ( send_time > NOW ) {
511 tint towait = send_time - NOW;
512 dprintf("%s waiting %lliusec\n",tintstr(),towait);
513 int rd = Datagram::Wait(socket_count,sockets,towait);
514 if (rd!=INVALID_SOCKET)
516 } else { // FIXME FIXME FIXME REWRITE!!! if (sender->next_send_time_==TINT_NEVER) {
518 dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
524 } while (Datagram::Time()<limit);
529 void Channel::Schedule (tint next_time) {
530 next_send_time_ = next_time;
531 if (next_time==TINT_NEVER)
532 next_time = NOW + TINT_MIN; // 1min timeout
533 send_queue.push(tintbin(next_time,id));
534 dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));