77f8b6b531404fda6fb7f2c43040d031d054ffe4
[swift-upb.git] / swift.h
1 /*
2  *  swift.h
3  *  the main header file for libswift, normally you should only read this one
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 /*
10
11 The swift protocol
12
13 Messages
14
15  HANDSHAKE    00, channelid
16  Communicates the channel id of the sender. The
17  initial handshake packet also has the root hash
18  (a HASH message).
19
20  DATA        01, bin_32, buffer
21  1K of data.
22
23  ACK        02, bin_32, timestamp_32
24  HAVE       03, bin_32
25  Confirms successfull delivery of data. Used for
26  congestion control, as well.
27
28  HINT        08, bin_32
29  Practical value of "hints" is to avoid overlap, mostly.
30  Hints might be lost in the network or ignored.
31  Peer might send out data without a hint.
32  Hint which was not responded (by DATA) in some RTTs
33  is considered to be ignored.
34  As peers cant pick randomly kilobyte here and there,
35  they send out "long hints" for non-base bins.
36
37  HASH        04, bin_32, sha1hash
38  SHA1 hash tree hashes for data verification. The
39  connection to a fresh peer starts with bootstrapping
40  him with peak hashes. Later, before sending out
41  any data, a peer sends the necessary uncle hashes.
42
43  PEX+/PEX-    05/06, ipv4 addr, port
44  Peer exchange messages; reports all connected and
45  disconected peers. Might has special meaning (as
46  in the case with swarm supervisors).
47
48 */
49 #ifndef SWIFT_H
50 #define SWIFT_H
51
52 #include <deque>
53 #include <vector>
54 #include <algorithm>
55 #include <string>
56 #include "bin64.h"
57 #include "bins.h"
58 #include "datagram.h"
59 #include "hashtree.h"
60
61 namespace swift {
62
63     #define NOW Datagram::now
64
65     /** tintbin is basically a pair<tint,bin64_t> plus some nice operators.
66         Most frequently used in different queues (acknowledgements, requests,
67         etc). */
68     struct tintbin {
69         tint    time;
70         bin64_t bin;
71         tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
72         tintbin() : time(TINT_NEVER), bin(bin64_t::NONE) {}
73         tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
74         tintbin(bin64_t bin_) : time(NOW), bin(bin_) {}
75         bool operator < (const tintbin& b) const
76             { return time > b.time; }
77         bool operator == (const tintbin& b) const
78             { return time==b.time && bin==b.bin; }
79         bool operator != (const tintbin& b) const
80             { return !(*this==b); }
81     };
82
83     typedef std::deque<tintbin> tbqueue;
84     typedef std::deque<bin64_t> binqueue;
85     typedef Address   Address;
86
87     /** A heap (priority queue) for timestamped bin numbers (tintbins). */
88     class tbheap {
89         tbqueue data_;
90     public:
91         int size () const { return data_.size(); }
92         bool is_empty () const { return data_.empty(); }
93         tintbin         pop() {
94             tintbin ret = data_.front();
95             std::pop_heap(data_.begin(),data_.end());
96             data_.pop_back();
97             return ret;
98         }
99         void            push(const tintbin& tb) {
100             data_.push_back(tb);
101             push_heap(data_.begin(),data_.end());
102         }
103         const tintbin&  peek() const {
104             return data_.front();
105         }
106     };
107
108     /** swift protocol message types; these are used on the wire. */
109     typedef enum {
110         SWIFT_HANDSHAKE = 0,
111         SWIFT_DATA = 1,
112         SWIFT_ACK = 2,
113         SWIFT_HAVE = 3,
114         SWIFT_HASH = 4,
115         SWIFT_PEX_ADD = 5,
116         SWIFT_PEX_RM = 6,
117         SWIFT_SIGNED_HASH = 7,
118         SWIFT_HINT = 8,
119         SWIFT_MSGTYPE_RCVD = 9,
120         SWIFT_MESSAGE_COUNT = 10
121     } messageid_t;
122
123     class PiecePicker;
124     class CongestionController;
125     class PeerSelector;
126     struct TransferProgressCallback {
127         typedef void (*callback_t) (int transfer, bin64_t bin);
128         /** The function to invoke. */
129         callback_t  cb;
130         /** aggregation level (do not report smaller events). */
131         uint8_t     agg;
132         TransferProgressCallback(callback_t callback) : cb(callback), agg(0) {}
133         TransferProgressCallback() : cb(NULL), agg(0) {}
134     };
135
136
137     /** A class representing single file transfer. */
138     class    FileTransfer {
139
140     public:
141
142         /** A constructor. Open/submit/retrieve a file.
143          *  @param file_name    the name of the file
144          *  @param root_hash    the root hash of the file; zero hash if the file
145                                 is newly submitted */
146         FileTransfer(const char *file_name, const Sha1Hash& root_hash=Sha1Hash::ZERO);
147
148         /**    Close everything. */
149         ~FileTransfer();
150
151
152         /** While we need to feed ACKs to every peer, we try (1) avoid
153             unnecessary duplication and (2) keep minimum state. Thus,
154             we use a rotating queue of bin completion events. */
155         //bin64_t         RevealAck (uint64_t& offset);
156         /** Rotating queue read for channels of this transmission. */
157         int             RevealChannel (int& i);
158
159         /** Find transfer by the root hash. */
160         static FileTransfer* Find (const Sha1Hash& hash);
161         /** Find transfer by the file descriptor. */
162         static FileTransfer* file (int fd) {
163             return fd<files.size() ? files[fd] : NULL;
164         }
165
166         /** The binmap for data already retrieved and checked. */
167         binmap_t&           ack_out ()  { return file_.ack_out(); }
168         /** Piece picking strategy used by this transfer. */
169         PiecePicker&    picker () { return *picker_; }
170         /** The number of channels working for this transfer. */
171         int             channel_count () const { return hs_in_.size(); }
172         /** Hash tree checked file; all the hashes and data are kept here. */
173         HashTree&       file() { return file_; }
174         /** File descriptor for the data file. */
175         int             fd () const { return file_.file_descriptor(); }
176         /** Root SHA1 hash of the transfer (and the data file). */
177         const Sha1Hash& root_hash () const { return file_.root_hash(); }
178
179     private:
180
181         static std::vector<FileTransfer*> files;
182
183         HashTree        file_;
184
185         /** Piece picker strategy. */
186         PiecePicker*    picker_;
187
188         /** Channels working for this transfer. */
189         binqueue        hs_in_;
190         int             hs_in_offset_;
191         std::deque<Address> pex_in_;
192
193         /** Messages we are accepting.    */
194         uint64_t        cap_out_;
195
196         tint            init_time_;
197
198         #define SWFT_MAX_TRANSFER_CB 8
199         TransferProgressCallback    callbacks[SWFT_MAX_TRANSFER_CB];
200         int             cb_installed;
201
202     public:
203         void            OnDataIn (bin64_t pos);
204         void            OnPexIn (const Address& addr);
205
206         friend class Channel;
207         friend uint64_t  Size (int fdes);
208         friend bool      IsComplete (int fdes);
209         friend uint64_t  Complete (int fdes);
210         friend uint64_t  SeqComplete (int fdes);
211         friend int     Open (const char* filename, const Sha1Hash& hash) ;
212         friend void    Close (int fd) ;
213         friend void AddProgressCallback (int transfer,TransferProgressCallback cb);
214         friend void RemoveProgressCallback (int transfer,TransferProgressCallback cb);
215         friend void ExternallyRetrieved (int transfer,bin64_t piece);
216     };
217
218
219     /** PiecePicker implements some strategy of choosing (picking) what
220         to request next, given the possible range of choices:
221         data acknowledged by the peer minus data already retrieved.
222         May pick sequentially, do rarest first or in some other way. */
223     class PiecePicker {
224     public:
225         virtual void Randomize (uint64_t twist) = 0;
226         /** The piece picking method itself.
227          *  @param  offered     the daata acknowledged by the peer
228          *  @param  max_width   maximum number of packets to ask for
229          *  @param  expires     (not used currently) when to consider request expired
230          *  @return             the bin number to request */
231         virtual bin64_t Pick (binmap_t& offered, uint64_t max_width, tint expires) = 0;
232         virtual void LimitRange (bin64_t range) = 0;
233         virtual ~PiecePicker() {}
234     };
235
236
237     class PeerSelector {
238     public:
239         virtual void AddPeer (const Address& addr, const Sha1Hash& root) = 0;
240         virtual Address GetPeer (const Sha1Hash& for_root) = 0;
241     };
242
243
244     class DataStorer {
245     public:
246         DataStorer (const Sha1Hash& id, size_t size);
247         virtual size_t    ReadData (bin64_t pos,uint8_t** buf) = 0;
248         virtual size_t    WriteData (bin64_t pos, uint8_t* buf, size_t len) = 0;
249     };
250
251
252     /**    swift channel's "control block"; channels loosely correspond to TCP
253         connections or FTP sessions; one channel is created for one file
254         being transferred between two peers. As we don't need buffers and
255         lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
256         Normally, API users do not deal with this class. */
257     class Channel {
258     public:
259         Channel    (FileTransfer* file, int socket=INVALID_SOCKET, Address peer=Address());
260         ~Channel();
261
262         typedef enum {
263             KEEP_ALIVE_CONTROL,
264             PING_PONG_CONTROL,
265             SLOW_START_CONTROL,
266             AIMD_CONTROL,
267             LEDBAT_CONTROL,
268             CLOSE_CONTROL
269         } send_control_t;
270
271         static const char* SEND_CONTROL_MODES[];
272
273         static void RecvDatagram (SOCKET socket);
274         static void Loop (tint till);
275
276         void        Recv (Datagram& dgram);
277         void        Send ();
278         void        Close ();
279
280         void        OnAck (Datagram& dgram);
281         void        OnHave (Datagram& dgram);
282         bin64_t     OnData (Datagram& dgram);
283         void        OnHint (Datagram& dgram);
284         void        OnHash (Datagram& dgram);
285         void        OnPex (Datagram& dgram);
286         void        OnHandshake (Datagram& dgram);
287         void        AddHandshake (Datagram& dgram);
288         bin64_t     AddData (Datagram& dgram);
289         void        AddAck (Datagram& dgram);
290         void        AddHave (Datagram& dgram);
291         void        AddHint (Datagram& dgram);
292         void        AddUncleHashes (Datagram& dgram, bin64_t pos);
293         void        AddPeakHashes (Datagram& dgram);
294         void        AddPex (Datagram& dgram);
295
296         void        BackOffOnLosses (float ratio=0.5);
297         tint        SwitchSendControl (int control_mode);
298         tint        NextSendTime ();
299         tint        KeepAliveNextSendTime ();
300         tint        PingPongNextSendTime ();
301         tint        CwndRateNextSendTime ();
302         tint        SlowStartNextSendTime ();
303         tint        AimdNextSendTime ();
304         tint        LedbatNextSendTime ();
305
306         static int  MAX_REORDERING;
307         static tint TIMEOUT;
308         static tint MIN_DEV;
309         static tint MAX_SEND_INTERVAL;
310         static tint LEDBAT_TARGET;
311         static float LEDBAT_GAIN;
312         static tint LEDBAT_DELAY_BIN;
313         static bool SELF_CONN_OK;
314         static tint MAX_POSSIBLE_RTT;
315         static FILE* debug_file;
316
317         const std::string id_string () const;
318         /** A channel is "established" if had already sent and received packets. */
319         bool        is_established () { return peer_channel_id_ && own_id_mentioned_; }
320         FileTransfer& transfer() { return *transfer_; }
321         HashTree&   file () { return transfer_->file(); }
322         const Address& peer() const { return peer_; }
323         tint ack_timeout () {
324                         tint dev = dev_avg_ < MIN_DEV ? MIN_DEV : dev_avg_;
325                         tint tmo = rtt_avg_ + dev * 4;
326                         return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC;
327         }
328         uint32_t    id () const { return id_; }
329
330         static int  DecodeID(int scrambled);
331         static int  EncodeID(int unscrambled);
332         static Channel* channel(int i) {
333             return i<channels.size()?channels[i]:NULL;
334         }
335         static void CloseTransfer (FileTransfer* trans);
336
337     protected:
338         /** Channel id: index in the channel array. */
339         uint32_t    id_;
340         /**    Socket address of the peer. */
341         Address     peer_;
342         /**    The UDP socket fd. */
343         SOCKET      socket_;
344         /**    Descriptor of the file in question. */
345         FileTransfer*    transfer_;
346         /**    Peer channel id; zero if we are trying to open a channel. */
347         uint32_t    peer_channel_id_;
348         bool        own_id_mentioned_;
349         /**    Peer's progress, based on acknowledgements. */
350         binmap_t        ack_in_;
351         /**    Last data received; needs to be acked immediately. */
352         tintbin     data_in_;
353         bin64_t     data_in_dbl_;
354         /** The history of data sent and still unacknowledged. */
355         tbqueue     data_out_;
356         /** Timeouted data (potentially to be retransmitted). */
357         tbqueue     data_out_tmo_;
358         bin64_t     data_out_cap_;
359         /** Index in the history array. */
360         binmap_t        have_out_;
361         /**    Transmit schedule: in most cases filled with the peer's hints */
362         tbqueue     hint_in_;
363         /** Hints sent (to detect and reschedule ignored hints). */
364         tbqueue     hint_out_;
365         uint64_t    hint_out_size_;
366         /** Types of messages the peer accepts. */
367         uint64_t    cap_in_;
368         /** For repeats. */
369         //tint        last_send_time, last_recv_time;
370         /** PEX progress */
371         int         pex_out_;
372         /** Smoothed averages for RTT, RTT deviation and data interarrival periods. */
373         tint        rtt_avg_, dev_avg_, dip_avg_;
374         tint        last_send_time_;
375         tint        last_recv_time_;
376         tint        last_data_out_time_;
377         tint        last_data_in_time_;
378         tint        last_loss_time_;
379         tint        next_send_time_;
380         /** Congestion window; TODO: int, bytes. */
381         float       cwnd_;
382         /** Data sending interval. */
383         tint        send_interval_;
384         /** The congestion control strategy. */
385         int         send_control_;
386         /** Datagrams (not data) sent since last recv.    */
387         int         sent_since_recv_;
388         /** Recent acknowlegements for data previously sent.    */
389         int         ack_rcvd_recent_;
390         /** Recent non-acknowlegements (losses) of data previously sent.    */
391         int         ack_not_rcvd_recent_;
392         /** LEDBAT one-way delay machinery */
393         tint        owd_min_bins_[4];
394         int         owd_min_bin_;
395         tint        owd_min_bin_start_;
396         tint        owd_current_[4];
397         int         owd_cur_bin_;
398         /** Stats */
399         int         dgrams_sent_;
400         int         dgrams_rcvd_;
401
402         int         PeerBPS() const {
403             return TINT_SEC / dip_avg_ * 1024;
404         }
405         /** Get a request for one packet from the queue of peer's requests. */
406         bin64_t     DequeueHint();
407         bin64_t     ImposeHint();
408         void        TimeoutDataOut ();
409         void        CleanStaleHintOut();
410         void        CleanHintOut(bin64_t pos);
411         void        Reschedule();
412
413         static PeerSelector* peer_selector;
414
415         static tint     last_tick;
416         static tbheap   send_queue;
417
418         static Address  tracker;
419         static std::vector<Channel*> channels;
420
421         friend int      Listen (Address addr);
422         friend void     Shutdown (int sock_des);
423         friend void     AddPeer (Address address, const Sha1Hash& root);
424         friend void     SetTracker(const Address& tracker);
425         friend int      Open (const char*, const Sha1Hash&) ; // FIXME
426
427     };
428
429
430
431     /*************** The top-level API ****************/
432     /** Start listening a port. Returns socket descriptor. */
433     int     Listen (Address addr);
434     /** Run send/receive loop for the specified amount of time. */
435     void    Loop (tint till);
436     bool    Listen3rdPartySocket (sckrwecb_t);
437     /** Stop listening to a port. */
438     void    Shutdown (int sock_des=-1);
439
440     /** Open a file, start a transmission; fill it with content for a given root hash;
441         in case the hash is omitted, the file is a fresh submit. */
442     int     Open (const char* filename, const Sha1Hash& hash=Sha1Hash::ZERO) ;
443     /** Get the root hash for the transmission. */
444     const Sha1Hash& RootMerkleHash (int file) ;
445     /** Close a file and a transmission. */
446     void    Close (int fd) ;
447     /** Add a possible peer which participares in a given transmission. In the case
448         root hash is zero, the peer might be talked to regarding any transmission
449         (likely, a tracker, cache or an archive). */
450     void    AddPeer (Address address, const Sha1Hash& root=Sha1Hash::ZERO);
451
452     void    SetTracker(const Address& tracker);
453
454     /** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte
455         before the transmission is complete. */
456     uint64_t  Size (int fdes);
457     /** Returns the amount of retrieved and verified data, in bytes.
458         A 100% complete transmission has Size()==Complete(). */
459     uint64_t  Complete (int fdes);
460     bool      IsComplete (int fdes);
461     /** Returns the number of bytes that are complete sequentially, starting from the
462         beginning, till the first not-yet-retrieved packet. */
463     uint64_t  SeqComplete (int fdes);
464     /***/
465     int       Find (Sha1Hash hash);
466
467     void AddProgressCallback (int transfer,TransferProgressCallback cb);
468     void RemoveProgressCallback (int transfer,TransferProgressCallback cb);
469     void ExternallyRetrieved (int transfer,bin64_t piece);
470
471     //uint32_t Width (const tbinvec& v);
472
473
474     /** Must be called by any client using the library */
475     void LibraryInit(void);
476
477
478 } // namespace end
479
480 #ifndef SWIFT_MUTE
481 #define dprintf(...) { if (Channel::debug_file) fprintf(Channel::debug_file,__VA_ARGS__); }
482 #else
483 #define dprintf(...) {}
484 #endif
485 #define eprintf(...) fprintf(stderr,__VA_ARGS__)
486
487 #endif