5 * Created by Victor Grishchenko on 3/6/09.
6 * Copyright 2009 Delft University of Technology. All rights reserved.
10 #include "compat/util.h"
14 using namespace std; // FIXME remove
18 - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
21 - randomized testing of advanced ops (new testcase)
23 - bins hint_out_, tbqueue hint_out_ts_
27 void Channel::AddPeakHashes (Datagram& dgram) {
28 for(int i=0; i<file().peak_count(); i++) {
29 bin64_t peak = file().peak(i);
30 dgram.Push8(P2TP_HASH);
31 dgram.Push32((uint32_t)peak);
32 dgram.PushHash(file().peak_hash(i));
33 //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
34 dprintf("%s #%i +phash %s\n",tintstr(),id,peak.str());
39 void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
40 bin64_t peak = file().peak_for(pos);
41 while (pos!=peak && ((NOW&7)==7 || !data_out_cap_.within(pos.parent())) &&
42 ack_in_.get(pos.parent())==bins::EMPTY) {
43 bin64_t uncle = pos.sibling();
44 dgram.Push8(P2TP_HASH);
45 dgram.Push32((uint32_t)uncle);
46 dgram.PushHash( file().hash(uncle) );
47 //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
48 dprintf("%s #%i +hash %s\n",tintstr(),id,uncle.str());
54 bin64_t Channel::DequeueHint () { // TODO: resilience
55 bin64_t send = bin64_t::NONE;
56 while (!hint_in_.empty() && send==bin64_t::NONE) {
57 bin64_t hint = hint_in_.front().bin;
58 tint time = hint_in_.front().time;
60 if (time < NOW-TINT_SEC*3/2 ) //NOW-8*rtt_avg_)
62 send = file().ack_out().find_filtered(ack_in_,hint,bins::FILLED);
63 send = send.left_foot(); // single packet
64 if (send!=bin64_t::NONE)
66 hint = hint.towards(send);
67 hint_in_.push_front(hint.sibling());
71 for(int i=0; i<hint_in_.size(); i++)
72 mass += hint_in_[i].bin.width();
73 dprintf("%s #%i dequeued %s [%lli]\n",tintstr(),id,send.str(),mass);
78 void Channel::AddHandshake (Datagram& dgram) {
79 if (!peer_channel_id_) { // initiating
80 dgram.Push8(P2TP_HASH);
81 dgram.Push32(bin64_t::ALL32);
82 dgram.PushHash(file().root_hash());
83 dprintf("%s #%i +hash ALL %s\n",
84 tintstr(),id,file().root_hash().hex().c_str());
86 dgram.Push8(P2TP_HANDSHAKE);
87 dgram.Push32(EncodeID(id));
88 dprintf("%s #%i +hs\n",tintstr(),id);
94 void Channel::ClearStaleDataOut() {
95 int oldsize = data_out_.size();
96 tint timeout = NOW - max( rtt_avg_-dev_avg_*4, 500*TINT_MSEC );
97 while ( data_out_.size() && data_out_.front().time < timeout &&
98 ack_in_.get(data_out_.front().bin)==bins::EMPTY ) {
99 dprintf("%s #%i Tdata %s\n",tintstr(),id,data_out_.front().bin.str());
100 data_out_.pop_front();
102 if (data_out_.size()!=oldsize) {
103 cc_->OnAckRcvd(bin64_t::NONE);
104 data_out_cap_ = bin64_t::ALL;
106 while (data_out_.size() && (data_out_.front()==tintbin() || ack_in_.get(data_out_.front().bin)==bins::FILLED))
107 data_out_.pop_front();
111 void Channel::Send () {
112 Datagram dgram(socket_,peer());
113 dgram.Push32(peer_channel_id_);
114 bin64_t data = bin64_t::NONE;
115 if ( is_established() ) {
116 // FIXME: seeder check
118 if (!file().is_complete())
122 data = AddData(dgram);
127 dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str().c_str());
128 if (dgram.size()==4) // only the channel id; bare keep-alive
130 cc_->OnDataSent(data);
131 if (dgram.Send()==-1)
132 print_error("can't send datagram");
136 void Channel::CleanStaleHintOut () {
137 tint timed_out = NOW - 8*rtt_avg_; // FIXME BULLSHIT (take rtt=0)
138 while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) {
139 transfer().picker().Expired(hint_out_.front().bin);
140 hint_out_.pop_front();
145 void Channel::AddHint (Datagram& dgram) {
149 uint64_t hint_out_mass=0;
150 for(int i=0; i<hint_out_.size(); i++)
151 hint_out_mass += hint_out_[i].bin.width();
153 int peer_cwnd = (int)(rtt_avg_ / dip_avg_);
156 int peer_pps = TINT_SEC / dip_avg_;
160 if ( hint_out_mass < peer_pps ) { //4*peer_cwnd ) {
162 int diff = peer_pps - hint_out_mass;
163 //if (diff>4 && diff>2*peer_cwnd)
165 bin64_t hint = transfer().picker().Pick(ack_in_,diff,rtt_avg_*8+TINT_MSEC*100);
167 if (hint!=bin64_t::NONE) {
168 dgram.Push8(P2TP_HINT);
170 dprintf("%s #%i +hint %s [%lli]\n",tintstr(),id,hint.str(),hint_out_mass);
171 hint_out_.push_back(hint);
173 dprintf("%s #%i .hint\n",tintstr(),id);
179 bin64_t Channel::AddData (Datagram& dgram) {
181 if (!file().size()) // know nothing
182 return bin64_t::NONE;
184 bin64_t tosend = bin64_t::NONE;
185 if (cc_->MaySendData()) {
186 tosend = DequeueHint();
187 if (tosend==bin64_t::NONE)
188 dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
190 dprintf("%s #%i no cwnd #sendctrl\n",tintstr(),id);
192 if (tosend==bin64_t::NONE && (last_send_data_time_>NOW-TINT_SEC || data_out_.empty()))
193 return bin64_t::NONE; // once in a while, empty data is sent just to check rtt
195 if (tosend!=bin64_t::NONE) { // hashes
196 if (ack_in_.is_empty() && file().size())
197 AddPeakHashes(dgram);
198 AddUncleHashes(dgram,tosend);
199 data_out_cap_ = tosend;
202 dgram.Push8(P2TP_DATA);
203 dgram.Push32(tosend.to32());
205 if (tosend!=bin64_t::NONE) { // data
207 size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
208 // TODO: corrupted data, retries, caching
210 print_error("error on reading");
211 return bin64_t::NONE;
213 assert(dgram.space()>=r+4+1);
217 last_send_data_time_ = NOW;
218 data_out_.push_back(tosend);
219 dprintf("%s #%i +data %s\n",tintstr(),id,tosend.str());
225 void Channel::AddTs (Datagram& dgram) {
226 dgram.Push8(P2TP_TS);
227 dgram.Push64(data_in_.time);
228 dprintf("%s #%i +ts %s\n",tintstr(),id,tintstr(data_in_.time));
232 void Channel::AddAck (Datagram& dgram) {
233 if (data_in_dbl_!=bin64_t::NONE) {
234 dgram.Push8(P2TP_ACK);
235 dgram.Push32(data_in_dbl_);
236 data_in_dbl_=bin64_t::NONE;
238 if (data_in_.bin!=bin64_t::NONE) {
240 bin64_t pos = file().ack_out().cover(data_in_.bin);
241 dgram.Push8(P2TP_ACK);
243 //dgram.Push64(data_in_.time);
245 dprintf("%s #%i +ack %s %s\n",tintstr(),id,pos.str(),tintstr(data_in_.time));
246 data_in_ = tintbin(0,bin64_t::NONE);
250 for(int count=0; count<4; count++) {
251 bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, bins::FILLED);
252 if (ack==bin64_t::NONE)
254 ack = file().ack_out().cover(ack);
256 dgram.Push8(P2TP_ACK);
258 dprintf("%s #%i +ack %s\n",tintstr(),id,ack.str());
263 void Channel::Recv (Datagram& dgram) {
264 dprintf("%s #%i recvd %i\n",tintstr(),id,dgram.size()+4);
265 if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
266 rtt_avg_ = NOW - last_send_time_;
269 transfer().hs_in_.push_back(id);
270 dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
272 bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
273 while (dgram.size()) {
274 uint8_t type = dgram.Pull8();
276 case P2TP_HANDSHAKE: OnHandshake(dgram); break;
277 case P2TP_DATA: data=OnData(dgram); break;
278 case P2TP_TS: OnTs(dgram); break;
279 case P2TP_ACK: OnAck(dgram); break;
280 case P2TP_HASH: OnHash(dgram); break;
281 case P2TP_HINT: OnHint(dgram); break;
282 case P2TP_PEX_ADD: OnPex(dgram); break;
284 eprintf("%s #%i ?msg id unknown %i\n",tintstr(),id,(int)type);
288 cc_->OnDataRecvd(data);
289 last_recv_time_ = NOW;
293 void Channel::OnHash (Datagram& dgram) {
294 bin64_t pos = dgram.Pull32();
295 Sha1Hash hash = dgram.PullHash();
296 file().OfferHash(pos,hash);
297 //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
298 dprintf("%s #%i -hash %s\n",tintstr(),id,pos.str());
302 void Channel::CleanFulfilledHints (bin64_t pos) {
304 while (hi<hint_out_.size() && hi<8 && !pos.within(hint_out_[hi].bin))
306 if (hi<8 && hi<hint_out_.size()) {
308 transfer().picker().Expired(hint_out_.front().bin);
309 hint_out_.pop_front();
311 while (hint_out_.front().bin!=pos) {
312 tintbin f = hint_out_.front();
313 f.bin = f.bin.towards(pos);
314 hint_out_.front().bin = f.bin.sibling();
315 hint_out_.push_front(f);
317 hint_out_.pop_front();
319 // every HINT ends up as either Expired or Received
320 transfer().picker().Received(pos);
324 bin64_t Channel::OnData (Datagram& dgram) {
325 bin64_t pos = dgram.Pull32();
327 int length = dgram.Pull(&data,1024);
328 bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
329 dprintf("%s #%i %cdata %s\n",tintstr(),id,ok?'-':'!',pos.str());
331 return bin64_t::NONE;
332 data_in_ = tintbin(NOW,pos);
333 if (pos!=bin64_t::NONE) {
334 if (last_recv_data_time_) {
335 tint dip = NOW - last_recv_data_time_;
336 dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
338 last_recv_data_time_ = NOW;
340 CleanFulfilledHints(pos);
345 void Channel::CleanFulfilledDataOut (bin64_t ackd_pos) {
346 for (int i=0; i<8 && i<data_out_.size(); i++)
347 if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
348 tint rtt = NOW-data_out_[i].time;
349 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
350 dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
351 dprintf("%s #%i rtt %lli dev %lli\n",
352 tintstr(),id,rtt_avg_,dev_avg_);
353 cc_->OnAckRcvd(data_out_[i].bin);
354 data_out_[i]=tintbin();
356 while ( data_out_.size() && ( data_out_.front()==tintbin() ||
357 ack_in_.get(data_out_.front().bin)==bins::FILLED ) )
358 data_out_.pop_front();
362 void Channel::OnAck (Datagram& dgram) {
363 bin64_t ackd_pos = dgram.Pull32();
364 if (ackd_pos!=bin64_t::NONE && file().size() && ackd_pos.base_offset()>=file().packet_size()) {
365 eprintf("invalid ack: %s\n",ackd_pos.str());
368 dprintf("%s #%i -ack %s\n",tintstr(),id,ackd_pos.str());
369 ack_in_.set(ackd_pos);
370 CleanFulfilledDataOut(ackd_pos);
374 void Channel::OnTs (Datagram& dgram) {
375 peer_send_time_ = dgram.Pull64();
376 dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
380 void Channel::OnHint (Datagram& dgram) {
381 bin64_t hint = dgram.Pull32();
382 hint_in_.push_back(hint);
383 //ack_in_.set(hint,bins::EMPTY);
384 //RequeueSend(cc_->OnHintRecvd(hint));
385 dprintf("%s #%i -hint %s\n",tintstr(),id,hint.str());
389 void Channel::OnHandshake (Datagram& dgram) {
390 peer_channel_id_ = dgram.Pull32();
391 dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
392 // FUTURE: channel forking
396 void Channel::OnPex (Datagram& dgram) {
397 uint32_t ipv4 = dgram.Pull32();
398 uint16_t port = dgram.Pull16();
399 Address addr(ipv4,port);
400 dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str().c_str());
401 transfer().OnPexIn(addr);
405 void Channel::AddPex (Datagram& dgram) {
406 int chid = transfer().RevealChannel(pex_out_);
407 if (chid==-1 || chid==id)
409 Address a = channels[chid]->peer();
410 dgram.Push8(P2TP_PEX_ADD);
411 dgram.Push32(a.ipv4());
412 dgram.Push16(a.port());
413 dprintf("%s #%i +pex %s\n",tintstr(),id,a.str().c_str());
417 void Channel::RecvDatagram (int socket) {
418 Datagram data(socket);
421 RETLOG("datagram shorter than 4 bytes");
422 uint32_t mych = data.Pull32();
424 Channel* channel = NULL;
425 if (!mych) { // handshake initiated
426 if (data.size()<1+4+1+4+Sha1Hash::SIZE)
427 RETLOG ("incorrect size initial handshake packet");
428 uint8_t hashid = data.Pull8();
429 if (hashid!=P2TP_HASH)
430 RETLOG ("no hash in the initial handshake");
431 bin64_t pos = data.Pull32();
432 if (pos!=bin64_t::ALL)
433 RETLOG ("that is not the root hash");
434 hash = data.PullHash();
435 FileTransfer* file = FileTransfer::Find(hash);
437 RETLOG ("hash unknown, no such file");
438 dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
439 for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
440 if (channels[*i] && channels[*i]->peer_==data.addr &&
441 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
442 RETLOG("have a channel already");
443 channel = new Channel(file, socket, data.address());
445 mych = DecodeID(mych);
446 if (mych>=channels.size()) {
447 eprintf("invalid channel #%i\n",mych);
450 channel = channels[mych];
452 RETLOG ("channel is closed");
453 if (channel->peer() != data.address())
454 RETLOG ("invalid peer address");
455 channel->own_id_mentioned_ = true;
457 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
462 void Channel::Loop (tint howlong) {
464 tint limit = Datagram::Time() + howlong;
468 tint send_time(TINT_NEVER);
469 Channel* sender(NULL);
470 while (!send_queue.is_empty()) {
471 send_time = send_queue.peek().time;
472 sender = channel((int)send_queue.peek().bin);
474 if ( sender->next_send_time_==send_time ||
475 sender->next_send_time_==TINT_NEVER )
477 sender = NULL; // it was a stale entry
482 if ( sender && sender->next_send_time_ <= NOW ) {
483 dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
486 sender->last_send_time_ = NOW;
487 // sender->RequeueSend(sender->cc_->NextSendTime()); goes to SendCtrl
489 } else if ( send_time > NOW ) {
490 tint towait = send_time - NOW;
491 dprintf("%s waiting %lliusec\n",tintstr(),towait);
492 int rd = Datagram::Wait(socket_count,sockets,towait);
493 if (rd!=INVALID_SOCKET)
495 } else { // FIXME FIXME FIXME REWRITE!!! if (sender->next_send_time_==TINT_NEVER) {
497 dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
503 } while (Datagram::Time()<limit);
508 void Channel::Schedule (tint next_time) {
509 next_send_time_ = next_time;
510 if (next_time==TINT_NEVER)
511 next_time = NOW + TINT_MIN; // 1min timeout
512 send_queue.push(tintbin(next_time,id));
513 dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));