3 * class representing a virtual connection to a peer. In addition,
4 * it contains generic functions for socket management (see sock_open
7 * Created by Victor Grishchenko on 3/6/09.
8 * Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
14 //#include <glog/logging.h>
16 #include "../kernel/mptp.h"
19 using namespace swift;
25 swift::tint now_t::now = Channel::Time();
26 tint Channel::start = now_t::now;
27 tint Channel::epoch = now_t::now/360000000LL*360000000LL; // make logs mergeable
28 uint64_t Channel::global_dgrams_up=0, Channel::global_dgrams_down=0,
29 Channel::global_raw_bytes_up=0, Channel::global_raw_bytes_down=0,
30 Channel::global_bytes_up=0, Channel::global_bytes_down=0,
31 Channel::global_buffers_up=0, Channel::global_syscalls_up=0,
32 Channel::global_buffers_down=0, Channel::global_syscalls_down=0;
33 sckrwecb_t Channel::sock_open[] = {};
34 int Channel::sock_count = 0;
35 swift::tint Channel::last_tick = 0;
36 int Channel::MAX_REORDERING = 4;
37 bool Channel::SELF_CONN_OK = false;
38 swift::tint Channel::TIMEOUT = TINT_SEC*60;
39 std::vector<Channel*> Channel::channels(1);
40 Address Channel::tracker;
41 //tbheap Channel::send_queue;
42 FILE* Channel::debug_file = NULL;
43 #include "ext/simple_selector.cpp"
44 PeerSelector* Channel::peer_selector = new SimpleSelector();
45 tint Channel::MIN_PEX_REQUEST_INTERVAL = TINT_SEC;
52 Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) :
53 // Arno, 2011-10-03: Reordered to avoid g++ Wall warning
54 peer_(peer_addr), socket_(socket==INVALID_SOCKET?default_socket():socket), // FIXME
55 transfer_(transfer), peer_channel_id_(0), own_id_mentioned_(false),
56 data_in_(TINT_NEVER,bin_t::NONE), data_in_dbl_(bin_t::NONE),
57 data_out_cap_(bin_t::ALL),hint_out_size_(0),
58 // Gertjan fix 996e21e8abfc7d88db3f3f8158f2a2c4fc8a8d3f
59 // "Changed PEX rate limiting to per channel limiting"
60 last_pex_request_time_(0), next_pex_request_time_(0),
61 pex_request_outstanding_(false), useless_pex_count_(0),
62 pex_requested_(false), // Ric: init var that wasn't initialiazed
64 rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
65 last_send_time_(0), last_recv_time_(0), last_data_out_time_(0), last_data_in_time_(0),
66 last_loss_time_(0), next_send_time_(0), cwnd_(1), cwnd_count1_(0), send_interval_(TINT_SEC),
67 send_control_(PING_PONG_CONTROL), sent_since_recv_(0),
68 lastrecvwaskeepalive_(false), lastsendwaskeepalive_(false), // Arno: nap bug fix
70 ack_not_rcvd_recent_(0), owd_min_bin_(0), owd_min_bin_start_(NOW),
71 owd_cur_bin_(0), dgrams_sent_(0), dgrams_rcvd_(0),
72 raw_bytes_up_(0), raw_bytes_down_(0), bytes_up_(0), bytes_down_(0),
73 scheduled4close_(false)
77 this->id_ = channels.size();
78 channels.push_back(this);
79 transfer_->hs_in_.push_back(bin_t(id_));
80 for(int i=0; i<4; i++) {
81 owd_min_bins_[i] = TINT_NEVER;
82 owd_current_[i] = TINT_NEVER;
84 evsend_ptr_ = new struct event;
85 evtimer_assign(evsend_ptr_,evbase,&Channel::LibeventSendCallback,this);
86 evtimer_add(evsend_ptr_,tint2tv(next_send_time_));
89 transfer->mychannels_.insert(this);
91 dprintf("%s #%u init channel %s\n",tintstr(),id_,peer_.str());
92 //fprintf(stderr,"new Channel %d %s\n", id_, peer_.str() );
96 Channel::~Channel () {
97 dprintf("%s #%u dealloc channel\n",tintstr(),id_);
102 if (transfer_ != NULL)
103 transfer_->mychannels_.erase(this);
107 void Channel::ClearEvents()
109 if (evsend_ptr_ != NULL) {
110 if (evtimer_pending(evsend_ptr_,NULL))
111 evtimer_del(evsend_ptr_);
120 bool Channel::IsComplete() {
121 // Check if peak hash bins are filled.
122 if (file().peak_count() == 0)
125 for(int i=0; i<file().peak_count(); i++) {
126 bin_t peak = file().peak(i);
127 if (!ack_in_.is_filled(peak))
135 uint16_t Channel::GetMyPort() {
136 struct sockaddr_in mysin = {};
137 socklen_t mysinlen = sizeof(mysin);
138 if (getsockname(socket_, (struct sockaddr *)&mysin, &mysinlen) < 0)
140 print_error("error on getsockname");
144 return ntohs(mysin.sin_port);
147 bool Channel::IsDiffSenderOrDuplicate(Address addr, uint32_t chid)
151 // Got message from different address than I send to
153 if (!own_id_mentioned_ && addr.is_private()) {
154 // Arno, 2012-02-27: Got HANDSHAKE reply from IANA private address,
155 // check for duplicate connections:
157 // When two peers A and B are behind the same firewall, they will get
158 // extB, resp. extA addresses from the tracker. They will both
159 // connect to their counterpart but because the incoming packet
160 // will be from the intNAT address the duplicates are not
163 // Solution: when the second datagram comes in (HANDSHAKE reply),
164 // see if you have had a first datagram from the same addr
165 // (HANDSHAKE). If so, close the channel if his port number is
166 // larger than yours (such that one channel remains).
170 Channel *c = transfer().FindChannel(addr,this);
172 // I already initiated a connection to this peer,
173 // this new incoming message would establish a duplicate.
174 // One must break the connection, decide using port
176 dprintf("%s #%u found duplicate channel to %s\n",
177 tintstr(),chid,addr.str());
179 if (addr.port() > GetMyPort()) {
181 dprintf("%s #%u closing duplicate channel to %s\n",
182 tintstr(),chid,addr.str());
189 // Received HANDSHAKE reply from other address than I sent
190 // HANDSHAKE to, and the address is not an IANA private
191 // address (=no NAT in play), so close.
193 dprintf("%s #%u invalid peer address %s!=%s\n",
194 tintstr(),chid,peer().str(),addr.str());
210 tint Channel::Time () {
211 //HiResTimeOfDay* tod = HiResTimeOfDay::Instance();
212 //tint ret = tod->getTimeUSec();
213 //DLOG(INFO)<<"now is "<<ret;
214 return now_t::now = usec_time();
218 evutil_socket_t Channel::Bind (Address address, sckrwecb_t callbacks) {
219 struct sockaddr_mptp *addr = address.addr;
221 int len = sizeof(struct sockaddr_mptp) + addr->count*sizeof(struct mptp_dest), sndbuf=1<<20, rcvbuf=1<<20;
222 #define dbnd_ensure(x) { if (!(x)) { \
223 print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
224 dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_MPTP)) >= 0 );
225 dbnd_ensure( make_socket_nonblocking(fd) ); // FIXME may remove this
227 dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF,
228 (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
229 dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
230 (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
231 //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
232 dbnd_ensure ( ::bind(fd, (sockaddr*)addr, len) == 0 );
235 sock_open[sock_count++] = callbacks;
239 Address Channel::BoundAddress(evutil_socket_t sock) {
241 struct sockaddr_in myaddr;
242 socklen_t mylen = sizeof(myaddr);
243 int ret = getsockname(sock,(sockaddr*)&myaddr,&mylen);
245 return Address(myaddr);
253 Address swift::BoundAddress(evutil_socket_t sock) {
254 return Channel::BoundAddress(sock);
258 int Channel::SendTo (evutil_socket_t sock, const Address& addr, struct evbuffer **evb) {
260 int count = addr.addr->count;
261 int addr_len = sizeof(struct sockaddr_mptp) + count * sizeof(struct mptp_dest);
262 struct iovec iov[count];
265 memset(&msg, 0, sizeof(msg));
266 memset(&iov, 0, sizeof(iov));
267 for (int i=0; i<count; ++i) {
268 lengths[i] = evbuffer_get_length(evb[i]);
269 iov[i].iov_base = evbuffer_pullup(evb[i], lengths[i]);
270 iov[i].iov_len = lengths[i];
273 msg.msg_iovlen = count;
274 msg.msg_name = addr.addr;
275 msg.msg_namelen = addr_len;
276 int r = sendmsg(sock, &msg, 0);
278 print_error("can't send");
279 for (int i=0; i<count; ++i)
280 evbuffer_drain(evb[i], lengths[i]); // Arno: behaviour is to pretend the packet got lost
283 for (int i=0; i<count; ++i)
284 evbuffer_drain(evb[i], addr.addr->dests[i].bytes);
285 global_dgrams_up+=count;
286 global_buffers_up+=count;
287 global_syscalls_up++;
288 for (int i=0; i<count; ++i)
289 global_raw_bytes_up+=lengths[i];
294 int Channel::RecvFrom (evutil_socket_t sock, Address& addr, struct evbuffer **evb) {
295 int count = addr.addr->count;
296 socklen_t addrlen = sizeof(struct sockaddr_mptp) + count * sizeof(mptp_dest);
297 struct evbuffer_iovec vec[count];
298 for (int i=0; i<count; ++i) {
299 if (evbuffer_reserve_space(evb[i], SWIFT_MAX_RECV_DGRAM_SIZE, &vec[i], 1) < 0) {
300 print_error("error on evbuffer_reserve_space");
304 struct iovec iov[count];
306 memset(&msg, 0, sizeof(msg));
307 memset(&iov, 0, sizeof(iov));
308 for (int i=0; i<count; ++i) {
309 iov[i].iov_base = vec[i].iov_base;
310 iov[i].iov_len = SWIFT_MAX_RECV_DGRAM_SIZE;
313 msg.msg_iovlen = count;
314 msg.msg_name = addr.addr;
315 msg.msg_namelen = addrlen;
316 int length = recvmsg(sock, &msg, 0);
320 // Linux and Windows report "ICMP port unreachable" if the dest port could
322 // http://support.microsoft.com/kb/260018
323 // http://www.faqs.org/faqs/unix-faq/socket/
325 if (WSAGetLastError() == 10054) // Sometimes errno == 2 ?!
327 if (errno == ECONNREFUSED)
330 CloseChannelByAddress(addr);
333 print_error("error on recv");
336 for (int i=0; i<addr.addr->count; ++i) {
337 length += addr.addr->dests[i].bytes;
338 vec[i].iov_len = addr.addr->dests[i].bytes;
339 if (evbuffer_commit_space(evb[i], &vec[i], 1) < 0) {
341 print_error("error on evbuffer_commit_space");
344 global_dgrams_down+=addr.addr->count;
345 global_buffers_down+=addr.addr->count;
346 global_syscalls_down++;
347 global_raw_bytes_down+=length;
353 void Channel::CloseSocket(evutil_socket_t sock) {
354 for(int i=0; i<sock_count; i++)
355 if (sock_open[i].sock==sock)
356 sock_open[i] = sock_open[--sock_count];
357 if (!close_socket(sock))
358 print_error("on closing a socket");
361 void Channel::Shutdown () {
363 CloseSocket(sock_open[sock_count].sock);
366 void swift::SetTracker(const Address& tracker) {
367 Channel::tracker = tracker;
370 int Channel::DecodeID(int scrambled) {
371 return scrambled ^ (int)start;
373 int Channel::EncodeID(int unscrambled) {
374 return unscrambled ^ (int)start;
378 * class Address implementation
381 void Address::set_ipv4 (const char* ip_str) {
382 struct hostent *h = gethostbyname(ip_str);
384 print_error("cannot lookup address");
387 addr->dests[0].addr = *(u_long *) h->h_addr_list[0];
392 Address::Address(const char* ip_port) {
394 if (strlen(ip_port)>=1024)
397 strncpy(ipp,ip_port,1024);
398 char* semi = strchr(ipp,':');
404 if (strchr(ipp, '.')) {
406 set_port((uint16_t)0);
408 set_ipv4((uint32_t)INADDR_ANY);
415 uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
423 const char* swift::tintstr (tint time) {
426 static char ret_str[4][32]; // wow
429 if (time==TINT_NEVER)
431 time -= Channel::epoch;
433 int hours = time/TINT_HOUR;
435 int mins = time/TINT_MIN;
437 int secs = time/TINT_SEC;
439 int msecs = time/TINT_MSEC;
441 int usecs = time/TINT_uSEC;
442 sprintf(ret_str[i],"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
447 std::string swift::sock2str (struct sockaddr_in addr) {
450 //Vista only: InetNtop(AF_INET,&(addr.sin_addr),ipch,32);
452 struct in_addr inaddr;
453 memcpy(&inaddr, &(addr.sin_addr), sizeof(inaddr));
454 strncpy(ipch, inet_ntoa(inaddr),32);
456 inet_ntop(AF_INET,&(addr.sin_addr),ipch,32);
458 sprintf(ipch+strlen(ipch),":%i",ntohs(addr.sin_port));
459 return std::string(ipch);
464 * Swift top-level API implementation
467 int swift::Listen (Address addr) {
469 cb.may_read = &Channel::LibeventReceiveCallback;
470 cb.sock = Channel::Bind(addr,cb);
472 event_assign(&Channel::evrecv, Channel::evbase, cb.sock, EV_READ,
474 event_add(&Channel::evrecv, NULL);
478 void swift::Shutdown (int sock_des) {
482 int swift::Open (const char* filename, const Sha1Hash& hash, Address tracker, bool force_check_diskvshash, bool check_netwvshash, uint32_t chunk_size) {
483 FileTransfer* ft = new FileTransfer(filename, hash, force_check_diskvshash, check_netwvshash, chunk_size);
484 if (ft && ft->file().file_descriptor()) {
486 /*if (FileTransfer::files.size()<fdes) // FIXME duplication
487 FileTransfer::files.resize(fdes);
488 FileTransfer::files[fdes] = ft;*/
490 // initiate tracker connections
492 ft->SetTracker(tracker);
493 ft->ConnectToTracker();
495 return ft->file().file_descriptor();
504 void swift::Close (int fd) {
505 if (fd<FileTransfer::files.size() && FileTransfer::files[fd])
506 delete FileTransfer::files[fd];
510 void swift::AddPeer (Address address, const Sha1Hash& root) {
511 Channel::peer_selector->AddPeer(address,root);
515 uint64_t swift::Size (int fdes) {
516 if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
517 return FileTransfer::files[fdes]->file().size();
523 bool swift::IsComplete (int fdes) {
524 if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
525 return FileTransfer::files[fdes]->file().is_complete();
531 uint64_t swift::Complete (int fdes) {
532 if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
533 return FileTransfer::files[fdes]->file().complete();
539 uint64_t swift::SeqComplete (int fdes) {
540 if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
541 return FileTransfer::files[fdes]->file().seq_complete();
547 const Sha1Hash& swift::RootMerkleHash (int file) {
548 FileTransfer* trans = FileTransfer::file(file);
550 return Sha1Hash::ZERO;
551 return trans->file().root_hash();
555 /** Returns the number of bytes in a chunk for this transmission */
556 uint32_t swift::ChunkSize(int fdes)
558 if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
559 return FileTransfer::files[fdes]->file().chunk_size();
566 void swift::Checkpoint(int transfer) {
567 // Save transfer's binmap for zero-hashcheck restart
568 FileTransfer *ft = FileTransfer::file(transfer);
572 std::string binmap_filename = ft->file().filename();
573 binmap_filename.append(".mbinmap");
574 //fprintf(stderr,"swift: checkpointing %s at %lli\n", binmap_filename.c_str(), Complete(transfer));
575 FILE *fp = fopen(binmap_filename.c_str(),"wb");
577 print_error("cannot open mbinmap for writing");
580 if (ft->file().serialize(fp) < 0)
581 print_error("writing to mbinmap");
591 int swift::evbuffer_add_string(struct evbuffer *evb, std::string str) {
592 return evbuffer_add(evb, str.c_str(), str.size());
595 int swift::evbuffer_add_8(struct evbuffer *evb, uint8_t b) {
596 return evbuffer_add(evb, &b, 1);
599 int swift::evbuffer_add_16be(struct evbuffer *evb, uint16_t w) {
600 uint16_t wbe = htons(w);
601 return evbuffer_add(evb, &wbe, 2);
604 int swift::evbuffer_add_32be(struct evbuffer *evb, uint32_t i) {
605 uint32_t ibe = htonl(i);
606 return evbuffer_add(evb, &ibe, 4);
609 int swift::evbuffer_add_64be(struct evbuffer *evb, uint64_t l) {
611 lbe[0] = htonl((uint32_t)(l>>32));
612 lbe[1] = htonl((uint32_t)(l&0xffffffff));
613 return evbuffer_add(evb, lbe, 8);
616 int swift::evbuffer_add_hash(struct evbuffer *evb, const Sha1Hash& hash) {
617 return evbuffer_add(evb, hash.bits, Sha1Hash::SIZE);
620 uint8_t swift::evbuffer_remove_8(struct evbuffer *evb) {
622 if (evbuffer_remove(evb, &b, 1) < 1)
627 uint16_t swift::evbuffer_remove_16be(struct evbuffer *evb) {
629 if (evbuffer_remove(evb, &wbe, 2) < 2)
634 uint32_t swift::evbuffer_remove_32be(struct evbuffer *evb) {
636 if (evbuffer_remove(evb, &ibe, 4) < 4)
641 uint64_t swift::evbuffer_remove_64be(struct evbuffer *evb) {
643 if (evbuffer_remove(evb, lbe, 8) < 8)
645 uint64_t l = ntohl(lbe[0]);
651 Sha1Hash swift::evbuffer_remove_hash(struct evbuffer* evb) {
652 char bits[Sha1Hash::SIZE];
653 if (evbuffer_remove(evb, bits, Sha1Hash::SIZE) < Sha1Hash::SIZE)
654 return Sha1Hash::ZERO;
655 return Sha1Hash(false, bits);