35f443558da29184b3a3e212b73c7949b4d42210
[swifty.git] / src / libswift / channel.cpp
1 /*
2  *  channel.cpp
3  *  class representing a virtual connection to a peer. In addition,
4  *  it contains generic functions for socket management (see sock_open
5  *  class variable)
6  *
7  *  Created by Victor Grishchenko on 3/6/09.
8  *  Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
9  *
10  */
11
12 #include <cassert>
13 #include "compat.h"
14 //#include <glog/logging.h>
15 #include "swift.h"
16
17 using namespace std;
18 using namespace swift;
19
20 /*
21  * Class variables
22  */
23
24 swift::tint now_t::now = Channel::Time();
25 tint Channel::start = now_t::now;
26 tint Channel::epoch = now_t::now/360000000LL*360000000LL; // make logs mergeable
27 uint64_t Channel::global_dgrams_up=0, Channel::global_dgrams_down=0,
28          Channel::global_raw_bytes_up=0, Channel::global_raw_bytes_down=0,
29          Channel::global_bytes_up=0, Channel::global_bytes_down=0;
30 sckrwecb_t Channel::sock_open[] = {};
31 int Channel::sock_count = 0;
32 swift::tint Channel::last_tick = 0;
33 int Channel::MAX_REORDERING = 4;
34 bool Channel::SELF_CONN_OK = false;
35 swift::tint Channel::TIMEOUT = TINT_SEC*60;
36 std::vector<Channel*> Channel::channels(1);
37 Address Channel::tracker;
38 //tbheap Channel::send_queue;
39 FILE* Channel::debug_file = NULL;
40 #include "ext/simple_selector.cpp"
41 PeerSelector* Channel::peer_selector = new SimpleSelector();
42 tint Channel::MIN_PEX_REQUEST_INTERVAL = TINT_SEC;
43
44
45 /*
46  * Instance methods
47  */
48
49 Channel::Channel    (FileTransfer* transfer, int socket, Address peer_addr) :
50         // Arno, 2011-10-03: Reordered to avoid g++ Wall warning
51         peer_(peer_addr), socket_(socket==INVALID_SOCKET?default_socket():socket), // FIXME
52     transfer_(transfer), peer_channel_id_(0), own_id_mentioned_(false),
53     data_in_(TINT_NEVER,bin_t::NONE), data_in_dbl_(bin_t::NONE),
54     data_out_cap_(bin_t::ALL),hint_out_size_(0),
55     // Gertjan fix 996e21e8abfc7d88db3f3f8158f2a2c4fc8a8d3f
56     // "Changed PEX rate limiting to per channel limiting"
57     last_pex_request_time_(0), next_pex_request_time_(0),
58     pex_request_outstanding_(false), useless_pex_count_(0),
59     pex_requested_(false),  // Ric: init var that wasn't initialiazed
60     //
61     rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
62     last_send_time_(0), last_recv_time_(0), last_data_out_time_(0), last_data_in_time_(0),
63     last_loss_time_(0), next_send_time_(0), cwnd_(1), cwnd_count1_(0), send_interval_(TINT_SEC),
64     send_control_(PING_PONG_CONTROL), sent_since_recv_(0),
65     lastrecvwaskeepalive_(false), lastsendwaskeepalive_(false), // Arno: nap bug fix
66     ack_rcvd_recent_(0),
67     ack_not_rcvd_recent_(0), owd_min_bin_(0), owd_min_bin_start_(NOW),
68     owd_cur_bin_(0), dgrams_sent_(0), dgrams_rcvd_(0),
69     raw_bytes_up_(0), raw_bytes_down_(0), bytes_up_(0), bytes_down_(0),
70     scheduled4close_(false)
71 {
72     if (peer_==Address())
73         peer_ = tracker;
74     this->id_ = channels.size();
75     channels.push_back(this);
76     transfer_->hs_in_.push_back(bin_t(id_));
77     for(int i=0; i<4; i++) {
78         owd_min_bins_[i] = TINT_NEVER;
79         owd_current_[i] = TINT_NEVER;
80     }
81     evsend_ptr_ = new struct event;
82     evtimer_assign(evsend_ptr_,evbase,&Channel::LibeventSendCallback,this);
83     evtimer_add(evsend_ptr_,tint2tv(next_send_time_));
84
85     // RATELIMIT
86         transfer->mychannels_.insert(this);
87
88         dprintf("%s #%u init channel %s\n",tintstr(),id_,peer_.str());
89         //fprintf(stderr,"new Channel %d %s\n", id_, peer_.str() );
90 }
91
92
93 Channel::~Channel () {
94         dprintf("%s #%u dealloc channel\n",tintstr(),id_);
95     channels[id_] = NULL;
96     ClearEvents();
97
98     // RATELIMIT
99     if (transfer_ != NULL)
100         transfer_->mychannels_.erase(this);
101 }
102
103
104 void Channel::ClearEvents()
105 {
106     if (evsend_ptr_ != NULL) {
107         if (evtimer_pending(evsend_ptr_,NULL))
108                 evtimer_del(evsend_ptr_);
109         delete evsend_ptr_;
110         evsend_ptr_ = NULL;
111     }
112 }
113
114
115
116
117 bool Channel::IsComplete() {
118         // Check if peak hash bins are filled.
119         if (file().peak_count() == 0)
120                 return false;
121
122     for(int i=0; i<file().peak_count(); i++) {
123         bin_t peak = file().peak(i);
124         if (!ack_in_.is_filled(peak))
125             return false;
126     }
127         return true;
128 }
129
130
131
132 uint16_t Channel::GetMyPort() {
133         struct sockaddr_in mysin = {};
134         socklen_t mysinlen = sizeof(mysin);
135         if (getsockname(socket_, (struct sockaddr *)&mysin, &mysinlen) < 0)
136         {
137                 print_error("error on getsockname");
138                 return 0;
139         }
140         else
141                 return ntohs(mysin.sin_port);
142 }
143
144 bool Channel::IsDiffSenderOrDuplicate(Address addr, uint32_t chid)
145 {
146     if (peer() != addr)
147     {
148         // Got message from different address than I send to
149         //
150                 if (!own_id_mentioned_ && addr.is_private()) {
151                         // Arno, 2012-02-27: Got HANDSHAKE reply from IANA private address,
152                         // check for duplicate connections:
153                         //
154                         // When two peers A and B are behind the same firewall, they will get
155                         // extB, resp. extA addresses from the tracker. They will both
156                         // connect to their counterpart but because the incoming packet
157                         // will be from the intNAT address the duplicates are not
158                         // recognized.
159                         //
160                         // Solution: when the second datagram comes in (HANDSHAKE reply),
161                         // see if you have had a first datagram from the same addr
162                         // (HANDSHAKE). If so, close the channel if his port number is
163                         // larger than yours (such that one channel remains).
164                         //
165                         recv_peer_ = addr;
166
167                         Channel *c = transfer().FindChannel(addr,this);
168                         if (c != NULL) {
169                                 // I already initiated a connection to this peer,
170                                 // this new incoming message would establish a duplicate.
171                                 // One must break the connection, decide using port
172                                 // number:
173                                 dprintf("%s #%u found duplicate channel to %s\n",
174                                                 tintstr(),chid,addr.str());
175
176                                 if (addr.port() > GetMyPort()) {
177                                         //Schedule4Close();
178                                         dprintf("%s #%u closing duplicate channel to %s\n",
179                                                         tintstr(),chid,addr.str());
180                                         return true;
181                                 }
182                         }
183                 }
184                 else
185                 {
186                         // Received HANDSHAKE reply from other address than I sent
187                         // HANDSHAKE to, and the address is not an IANA private
188                         // address (=no NAT in play), so close.
189                         //Schedule4Close();
190                         dprintf("%s #%u invalid peer address %s!=%s\n",
191                                         tintstr(),chid,peer().str(),addr.str());
192                         return true;
193                 }
194     }
195         return false;
196 }
197
198
199
200
201
202
203
204 /*
205  * Class methods
206  */
207 tint Channel::Time () {
208     //HiResTimeOfDay* tod = HiResTimeOfDay::Instance();
209     //tint ret = tod->getTimeUSec();
210     //DLOG(INFO)<<"now is "<<ret;
211     return now_t::now = usec_time();
212 }
213
214 // SOCKMGMT
215 evutil_socket_t Channel::Bind (Address address, sckrwecb_t callbacks) {
216     struct sockaddr_in addr = address;
217     evutil_socket_t fd;
218     int len = sizeof(struct sockaddr_in), sndbuf=1<<20, rcvbuf=1<<20;
219     #define dbnd_ensure(x) { if (!(x)) { \
220         print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
221     dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 );
222     dbnd_ensure( make_socket_nonblocking(fd) );  // FIXME may remove this
223     int enable = true;
224     dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF,
225                              (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
226     dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
227                              (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
228     //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
229     dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 );
230
231     callbacks.sock = fd;
232     sock_open[sock_count++] = callbacks;
233     return fd;
234 }
235
236 Address Channel::BoundAddress(evutil_socket_t sock) {
237
238     struct sockaddr_in myaddr;
239     socklen_t mylen = sizeof(myaddr);
240     int ret = getsockname(sock,(sockaddr*)&myaddr,&mylen);
241     if (ret >= 0) {
242                 return Address(myaddr);
243     }
244         else {
245                 return Address();
246         }
247 }
248
249
250 Address swift::BoundAddress(evutil_socket_t sock) {
251         return Channel::BoundAddress(sock);
252 }
253
254
255 int Channel::SendTo (evutil_socket_t sock, const Address& addr, struct evbuffer *evb) {
256
257     int length = evbuffer_get_length(evb);
258     int r = sendto(sock,(const char *)evbuffer_pullup(evb, length),length,0,
259                    (struct sockaddr*)&(addr.addr),sizeof(struct sockaddr_in));
260     if (r<0) {
261         print_error("can't send");
262         evbuffer_drain(evb, length); // Arno: behaviour is to pretend the packet got lost
263     }
264     else
265         evbuffer_drain(evb,r);
266     global_dgrams_up++;
267     global_raw_bytes_up+=length;
268     Time();
269     return r;
270 }
271
272 int Channel::RecvFrom (evutil_socket_t sock, Address& addr, struct evbuffer *evb) {
273     socklen_t addrlen = sizeof(struct sockaddr_in);
274     struct evbuffer_iovec vec;
275     if (evbuffer_reserve_space(evb, SWIFT_MAX_RECV_DGRAM_SIZE, &vec, 1) < 0) {
276         print_error("error on evbuffer_reserve_space");
277         return 0;
278     }
279     int length = recvfrom (sock, (char *)vec.iov_base, SWIFT_MAX_RECV_DGRAM_SIZE, 0,
280                            (struct sockaddr*)&(addr.addr), &addrlen);
281     if (length<0) {
282         length = 0;
283
284         // Linux and Windows report "ICMP port unreachable" if the dest port could
285         // not be reached:
286         //    http://support.microsoft.com/kb/260018
287         //    http://www.faqs.org/faqs/unix-faq/socket/
288 #ifdef _WIN32
289         if (WSAGetLastError() == 10054) // Sometimes errno == 2 ?!
290 #else
291                 if (errno == ECONNREFUSED)
292 #endif
293                 {
294                 CloseChannelByAddress(addr);
295                 }
296         else
297                 print_error("error on recv");
298     }
299     vec.iov_len = length;
300     if (evbuffer_commit_space(evb, &vec, 1) < 0)  {
301         length = 0;
302         print_error("error on evbuffer_commit_space");
303     }
304     global_dgrams_down++;
305     global_raw_bytes_down+=length;
306     Time();
307     return length;
308 }
309
310
311 void Channel::CloseSocket(evutil_socket_t sock) {
312     for(int i=0; i<sock_count; i++)
313         if (sock_open[i].sock==sock)
314             sock_open[i] = sock_open[--sock_count];
315     if (!close_socket(sock))
316         print_error("on closing a socket");
317 }
318
319 void Channel::Shutdown () {
320     while (sock_count--)
321         CloseSocket(sock_open[sock_count].sock);
322 }
323
324 void     swift::SetTracker(const Address& tracker) {
325     Channel::tracker = tracker;
326 }
327
328 int Channel::DecodeID(int scrambled) {
329     return scrambled ^ (int)start;
330 }
331 int Channel::EncodeID(int unscrambled) {
332     return unscrambled ^ (int)start;
333 }
334
335 /*
336  * class Address implementation
337  */
338
339 void Address::set_ipv4 (const char* ip_str) {
340     struct hostent *h = gethostbyname(ip_str);
341     if (h == NULL) {
342         print_error("cannot lookup address");
343         return;
344     } else {
345         addr.sin_addr.s_addr = *(u_long *) h->h_addr_list[0];
346     }
347 }
348
349
350 Address::Address(const char* ip_port) {
351     clear();
352     if (strlen(ip_port)>=1024)
353         return;
354     char ipp[1024];
355     strncpy(ipp,ip_port,1024);
356     char* semi = strchr(ipp,':');
357     if (semi) {
358         *semi = 0;
359         set_ipv4(ipp);
360         set_port(semi+1);
361     } else {
362         if (strchr(ipp, '.')) {
363             set_ipv4(ipp);
364             set_port((uint16_t)0);
365         } else {
366             set_ipv4((uint32_t)INADDR_ANY);
367             set_port(ipp);
368         }
369     }
370 }
371
372
373 uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
374
375
376 /*
377  * Utility methods 1
378  */
379
380
381 const char* swift::tintstr (tint time) {
382     if (time==0)
383         time = now_t::now;
384     static char ret_str[4][32]; // wow
385     static int i;
386     i = (i+1) & 3;
387     if (time==TINT_NEVER)
388         return "NEVER";
389     time -= Channel::epoch;
390     assert(time>=0);
391     int hours = time/TINT_HOUR;
392     time %= TINT_HOUR;
393     int mins = time/TINT_MIN;
394     time %= TINT_MIN;
395     int secs = time/TINT_SEC;
396     time %= TINT_SEC;
397     int msecs = time/TINT_MSEC;
398     time %= TINT_MSEC;
399     int usecs = time/TINT_uSEC;
400     sprintf(ret_str[i],"%i_%02i_%02i_%03i_%03i",hours,mins,secs,msecs,usecs);
401     return ret_str[i];
402 }
403
404
405 std::string swift::sock2str (struct sockaddr_in addr) {
406     char ipch[32];
407 #ifdef _WIN32
408     //Vista only: InetNtop(AF_INET,&(addr.sin_addr),ipch,32);
409     // IPv4 only:
410     struct in_addr inaddr;
411     memcpy(&inaddr, &(addr.sin_addr), sizeof(inaddr));
412     strncpy(ipch, inet_ntoa(inaddr),32);
413 #else
414     inet_ntop(AF_INET,&(addr.sin_addr),ipch,32);
415 #endif
416     sprintf(ipch+strlen(ipch),":%i",ntohs(addr.sin_port));
417     return std::string(ipch);
418 }
419
420
421 /*
422  * Swift top-level API implementation
423  */
424
425 int     swift::Listen (Address addr) {
426     sckrwecb_t cb;
427     cb.may_read = &Channel::LibeventReceiveCallback;
428     cb.sock = Channel::Bind(addr,cb);
429     // swift UDP receive
430     event_assign(&Channel::evrecv, Channel::evbase, cb.sock, EV_READ,
431                  cb.may_read, NULL);
432     event_add(&Channel::evrecv, NULL);
433     return cb.sock;
434 }
435
436 void    swift::Shutdown (int sock_des) {
437     Channel::Shutdown();
438 }
439
440 int      swift::Open (const char* filename, const Sha1Hash& hash, Address tracker, bool force_check_diskvshash, bool check_netwvshash, uint32_t chunk_size) {
441     FileTransfer* ft = new FileTransfer(filename, hash, force_check_diskvshash, check_netwvshash, chunk_size);
442     if (ft && ft->file().file_descriptor()) {
443
444         /*if (FileTransfer::files.size()<fdes)  // FIXME duplication
445             FileTransfer::files.resize(fdes);
446         FileTransfer::files[fdes] = ft;*/
447
448         // initiate tracker connections
449         // SWIFTPROC
450         ft->SetTracker(tracker);
451         ft->ConnectToTracker();
452
453         return ft->file().file_descriptor();
454     } else {
455         if (ft)
456             delete ft;
457         return -1;
458     }
459 }
460
461
462 void    swift::Close (int fd) {
463     if (fd<FileTransfer::files.size() && FileTransfer::files[fd])
464         delete FileTransfer::files[fd];
465 }
466
467
468 void    swift::AddPeer (Address address, const Sha1Hash& root) {
469     Channel::peer_selector->AddPeer(address,root);
470 }
471
472
473 uint64_t  swift::Size (int fdes) {
474     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
475         return FileTransfer::files[fdes]->file().size();
476     else
477         return 0;
478 }
479
480
481 bool  swift::IsComplete (int fdes) {
482     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
483         return FileTransfer::files[fdes]->file().is_complete();
484     else
485         return 0;
486 }
487
488
489 uint64_t  swift::Complete (int fdes) {
490     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
491         return FileTransfer::files[fdes]->file().complete();
492     else
493         return 0;
494 }
495
496
497 uint64_t  swift::SeqComplete (int fdes) {
498     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
499         return FileTransfer::files[fdes]->file().seq_complete();
500     else
501         return 0;
502 }
503
504
505 const Sha1Hash& swift::RootMerkleHash (int file) {
506     FileTransfer* trans = FileTransfer::file(file);
507     if (!trans)
508         return Sha1Hash::ZERO;
509     return trans->file().root_hash();
510 }
511
512
513 /** Returns the number of bytes in a chunk for this transmission */
514 uint32_t          swift::ChunkSize(int fdes)
515 {
516     if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
517         return FileTransfer::files[fdes]->file().chunk_size();
518     else
519         return 0;
520 }
521
522
523 // CHECKPOINT
524 void swift::Checkpoint(int transfer) {
525         // Save transfer's binmap for zero-hashcheck restart
526         FileTransfer *ft = FileTransfer::file(transfer);
527         if (ft == NULL)
528                 return;
529
530         std::string binmap_filename = ft->file().filename();
531         binmap_filename.append(".mbinmap");
532         //fprintf(stderr,"swift: checkpointing %s at %lli\n", binmap_filename.c_str(), Complete(transfer));
533         FILE *fp = fopen(binmap_filename.c_str(),"wb");
534         if (!fp) {
535                 print_error("cannot open mbinmap for writing");
536                 return;
537         }
538         if (ft->file().serialize(fp) < 0)
539                 print_error("writing to mbinmap");
540         fclose(fp);
541 }
542
543
544
545 /*
546  * Utility methods 2
547  */
548
549 int swift::evbuffer_add_string(struct evbuffer *evb, std::string str) {
550     return evbuffer_add(evb, str.c_str(), str.size());
551 }
552
553 int swift::evbuffer_add_8(struct evbuffer *evb, uint8_t b) {
554     return evbuffer_add(evb, &b, 1);
555 }
556
557 int swift::evbuffer_add_16be(struct evbuffer *evb, uint16_t w) {
558     uint16_t wbe = htons(w);
559     return evbuffer_add(evb, &wbe, 2);
560 }
561
562 int swift::evbuffer_add_32be(struct evbuffer *evb, uint32_t i) {
563     uint32_t ibe = htonl(i);
564     return evbuffer_add(evb, &ibe, 4);
565 }
566
567 int swift::evbuffer_add_64be(struct evbuffer *evb, uint64_t l) {
568     uint32_t lbe[2];
569     lbe[0] = htonl((uint32_t)(l>>32));
570     lbe[1] = htonl((uint32_t)(l&0xffffffff));
571     return evbuffer_add(evb, lbe, 8);
572 }
573
574 int swift::evbuffer_add_hash(struct evbuffer *evb, const Sha1Hash& hash)  {
575     return evbuffer_add(evb, hash.bits, Sha1Hash::SIZE);
576 }
577
578 uint8_t swift::evbuffer_remove_8(struct evbuffer *evb) {
579     uint8_t b;
580     if (evbuffer_remove(evb, &b, 1) < 1)
581         return 0;
582     return b;
583 }
584
585 uint16_t swift::evbuffer_remove_16be(struct evbuffer *evb) {
586     uint16_t wbe;
587     if (evbuffer_remove(evb, &wbe, 2) < 2)
588         return 0;
589     return ntohs(wbe);
590 }
591
592 uint32_t swift::evbuffer_remove_32be(struct evbuffer *evb) {
593     uint32_t ibe;
594     if (evbuffer_remove(evb, &ibe, 4) < 4)
595         return 0;
596     return ntohl(ibe);
597 }
598
599 uint64_t swift::evbuffer_remove_64be(struct evbuffer *evb) {
600     uint32_t lbe[2];
601     if (evbuffer_remove(evb, lbe, 8) < 8)
602         return 0;
603     uint64_t l = ntohl(lbe[0]);
604     l<<=32;
605     l |= ntohl(lbe[1]);
606     return l;
607 }
608
609 Sha1Hash swift::evbuffer_remove_hash(struct evbuffer* evb)  {
610     char bits[Sha1Hash::SIZE];
611     if (evbuffer_remove(evb, bits, Sha1Hash::SIZE) < Sha1Hash::SIZE)
612         return Sha1Hash::ZERO;
613     return Sha1Hash(false, bits);
614 }
615