5 * Created by Victor Grishchenko on 3/6/09.
6 * Copyright 2009 Delft University of Technology. All rights reserved.
10 //#include <glog/logging.h>
12 #include "compat/util.h"
16 using namespace std; // FIXME remove
20 - move hint_out_, piece picking to piece picker (needed e.g. for the case of channel drop)
23 - randomized testing of advanced ops (new testcase)
25 - bins hint_out_, tbqueue hint_out_ts_
29 void Channel::AddPeakHashes (Datagram& dgram) {
30 for(int i=0; i<file().peak_count(); i++) {
31 bin64_t peak = file().peak(i);
32 dgram.Push8(P2TP_HASH);
33 dgram.Push32((uint32_t)peak);
34 dgram.PushHash(file().peak_hash(i));
35 //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
36 dprintf("%s #%i +phash (%i,%lli)\n",tintstr(),id,peak.layer(),peak.offset());
41 void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
42 bin64_t peak = file().peak_for(pos);
43 while (pos!=peak && !data_out_cap_.within(pos.parent()) &&
44 ack_in_.get(pos.parent())==bins::EMPTY) {
45 bin64_t uncle = pos.sibling();
46 dgram.Push8(P2TP_HASH);
47 dgram.Push32((uint32_t)uncle);
48 dgram.PushHash( file().hash(uncle) );
49 //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
50 dprintf("%s #%i +hash (%i,%lli)\n",tintstr(),id,uncle.layer(),uncle.offset());
56 bin64_t Channel::DequeueHint () { // TODO: resilience
57 bin64_t send = bin64_t::NONE;
58 while (!hint_in_.empty() && send==bin64_t::NONE) {
59 bin64_t hint = hint_in_.front().bin;
60 tint time = hint_in_.front().time;
62 if (time < NOW-8*rtt_avg_)
64 send = file().ack_out().find_filtered
65 (ack_in_,hint,0,bins::FILLED);
66 dprintf("%s #%i dequeued %lli\n",tintstr(),id,send.base_offset());
67 if (send!=bin64_t::NONE)
69 hint = hint.towards(send);
70 hint_in_.push_front(hint.sibling());
77 /*void Channel::CleanStaleHints () {
78 while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED )
79 hint_out.pop_front(); // FIXME must normally clear fulfilled entries
80 tint timed_out = NOW - cc_->RoundTripTime()*8;
81 while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
82 file().picker()->Snubbed(hint_out.front().bin);
88 void Channel::AddHandshake (Datagram& dgram) {
89 if (!peer_channel_id_) { // initiating
90 dgram.Push8(P2TP_HASH);
91 dgram.Push32(bin64_t::ALL32);
92 dgram.PushHash(file().root_hash());
93 dprintf("%s #%i +hash ALL %s\n",
94 tintstr(),id,file().root_hash().hex().c_str());
96 dgram.Push8(P2TP_HANDSHAKE);
97 dgram.Push32(EncodeID(id));
98 dprintf("%s #%i +hs\n",tintstr(),id);
104 void Channel::ClearStaleDataOut() {
105 int oldsize = data_out_.size();
106 while ( data_out_.size() && data_out_.front().time <
107 NOW - rtt_avg_ - dev_avg_*4 )
108 data_out_.pop_front();
109 if (data_out_.size()!=oldsize) {
110 cc_->OnAckRcvd(bin64_t::NONE);
111 data_out_cap_ = bin64_t::ALL;
113 while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
114 data_out_.pop_front();
118 void Channel::Send () {
119 Datagram dgram(socket_,peer());
120 dgram.Push32(peer_channel_id_);
121 bin64_t data = bin64_t::NONE;
122 if ( is_established() ) {
123 // FIXME: seeder check
125 if (!file().is_complete())
129 if (cc_->MaySendData())
130 data = AddData(dgram);
132 dprintf("%s #%i no cwnd\n",tintstr(),id);
137 dprintf("%s #%i sent %ib %s\n",tintstr(),id,dgram.size(),peer().str().c_str());
138 if (dgram.size()==4) // only the channel id; bare keep-alive
140 cc_->OnDataSent(data);
141 if (dgram.Send()==-1)
142 print_error("can't send datagram");
146 void Channel::AddHint (Datagram& dgram) {
148 while (!hint_out_.empty()) {
149 tintbin f = hint_out_.front();
150 if (f.time<NOW-rtt_avg_*8) {
151 hint_out_.pop_front();
152 dprintf("%s #%i !hint (%i,%lli)\n",
153 tintstr(),id,(int)f.bin.layer(),f.bin.offset());
154 transfer().picker().Expired(f.bin);
156 int status = file().ack_out().get(f.bin);
157 if (status==bins::EMPTY) {
159 } else if (status==bins::FILLED) {
160 hint_out_.pop_front();
161 transfer().picker().Expired(f.bin);
163 hint_out_.front().bin = f.bin.right();
164 f.bin = f.bin.left();
165 hint_out_.push_front(f);
166 } // FIXME: simplify this mess
169 /*while (!hint_out_.empty() &&
170 (hint_out_.front().time<NOW-TINT_SEC ||
171 file().ack_out().get(hint_out_.front().bin)==bins::FILLED ) ) {
172 file().picker().Expired(hint_out_.front().bin);
173 hint_out_.pop_front();
176 for(tbqueue::iterator i=hint_out_.begin(); i!=hint_out_.end(); i++)
177 hinted += i->bin.width();
178 //int bps = PeerBPS();
179 //double kbps = max(4,TINT_SEC / dip_avg_);
180 double peer_cwnd = rtt_avg_ / dip_avg_;
183 dprintf("%s #%i hinted %lli peer_cwnd %lli/%lli=%f\n",
184 tintstr(),id,hinted,rtt_avg_,dip_avg_,((float)rtt_avg_/dip_avg_));
186 if ( 4*peer_cwnd > hinted ) { //hinted*1024 < peer_cwnd*4 ) {
188 uint8_t layer = 2; // actually, enough
189 bin64_t hint = transfer().picker().Pick(ack_in_,layer);
190 // FIXME FIXME FIXME: any layer
191 if (hint==bin64_t::NONE)
192 hint = transfer().picker().Pick(ack_in_,0);
194 if (hint!=bin64_t::NONE) {
195 hint_out_.push_back(hint);
196 dgram.Push8(P2TP_HINT);
198 dprintf("%s #%i +hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
205 bin64_t Channel::AddData (Datagram& dgram) {
206 if (!file().size()) // know nothing
207 return bin64_t::NONE;
208 bin64_t tosend = DequeueHint();
209 if (tosend==bin64_t::NONE) {
210 dprintf("%s #%i out of hints #sendctrl\n",tintstr(),id);
211 return bin64_t::NONE;
213 if (ack_in_.is_empty() && file().size())
214 AddPeakHashes(dgram);
215 AddUncleHashes(dgram,tosend);
217 size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10);
218 // TODO: ??? corrupted data, retries
220 print_error("error on reading");
221 return bin64_t::NONE;
223 assert(dgram.space()>=r+4+1);
224 dgram.Push8(P2TP_DATA);
225 dgram.Push32(tosend);
227 dprintf("%s #%i +data (%lli)\n",tintstr(),id,tosend.base_offset());
228 data_out_.push_back(tosend);
229 data_out_cap_ = tosend;
230 // FIXME BUG this makes data_out_ all stale ack_in_.set(tosend);
235 void Channel::AddTs (Datagram& dgram) {
236 dgram.Push8(P2TP_TS);
237 dgram.Push64(data_in_.time);
238 dprintf("%s #%i +ts %lli\n",tintstr(),id,data_in_.time);
242 void Channel::AddAck (Datagram& dgram) {
243 if (data_in_.bin!=bin64_t::NONE) {
245 bin64_t pos = data_in_.bin;
246 dgram.Push8(P2TP_ACK);
248 //dgram.Push64(data_in_.time);
250 dprintf("%s #%i +ack (%i,%lli) %s\n",tintstr(),id,
251 pos.layer(),pos.offset(),tintstr(data_in_.time));
252 data_in_ = tintbin(0,bin64_t::NONE);
254 for(int count=0; count<4; count++) {
255 bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, 0, bins::FILLED);
256 // TODO bins::ANY_LAYER
257 if (ack==bin64_t::NONE)
259 ack = file().ack_out().cover(ack);
261 dgram.Push8(P2TP_ACK);
263 dprintf("%s #%i +ack (%i,%lli)\n",tintstr(),id,ack.layer(),ack.offset());
268 void Channel::Recv (Datagram& dgram) {
269 if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
270 rtt_avg_ = NOW - last_send_time_;
273 transfer().hs_in_.push_back(id);
274 dprintf("%s #%i rtt init %lli\n",tintstr(),id,rtt_avg_);
276 bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
277 while (dgram.size()) {
278 uint8_t type = dgram.Pull8();
280 case P2TP_HANDSHAKE: OnHandshake(dgram); break;
281 case P2TP_DATA: data=OnData(dgram); break;
282 case P2TP_TS: OnTs(dgram); break;
283 case P2TP_ACK: OnAck(dgram); break;
284 case P2TP_HASH: OnHash(dgram); break;
285 case P2TP_HINT: OnHint(dgram); break;
286 case P2TP_PEX_ADD: OnPex(dgram); break;
288 //LOG(ERROR) << this->id_string() << " malformed datagram";
292 cc_->OnDataRecvd(data);
293 last_recv_time_ = NOW;
294 if (data!=bin64_t::ALL && next_send_time_>NOW+TINT_MSEC)
295 Send(); //RequeueSend(NOW);
299 void Channel::OnHash (Datagram& dgram) {
300 bin64_t pos = dgram.Pull32();
301 Sha1Hash hash = dgram.PullHash();
302 file().OfferHash(pos,hash);
303 //DLOG(INFO)<<"#"<<id<<" .HASH"<<(int)pos;
304 dprintf("%s #%i -hash (%i,%lli)\n",tintstr(),id,pos.layer(),pos.offset());
308 bin64_t Channel::OnData (Datagram& dgram) {
309 bin64_t pos = dgram.Pull32();
311 int length = dgram.Pull(&data,1024);
312 bool ok = file().OfferData(pos, (char*)data, length) ;
313 dprintf("%s #%i %cdata (%lli)\n",tintstr(),id,ok?'-':'!',pos.offset());
315 data_in_ = tintbin(NOW,pos);
316 if (last_recv_time_) {
317 tint dip = NOW - last_recv_time_; // FIXME: was it an ACK?
318 dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
320 transfer().picker().Received(pos); // so dirty; FIXME FIXME FIXME
323 return bin64_t::NONE;
327 void Channel::OnAck (Datagram& dgram) {
328 bin64_t ackd_pos = dgram.Pull32();
329 if (file().size() && ackd_pos.base_offset()>=file().packet_size()) {
330 eprintf("invalid ack: (%i,%lli)\n",ackd_pos.layer(),ackd_pos.offset());
333 dprintf("%s #%i -ack (%i,%lli)\n",tintstr(),id,ackd_pos.layer(),ackd_pos.offset());
334 for (int i=0; i<8 && i<data_out_.size(); i++)
335 if (data_out_[i].bin.within(ackd_pos)) {
336 tint rtt = NOW-data_out_[i].time;
337 rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
338 dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
339 dprintf("%s #%i rtt %lli dev %lli\n",
340 tintstr(),id,rtt_avg_,dev_avg_);
341 cc_->OnAckRcvd(data_out_[i].bin); // may be invoked twice FIXME FIXME FIXME
343 ack_in_.set(ackd_pos);
344 while (data_out_.size() && ack_in_.get(data_out_.front().bin)==bins::FILLED)
345 data_out_.pop_front();
349 /*void Channel::OnAckTs (Datagram& dgram) { // FIXME: OnTs
350 bin64_t pos = dgram.Pull32();
351 tint ts = dgram.Pull64();
353 dprintf("%s #%i -ackts (%i,%lli) %s\n",
354 tintstr(),id,pos.layer(),pos.offset(),tintstr(ts));
356 cc_->OnAckRcvd(pos,ts);
359 void Channel::OnTs (Datagram& dgram) {
360 peer_send_time_ = dgram.Pull64();
361 dprintf("%s #%i -ts %lli\n",tintstr(),id,peer_send_time_);
365 void Channel::OnHint (Datagram& dgram) {
366 bin64_t hint = dgram.Pull32();
367 hint_in_.push_back(hint);
368 ack_in_.set(hint,bins::EMPTY);
369 //RequeueSend(cc_->OnHintRecvd(hint));
370 dprintf("%s #%i -hint (%i,%lli)\n",tintstr(),id,hint.layer(),hint.offset());
374 void Channel::OnHandshake (Datagram& dgram) {
375 peer_channel_id_ = dgram.Pull32();
376 dprintf("%s #%i -hs %i\n",tintstr(),id,peer_channel_id_);
377 // FUTURE: channel forking
381 void Channel::OnPex (Datagram& dgram) {
382 uint32_t ipv4 = dgram.Pull32();
383 uint16_t port = dgram.Pull16();
384 Address addr(ipv4,port);
385 dprintf("%s #%i -pex %s\n",tintstr(),id,addr.str().c_str());
386 transfer().OnPexIn(addr);
390 void Channel::AddPex (Datagram& dgram) {
391 int chid = transfer().RevealChannel(pex_out_);
392 if (chid==-1 || chid==id)
394 Address a = channels[chid]->peer();
395 dgram.Push8(P2TP_PEX_ADD);
396 dgram.Push32(a.ipv4());
397 dgram.Push16(a.port());
398 dprintf("%s #%i +pex %s\n",tintstr(),id,a.str().c_str());
402 void Channel::Recv (int socket) {
403 Datagram data(socket);
406 RETLOG("datagram shorter than 4 bytes");
407 uint32_t mych = data.Pull32();
409 Channel* channel = NULL;
410 if (!mych) { // handshake initiated
411 if (data.size()<1+4+1+4+Sha1Hash::SIZE)
412 RETLOG ("incorrect size initial handshake packet");
413 uint8_t hashid = data.Pull8();
414 if (hashid!=P2TP_HASH)
415 RETLOG ("no hash in the initial handshake");
416 bin64_t pos = data.Pull32();
417 if (pos!=bin64_t::ALL32)
418 RETLOG ("that is not the root hash");
419 hash = data.PullHash();
420 FileTransfer* file = FileTransfer::Find(hash);
422 RETLOG ("hash unknown, no such file");
423 dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
424 for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
425 if (channels[*i] && channels[*i]->peer_==data.addr)
426 RETLOG("have a channel already");
427 channel = new Channel(file, socket, data.address());
429 mych = DecodeID(mych);
430 if (mych>=channels.size()) {
431 eprintf("invalid channel #%i\n",mych);
434 channel = channels[mych];
436 RETLOG ("channel is closed");
437 if (channel->peer() != data.address())
438 RETLOG ("invalid peer address");
439 channel->own_id_mentioned_ = true;
441 dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
446 bool tblater (const tintbin& a, const tintbin& b) {
447 return a.time > b.time;
451 void Channel::RequeueSend (tint next_time) {
452 if (next_time==next_send_time_)
454 next_send_time_ = next_time;
456 (tintbin(next_time==TINT_NEVER?NOW+TINT_MIN:next_time,id));
457 push_heap(send_queue.begin(),send_queue.end(),tblater);
458 dprintf("%s requeue #%i for %s\n",tintstr(),id,tintstr(next_time));
462 void Channel::Loop (tint howlong) {
464 tint limit = Datagram::Time() + howlong;
468 tint send_time(TINT_NEVER);
469 Channel* sender(NULL);
470 while (!send_queue.empty()) {
471 send_time = send_queue.front().time;
472 sender = channel((int)send_queue.front().bin);
474 if ( sender->next_send_time_==send_time ||
475 sender->next_send_time_==TINT_NEVER )
477 sender = NULL; // it was a stale entry
478 pop_heap(send_queue.begin(), send_queue.end(), tblater);
479 send_queue.pop_back();
483 if ( sender && sender->next_send_time_ <= NOW ) {
484 dprintf("%s #%i sch_send %s\n",tintstr(),sender->id,
487 sender->last_send_time_ = NOW;
488 sender->RequeueSend(sender->cc_->NextSendTime());
489 pop_heap(send_queue.begin(), send_queue.end(), tblater);
490 send_queue.pop_back();
491 } else if ( send_time > NOW ) {
492 tint towait = send_time - NOW;
493 dprintf("%s waiting %lliusec\n",tintstr(),towait);
494 int rd = Datagram::Wait(socket_count,sockets,towait);
495 if (rd!=INVALID_SOCKET)
497 } else { //if (sender->next_send_time_==TINT_NEVER) {
498 dprintf("%s #%i closed sendctrl\n",tintstr(),sender->id);
500 pop_heap(send_queue.begin(), send_queue.end(), tblater);
501 send_queue.pop_back();
504 } while (Datagram::Time()<limit);