3fac3904ed94e262d96e1963d90d8b7f7031d5f7
[swifty.git] / src / libswift / swift.h
1 /*
2  *  swift.h
3  *  the main header file for libswift, normally you should only read this one
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
7  *
8  */
9 /*
10
11   The swift protocol
12
13   Messages
14
15   HANDSHAKE    00, channelid
16   Communicates the channel id of the sender. The
17   initial handshake packet also has the root hash
18   (a HASH message).
19
20   DATA        01, bin_32, buffer
21   1K of data.
22
23   ACK        02, bin_32, timestamp_32
24   HAVE       03, bin_32
25   Confirms successfull delivery of data. Used for
26   congestion control, as well.
27
28   HINT        08, bin_32
29   Practical value of "hints" is to avoid overlap, mostly.
30   Hints might be lost in the network or ignored.
31   Peer might send out data without a hint.
32   Hint which was not responded (by DATA) in some RTTs
33   is considered to be ignored.
34   As peers cant pick randomly kilobyte here and there,
35   they send out "long hints" for non-base bins.
36
37   HASH        04, bin_32, sha1hash
38   SHA1 hash tree hashes for data verification. The
39   connection to a fresh peer starts with bootstrapping
40   him with peak hashes. Later, before sending out
41   any data, a peer sends the necessary uncle hashes.
42
43   PEX+/PEX-    05/06, ipv4 addr, port
44   Peer exchange messages; reports all connected and
45   disconected peers. Might has special meaning (as
46   in the case with swarm supervisors).
47
48 */
49 #ifndef SWIFT_H
50 #define SWIFT_H
51
52 #include <deque>
53 #include <vector>
54 #include <set>
55 #include <algorithm>
56 #include <string>
57 #include <map>
58 #include <math.h>
59
60 #include "compat.h"
61 #include <event2/event.h>
62 #include <event2/event_struct.h>
63 #include <event2/buffer.h>
64 #include "bin.h"
65 #include "binmap.h"
66 #include "hashtree.h"
67 #include "avgspeed.h"
68 #include "availability.h"
69 #include "../kernel/mptp.h"
70
71 namespace swift {
72
73 #define SWIFT_MAX_UDP_OVER_ETH_PAYLOAD          (1500-20-8)
74 // Arno: Maximum size of non-DATA messages in a UDP packet we send.
75 #define SWIFT_MAX_NONDATA_DGRAM_SIZE            (SWIFT_MAX_UDP_OVER_ETH_PAYLOAD-SWIFT_DEFAULT_CHUNK_SIZE-1-4)
76 // Arno: Maximum size of a UDP packet we send. Note: depends on CHUNKSIZE 8192
77 #define SWIFT_MAX_SEND_DGRAM_SIZE                       (SWIFT_MAX_NONDATA_DGRAM_SIZE+1+4+8192)
78 // Arno: Maximum size of a UDP packet we are willing to accept. Note: depends on CHUNKSIZE 8192
79 #define SWIFT_MAX_RECV_DGRAM_SIZE                       (SWIFT_MAX_SEND_DGRAM_SIZE*2)
80
81 #define layer2bytes(ln,cs)      (uint64_t)( ((double)cs)*pow(2.0,(double)ln))
82 #define bytes2layer(bn,cs)  (int)log2(  ((double)bn)/((double)cs) )
83
84 // Arno, 2011-12-22: Enable Riccardo's VodPiecePicker
85 #define ENABLE_VOD_PIECEPICKER          1
86
87
88 /** IPv4 address, just a nice wrapping around struct sockaddr_in. */
89     struct Address {
90         struct sockaddr_mptp *addr;
91         static uint32_t LOCALHOST;
92         void set_port (uint16_t port) {
93             addr->dests[0].port = htons(port);
94         }
95         void set_port (const char* port_str) {
96             int p;
97             if (sscanf(port_str,"%i",&p))
98                 set_port(p);
99         }
100         void set_ipv4 (uint32_t ipv4) {
101             addr->dests[0].addr = htonl(ipv4);
102         }
103         void set_ipv4 (const char* ipv4_str) ;
104         //{    inet_aton(ipv4_str,&(addr.sin_addr));    }
105         void clear () {
106                 addr = (struct sockaddr_mptp *)calloc(1, sizeof(struct sockaddr_mptp) + sizeof(struct mptp_dest));
107                 addr->count = 1;
108         }
109         Address() {
110             clear();
111         }
112         Address(const Address &b) {
113             clear();
114                 addr->dests[0].addr = b.addr->dests[0].addr;
115                 addr->dests[0].port = b.addr->dests[0].port;
116         }
117         Address(const char* ip, uint16_t port)  {
118             clear();
119             set_ipv4(ip);
120             set_port(port);
121         }
122         Address(const char* ip_port);
123         Address(uint16_t port) {
124             clear();
125             set_ipv4((uint32_t)INADDR_ANY);
126             set_port(port);
127         }
128         Address(uint32_t ipv4addr, uint16_t port) {
129             clear();
130             set_ipv4(ipv4addr);
131             set_port(port);
132         }
133         Address(const struct sockaddr_in& address) {
134                 clear();
135                 addr->dests[0].addr = address.sin_addr.s_addr;
136                 addr->dests[0].port = address.sin_port;
137         }
138         ~Address(){
139                 free(addr);
140         }
141         uint32_t ipv4 () const { return ntohl(addr->dests[0].addr); }
142         uint16_t port () const { return ntohs(addr->dests[0].port); }
143         Address& operator = (const Address& b) {
144                 if (this != &b) {
145                         free(addr);
146                         clear();
147                         addr->dests[0].addr = b.addr->dests[0].addr;
148                         addr->dests[0].port = b.addr->dests[0].port;
149                 }
150                 return *this;
151         }
152         bool operator == (const Address& b) const {
153             return addr->count == b.addr->count &&
154                 addr->dests[0].port==b.addr->dests[0].port &&
155                 addr->dests[0].addr==b.addr->dests[0].addr;
156         }
157         const char* str () const {
158                 // Arno, 2011-10-04: not thread safe, replace.
159             static char rs[4][32];
160             static int i;
161             i = (i+1) & 3;
162             sprintf(rs[i],"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
163                     (ipv4()>>8)&0xff,ipv4()&0xff,port());
164             return rs[i];
165         }
166         const char* ipv4str () const {
167                 // Arno, 2011-10-04: not thread safe, replace.
168             static char rs[4][32];
169             static int i;
170             i = (i+1) & 3;
171             sprintf(rs[i],"%i.%i.%i.%i",ipv4()>>24,(ipv4()>>16)&0xff,
172                     (ipv4()>>8)&0xff,ipv4()&0xff);
173             return rs[i];
174         }
175         bool operator != (const Address& b) const { return !(*this==b); }
176         bool is_private() const {
177                 // TODO IPv6
178                 uint32_t no = ipv4(); uint8_t no0 = no>>24,no1 = (no>>16)&0xff;
179                 if (no0 == 10) return true;
180                 else if (no0 == 172 && no1 >= 16 && no1 <= 31) return true;
181                 else if (no0 == 192 && no1 == 168) return true;
182                 else return false;
183         }
184     };
185
186 // Arno, 2011-10-03: Use libevent callback functions, no on_error?
187 #define sockcb_t                event_callback_fn
188     struct sckrwecb_t {
189         sckrwecb_t (evutil_socket_t s=0, sockcb_t mr=NULL, sockcb_t mw=NULL,
190                     sockcb_t oe=NULL) :
191             sock(s), may_read(mr), may_write(mw), on_error(oe) {}
192         evutil_socket_t sock;
193         sockcb_t   may_read;
194         sockcb_t   may_write;
195         sockcb_t   on_error;
196     };
197
198     struct now_t  {
199         static tint now;
200     };
201
202 #define NOW now_t::now
203
204     /** tintbin is basically a pair<tint,bin64_t> plus some nice operators.
205         Most frequently used in different queues (acknowledgements, requests,
206         etc). */
207     struct tintbin {
208         tint    time;
209         bin_t bin;
210         tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
211         tintbin() : time(TINT_NEVER), bin(bin_t::NONE) {}
212         tintbin(tint time_, bin_t bin_) : time(time_), bin(bin_) {}
213         tintbin(bin_t bin_) : time(NOW), bin(bin_) {}
214         bool operator < (const tintbin& b) const
215                         { return time > b.time; }
216         bool operator == (const tintbin& b) const
217                         { return time==b.time && bin==b.bin; }
218         bool operator != (const tintbin& b) const
219                         { return !(*this==b); }
220     };
221
222     typedef std::deque<tintbin> tbqueue;
223     typedef std::deque<bin_t> binqueue;
224     typedef Address   Address;
225
226     /** A heap (priority queue) for timestamped bin numbers (tintbins). */
227     class tbheap {
228         tbqueue data_;
229     public:
230         int size () const { return data_.size(); }
231         bool is_empty () const { return data_.empty(); }
232         tintbin         pop() {
233             tintbin ret = data_.front();
234             std::pop_heap(data_.begin(),data_.end());
235             data_.pop_back();
236             return ret;
237         }
238         void            push(const tintbin& tb) {
239             data_.push_back(tb);
240             push_heap(data_.begin(),data_.end());
241         }
242         const tintbin&  peek() const {
243             return data_.front();
244         }
245     };
246
247     /** swift protocol message types; these are used on the wire. */
248     typedef enum {
249         SWIFT_HANDSHAKE = 0,
250         SWIFT_DATA = 1,
251         SWIFT_ACK = 2,
252         SWIFT_HAVE = 3,
253         SWIFT_HASH = 4,
254         SWIFT_PEX_ADD = 5,
255         SWIFT_PEX_REQ = 6,
256         SWIFT_SIGNED_HASH = 7,
257         SWIFT_HINT = 8,
258         SWIFT_MSGTYPE_RCVD = 9,
259         SWIFT_RANDOMIZE = 10, //FRAGRAND
260         SWIFT_VERSION = 11, // Arno, 2011-10-19: TODO to match RFC-rev-03
261         SWIFT_MESSAGE_COUNT = 12
262     } messageid_t;
263
264     typedef enum {
265         DDIR_UPLOAD,
266         DDIR_DOWNLOAD
267     } data_direction_t;
268
269     class PiecePicker;
270     //class CongestionController; // Arno: Currently part of Channel. See ::NextSendTime
271     class PeerSelector;
272     class Channel;
273     typedef void (*ProgressCallback) (int transfer, bin_t bin);
274
275     /** A class representing single file transfer. */
276     class    FileTransfer {
277
278     public:
279
280         /** A constructor. Open/submit/retrieve a file.
281          *  @param file_name    the name of the file
282          *  @param root_hash    the root hash of the file; zero hash if the file
283                  *                          is newly submitted
284                  */
285         FileTransfer(const char *file_name, const Sha1Hash& root_hash=Sha1Hash::ZERO,bool force_check_diskvshash=true,bool check_netwvshash=true,uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE);
286
287         /**    Close everything. */
288         ~FileTransfer();
289
290
291         /** While we need to feed ACKs to every peer, we try (1) avoid
292             unnecessary duplication and (2) keep minimum state. Thus,
293             we use a rotating queue of bin completion events. */
294         //bin64_t         RevealAck (uint64_t& offset);
295         /** Rotating queue read for channels of this transmission. */
296         // Jori
297         int             RevealChannel (int& i);
298         // Gertjan
299         int             RandomChannel (int own_id);
300
301
302         /** Find transfer by the root hash. */
303         static FileTransfer* Find (const Sha1Hash& hash);
304         /** Find transfer by the file descriptor. */
305         static FileTransfer* file (int fd) {
306             return fd<files.size() ? files[fd] : NULL;
307         }
308
309         /** The binmap for data already retrieved and checked. */
310         binmap_t&           ack_out ()  { return file_.ack_out(); }
311         /** Piece picking strategy used by this transfer. */
312         PiecePicker&    picker () { return *picker_; }
313         /** The number of channels working for this transfer. */
314         int             channel_count () const { return hs_in_.size(); }
315         /** Hash tree checked file; all the hashes and data are kept here. */
316         HashTree&       file() { return file_; }
317         /** File descriptor for the data file. */
318         int             fd () const { return file_.file_descriptor(); }
319         /** Root SHA1 hash of the transfer (and the data file). */
320         const Sha1Hash& root_hash () const { return file_.root_hash(); }
321         /** Ric: the availability in the swarm */
322         Availability&   availability() { return *availability_; }
323
324                 // RATELIMIT
325         /** Arno: Call when n bytes are received. */
326         void                    OnRecvData(int n);
327         /** Arno: Call when n bytes are sent. */
328         void                    OnSendData(int n);
329         /** Arno: Call when no bytes are sent due to rate limiting. */
330         void                    OnSendNoData();
331         /** Arno: Return current speed for the given direction in bytes/s */
332                 double                  GetCurrentSpeed(data_direction_t ddir);
333                 /** Arno: Return maximum speed for the given direction in bytes/s */
334                 double                  GetMaxSpeed(data_direction_t ddir);
335                 /** Arno: Set maximum speed for the given direction in bytes/s */
336                 void                    SetMaxSpeed(data_direction_t ddir, double m);
337                 /** Arno: Return the number of non-seeders current channeled with. */
338                 uint32_t                GetNumLeechers();
339                 /** Arno: Return the number of seeders current channeled with. */
340                 uint32_t                GetNumSeeders();
341                 /** Arno: Return the set of Channels for this transfer. MORESTATS */
342                 std::set<Channel *> GetChannels() { return mychannels_; }
343
344                 /** Arno: set the tracker for this transfer. Reseting it won't kill
345                  * any existing connections.
346                  */
347                 void SetTracker(Address tracker) { tracker_ = tracker; }
348
349                 /** Arno: (Re)Connect to tracker for this transfer, or global Channel::tracker if not set */
350                 void ConnectToTracker();
351
352                 /** Arno: Reconnect to the tracker if no established peers and
353                  * exp backoff allows it.
354                  */
355                 void ReConnectToTrackerIfAllowed(bool hasestablishedpeers);
356
357                 /** Arno: Return the Channel to peer "addr" that is not equal to "notc". */
358                 Channel * FindChannel(const Address &addr, Channel *notc);
359
360                 // SAFECLOSE
361                 static void LibeventCleanCallback(int fd, short event, void *arg);
362     protected:
363
364         HashTree        file_;
365
366         /** Piece picker strategy. */
367         PiecePicker*    picker_;
368
369         /** Channels working for this transfer. */
370         binqueue        hs_in_;                 // Arno, 2011-10-03: Should really be queue of channel ID (=uint32_t)
371
372         /** Messages we are accepting.    */
373         uint64_t        cap_out_;
374
375         tint            init_time_;
376         
377         // Ric: PPPLUG
378         /** Availability in the swarm */
379         Availability*   availability_;
380
381 #define SWFT_MAX_TRANSFER_CB 8
382         ProgressCallback callbacks[SWFT_MAX_TRANSFER_CB];
383         uint8_t         cb_agg[SWFT_MAX_TRANSFER_CB];
384         int             cb_installed;
385
386                 // RATELIMIT
387         std::set<Channel *>     mychannels_; // Arno, 2012-01-31: May be duplicate of hs_in_
388         MovingAverageSpeed      cur_speed_[2];
389         double                          max_speed_[2];
390         int                                     speedzerocount_;
391
392         // SAFECLOSE
393         struct event            evclean_;
394
395         Address                         tracker_; // Tracker for this transfer
396         tint                            tracker_retry_interval_;
397         tint                            tracker_retry_time_;
398
399     public:
400         void            OnDataIn (bin_t pos);
401         // Gertjan fix: return bool
402         bool            OnPexIn (const Address& addr);
403
404         static std::vector<FileTransfer*> files;
405
406
407         friend class Channel;
408         // Ric: maybe not really needed
409         friend class Availability;
410         friend uint64_t  Size (int fdes);
411         friend bool      IsComplete (int fdes);
412         friend uint64_t  Complete (int fdes);
413         friend uint64_t  SeqComplete (int fdes);
414         friend int     Open (const char* filename, const Sha1Hash& hash, Address tracker, bool force_check_diskvshash, bool check_netwvshash, uint32_t chunk_size);
415         friend void    Close (int fd) ;
416         friend void AddProgressCallback (int transfer,ProgressCallback cb,uint8_t agg);
417         friend void RemoveProgressCallback (int transfer,ProgressCallback cb);
418         friend void ExternallyRetrieved (int transfer,bin_t piece);
419     };
420
421
422     /** PiecePicker implements some strategy of choosing (picking) what
423         to request next, given the possible range of choices:
424         data acknowledged by the peer minus data already retrieved.
425         May pick sequentially, do rarest first or in some other way. */
426     class PiecePicker {
427     public:
428         virtual void Randomize (uint64_t twist) = 0;
429         /** The piece picking method itself.
430          *  @param  offered     the data acknowledged by the peer
431          *  @param  max_width   maximum number of packets to ask for
432          *  @param  expires     (not used currently) when to consider request expired
433          *  @return             the bin number to request */
434         virtual bin_t Pick (binmap_t& offered, uint64_t max_width, tint expires) = 0;
435         virtual void LimitRange (bin_t range) = 0;
436         /** updates the playback position for streaming piece picking.
437          *  @param  amount              amount to increment in bin unit size (1KB default) */
438         virtual void updatePlaybackPos (int amount=1) = 0;
439         virtual ~PiecePicker() {}
440     };
441
442
443     class PeerSelector { // Arno: partically unused
444     public:
445         virtual void AddPeer (const Address& addr, const Sha1Hash& root) = 0;
446         virtual Address GetPeer (const Sha1Hash& for_root) = 0;
447     };
448
449
450     /* class DataStorer { // Arno: never implemented
451     public:
452         DataStorer (const Sha1Hash& id, size_t size);
453         virtual size_t    ReadData (bin_t pos,uint8_t** buf) = 0;
454         virtual size_t    WriteData (bin_t pos, uint8_t* buf, size_t len) = 0;
455     }; */
456
457
458     /**    swift channel's "control block"; channels loosely correspond to TCP
459            connections or FTP sessions; one channel is created for one file
460            being transferred between two peers. As we don't need buffers and
461            lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
462            Normally, API users do not deal with this class. */
463         class MessageQueue;
464     class Channel {
465
466 #define DGRAM_MAX_SOCK_OPEN 128
467         static int sock_count;
468         static sckrwecb_t sock_open[DGRAM_MAX_SOCK_OPEN];
469
470     public:
471         Channel    (FileTransfer* file, int socket=INVALID_SOCKET, Address peer=Address());
472         ~Channel();
473
474         typedef enum {
475             KEEP_ALIVE_CONTROL,
476             PING_PONG_CONTROL,
477             SLOW_START_CONTROL,
478             AIMD_CONTROL,
479             LEDBAT_CONTROL,
480             CLOSE_CONTROL
481         } send_control_t;
482
483         static Address  tracker; // Global tracker for all transfers
484         struct event *evsend_ptr_; // Arno: timer per channel // SAFECLOSE
485         static struct event_base *evbase;
486         static struct event evrecv;
487         static const char* SEND_CONTROL_MODES[];
488
489                 static MessageQueue messageQueue;
490
491             static tint epoch, start;
492             static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down;
493         static void CloseChannelByAddress(const Address &addr);
494
495         // SOCKMGMT
496         // Arno: channel is also a "singleton" class that manages all sockets
497         // for a swift process
498         static void LibeventSendCallback(int fd, short event, void *arg);
499         static void LibeventReceiveCallback(int fd, short event, void *arg);
500         static void RecvDatagram (evutil_socket_t socket); // Called by LibeventReceiveCallback
501             static int RecvFrom(evutil_socket_t sock, Address& addr, struct evbuffer **evb); // Called by RecvDatagram
502             static int SendTo(evutil_socket_t sock, const Address& addr, struct evbuffer **evb); // Called by Channel::Send()
503             static evutil_socket_t Bind(Address address, sckrwecb_t callbacks=sckrwecb_t());
504             static Address BoundAddress(evutil_socket_t sock);
505             static evutil_socket_t default_socket()
506             { return sock_count ? sock_open[0].sock : INVALID_SOCKET; }
507
508             /** close the port */
509             static void CloseSocket(evutil_socket_t sock);
510             static void Shutdown ();
511             /** the current time */
512             static tint Time();
513
514             // Arno: Per instance methods
515         void        Recv (struct evbuffer *evb);
516         void        Send ();  // Called by LibeventSendCallback
517         void        Close ();
518
519         void        OnAck (struct evbuffer *evb);
520         void        OnHave (struct evbuffer *evb);
521         bin_t       OnData (struct evbuffer *evb);
522         void        OnHint (struct evbuffer *evb);
523         void        OnHash (struct evbuffer *evb);
524         void        OnPex (struct evbuffer *evb);
525         void        OnHandshake (struct evbuffer *evb);
526         void        OnRandomize (struct evbuffer *evb); //FRAGRAND
527         void        AddHandshake (struct evbuffer *evb);
528         bin_t       AddData (struct evbuffer *evb);
529         void        AddAck (struct evbuffer *evb);
530         void        AddHave (struct evbuffer *evb);
531         void        AddHint (struct evbuffer *evb);
532         void        AddUncleHashes (struct evbuffer *evb, bin_t pos);
533         void        AddPeakHashes (struct evbuffer *evb);
534         void        AddPex (struct evbuffer *evb);
535         void        OnPexReq(void);
536         void        AddPexReq(struct evbuffer *evb);
537         void        BackOffOnLosses (float ratio=0.5);
538         tint        SwitchSendControl (int control_mode);
539         tint        NextSendTime ();
540         tint        KeepAliveNextSendTime ();
541         tint        PingPongNextSendTime ();
542         tint        CwndRateNextSendTime ();
543         tint        SlowStartNextSendTime ();
544         tint        AimdNextSendTime ();
545         tint        LedbatNextSendTime ();
546         /** Arno: return true if this peer has complete file. May be fuzzy if Peak Hashes not in */
547         bool            IsComplete();
548         /** Arno: return (UDP) port for this channel */
549         uint16_t        GetMyPort();
550         bool            IsDiffSenderOrDuplicate(Address addr, uint32_t chid);
551
552         static int  MAX_REORDERING;
553         static tint TIMEOUT;
554         static tint MIN_DEV;
555         static tint MAX_SEND_INTERVAL;
556         static tint LEDBAT_TARGET;
557         static float LEDBAT_GAIN;
558         static tint LEDBAT_DELAY_BIN;
559         static bool SELF_CONN_OK;
560         static tint MAX_POSSIBLE_RTT;
561         static tint MIN_PEX_REQUEST_INTERVAL;
562         static FILE* debug_file;
563
564         const std::string id_string () const;
565         /** A channel is "established" if had already sent and received packets. */
566         bool        is_established () { return peer_channel_id_ && own_id_mentioned_; }
567         FileTransfer& transfer() { return *transfer_; }
568         HashTree&   file () { return transfer_->file(); }
569         const Address& peer() const { return peer_; }
570         const Address& recv_peer() const { return recv_peer_; }
571         tint ack_timeout () {
572                 tint dev = dev_avg_ < MIN_DEV ? MIN_DEV : dev_avg_;
573                 tint tmo = rtt_avg_ + dev * 4;
574                 return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC;
575         }
576         uint32_t    id () const { return id_; }
577
578         // MORESTATS
579         uint64_t raw_bytes_up() { return raw_bytes_up_; }
580         uint64_t raw_bytes_down() { return raw_bytes_down_; }
581         uint64_t bytes_up() { return bytes_up_; }
582         uint64_t bytes_down() { return bytes_down_; }
583
584         static int  DecodeID(int scrambled);
585         static int  EncodeID(int unscrambled);
586         static Channel* channel(int i) {
587             return i<channels.size()?channels[i]:NULL;
588         }
589         static void CloseTransfer (FileTransfer* trans);
590
591         // SAFECLOSE
592         void            ClearEvents();
593         void            Schedule4Close() { scheduled4close_ = true; }
594         bool            IsScheduled4Close() { return scheduled4close_; }
595
596                 void Sent(int bytes, evbuffer *evb, bool tofree);
597
598
599     protected:
600         /** Channel id: index in the channel array. */
601         uint32_t    id_;
602         /**    Socket address of the peer. */
603         Address     peer_;
604         /**    The UDP socket fd. */
605         evutil_socket_t      socket_;
606         /**    Descriptor of the file in question. */
607         FileTransfer*    transfer_;
608         /**    Peer channel id; zero if we are trying to open a channel. */
609         uint32_t    peer_channel_id_;
610         bool        own_id_mentioned_;
611         /**    Peer's progress, based on acknowledgements. */
612         binmap_t    ack_in_;
613         /**    Last data received; needs to be acked immediately. */
614         tintbin     data_in_;
615         bin_t       data_in_dbl_;
616         /** The history of data sent and still unacknowledged. */
617         tbqueue     data_out_;
618         /** Timeouted data (potentially to be retransmitted). */
619         tbqueue     data_out_tmo_;
620         bin_t       data_out_cap_;
621         /** Index in the history array. */
622         binmap_t    have_out_;
623         /**    Transmit schedule: in most cases filled with the peer's hints */
624         tbqueue     hint_in_;
625         /** Hints sent (to detect and reschedule ignored hints). */
626         tbqueue     hint_out_;
627         uint64_t    hint_out_size_;
628         /** Types of messages the peer accepts. */
629         uint64_t    cap_in_;
630         /** For repeats. */
631         //tint        last_send_time, last_recv_time;
632         /** PEX progress */
633         bool        pex_requested_;
634         tint        last_pex_request_time_;
635         tint        next_pex_request_time_;
636         bool        pex_request_outstanding_;
637         tbqueue     reverse_pex_out_;           // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
638         int         useless_pex_count_;
639         /** Smoothed averages for RTT, RTT deviation and data interarrival periods. */
640         tint        rtt_avg_, dev_avg_, dip_avg_;
641         tint        last_send_time_;
642         tint        last_recv_time_;
643         tint        last_data_out_time_;
644         tint        last_data_in_time_;
645         tint        last_loss_time_;
646         tint        next_send_time_;
647         /** Congestion window; TODO: int, bytes. */
648         float       cwnd_;
649         int         cwnd_count1_;
650         /** Data sending interval. */
651         tint        send_interval_;
652         /** The congestion control strategy. */
653         int         send_control_;
654         /** Datagrams (not data) sent since last recv.    */
655         int         sent_since_recv_;
656
657         /** Arno: Fix for KEEP_ALIVE_CONTROL */
658         bool            lastrecvwaskeepalive_;
659         bool            lastsendwaskeepalive_;
660
661         /** Recent acknowlegements for data previously sent.    */
662         int         ack_rcvd_recent_;
663         /** Recent non-acknowlegements (losses) of data previously sent.    */
664         int         ack_not_rcvd_recent_;
665         /** LEDBAT one-way delay machinery */
666         tint        owd_min_bins_[4];
667         int         owd_min_bin_;
668         tint        owd_min_bin_start_;
669         tint        owd_current_[4];
670         int         owd_cur_bin_;
671         /** Stats */
672         int         dgrams_sent_;
673         int         dgrams_rcvd_;
674         // Arno, 2011-11-28: for detailed, per-peer stats. MORESTATS
675         uint64_t raw_bytes_up_, raw_bytes_down_, bytes_up_, bytes_down_;
676
677         // SAFECLOSE
678         bool            scheduled4close_;
679         /** Arno: Socket address of the peer where packets are received from,
680          * when an IANA private address, otherwise 0.
681          * May not be equal to peer_. 2PEERSBEHINDSAMENAT */
682         Address     recv_peer_;
683
684         int         PeerBPS() const {
685             return TINT_SEC / dip_avg_ * 1024;
686         }
687         /** Get a request for one packet from the queue of peer's requests. */
688         bin_t       DequeueHint(bool *retransmitptr);
689         bin_t       ImposeHint();
690         void        TimeoutDataOut ();
691         void        CleanStaleHintOut();
692         void        CleanHintOut(bin_t pos);
693         void        Reschedule();
694         void            UpdateDIP(bin_t pos); // RETRANSMIT
695
696
697         static PeerSelector* peer_selector;
698
699         static tint     last_tick;
700         //static tbheap   send_queue;
701
702         static std::vector<Channel*> channels;
703
704         friend int      Listen (Address addr);
705         friend void     Shutdown (int sock_des);
706         friend void     AddPeer (Address address, const Sha1Hash& root);
707         friend void     SetTracker(const Address& tracker);
708         friend int      Open (const char*, const Sha1Hash&, Address tracker, bool force_check_diskvshash, bool check_netwvshash, uint32_t chunk_size) ; // FIXME
709     };
710
711
712
713     /*************** The top-level API ****************/
714     /** Start listening a port. Returns socket descriptor. */
715     int     Listen (Address addr);
716     /** Stop listening to a port. */
717     void    Shutdown (int sock_des=-1);
718
719     /** Open a file, start a transmission; fill it with content for a given
720         root hash and tracker (optional). If "force_check_diskvshash" is true, the
721         hashtree state will be (re)constructed from the file on disk (if any).
722         If not, open will try to reconstruct the hashtree state from
723         the .mhash and .mbinmap files on disk. .mhash files are created
724         automatically, .mbinmap files must be written by checkpointing the
725         transfer by calling FileTransfer::serialize(). If the reconstruction
726         fails, it will hashcheck anyway. Roothash is optional for new files or
727         files already hashchecked and checkpointed. If "check_netwvshash" is
728         false, no uncle hashes will be sent and no data will be verified against
729         then on receipt. In this mode, checking disk contents against hashes
730         no longer works on restarts, unless checkpoints are used.
731         */
732     int     Open (const char* filename, const Sha1Hash& hash=Sha1Hash::ZERO,Address tracker=Address(), bool force_check_diskvshash=true, bool check_netwvshash=true, uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE);
733     /** Get the root hash for the transmission. */
734     const Sha1Hash& RootMerkleHash (int file) ;
735     /** Close a file and a transmission. */
736     void    Close (int fd) ;
737     /** Add a possible peer which participares in a given transmission. In the case
738         root hash is zero, the peer might be talked to regarding any transmission
739         (likely, a tracker, cache or an archive). */
740     void    AddPeer (Address address, const Sha1Hash& root=Sha1Hash::ZERO);
741
742     void    SetTracker(const Address& tracker);
743     /** Set the default tracker that is used when Open is not passed a tracker
744         address. */
745
746     /** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte
747         before the transmission is complete. */
748     uint64_t  Size (int fdes);
749     /** Returns the amount of retrieved and verified data, in bytes.
750         A 100% complete transmission has Size()==Complete(). */
751     uint64_t  Complete (int fdes);
752     bool      IsComplete (int fdes);
753     /** Returns the number of bytes that are complete sequentially, starting from the
754         beginning, till the first not-yet-retrieved packet. */
755     uint64_t  SeqComplete (int fdes);
756     /***/
757     int       Find (Sha1Hash hash);
758     /** Returns the number of bytes in a chunk for this transmission */
759     uint32_t      ChunkSize(int fdes);
760
761     /** Get the address bound to the socket descriptor returned by Listen() */
762     Address BoundAddress(evutil_socket_t sock);
763
764     void AddProgressCallback (int transfer,ProgressCallback cb,uint8_t agg);
765     void RemoveProgressCallback (int transfer,ProgressCallback cb);
766     void ExternallyRetrieved (int transfer,bin_t piece);
767
768
769     /** Must be called by any client using the library */
770     void LibraryInit(void);
771
772     int evbuffer_add_string(struct evbuffer *evb, std::string str);
773     int evbuffer_add_8(struct evbuffer *evb, uint8_t b);
774     int evbuffer_add_16be(struct evbuffer *evb, uint16_t w);
775     int evbuffer_add_32be(struct evbuffer *evb, uint32_t i);
776     int evbuffer_add_64be(struct evbuffer *evb, uint64_t l);
777     int evbuffer_add_hash(struct evbuffer *evb, const Sha1Hash& hash);
778
779     uint8_t evbuffer_remove_8(struct evbuffer *evb);
780     uint16_t evbuffer_remove_16be(struct evbuffer *evb);
781     uint32_t evbuffer_remove_32be(struct evbuffer *evb);
782     uint64_t evbuffer_remove_64be(struct evbuffer *evb);
783     Sha1Hash evbuffer_remove_hash(struct evbuffer* evb);
784
785     const char* tintstr(tint t=0);
786     std::string sock2str (struct sockaddr_in addr);
787  #define SWIFT_MAX_CONNECTIONS 20
788
789     void nat_test_update(void);
790
791     // Arno: Save transfer's binmap for zero-hashcheck restart
792     void Checkpoint(int fdes);
793
794 #define MAX_QUEUE_LENGTH 10
795 #define TIMER_USEC 100000
796
797         class MessageQueue
798         {
799         public:
800                 class Entry
801                 {
802                 public:
803                         Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree)
804                                 :
805                                         evb(ievb),
806                                         addr(iaddr),
807                                         channel(ichannel),
808                                         tofree(itofree)
809                         {
810                         }
811
812                         evbuffer *evb;
813                         Address addr;
814                         Channel *channel;
815                         bool tofree;
816                 };
817
818                 typedef std::deque<Entry> EntryList;
819                 typedef std::map<int, EntryList> EntryLists;
820         
821                 void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true)
822                 {
823                         EntryList &list = lists[sock];
824                         list.push_back(Entry(evb, addr, channel, tofree));
825                         if (list.size() == MAX_QUEUE_LENGTH)
826                                 Flush(sock);
827                 }
828
829                 void Flush(int sock)
830                 {
831                         EntryList &list = lists[sock];
832                         if (list.empty())
833                                 return;
834
835                         Address addr;
836                         free(addr.addr);
837                         addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest));
838                         addr.addr->count = list.size();
839                         evbuffer *evbs[list.size()];
840                         int i = 0;
841                         for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
842                                 addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr;
843                                 addr.addr->dests[i].port = (*it).addr.addr->dests[0].port;
844                                 evbs[i] = (*it).evb;
845                         }
846
847                         int r = Channel::SendTo(sock, addr, evbs);
848                         printf("Sent %d buffers\n", list.size());
849                         if (r > 0) {
850                                 i = 0;
851                                 for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
852                                         (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree);
853                                         printf("Sent %d bytes\n", addr.addr->dests[i].bytes);
854                                 }
855                         }
856                         list.clear();
857                 }
858
859                 void Flush() 
860                 { 
861                         for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it)
862                                 Flush(it->first);
863                 }
864
865         private:
866                 EntryLists lists;
867         };
868
869 } // namespace end
870
871 // #define SWIFT_MUTE
872
873 #ifndef SWIFT_MUTE
874 #define dprintf(...) do { if (Channel::debug_file) fprintf(Channel::debug_file,__VA_ARGS__); } while (0)
875 #define dflush() fflush(Channel::debug_file)
876 #else
877 #define dprintf(...) do {} while(0)
878 #define dflush() do {} while(0)
879 #endif
880 #define eprintf(...) fprintf(stderr,__VA_ARGS__)
881
882 #endif