First step for using multiple recvs from mptp.
[swifty.git] / src / libswift / channel.cpp
1 /*
2  *  channel.cpp
3  *  class representing a virtual connection to a peer. In addition,
4  *  it contains generic functions for socket management (see sock_open
5  *  class variable)
6  *
7  *  Created by Victor Grishchenko on 3/6/09.
8  *  Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
9  *
10  */
11
12 #include <cassert>
13 #include "compat.h"
14 //#include <glog/logging.h>
15 #include "swift.h"
16 #include "../kernel/mptp.h"
17
18 using namespace std;
19 using namespace swift;
20
21 /*
22  * Class variables
23  */
24
25 swift::tint now_t::now = Channel::Time();
26 tint Channel::start = now_t::now;
27 tint Channel::epoch = now_t::now/360000000LL*360000000LL; // make logs mergeable
28 uint64_t Channel::global_dgrams_up=0, Channel::global_dgrams_down=0,
29          Channel::global_raw_bytes_up=0, Channel::global_raw_bytes_down=0,
30          Channel::global_bytes_up=0, Channel::global_bytes_down=0;
31 sckrwecb_t Channel::sock_open[] = {};
32 int Channel::sock_count = 0;
33 swift::tint Channel::last_tick = 0;
34 int Channel::MAX_REORDERING = 4;
35 bool Channel::SELF_CONN_OK = false;
36 swift::tint Channel::TIMEOUT = TINT_SEC*60;
37 std::vector<Channel*> Channel::channels(1);
38 Address Channel::tracker;
39 //tbheap Channel::send_queue;
40 FILE* Channel::debug_file = NULL;
41 #include "ext/simple_selector.cpp"
42 PeerSelector* Channel::peer_selector = new SimpleSelector();
43 tint Channel::MIN_PEX_REQUEST_INTERVAL = TINT_SEC;
44
45
46 /*
47  * Instance methods
48  */
49
50 Channel::Channel    (FileTransfer* transfer, int socket, Address peer_addr) :
51         // Arno, 2011-10-03: Reordered to avoid g++ Wall warning
52         peer_(peer_addr), socket_(socket==INVALID_SOCKET?default_socket():socket), // FIXME
53     transfer_(transfer), peer_channel_id_(0), own_id_mentioned_(false),
54     data_in_(TINT_NEVER,bin_t::NONE), data_in_dbl_(bin_t::NONE),
55     data_out_cap_(bin_t::ALL),hint_out_size_(0),
56     // Gertjan fix 996e21e8abfc7d88db3f3f8158f2a2c4fc8a8d3f
57     // "Changed PEX rate limiting to per channel limiting"
58     last_pex_request_time_(0), next_pex_request_time_(0),
59     pex_request_outstanding_(false), useless_pex_count_(0),
60     pex_requested_(false),  // Ric: init var that wasn't initialiazed
61     //
62     rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
63     last_send_time_(0), last_recv_time_(0), last_data_out_time_(0), last_data_in_time_(0),
64     last_loss_time_(0), next_send_time_(0), cwnd_(1), cwnd_count1_(0), send_interval_(TINT_SEC),
65     send_control_(PING_PONG_CONTROL), sent_since_recv_(0),
66     lastrecvwaskeepalive_(false), lastsendwaskeepalive_(false), // Arno: nap bug fix
67     ack_rcvd_recent_(0),
68     ack_not_rcvd_recent_(0), owd_min_bin_(0), owd_min_bin_start_(NOW),
69     owd_cur_bin_(0), dgrams_sent_(0), dgrams_rcvd_(0),
70     raw_bytes_up_(0), raw_bytes_down_(0), bytes_up_(0), bytes_down_(0),
71     scheduled4close_(false)
72 {
73     if (peer_==Address())
74         peer_ = tracker;
75     this->id_ = channels.size();
76     channels.push_back(this);
77     transfer_->hs_in_.push_back(bin_t(id_));
78     for(int i=0; i<4; i++) {
79         owd_min_bins_[i] = TINT_NEVER;
80         owd_current_[i] = TINT_NEVER;
81     }
82     evsend_ptr_ = new struct event;
83     evtimer_assign(evsend_ptr_,evbase,&Channel::LibeventSendCallback,this);
84     evtimer_add(evsend_ptr_,tint2tv(next_send_time_));
85
86     // RATELIMIT
87         transfer->mychannels_.insert(this);
88
89         dprintf("%s #%u init channel %s\n",tintstr(),id_,peer_.str());
90         //fprintf(stderr,"new Channel %d %s\n", id_, peer_.str() );
91 }
92
93
94 Channel::~Channel () {
95         dprintf("%s #%u dealloc channel\n",tintstr(),id_);
96     channels[id_] = NULL;
97     ClearEvents();
98
99     // RATELIMIT
100     if (transfer_ != NULL)
101         transfer_->mychannels_.erase(this);
102 }
103
104
105 void Channel::ClearEvents()
106 {
107     if (evsend_ptr_ != NULL) {
108         if (evtimer_pending(evsend_ptr_,NULL))
109                 evtimer_del(evsend_ptr_);
110         delete evsend_ptr_;
111         evsend_ptr_ = NULL;
112     }
113 }
114
115
116
117
118 bool Channel::IsComplete() {
119         // Check if peak hash bins are filled.
120         if (file().peak_count() == 0)
121                 return false;
122
123     for(int i=0; i<file().peak_count(); i++) {
124         bin_t peak = file().peak(i);
125         if (!ack_in_.is_filled(peak))
126             return false;
127     }
128         return true;
129 }
130
131
132
133 uint16_t Channel::GetMyPort() {
134         struct sockaddr_in mysin = {};
135         socklen_t mysinlen = sizeof(mysin);
136         if (getsockname(socket_, (struct sockaddr *)&mysin, &mysinlen) < 0)
137         {
138                 print_error("error on getsockname");
139                 return 0;
140         }
141         else
142                 return ntohs(mysin.sin_port);
143 }
144
145 bool Channel::IsDiffSenderOrDuplicate(Address addr, uint32_t chid)
146 {
147     if (peer() != addr)
148     {
149         // Got message from different address than I send to
150         //
151                 if (!own_id_mentioned_ && addr.is_private()) {
152                         // Arno, 2012-02-27: Got HANDSHAKE reply from IANA private address,
153                         // check for duplicate connections:
154                         //
155                         // When two peers A and B are behind the same firewall, they will get
156                         // extB, resp. extA addresses from the tracker. They will both
157                         // connect to their counterpart but because the incoming packet
158                         // will be from the intNAT address the duplicates are not
159                         // recognized.
160                         //
161                         // Solution: when the second datagram comes in (HANDSHAKE reply),
162                         // see if you have had a first datagram from the same addr
163                         // (HANDSHAKE). If so, close the channel if his port number is
164                         // larger than yours (such that one channel remains).
165                         //
166                         recv_peer_ = addr;
167
168                         Channel *c = transfer().FindChannel(addr,this);
169                         if (c != NULL) {
170                                 // I already initiated a connection to this peer,
171                                 // this new incoming message would establish a duplicate.
172                                 // One must break the connection, decide using port
173                                 // number:
174                                 dprintf("%s #%u found duplicate channel to %s\n",
175                                                 tintstr(),chid,addr.str());
176
177                                 if (addr.port() > GetMyPort()) {
178                                         //Schedule4Close();
179                                         dprintf("%s #%u closing duplicate channel to %s\n",
180                                                         tintstr(),chid,addr.str());
181                                         return true;
182                                 }
183                         }
184                 }
185                 else
186                 {
187                         // Received HANDSHAKE reply from other address than I sent
188                         // HANDSHAKE to, and the address is not an IANA private
189                         // address (=no NAT in play), so close.
190                         //Schedule4Close();
191                         dprintf("%s #%u invalid peer address %s!=%s\n",
192                                         tintstr(),chid,peer().str(),addr.str());
193                         return true;
194                 }
195     }
196         return false;
197 }
198
199
200
201
202
203
204
205 /*
206  * Class methods
207  */
208 tint Channel::Time () {
209     //HiResTimeOfDay* tod = HiResTimeOfDay::Instance();
210     //tint ret = tod->getTimeUSec();
211     //DLOG(INFO)<<"now is "<<ret;
212     return now_t::now = usec_time();
213 }
214
215 // SOCKMGMT
216 evutil_socket_t Channel::Bind (Address address, sckrwecb_t callbacks) {
217     struct sockaddr_mptp *addr = address.addr;
218     evutil_socket_t fd;
219     int len = sizeof(struct sockaddr_mptp) + addr->count*sizeof(struct mptp_dest), sndbuf=1<<20, rcvbuf=1<<20;
220     #define dbnd_ensure(x) { if (!(x)) { \
221         print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
222     dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_MPTP)) >= 0 );
223     dbnd_ensure( make_socket_nonblocking(fd) );  // FIXME may remove this
224     int enable = true;
225     dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF,
226                              (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
227     dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
228                              (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
229     //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
230     dbnd_ensure ( ::bind(fd, (sockaddr*)addr, len) == 0 );
231
232     callbacks.sock = fd;
233     sock_open[sock_count++] = callbacks;
234     return fd;
235 }
236
237 Address Channel::BoundAddress(evutil_socket_t sock) {
238
239     struct sockaddr_in myaddr;
240     socklen_t mylen = sizeof(myaddr);
241     int ret = getsockname(sock,(sockaddr*)&myaddr,&mylen);
242     if (ret >= 0) {
243                 return Address(myaddr);
244     }
245         else {
246                 return Address();
247         }
248 }
249
250
251 Address swift::BoundAddress(evutil_socket_t sock) {
252         return Channel::BoundAddress(sock);
253 }
254
255
256 int Channel::SendTo (evutil_socket_t sock, const Address& addr, struct evbuffer **evb) {
257
258         int count = addr.addr->count;
259         int addr_len = sizeof(struct sockaddr_mptp) + count * sizeof(struct mptp_dest);
260         struct iovec iov[count];
261         int lengths[count];
262         struct msghdr msg;
263         memset(&msg, 0, sizeof(msg));
264         memset(&iov, 0, sizeof(iov));
265         for (int i=0; i<count; ++i) {
266                 lengths[i] = evbuffer_get_length(evb[i]);
267                 iov[i].iov_base = evbuffer_pullup(evb[i], lengths[i]);
268                 iov[i].iov_len = lengths[i];
269         }
270         msg.msg_iov = iov;
271         msg.msg_iovlen = count;
272         msg.msg_name = addr.addr;
273         msg.msg_namelen = addr_len;
274         int r = sendmsg(sock, &msg, 0);
275     if (r<0) {
276         print_error("can't send");
277                 for (int i=0; i<count; ++i)
278                 evbuffer_drain(evb[i], lengths[i]); // Arno: behaviour is to pretend the packet got lost
279     }
280     else
281                 for (int i=0; i<count; ++i)
282                         evbuffer_drain(evb[i], addr.addr->dests[i].bytes);
283     global_dgrams_up+=count;
284         for (int i=0; i<count; ++i)
285                 global_raw_bytes_up+=lengths[i];
286     Time();
287     return r;
288 }
289
290 int Channel::RecvFrom (evutil_socket_t sock, Address& addr, struct evbuffer **evb) {
291         int count = addr.addr->count;
292     socklen_t addrlen = sizeof(struct sockaddr_mptp) + count * sizeof(mptp_dest);
293     struct evbuffer_iovec vec[count];
294         for (int i=0; i<count; ++i) {
295                 if (evbuffer_reserve_space(evb[i], SWIFT_MAX_RECV_DGRAM_SIZE, &vec[i], 1) < 0) {
296                         print_error("error on evbuffer_reserve_space");
297                         return 0;
298                 }
299         }
300         struct iovec iov[count];
301         struct msghdr msg;
302         memset(&msg, 0, sizeof(msg));
303         memset(&iov, 0, sizeof(iov));
304         for (int i=0; i<count; ++i) {
305                 iov[i].iov_base = vec[i].iov_base;
306                 iov[i].iov_len = SWIFT_MAX_RECV_DGRAM_SIZE;
307         }
308         msg.msg_iov = iov;
309         msg.msg_iovlen = count;
310         msg.msg_name = addr.addr;
311         msg.msg_namelen = addrlen;
312         int length = recvmsg(sock, &msg, 0);
313     if (length<0) {
314         length = 0;
315
316         // Linux and Windows report "ICMP port unreachable" if the dest port could
317         // not be reached:
318         //    http://support.microsoft.com/kb/260018
319         //    http://www.faqs.org/faqs/unix-faq/socket/
320 #ifdef _WIN32
321         if (WSAGetLastError() == 10054) // Sometimes errno == 2 ?!
322 #else
323                 if (errno == ECONNREFUSED)
324 #endif
325                 {
326                 CloseChannelByAddress(addr);
327                 }
328         else
329                 print_error("error on recv");
330     }
331         length = 0;
332         for (int i=0; i<addr.addr->count; ++i) {
333                 length += addr.addr->dests[i].bytes;
334                 vec[i].iov_len = addr.addr->dests[i].bytes;
335                 if (evbuffer_commit_space(evb[i], &vec[i], 1) < 0)  {
336                         length = 0;
337                         print_error("error on evbuffer_commit_space");
338                 }
339         }
340     global_dgrams_down+=addr.addr->count;
341     global_raw_bytes_down+=length;
342     Time();
343     return length;
344 }
345
346
347 void Channel::CloseSocket(evutil_socket_t sock) {
348     for(int i=0; i<sock_count; i++)
349         if (sock_open[i].sock==sock)
350             sock_open[i] = sock_open[--sock_count];
351     if (!close_socket(sock))
352         print_error("on closing a socket");
353 }
354
355 void Channel::Shutdown () {
356     while (sock_count--)
357         CloseSocket(sock_open[sock_count].sock);
358 }
359
360 void     swift::SetTracker(const Address& tracker) {
361     Channel::tracker = tracker;
362 }
363
364 int Channel::DecodeID(int scrambled) {
365     return scrambled ^ (int)start;
366 }
367 int Channel::EncodeID(int unscrambled) {
368     return unscrambled ^ (int)start;
369 }
370
371 /*
372  * class Address implementation
373  */
374
375 void Address::set_ipv4 (const char* ip_str) {
376     struct hostent *h = gethostbyname(ip_str);
377     if (h == NULL) {
378         print_error("cannot lookup address");
379         return;
380     } else {
381         addr->dests[0].addr = *(u_long *) h->h_addr_list[0];
382     }
383 }
384
385
386 Address::Address(const char* ip_port) {
387     clear();
388     if (strlen(ip_port)>=1024)
389         return;
390     char ipp[1024];
391     strncpy(ipp,ip_port,1024);
392     char* semi = strchr(ipp,':');
393     if (semi) {
394         *semi = 0;
395         set_ipv4(ipp);
396         set_port(semi+1);
397     } else {
398         if (strchr(ipp, '.')) {
399             set_ipv4(ipp);
400             set_port((uint16_t)0);
401         } else {
402             set_ipv4((uint32_t)INADDR_ANY);
403             set_port(ipp);
404         }
405     }
406 }
407
408
409 uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
410
411
412 /*
413  * Utility methods 1
414  */
415
416
417 const char* swift::tintstr (tint time) {
418     if (time==0)
419         time = now_t::now;
420     static char ret_str[4][32]; // wow
421     static int i;
422     i = (i+1) & 3;
423     if (time==TINT_NEVER)
424         return "NEVER";
425     time -= Channel::epoch;
426     assert(time>=0);
427     int hours = time/TINT_HOUR;
428     time %= TINT_HOUR;
429     int mins = time/TINT_MIN;
430     time %= TINT_MIN;
431     int secs = time/TINT_SEC;
432     time %= TINT_SEC;
433     int msecs = time/TINT_MSEC;
434     time %= TINT_MSEC;
435     int usecs = time/TINT_uSEC;
436     sprintf(ret_str[i],"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
437     return ret_str[i];
438 }
439
440
441 std::string swift::sock2str (struct sockaddr_in addr) {
442     char ipch[32];
443 #ifdef _WIN32
444     //Vista only: InetNtop(AF_INET,&(addr.sin_addr),ipch,32);
445     // IPv4 only:
446     struct in_addr inaddr;
447     memcpy(&inaddr, &(addr.sin_addr), sizeof(inaddr));
448     strncpy(ipch, inet_ntoa(inaddr),32);
449 #else
450     inet_ntop(AF_INET,&(addr.sin_addr),ipch,32);
451 #endif
452     sprintf(ipch+strlen(ipch),":%i",ntohs(addr.sin_port));
453     return std::string(ipch);
454 }
455
456
457 /*
458  * Swift top-level API implementation
459  */
460
461 int     swift::Listen (Address addr) {
462     sckrwecb_t cb;
463     cb.may_read = &Channel::LibeventReceiveCallback;
464     cb.sock = Channel::Bind(addr,cb);
465     // swift UDP receive
466     event_assign(&Channel::evrecv, Channel::evbase, cb.sock, EV_READ,
467                  cb.may_read, NULL);
468     event_add(&Channel::evrecv, NULL);
469     return cb.sock;
470 }
471
472 void    swift::Shutdown (int sock_des) {
473     Channel::Shutdown();
474 }
475
476 int      swift::Open (const char* filename, const Sha1Hash& hash, Address tracker, bool force_check_diskvshash, bool check_netwvshash, uint32_t chunk_size) {
477     FileTransfer* ft = new FileTransfer(filename, hash, force_check_diskvshash, check_netwvshash, chunk_size);
478     if (ft && ft->file().file_descriptor()) {
479
480         /*if (FileTransfer::files.size()<fdes)  // FIXME duplication
481             FileTransfer::files.resize(fdes);
482         FileTransfer::files[fdes] = ft;*/
483
484         // initiate tracker connections
485         // SWIFTPROC
486         ft->SetTracker(tracker);
487         ft->ConnectToTracker();
488
489         return ft->file().file_descriptor();
490     } else {
491         if (ft)
492             delete ft;
493         return -1;
494     }
495 }
496
497
498 void    swift::Close (int fd) {
499     if (fd<FileTransfer::files.size() && FileTransfer::files[fd])
500         delete FileTransfer::files[fd];
501 }
502
503
504 void    swift::AddPeer (Address address, const Sha1Hash& root) {
505     Channel::peer_selector->AddPeer(address,root);
506 }
507
508
509 uint64_t  swift::Size (int fdes) {
510     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
511         return FileTransfer::files[fdes]->file().size();
512     else
513         return 0;
514 }
515
516
517 bool  swift::IsComplete (int fdes) {
518     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
519         return FileTransfer::files[fdes]->file().is_complete();
520     else
521         return 0;
522 }
523
524
525 uint64_t  swift::Complete (int fdes) {
526     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
527         return FileTransfer::files[fdes]->file().complete();
528     else
529         return 0;
530 }
531
532
533 uint64_t  swift::SeqComplete (int fdes) {
534     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
535         return FileTransfer::files[fdes]->file().seq_complete();
536     else
537         return 0;
538 }
539
540
541 const Sha1Hash& swift::RootMerkleHash (int file) {
542     FileTransfer* trans = FileTransfer::file(file);
543     if (!trans)
544         return Sha1Hash::ZERO;
545     return trans->file().root_hash();
546 }
547
548
549 /** Returns the number of bytes in a chunk for this transmission */
550 uint32_t          swift::ChunkSize(int fdes)
551 {
552     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
553         return FileTransfer::files[fdes]->file().chunk_size();
554     else
555         return 0;
556 }
557
558
559 // CHECKPOINT
560 void swift::Checkpoint(int transfer) {
561         // Save transfer's binmap for zero-hashcheck restart
562         FileTransfer *ft = FileTransfer::file(transfer);
563         if (ft == NULL)
564                 return;
565
566         std::string binmap_filename = ft->file().filename();
567         binmap_filename.append(".mbinmap");
568         //fprintf(stderr,"swift: checkpointing %s at %lli\n", binmap_filename.c_str(), Complete(transfer));
569         FILE *fp = fopen(binmap_filename.c_str(),"wb");
570         if (!fp) {
571                 print_error("cannot open mbinmap for writing");
572                 return;
573         }
574         if (ft->file().serialize(fp) < 0)
575                 print_error("writing to mbinmap");
576         fclose(fp);
577 }
578
579
580
581 /*
582  * Utility methods 2
583  */
584
585 int swift::evbuffer_add_string(struct evbuffer *evb, std::string str) {
586     return evbuffer_add(evb, str.c_str(), str.size());
587 }
588
589 int swift::evbuffer_add_8(struct evbuffer *evb, uint8_t b) {
590     return evbuffer_add(evb, &b, 1);
591 }
592
593 int swift::evbuffer_add_16be(struct evbuffer *evb, uint16_t w) {
594     uint16_t wbe = htons(w);
595     return evbuffer_add(evb, &wbe, 2);
596 }
597
598 int swift::evbuffer_add_32be(struct evbuffer *evb, uint32_t i) {
599     uint32_t ibe = htonl(i);
600     return evbuffer_add(evb, &ibe, 4);
601 }
602
603 int swift::evbuffer_add_64be(struct evbuffer *evb, uint64_t l) {
604     uint32_t lbe[2];
605     lbe[0] = htonl((uint32_t)(l>>32));
606     lbe[1] = htonl((uint32_t)(l&0xffffffff));
607     return evbuffer_add(evb, lbe, 8);
608 }
609
610 int swift::evbuffer_add_hash(struct evbuffer *evb, const Sha1Hash& hash)  {
611     return evbuffer_add(evb, hash.bits, Sha1Hash::SIZE);
612 }
613
614 uint8_t swift::evbuffer_remove_8(struct evbuffer *evb) {
615     uint8_t b;
616     if (evbuffer_remove(evb, &b, 1) < 1)
617         return 0;
618     return b;
619 }
620
621 uint16_t swift::evbuffer_remove_16be(struct evbuffer *evb) {
622     uint16_t wbe;
623     if (evbuffer_remove(evb, &wbe, 2) < 2)
624         return 0;
625     return ntohs(wbe);
626 }
627
628 uint32_t swift::evbuffer_remove_32be(struct evbuffer *evb) {
629     uint32_t ibe;
630     if (evbuffer_remove(evb, &ibe, 4) < 4)
631         return 0;
632     return ntohl(ibe);
633 }
634
635 uint64_t swift::evbuffer_remove_64be(struct evbuffer *evb) {
636     uint32_t lbe[2];
637     if (evbuffer_remove(evb, lbe, 8) < 8)
638         return 0;
639     uint64_t l = ntohl(lbe[0]);
640     l<<=32;
641     l |= ntohl(lbe[1]);
642     return l;
643 }
644
645 Sha1Hash swift::evbuffer_remove_hash(struct evbuffer* evb)  {
646     char bits[Sha1Hash::SIZE];
647     if (evbuffer_remove(evb, bits, Sha1Hash::SIZE) < Sha1Hash::SIZE)
648         return Sha1Hash::ZERO;
649     return Sha1Hash(false, bits);
650 }
651