Use Linux-like indentation in mptp.c
[swifty.git] / src / libswift / channel.cpp
1 /*
2  *  channel.cpp
3  *  class representing a virtual connection to a peer. In addition,
4  *  it contains generic functions for socket management (see sock_open
5  *  class variable)
6  *
7  *  Created by Victor Grishchenko on 3/6/09.
8  *  Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
9  *
10  */
11
12 #include <cassert>
13 #include "compat.h"
14 //#include <glog/logging.h>
15 #include "swift.h"
16 #include "../kernel/mptp.h"
17
18 using namespace std;
19 using namespace swift;
20
21 /*
22  * Class variables
23  */
24
25 swift::tint now_t::now = Channel::Time();
26 tint Channel::start = now_t::now;
27 tint Channel::epoch = now_t::now/360000000LL*360000000LL; // make logs mergeable
28 uint64_t Channel::global_dgrams_up=0, Channel::global_dgrams_down=0,
29          Channel::global_raw_bytes_up=0, Channel::global_raw_bytes_down=0,
30          Channel::global_bytes_up=0, Channel::global_bytes_down=0,
31                  Channel::global_buffers_up=0, Channel::global_syscalls_up=0,
32                  Channel::global_buffers_down=0, Channel::global_syscalls_down=0;
33 sckrwecb_t Channel::sock_open[] = {};
34 int Channel::sock_count = 0;
35 swift::tint Channel::last_tick = 0;
36 int Channel::MAX_REORDERING = 4;
37 bool Channel::SELF_CONN_OK = false;
38 swift::tint Channel::TIMEOUT = TINT_SEC*60;
39 std::vector<Channel*> Channel::channels(1);
40 Address Channel::tracker;
41 //tbheap Channel::send_queue;
42 FILE* Channel::debug_file = NULL;
43 #include "ext/simple_selector.cpp"
44 PeerSelector* Channel::peer_selector = new SimpleSelector();
45 tint Channel::MIN_PEX_REQUEST_INTERVAL = TINT_SEC;
46
47
48 /*
49  * Instance methods
50  */
51
52 Channel::Channel    (FileTransfer* transfer, int socket, Address peer_addr) :
53         // Arno, 2011-10-03: Reordered to avoid g++ Wall warning
54         peer_(peer_addr), socket_(socket==INVALID_SOCKET?default_socket():socket), // FIXME
55     transfer_(transfer), peer_channel_id_(0), own_id_mentioned_(false),
56     data_in_(TINT_NEVER,bin_t::NONE), data_in_dbl_(bin_t::NONE),
57     data_out_cap_(bin_t::ALL),hint_out_size_(0),
58     // Gertjan fix 996e21e8abfc7d88db3f3f8158f2a2c4fc8a8d3f
59     // "Changed PEX rate limiting to per channel limiting"
60     last_pex_request_time_(0), next_pex_request_time_(0),
61     pex_request_outstanding_(false), useless_pex_count_(0),
62     pex_requested_(false),  // Ric: init var that wasn't initialiazed
63     //
64     rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
65     last_send_time_(0), last_recv_time_(0), last_data_out_time_(0), last_data_in_time_(0),
66     last_loss_time_(0), next_send_time_(0), cwnd_(1), cwnd_count1_(0), send_interval_(TINT_SEC),
67     send_control_(PING_PONG_CONTROL), sent_since_recv_(0),
68     lastrecvwaskeepalive_(false), lastsendwaskeepalive_(false), // Arno: nap bug fix
69     ack_rcvd_recent_(0),
70     ack_not_rcvd_recent_(0), owd_min_bin_(0), owd_min_bin_start_(NOW),
71     owd_cur_bin_(0), dgrams_sent_(0), dgrams_rcvd_(0),
72     raw_bytes_up_(0), raw_bytes_down_(0), bytes_up_(0), bytes_down_(0),
73     scheduled4close_(false)
74 {
75     if (peer_==Address())
76         peer_ = tracker;
77     this->id_ = channels.size();
78     channels.push_back(this);
79     transfer_->hs_in_.push_back(bin_t(id_));
80     for(int i=0; i<4; i++) {
81         owd_min_bins_[i] = TINT_NEVER;
82         owd_current_[i] = TINT_NEVER;
83     }
84     evsend_ptr_ = new struct event;
85     evtimer_assign(evsend_ptr_,evbase,&Channel::LibeventSendCallback,this);
86     evtimer_add(evsend_ptr_,tint2tv(next_send_time_));
87
88     // RATELIMIT
89         transfer->mychannels_.insert(this);
90
91         dprintf("%s #%u init channel %s\n",tintstr(),id_,peer_.str());
92         //fprintf(stderr,"new Channel %d %s\n", id_, peer_.str() );
93 }
94
95
96 Channel::~Channel () {
97         dprintf("%s #%u dealloc channel\n",tintstr(),id_);
98     channels[id_] = NULL;
99     ClearEvents();
100
101     // RATELIMIT
102     if (transfer_ != NULL)
103         transfer_->mychannels_.erase(this);
104 }
105
106
107 void Channel::ClearEvents()
108 {
109     if (evsend_ptr_ != NULL) {
110         if (evtimer_pending(evsend_ptr_,NULL))
111                 evtimer_del(evsend_ptr_);
112         delete evsend_ptr_;
113         evsend_ptr_ = NULL;
114     }
115 }
116
117
118
119
120 bool Channel::IsComplete() {
121         // Check if peak hash bins are filled.
122         if (file().peak_count() == 0)
123                 return false;
124
125     for(int i=0; i<file().peak_count(); i++) {
126         bin_t peak = file().peak(i);
127         if (!ack_in_.is_filled(peak))
128             return false;
129     }
130         return true;
131 }
132
133
134
135 uint16_t Channel::GetMyPort() {
136         struct sockaddr_in mysin = {};
137         socklen_t mysinlen = sizeof(mysin);
138         if (getsockname(socket_, (struct sockaddr *)&mysin, &mysinlen) < 0)
139         {
140                 print_error("error on getsockname");
141                 return 0;
142         }
143         else
144                 return ntohs(mysin.sin_port);
145 }
146
147 bool Channel::IsDiffSenderOrDuplicate(Address addr, uint32_t chid)
148 {
149     if (peer() != addr)
150     {
151         // Got message from different address than I send to
152         //
153                 if (!own_id_mentioned_ && addr.is_private()) {
154                         // Arno, 2012-02-27: Got HANDSHAKE reply from IANA private address,
155                         // check for duplicate connections:
156                         //
157                         // When two peers A and B are behind the same firewall, they will get
158                         // extB, resp. extA addresses from the tracker. They will both
159                         // connect to their counterpart but because the incoming packet
160                         // will be from the intNAT address the duplicates are not
161                         // recognized.
162                         //
163                         // Solution: when the second datagram comes in (HANDSHAKE reply),
164                         // see if you have had a first datagram from the same addr
165                         // (HANDSHAKE). If so, close the channel if his port number is
166                         // larger than yours (such that one channel remains).
167                         //
168                         recv_peer_ = addr;
169
170                         Channel *c = transfer().FindChannel(addr,this);
171                         if (c != NULL) {
172                                 // I already initiated a connection to this peer,
173                                 // this new incoming message would establish a duplicate.
174                                 // One must break the connection, decide using port
175                                 // number:
176                                 dprintf("%s #%u found duplicate channel to %s\n",
177                                                 tintstr(),chid,addr.str());
178
179                                 if (addr.port() > GetMyPort()) {
180                                         //Schedule4Close();
181                                         dprintf("%s #%u closing duplicate channel to %s\n",
182                                                         tintstr(),chid,addr.str());
183                                         return true;
184                                 }
185                         }
186                 }
187                 else
188                 {
189                         // Received HANDSHAKE reply from other address than I sent
190                         // HANDSHAKE to, and the address is not an IANA private
191                         // address (=no NAT in play), so close.
192                         //Schedule4Close();
193                         dprintf("%s #%u invalid peer address %s!=%s\n",
194                                         tintstr(),chid,peer().str(),addr.str());
195                         return true;
196                 }
197     }
198         return false;
199 }
200
201
202
203
204
205
206
207 /*
208  * Class methods
209  */
210 tint Channel::Time () {
211     //HiResTimeOfDay* tod = HiResTimeOfDay::Instance();
212     //tint ret = tod->getTimeUSec();
213     //DLOG(INFO)<<"now is "<<ret;
214     return now_t::now = usec_time();
215 }
216
217 // SOCKMGMT
218 evutil_socket_t Channel::Bind (Address address, sckrwecb_t callbacks) {
219     struct sockaddr_mptp *addr = address.addr;
220     evutil_socket_t fd;
221     int len = sizeof(struct sockaddr_mptp) + addr->count*sizeof(struct mptp_dest), sndbuf=1<<20, rcvbuf=1<<20;
222     #define dbnd_ensure(x) { if (!(x)) { \
223         print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
224     dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_MPTP)) >= 0 );
225     dbnd_ensure( make_socket_nonblocking(fd) );  // FIXME may remove this
226     int enable = true;
227     dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF,
228                              (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
229     dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
230                              (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
231     //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
232     dbnd_ensure ( ::bind(fd, (sockaddr*)addr, len) == 0 );
233
234     callbacks.sock = fd;
235     sock_open[sock_count++] = callbacks;
236     return fd;
237 }
238
239 Address Channel::BoundAddress(evutil_socket_t sock) {
240
241     struct sockaddr_in myaddr;
242     socklen_t mylen = sizeof(myaddr);
243     int ret = getsockname(sock,(sockaddr*)&myaddr,&mylen);
244     if (ret >= 0) {
245                 return Address(myaddr);
246     }
247         else {
248                 return Address();
249         }
250 }
251
252
253 Address swift::BoundAddress(evutil_socket_t sock) {
254         return Channel::BoundAddress(sock);
255 }
256
257
258 int Channel::SendTo (evutil_socket_t sock, const Address& addr, struct evbuffer **evb) {
259
260         int count = addr.addr->count;
261         int addr_len = sizeof(struct sockaddr_mptp) + count * sizeof(struct mptp_dest);
262         struct iovec iov[count];
263         int lengths[count];
264         struct msghdr msg;
265         memset(&msg, 0, sizeof(msg));
266         memset(&iov, 0, sizeof(iov));
267         for (int i=0; i<count; ++i) {
268                 lengths[i] = evbuffer_get_length(evb[i]);
269                 iov[i].iov_base = evbuffer_pullup(evb[i], lengths[i]);
270                 iov[i].iov_len = lengths[i];
271         }
272         msg.msg_iov = iov;
273         msg.msg_iovlen = count;
274         msg.msg_name = addr.addr;
275         msg.msg_namelen = addr_len;
276         int r = sendmsg(sock, &msg, 0);
277     if (r<0) {
278         print_error("can't send");
279                 for (int i=0; i<count; ++i)
280                 evbuffer_drain(evb[i], lengths[i]); // Arno: behaviour is to pretend the packet got lost
281     }
282     else
283                 for (int i=0; i<count; ++i)
284                         evbuffer_drain(evb[i], addr.addr->dests[i].bytes);
285     global_dgrams_up+=count;
286         global_buffers_up+=count;
287         global_syscalls_up++;
288         for (int i=0; i<count; ++i)
289                 global_raw_bytes_up+=lengths[i];
290     Time();
291     return r;
292 }
293
294 int Channel::RecvFrom (evutil_socket_t sock, Address& addr, struct evbuffer **evb) {
295         int count = addr.addr->count;
296     socklen_t addrlen = sizeof(struct sockaddr_mptp) + count * sizeof(mptp_dest);
297     struct evbuffer_iovec vec[count];
298         for (int i=0; i<count; ++i) {
299                 if (evbuffer_reserve_space(evb[i], SWIFT_MAX_RECV_DGRAM_SIZE, &vec[i], 1) < 0) {
300                         print_error("error on evbuffer_reserve_space");
301                         return 0;
302                 }
303         }
304         struct iovec iov[count];
305         struct msghdr msg;
306         memset(&msg, 0, sizeof(msg));
307         memset(&iov, 0, sizeof(iov));
308         for (int i=0; i<count; ++i) {
309                 iov[i].iov_base = vec[i].iov_base;
310                 iov[i].iov_len = SWIFT_MAX_RECV_DGRAM_SIZE;
311         }
312         msg.msg_iov = iov;
313         msg.msg_iovlen = count;
314         msg.msg_name = addr.addr;
315         msg.msg_namelen = addrlen;
316         int length = recvmsg(sock, &msg, 0);
317     if (length<0) {
318         length = 0;
319
320         // Linux and Windows report "ICMP port unreachable" if the dest port could
321         // not be reached:
322         //    http://support.microsoft.com/kb/260018
323         //    http://www.faqs.org/faqs/unix-faq/socket/
324 #ifdef _WIN32
325         if (WSAGetLastError() == 10054) // Sometimes errno == 2 ?!
326 #else
327                 if (errno == ECONNREFUSED)
328 #endif
329                 {
330                 CloseChannelByAddress(addr);
331                 }
332         else
333                 print_error("error on recv");
334     }
335         length = 0;
336         for (int i=0; i<addr.addr->count; ++i) {
337                 length += addr.addr->dests[i].bytes;
338                 vec[i].iov_len = addr.addr->dests[i].bytes;
339                 if (evbuffer_commit_space(evb[i], &vec[i], 1) < 0)  {
340                         length = 0;
341                         print_error("error on evbuffer_commit_space");
342                 }
343         }
344     global_dgrams_down+=addr.addr->count;
345         global_buffers_down+=addr.addr->count;
346         global_syscalls_down++;
347     global_raw_bytes_down+=length;
348     Time();
349     return length;
350 }
351
352
353 void Channel::CloseSocket(evutil_socket_t sock) {
354     for(int i=0; i<sock_count; i++)
355         if (sock_open[i].sock==sock)
356             sock_open[i] = sock_open[--sock_count];
357     if (!close_socket(sock))
358         print_error("on closing a socket");
359 }
360
361 void Channel::Shutdown () {
362     while (sock_count--)
363         CloseSocket(sock_open[sock_count].sock);
364 }
365
366 void     swift::SetTracker(const Address& tracker) {
367     Channel::tracker = tracker;
368 }
369
370 int Channel::DecodeID(int scrambled) {
371     return scrambled ^ (int)start;
372 }
373 int Channel::EncodeID(int unscrambled) {
374     return unscrambled ^ (int)start;
375 }
376
377 /*
378  * class Address implementation
379  */
380
381 void Address::set_ipv4 (const char* ip_str) {
382     struct hostent *h = gethostbyname(ip_str);
383     if (h == NULL) {
384         print_error("cannot lookup address");
385         return;
386     } else {
387         addr->dests[0].addr = *(u_long *) h->h_addr_list[0];
388     }
389 }
390
391
392 Address::Address(const char* ip_port) {
393     clear();
394     if (strlen(ip_port)>=1024)
395         return;
396     char ipp[1024];
397     strncpy(ipp,ip_port,1024);
398     char* semi = strchr(ipp,':');
399     if (semi) {
400         *semi = 0;
401         set_ipv4(ipp);
402         set_port(semi+1);
403     } else {
404         if (strchr(ipp, '.')) {
405             set_ipv4(ipp);
406             set_port((uint16_t)0);
407         } else {
408             set_ipv4((uint32_t)INADDR_ANY);
409             set_port(ipp);
410         }
411     }
412 }
413
414
415 uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
416
417
418 /*
419  * Utility methods 1
420  */
421
422
423 const char* swift::tintstr (tint time) {
424     if (time==0)
425         time = now_t::now;
426     static char ret_str[4][32]; // wow
427     static int i;
428     i = (i+1) & 3;
429     if (time==TINT_NEVER)
430         return "NEVER";
431     time -= Channel::epoch;
432     assert(time>=0);
433     int hours = time/TINT_HOUR;
434     time %= TINT_HOUR;
435     int mins = time/TINT_MIN;
436     time %= TINT_MIN;
437     int secs = time/TINT_SEC;
438     time %= TINT_SEC;
439     int msecs = time/TINT_MSEC;
440     time %= TINT_MSEC;
441     int usecs = time/TINT_uSEC;
442     sprintf(ret_str[i],"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
443     return ret_str[i];
444 }
445
446
447 std::string swift::sock2str (struct sockaddr_in addr) {
448     char ipch[32];
449 #ifdef _WIN32
450     //Vista only: InetNtop(AF_INET,&(addr.sin_addr),ipch,32);
451     // IPv4 only:
452     struct in_addr inaddr;
453     memcpy(&inaddr, &(addr.sin_addr), sizeof(inaddr));
454     strncpy(ipch, inet_ntoa(inaddr),32);
455 #else
456     inet_ntop(AF_INET,&(addr.sin_addr),ipch,32);
457 #endif
458     sprintf(ipch+strlen(ipch),":%i",ntohs(addr.sin_port));
459     return std::string(ipch);
460 }
461
462
463 /*
464  * Swift top-level API implementation
465  */
466
467 int     swift::Listen (Address addr) {
468     sckrwecb_t cb;
469     cb.may_read = &Channel::LibeventReceiveCallback;
470     cb.sock = Channel::Bind(addr,cb);
471     // swift UDP receive
472     event_assign(&Channel::evrecv, Channel::evbase, cb.sock, EV_READ,
473                  cb.may_read, NULL);
474     event_add(&Channel::evrecv, NULL);
475     return cb.sock;
476 }
477
478 void    swift::Shutdown (int sock_des) {
479     Channel::Shutdown();
480 }
481
482 int      swift::Open (const char* filename, const Sha1Hash& hash, Address tracker, bool force_check_diskvshash, bool check_netwvshash, uint32_t chunk_size) {
483     FileTransfer* ft = new FileTransfer(filename, hash, force_check_diskvshash, check_netwvshash, chunk_size);
484     if (ft && ft->file().file_descriptor()) {
485
486         /*if (FileTransfer::files.size()<fdes)  // FIXME duplication
487             FileTransfer::files.resize(fdes);
488         FileTransfer::files[fdes] = ft;*/
489
490         // initiate tracker connections
491         // SWIFTPROC
492         ft->SetTracker(tracker);
493         ft->ConnectToTracker();
494
495         return ft->file().file_descriptor();
496     } else {
497         if (ft)
498             delete ft;
499         return -1;
500     }
501 }
502
503
504 void    swift::Close (int fd) {
505     if (fd<FileTransfer::files.size() && FileTransfer::files[fd])
506         delete FileTransfer::files[fd];
507 }
508
509
510 void    swift::AddPeer (Address address, const Sha1Hash& root) {
511     Channel::peer_selector->AddPeer(address,root);
512 }
513
514
515 uint64_t  swift::Size (int fdes) {
516     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
517         return FileTransfer::files[fdes]->file().size();
518     else
519         return 0;
520 }
521
522
523 bool  swift::IsComplete (int fdes) {
524     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
525         return FileTransfer::files[fdes]->file().is_complete();
526     else
527         return 0;
528 }
529
530
531 uint64_t  swift::Complete (int fdes) {
532     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
533         return FileTransfer::files[fdes]->file().complete();
534     else
535         return 0;
536 }
537
538
539 uint64_t  swift::SeqComplete (int fdes) {
540     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
541         return FileTransfer::files[fdes]->file().seq_complete();
542     else
543         return 0;
544 }
545
546
547 const Sha1Hash& swift::RootMerkleHash (int file) {
548     FileTransfer* trans = FileTransfer::file(file);
549     if (!trans)
550         return Sha1Hash::ZERO;
551     return trans->file().root_hash();
552 }
553
554
555 /** Returns the number of bytes in a chunk for this transmission */
556 uint32_t          swift::ChunkSize(int fdes)
557 {
558     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
559         return FileTransfer::files[fdes]->file().chunk_size();
560     else
561         return 0;
562 }
563
564
565 // CHECKPOINT
566 void swift::Checkpoint(int transfer) {
567         // Save transfer's binmap for zero-hashcheck restart
568         FileTransfer *ft = FileTransfer::file(transfer);
569         if (ft == NULL)
570                 return;
571
572         std::string binmap_filename = ft->file().filename();
573         binmap_filename.append(".mbinmap");
574         //fprintf(stderr,"swift: checkpointing %s at %lli\n", binmap_filename.c_str(), Complete(transfer));
575         FILE *fp = fopen(binmap_filename.c_str(),"wb");
576         if (!fp) {
577                 print_error("cannot open mbinmap for writing");
578                 return;
579         }
580         if (ft->file().serialize(fp) < 0)
581                 print_error("writing to mbinmap");
582         fclose(fp);
583 }
584
585
586
587 /*
588  * Utility methods 2
589  */
590
591 int swift::evbuffer_add_string(struct evbuffer *evb, std::string str) {
592     return evbuffer_add(evb, str.c_str(), str.size());
593 }
594
595 int swift::evbuffer_add_8(struct evbuffer *evb, uint8_t b) {
596     return evbuffer_add(evb, &b, 1);
597 }
598
599 int swift::evbuffer_add_16be(struct evbuffer *evb, uint16_t w) {
600     uint16_t wbe = htons(w);
601     return evbuffer_add(evb, &wbe, 2);
602 }
603
604 int swift::evbuffer_add_32be(struct evbuffer *evb, uint32_t i) {
605     uint32_t ibe = htonl(i);
606     return evbuffer_add(evb, &ibe, 4);
607 }
608
609 int swift::evbuffer_add_64be(struct evbuffer *evb, uint64_t l) {
610     uint32_t lbe[2];
611     lbe[0] = htonl((uint32_t)(l>>32));
612     lbe[1] = htonl((uint32_t)(l&0xffffffff));
613     return evbuffer_add(evb, lbe, 8);
614 }
615
616 int swift::evbuffer_add_hash(struct evbuffer *evb, const Sha1Hash& hash)  {
617     return evbuffer_add(evb, hash.bits, Sha1Hash::SIZE);
618 }
619
620 uint8_t swift::evbuffer_remove_8(struct evbuffer *evb) {
621     uint8_t b;
622     if (evbuffer_remove(evb, &b, 1) < 1)
623         return 0;
624     return b;
625 }
626
627 uint16_t swift::evbuffer_remove_16be(struct evbuffer *evb) {
628     uint16_t wbe;
629     if (evbuffer_remove(evb, &wbe, 2) < 2)
630         return 0;
631     return ntohs(wbe);
632 }
633
634 uint32_t swift::evbuffer_remove_32be(struct evbuffer *evb) {
635     uint32_t ibe;
636     if (evbuffer_remove(evb, &ibe, 4) < 4)
637         return 0;
638     return ntohl(ibe);
639 }
640
641 uint64_t swift::evbuffer_remove_64be(struct evbuffer *evb) {
642     uint32_t lbe[2];
643     if (evbuffer_remove(evb, lbe, 8) < 8)
644         return 0;
645     uint64_t l = ntohl(lbe[0]);
646     l<<=32;
647     l |= ntohl(lbe[1]);
648     return l;
649 }
650
651 Sha1Hash swift::evbuffer_remove_hash(struct evbuffer* evb)  {
652     char bits[Sha1Hash::SIZE];
653     if (evbuffer_remove(evb, bits, Sha1Hash::SIZE) < Sha1Hash::SIZE)
654         return Sha1Hash::ZERO;
655     return Sha1Hash(false, bits);
656 }
657