X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=blobdiff_plain;f=src%2Flibswift%2Fsendrecv.cpp;fp=src%2Flibswift%2Fsendrecv.cpp;h=e64d3eea324f9812f5bf9f8c4b15d9e1b7533ff6;hb=45963a7511531cd1656ad5d3847d2dafd015c54d;hp=0000000000000000000000000000000000000000;hpb=d069796805ad79542fd7e4406d1e9c6d2d8c2ef7;p=swifty.git diff --git a/src/libswift/sendrecv.cpp b/src/libswift/sendrecv.cpp new file mode 100644 index 0000000..e64d3ee --- /dev/null +++ b/src/libswift/sendrecv.cpp @@ -0,0 +1,1089 @@ +/* + * sendrecv.cpp + * most of the swift's state machine + * + * Created by Victor Grishchenko on 3/6/09. + * Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved. + * + */ +#include "bin_utils.h" +#include "swift.h" +#include // kill it +#include +#include +#include +#include "compat.h" + +using namespace swift; +using namespace std; + +struct event_base *Channel::evbase; +struct event Channel::evrecv; + +#define DEBUGTRAFFIC 0 + +/** Arno: Victor's design allows a sender to choose some data to push to + * a receiver, if that receiver is not HINTing at data. Should be disabled + * when the receiver has a download rate limit. + */ +#define ENABLE_SENDERSIZE_PUSH 0 + + +/** Arno, 2011-11-24: When rate limit is on and the download is in progress + * we send HINTs for 2 chunks at the moment. This constant can be used to + * get greater granularity. Set to 0 for original behaviour. + */ +#define HINT_GRANULARITY 16 // chunks + +/* + TODO 25 Oct 18:55 + - range: ALL + - randomized testing of advanced ops (new testcase) + */ + +void Channel::AddPeakHashes (struct evbuffer *evb) { + for(int i=0; imax(1,(int)cwnd_)) + my_pick = my_pick.left(); + + return my_pick.twisted(twist); +} + + +bin_t Channel::DequeueHint (bool *retransmitptr) { + bin_t send = bin_t::NONE; + + // Arno, 2012-01-23: Extra protection against channel loss, don't send DATA + if (last_recv_time_ < NOW-(3*TINT_SEC)) + return bin_t::NONE; + + // Arno, 2012-01-18: Reenable Victor's retransmit + if (!data_out_tmo_.empty()) + { + tintbin tb = data_out_tmo_.front(); + send = tb.bin; + data_out_tmo_.pop_front(); + *retransmitptr = true; + } + else + *retransmitptr = false; + + if (ENABLE_SENDERSIZE_PUSH && send.is_none() && hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) { + bin_t my_pick = ImposeHint(); // FIXME move to the loop + if (!my_pick.is_none()) { + hint_in_.push_back(my_pick); + char bin_name_buf[32]; + dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str(bin_name_buf)); + } + } + + while (!hint_in_.empty() && send.is_none()) { + bin_t hint = hint_in_.front().bin; + tint time = hint_in_.front().time; + hint_in_.pop_front(); + while (!hint.is_base()) { // FIXME optimize; possible attack + hint_in_.push_front(tintbin(time,hint.right())); + hint = hint.left(); + } + //if (time < NOW-TINT_SEC*3/2 ) + // continue; bad idea + if (!ack_in_.is_filled(hint)) + send = hint; + } + uint64_t mass = 0; + // Arno, 2012-03-09: Is mucho expensive on busy server. + //for(int i=0; i transfer().GetMaxSpeed(DDIR_DOWNLOAD)) { + if (DEBUGTRAFFIC) + fprintf(stderr,"hint: forbidden#"); + return; + } + + + // 1. Calc max of what we are allowed to request, uncongested bandwidth wise + tint plan_for = max(TINT_SEC,rtt_avg_*4); + + tint timed_out = NOW - plan_for*2; + while ( !hint_out_.empty() && hint_out_.front().time < timed_out ) { + hint_out_size_ -= hint_out_.front().bin.base_length(); + hint_out_.pop_front(); + } + + int first_plan_pck = max ( (tint)1, plan_for / dip_avg_ ); + + // Riccardo, 2012-04-04: Actually allowed is max minus what we already asked for + int queue_allowed_hints = max(0,first_plan_pck-(int)hint_out_size_); + + + // RATELIMIT + // 2. Calc max of what is allowed by the rate limiter + int rate_allowed_hints = LONG_MAX; + if (transfer().GetMaxSpeed(DDIR_DOWNLOAD) < DBL_MAX) + { + uint64_t rough_global_hint_out_size = 0; // rough estimate, as hint_out_ clean up is not done for all channels + std::set::iterator iter; + for (iter=transfer().mychannels_.begin(); iter!=transfer().mychannels_.end(); iter++) + { + Channel *c = *iter; + if (c != NULL) + rough_global_hint_out_size += c->hint_out_size_; + } + + // Policy: this channel is allowed to hint at the limit - global_hinted_at + // Handle MaxSpeed = unlimited + double rate_hints_limit_float = transfer().GetMaxSpeed(DDIR_DOWNLOAD)/((double)file().chunk_size()); + + int rate_hints_limit = (int)min((double)LONG_MAX,rate_hints_limit_float); + + // Actually allowed is max minus what we already asked for, globally (=all channels) + rate_allowed_hints = max(0,rate_hints_limit-(int)rough_global_hint_out_size); + } + + // 3. Take the smallest allowance from rate and queue limit + uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,queue_allowed_hints); + + // 4. Ask allowance in blocks of chunks to get pipelining going from serving peer. + if (hint_out_size_ == 0 || plan_pck > HINT_GRANULARITY) + { + bin_t hint = transfer().picker().Pick(ack_in_,plan_pck,NOW+plan_for*2); + if (!hint.is_none()) { + if (DEBUGTRAFFIC) + { + char binstr[32]; + fprintf(stderr,"hint c%d: ask %s\n", id(), hint.str(binstr) ); + } + evbuffer_add_8(evb, SWIFT_HINT); + evbuffer_add_32be(evb, bin_toUInt32(hint)); + char bin_name_buf[32]; + dprintf("%s #%u +hint %s [%lli]\n",tintstr(),id_,hint.str(bin_name_buf),hint_out_size_); + hint_out_.push_back(hint); + hint_out_size_ += hint.base_length(); + //fprintf(stderr,"send c%d: HINTLEN %i\n", id(), hint.base_length()); + //fprintf(stderr,"HL %i ", hint.base_length()); + } + else + dprintf("%s #%u Xhint\n",tintstr(),id_); + + } +} + + +bin_t Channel::AddData (struct evbuffer *evb) { + // RATELIMIT + if (transfer().GetCurrentSpeed(DDIR_UPLOAD) > transfer().GetMaxSpeed(DDIR_UPLOAD)) { + transfer().OnSendNoData(); + return bin_t::NONE; + } + + if (!file().size()) // know nothing + return bin_t::NONE; + + bin_t tosend = bin_t::NONE; + bool isretransmit = false; + tint luft = send_interval_>>4; // may wake up a bit earlier + if (data_out_.size()NOW-TINT_SEC || data_out_.empty())) + return bin_t::NONE; // once in a while, empty data is sent just to check rtt FIXED + + if (ack_in_.is_empty() && file().size()) + AddPeakHashes(evb); + + //NETWVSHASH + if (file().get_check_netwvshash()) + AddUncleHashes(evb,tosend); + + if (!ack_in_.is_empty()) // TODO: cwnd_>1 + data_out_cap_ = tosend; + + // Arno, 2011-11-03: May happen when first data packet is sent to empty + // leech, then peak + uncle hashes may be so big that they don't fit in eth + // frame with DATA. Send 2 datagrams then, one with peaks so they have + // a better chance of arriving. Optimistic violation of atomic datagram + // principle. + if (file().chunk_size() == SWIFT_DEFAULT_CHUNK_SIZE && evbuffer_get_length(evb) > SWIFT_MAX_NONDATA_DGRAM_SIZE) { + dprintf("%s #%u fsent %ib %s:%x\n", + tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(), + peer_channel_id_); + int ret = Channel::SendTo(socket_,peer(),evb); // kind of fragmentation + if (ret > 0) + raw_bytes_up_ += ret; + evbuffer_add_32be(evb, peer_channel_id_); + } + + if (file().chunk_size() != SWIFT_DEFAULT_CHUNK_SIZE && isretransmit) { + /* FRAGRAND + * Arno, 2012-01-17: We observe strange behaviour when using + * fragmented UDP packets. When ULANC sends a specific datagram ("995"), + * the 2nd IP packet carrying it gets lost structurally. When + * downloading from the same asset hosted on a Linux 32-bit machine + * using a Win7 32-bit client (behind a NAT), one specific full + * datagram never gets delivered (6970 one before do). A workaround + * is to add some random data to the datagram. Hence we introduce + * the SWIFT_RANDOMIZE message, that is added to the datagram carrying + * the DATA on a retransmit. + */ + char binstr[32]; + fprintf(stderr,"AddData: retransmit of randomized chunk %s\n",tosend.str(binstr) ); + evbuffer_add_8(evb, SWIFT_RANDOMIZE); + evbuffer_add_32be(evb, (int)rand() ); + } + + evbuffer_add_8(evb, SWIFT_DATA); + evbuffer_add_32be(evb, bin_toUInt32(tosend)); + + struct evbuffer_iovec vec; + if (evbuffer_reserve_space(evb, file().chunk_size(), &vec, 1) < 0) { + print_error("error on evbuffer_reserve_space"); + return bin_t::NONE; + } + size_t r = pread(file().file_descriptor(),(char *)vec.iov_base, + file().chunk_size(),tosend.base_offset()*file().chunk_size()); + // TODO: corrupted data, retries, caching + if (r<0) { + print_error("error on reading"); + vec.iov_len = 0; + evbuffer_commit_space(evb, &vec, 1); + return bin_t::NONE; + } + // assert(dgram.space()>=r+4+1); + vec.iov_len = r; + if (evbuffer_commit_space(evb, &vec, 1) < 0) { + print_error("error on evbuffer_commit_space"); + return bin_t::NONE; + } + + last_data_out_time_ = NOW; + data_out_.push_back(tosend); + bytes_up_ += r; + global_bytes_up += r; + + char bin_name_buf[32]; + dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str(bin_name_buf)); + + // RATELIMIT + // ARNOSMPTODO: count overhead bytes too? Move to Send() then. + transfer_->OnSendData(file().chunk_size()); + + return tosend; +} + + +void Channel::AddAck (struct evbuffer *evb) { + if (data_in_==tintbin()) + //if (data_in_.bin==bin64_t::NONE) + return; + // sometimes, we send a HAVE (e.g. in case the peer did repetitive send) + evbuffer_add_8(evb, data_in_.time==TINT_NEVER?SWIFT_HAVE:SWIFT_ACK); + evbuffer_add_32be(evb, bin_toUInt32(data_in_.bin)); + if (data_in_.time!=TINT_NEVER) + evbuffer_add_64be(evb, data_in_.time); + + + if (DEBUGTRAFFIC) + fprintf(stderr,"send c%d: ACK %i\n", id(), bin_toUInt32(data_in_.bin)); + + have_out_.set(data_in_.bin); + char bin_name_buf[32]; + dprintf("%s #%u +ack %s %s\n", + tintstr(),id_,data_in_.bin.str(bin_name_buf),tintstr(data_in_.time)); + if (data_in_.bin.layer()>2) + data_in_dbl_ = data_in_.bin; + + //fprintf(stderr,"data_in_ c%d\n", id() ); + data_in_ = tintbin(); + //data_in_ = tintbin(NOW,bin64_t::NONE); +} + + +void Channel::AddHave (struct evbuffer *evb) { + if (!data_in_dbl_.is_none()) { // TODO: do redundancy better + evbuffer_add_8(evb, SWIFT_HAVE); + evbuffer_add_32be(evb, bin_toUInt32(data_in_dbl_)); + data_in_dbl_=bin_t::NONE; + } + if (DEBUGTRAFFIC) + fprintf(stderr,"send c%d: HAVE ",id() ); + for(int count=0; count<4; count++) { + bin_t ack = binmap_t::find_complement(have_out_, file().ack_out(), 0); // FIXME: do rotating queue + if (ack.is_none()) + break; + ack = file().ack_out().cover(ack); + have_out_.set(ack); + evbuffer_add_8(evb, SWIFT_HAVE); + evbuffer_add_32be(evb, bin_toUInt32(ack)); + + if (DEBUGTRAFFIC) + fprintf(stderr," %i", bin_toUInt32(ack)); + + char bin_name_buf[32]; + dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str(bin_name_buf)); + } + if (DEBUGTRAFFIC) + fprintf(stderr,"\n"); + +} + + +void Channel::Recv (struct evbuffer *evb) { + dprintf("%s #%u recvd %ib\n",tintstr(),id_,(int)evbuffer_get_length(evb)+4); + dgrams_rcvd_++; + + lastrecvwaskeepalive_ = (evbuffer_get_length(evb) == 0); + if (lastrecvwaskeepalive_) + // Update speed measurements such that they decrease when DL stops + transfer().OnRecvData(0); + + if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) { + rtt_avg_ = NOW - last_send_time_; + dev_avg_ = rtt_avg_; + dip_avg_ = rtt_avg_; + dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_); + } + + bin_t data = evbuffer_get_length(evb) ? bin_t::NONE : bin_t::ALL; + + if (DEBUGTRAFFIC) + fprintf(stderr,"recv c%d: size %d ", id(), evbuffer_get_length(evb)); + + while (evbuffer_get_length(evb)) { + uint8_t type = evbuffer_remove_8(evb); + + if (DEBUGTRAFFIC) + fprintf(stderr," %d", type); + + switch (type) { + case SWIFT_HANDSHAKE: OnHandshake(evb); break; + case SWIFT_DATA: data=OnData(evb); break; + case SWIFT_HAVE: OnHave(evb); break; + case SWIFT_ACK: OnAck(evb); break; + case SWIFT_HASH: OnHash(evb); break; + case SWIFT_HINT: OnHint(evb); break; + case SWIFT_PEX_ADD: OnPex(evb); break; + case SWIFT_PEX_REQ: OnPexReq(); break; + case SWIFT_RANDOMIZE: OnRandomize(evb); break; //FRAGRAND + default: + dprintf("%s #%u ?msg id unknown %i\n",tintstr(),id_,(int)type); + return; + } + } + if (DEBUGTRAFFIC) + { + fprintf(stderr,"\n"); + } + + + last_recv_time_ = NOW; + sent_since_recv_ = 0; + Reschedule(); +} + +/* + * Arno: FAXME: HASH+DATA should be handled as a transaction: only when the + * hashes check out should they be stored in the hashtree, otherwise revert. + */ +void Channel::OnHash (struct evbuffer *evb) { + bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb)); + Sha1Hash hash = evbuffer_remove_hash(evb); + file().OfferHash(pos,hash); + char bin_name_buf[32]; + dprintf("%s #%u -hash %s\n",tintstr(),id_,pos.str(bin_name_buf)); + + //fprintf(stderr,"HASH %lli hex %s\n",pos.toUInt(), hash.hex().c_str() ); +} + + +void Channel::CleanHintOut (bin_t pos) { + int hi = 0; + while (hi file().chunk_size()) { + dprintf("%s #%u !data chunk size mismatch %s: exp %lu got " PRISIZET "\n",tintstr(),id_,pos.str(bin_name_buf), file().chunk_size(), evbuffer_get_length(evb)); + fprintf(stderr,"WARNING: chunk size mismatch: exp %lu got " PRISIZET "\n",file().chunk_size(), evbuffer_get_length(evb)); + } + + int length = (evbuffer_get_length(evb) < file().chunk_size()) ? evbuffer_get_length(evb) : file().chunk_size(); + if (!file().ack_out().is_empty(pos)) { + // Arno, 2012-01-24: print message for duplicate + dprintf("%s #%u Ddata %s\n",tintstr(),id_,pos.str(bin_name_buf)); + evbuffer_drain(evb, length); + data_in_ = tintbin(TINT_NEVER,transfer().ack_out().cover(pos)); + + // Arno, 2012-01-24: Make sure data interarrival periods don't get + // screwed up because of these (ignored) duplicates. + UpdateDIP(pos); + return bin_t::NONE; + } + uint8_t *data = evbuffer_pullup(evb, length); + data_in_ = tintbin(NOW,bin_t::NONE); + if (!file().OfferData(pos, (char*)data, length)) { + evbuffer_drain(evb, length); + char bin_name_buf[32]; + dprintf("%s #%u !data %s\n",tintstr(),id_,pos.str(bin_name_buf)); + return bin_t::NONE; + } + evbuffer_drain(evb, length); + dprintf("%s #%u -data %s\n",tintstr(),id_,pos.str(bin_name_buf)); + + if (DEBUGTRAFFIC) + fprintf(stderr,"$ "); + + bin_t cover = transfer().ack_out().cover(pos); + for(int i=0; i=transfer().cb_agg[i]) + transfer().callbacks[i](transfer().fd(),cover); // FIXME + if (cover.layer() >= 5) // Arno: tested with 32K, presently = 2 ** 5 * chunk_size CHUNKSIZE + transfer().OnRecvData( pow((double)2,(double)5)*((double)file().chunk_size()) ); + data_in_.bin = pos; + + UpdateDIP(pos); + CleanHintOut(pos); + bytes_down_ += length; + global_bytes_down += length; + return pos; +} + + +void Channel::UpdateDIP(bin_t pos) +{ + if (!pos.is_none()) { + if (last_data_in_time_) { + tint dip = NOW - last_data_in_time_; + dip_avg_ = ( dip_avg_*3 + dip ) >> 2; + } + last_data_in_time_ = NOW; + } +} + + +void Channel::OnAck (struct evbuffer *evb) { + bin_t ackd_pos = bin_fromUInt32(evbuffer_remove_32be(evb)); + tint peer_time = evbuffer_remove_64be(evb); // FIXME 32 + // FIXME FIXME: wrap around here + if (ackd_pos.is_none()) + return; // likely, broken chunk/ insufficient hashes + if (file().size() && ackd_pos.base_offset()>=file().size_in_chunks()) { + char bin_name_buf[32]; + eprintf("invalid ack: %s\n",ackd_pos.str(bin_name_buf)); + return; + } + ack_in_.set(ackd_pos); + + //fprintf(stderr,"OnAck: got bin %s is_complete %d\n", ackd_pos.str(), (int)ack_in_.is_complete_arno( file().ack_out().get_height() )); + + int di = 0, ri = 0; + // find an entry for the send (data out) event + while ( di> 3; + dev_avg_ = ( dev_avg_*3 + tintabs(rtt-rtt_avg_) ) >> 2; + assert(data_out_[di].time!=TINT_NEVER); + // one-way delay calculations + tint owd = peer_time - data_out_[di].time; + owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3; + owd_current_[owd_cur_bin_] = owd; + if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) { + owd_min_bin_start_ = NOW; + owd_min_bin_ = (owd_min_bin_+1) & 3; + owd_min_bins_[owd_min_bin_] = TINT_NEVER; + } + if (owd_min_bins_[owd_min_bin_]>owd) + owd_min_bins_[owd_min_bin_] = owd; + dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n", + tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str(bin_name_buf)); + ack_rcvd_recent_++; + // early loss detection by packet reordering + for (int re=0; re 0) + { + transfer().availability().setSize(file().size_in_chunks()); + } + // Ric: update the availability if needed + transfer().availability().set(id_, ack_in_, ackd_pos); + } + + ack_in_.set(ackd_pos); + char bin_name_buf[32]; + dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str(bin_name_buf)); + + //fprintf(stderr,"OnHave: got bin %s is_complete %d\n", ackd_pos.str(), IsComplete() ); + +} + + +void Channel::OnHint (struct evbuffer *evb) { + bin_t hint = bin_fromUInt32(evbuffer_remove_32be(evb)); + // FIXME: wake up here + hint_in_.push_back(hint); + char bin_name_buf[32]; + dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str(bin_name_buf)); +} + + +void Channel::OnHandshake (struct evbuffer *evb) { + + uint32_t pcid = evbuffer_remove_32be(evb); + dprintf("%s #%u -hs %x\n",tintstr(),id_,pcid); + + if (is_established() && pcid == 0) { + // Arno: received explicit close + peer_channel_id_ = 0; // == established -> false + Close(); + return; + } + + peer_channel_id_ = pcid; + // self-connection check + if (!SELF_CONN_OK) { + uint32_t try_id = DecodeID(peer_channel_id_); + if (channel(try_id) && !channel(try_id)->peer_channel_id_) { + peer_channel_id_ = 0; + Close(); + return; // this is a self-connection + } + } + + // FUTURE: channel forking + if (is_established()) + dprintf("%s #%u established %s\n", tintstr(), id_, peer().str()); +} + + +void Channel::OnPex (struct evbuffer *evb) { + uint32_t ipv4 = evbuffer_remove_32be(evb); + uint16_t port = evbuffer_remove_16be(evb); + Address addr(ipv4,port); + dprintf("%s #%u -pex %s\n",tintstr(),id_,addr.str()); + if (transfer().OnPexIn(addr)) + useless_pex_count_ = 0; + else + { + dprintf("%s #%u already channel to %s\n", tintstr(),id_,addr.str()); + useless_pex_count_++; + } + pex_request_outstanding_ = false; +} + + +//FRAGRAND +void Channel::OnRandomize (struct evbuffer *evb) { + dprintf("%s #%u -rand\n",tintstr(),id_ ); + // Payload is 4 random bytes + uint32_t r = evbuffer_remove_32be(evb); +} + + +void Channel::AddPex (struct evbuffer *evb) { + // Gertjan fix: Reverse PEX + // PEX messages sent to facilitate NAT/FW puncturing get priority + if (!reverse_pex_out_.empty()) { + do { + tintbin pex_peer = reverse_pex_out_.front(); + reverse_pex_out_.pop_front(); + if (channels[(int) pex_peer.bin.toUInt()] == NULL) + continue; + Address a = channels[(int) pex_peer.bin.toUInt()]->peer(); + // Arno, 2012-02-28: Don't send private addresses to non-private peers. + if (!a.is_private() || (a.is_private() && peer().is_private())) + { + evbuffer_add_8(evb, SWIFT_PEX_ADD); + evbuffer_add_32be(evb, a.ipv4()); + evbuffer_add_16be(evb, a.port()); + dprintf("%s #%u +pex (reverse) %s\n",tintstr(),id_,a.str()); + } + } while (!reverse_pex_out_.empty() && (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb)) >= 7); + + // Arno: 2012-02-23: Don't think this is right. Bit of DoS thing, + // that you only get back the addr of people that got your addr. + // Disable for now. + //return; + } + + if (!pex_requested_) + return; + + // Arno, 2012-02-28: Don't send private addresses to non-private peers. + int chid = 0, tries=0; + Address a; + while (true) + { + // Arno, 2011-10-03: Choosing Gertjan's RandomChannel over RevealChannel here. + chid = transfer().RandomChannel(id_); + if (chid==-1 || chid==id_ || tries > 5) { + pex_requested_ = false; + return; + } + a = channels[chid]->peer(); + if (!a.is_private() || (a.is_private() && peer().is_private())) + break; + tries++; + } + + evbuffer_add_8(evb, SWIFT_PEX_ADD); + evbuffer_add_32be(evb, a.ipv4()); + evbuffer_add_16be(evb, a.port()); + dprintf("%s #%u +pex %s\n",tintstr(),id_,a.str()); + + pex_requested_ = false; + /* Ensure that we don't add the same id to the reverse_pex_out_ queue + more than once. */ + for (tbqueue::iterator i = channels[chid]->reverse_pex_out_.begin(); + i != channels[chid]->reverse_pex_out_.end(); i++) + if ((int) (i->bin.toUInt()) == id_) + return; + + dprintf("%s #%u adding pex for channel %u at time %s\n", tintstr(), chid, + id_, tintstr(NOW + 2 * TINT_SEC)); + // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs. + channels[chid]->reverse_pex_out_.push_back(tintbin(NOW + 2 * TINT_SEC, bin_t(id_))); + if (channels[chid]->send_control_ == KEEP_ALIVE_CONTROL && + channels[chid]->next_send_time_ > NOW + 2 * TINT_SEC) + channels[chid]->Reschedule(); +} + +void Channel::OnPexReq(void) { + dprintf("%s #%u -pex req\n", tintstr(), id_); + if (NOW > MIN_PEX_REQUEST_INTERVAL + last_pex_request_time_) + pex_requested_ = true; +} + +void Channel::AddPexReq(struct evbuffer *evb) { + // Rate limit the number of PEX requests + if (NOW < next_pex_request_time_) + return; + + // If no answer has been received from a previous request, count it as useless + if (pex_request_outstanding_) + useless_pex_count_++; + + pex_request_outstanding_ = false; + + // Initiate at most SWIFT_MAX_CONNECTIONS connections + if (transfer().hs_in_.size() >= SWIFT_MAX_CONNECTIONS || + // Check whether this channel has been providing useful peer information + useless_pex_count_ > 2) + { + // Arno, 2012-02-23: Fix: Code doesn't recover from useless_pex_count_ > 2, + // let's just try again in 30s + useless_pex_count_ = 0; + next_pex_request_time_ = NOW + 30 * TINT_SEC; + + return; + } + + dprintf("%s #%u +pex req\n", tintstr(), id_); + evbuffer_add_8(evb, SWIFT_PEX_REQ); + /* Add a little more than the minimum interval, such that the other party is + less likely to drop it due to too high rate */ + next_pex_request_time_ = NOW + MIN_PEX_REQUEST_INTERVAL * 1.1; + pex_request_outstanding_ = true; +} + + + +/* + * Channel class methods + */ + +void Channel::LibeventReceiveCallback(evutil_socket_t fd, short event, void *arg) { + // Called by libevent when a datagram is received on the socket + Time(); + RecvDatagram(fd); + event_add(&evrecv, NULL); +} + +void Channel::RecvDatagram (evutil_socket_t socket) { + struct evbuffer *evb = evbuffer_new(); + Address addr; + RecvFrom(socket, addr, evb); + size_t evboriglen = evbuffer_get_length(evb); +#define return_log(...) { fprintf(stderr,__VA_ARGS__); evbuffer_free(evb); return; } + if (evbuffer_get_length(evb)<4) + return_log("socket layer weird: datagram shorter than 4 bytes from %s (prob ICMP unreach)\n",addr.str()); + uint32_t mych = evbuffer_remove_32be(evb); + Sha1Hash hash; + Channel* channel = NULL; + if (mych==0) { // peer initiates handshake + if (evbuffer_get_length(evb)<1+4+1+4+Sha1Hash::SIZE) + return_log ("%s #0 incorrect size %i initial handshake packet %s\n", + tintstr(),(int)evbuffer_get_length(evb),addr.str()); + uint8_t hashid = evbuffer_remove_8(evb); + if (hashid!=SWIFT_HASH) + return_log ("%s #0 no hash in the initial handshake %s\n", + tintstr(),addr.str()); + bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb)); + if (!pos.is_all()) + return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str()); + hash = evbuffer_remove_hash(evb); + FileTransfer* ft = FileTransfer::Find(hash); + if (!ft) + return_log ("%s #0 hash %s unknown, requested by %s\n",tintstr(),hash.hex().c_str(),addr.str()); + dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str()); + + // Arno, 2012-02-27: Check for duplicate channel + Channel* existchannel = ft->FindChannel(addr,NULL); + if (existchannel) + { + // Arno: 2011-10-13: Ignore if established, otherwise consider + // it a concurrent connection attempt. + if (existchannel->is_established()) { + // ARNOTODO: Read complete handshake here so we know whether + // attempt is to new channel or to existing. Currently read + // in OnHandshake() + // + return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str()); + } else { + channel = existchannel; + //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: reuse channel %s\n", channel->peer_.str() ); + } + } + if (channel == NULL) { + //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: create new channel %s\n", addr.str() ); + channel = new Channel(ft, socket, addr); + } + //fprintf(stderr,"CHANNEL INCOMING DEF hass %s is id %d\n",hash.hex().c_str(),channel->id()); + + } else { // peer responds to my handshake (and other messages) + mych = DecodeID(mych); + if (mych>=channels.size()) + return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str()); + channel = channels[mych]; + if (!channel) + return_log ("%s #%u is already closed\n",tintstr(),mych); + if (channel->IsDiffSenderOrDuplicate(addr,mych)) { + channel->Schedule4Close(); + return; + } + channel->own_id_mentioned_ = true; + } + channel->raw_bytes_down_ += evboriglen; + //dprintf("recvd %i bytes for %i\n",data.size(),channel->id); + bool wasestablished = channel->is_established(); + + dprintf("%s #%u peer %s recv_peer %s addr %s\n", tintstr(),mych, channel->peer().str(), channel->recv_peer().str(), addr.str() ); + + channel->Recv(evb); + + evbuffer_free(evb); + //SAFECLOSE + if (wasestablished && !channel->is_established()) { + // Arno, 2012-01-26: Received an explict close, clean up channel, safely. + channel->Schedule4Close(); + } +} + + + +/* + * Channel instance methods + */ + +void Channel::CloseChannelByAddress(const Address &addr) +{ + // fprintf(stderr,"CloseChannelByAddress: address is %s\n", addr.str() ); + std::vector::iterator iter; + for (iter = channels.begin(); iter != channels.end(); iter++) + { + Channel *c = *iter; + if (c != NULL && c->peer_ == addr) + { + // ARNOSMPTODO: will do another send attempt before not being + // Rescheduled. + c->peer_channel_id_ = 0; // established->false, do no more sending + c->Schedule4Close(); + break; + } + } +} + + +void Channel::Close () { + + this->SwitchSendControl(CLOSE_CONTROL); + + if (is_established()) + this->Send(); // Arno: send explicit close + + if (ENABLE_VOD_PIECEPICKER) { + // Ric: remove it's binmap from the availability + transfer().availability().remove(id_, ack_in_); + } + + // SAFECLOSE + // Arno: ensure LibeventSendCallback is no longer called with ptr to this Channel + ClearEvents(); +} + + +void Channel::Reschedule () { + + // Arno: CAREFUL: direct send depends on diff between next_send_time_ and + // NOW to be 0, so any calls to Time in between may put things off. Sigh. + Time(); + next_send_time_ = NextSendTime(); + if (next_send_time_!=TINT_NEVER) { + + assert(next_send_time_Schedule4Close(); + } +} + + +/* + * Channel class methods + */ +void Channel::LibeventSendCallback(int fd, short event, void *arg) { + + // Called by libevent when it is the requested send time. + Time(); + Channel * sender = (Channel*) arg; + if (NOWnext_send_time_-TINT_MSEC) + dprintf("%s #%u suspicious send %s<%s\n",tintstr(), + sender->id(),tintstr(NOW),tintstr(sender->next_send_time_)); + if (sender->next_send_time_ != TINT_NEVER) + sender->Send(); +} +