Towards ACK : HAVE split.
[swift-upb.git] / swift.h
1 /*
2  *  swift.h
3  *  the main header file for libswift, normally you should only read this one
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 /*
10
11 The swift protocol
12
13 Messages
14
15  HANDSHAKE    00, channelid
16  Communicates the channel id of the sender. The
17  initial handshake packet also has the root hash
18  (a HASH message).
19
20  DATA        01, bin_32, buffer
21  1K of data.
22
23  ACK        02, bin_32, timestamp_32
24  HAVE       03, bin_32
25  Confirms successfull delivery of data. Used for
26  congestion control, as well.
27
28  HINT        08, bin_32
29  Practical value of "hints" is to avoid overlap, mostly.
30  Hints might be lost in the network or ignored.
31  Peer might send out data without a hint.
32  Hint which was not responded (by DATA) in some RTTs
33  is considered to be ignored.
34  As peers cant pick randomly kilobyte here and there,
35  they send out "long hints" for non-base bins.
36
37  HASH        04, bin_32, sha1hash
38  SHA1 hash tree hashes for data verification. The
39  connection to a fresh peer starts with bootstrapping
40  him with peak hashes. Later, before sending out
41  any data, a peer sends the necessary uncle hashes.
42
43  PEX+/PEX-    05/06, ipv4 addr, port
44  Peer exchange messages; reports all connected and
45  disconected peers. Might has special meaning (as
46  in the case with swarm supervisors).
47
48 */
49 #ifndef SWIFT_H
50 #define SWIFT_H
51
52 #include <deque>
53 #include <vector>
54 #include <algorithm>
55 #include <string>
56 #include "bin64.h"
57 #include "bins.h"
58 #include "datagram.h"
59 #include "hashtree.h"
60
61 namespace swift {
62
63     #define NOW Datagram::now
64
65     /** tintbin is basically a pair<tint,bin64_t> plus some nice operators.
66         Most frequently used in different queues (acknowledgements, requests,
67         etc). */
68     struct tintbin {
69         tint    time;
70         bin64_t bin;
71         tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
72         tintbin() : time(TINT_NEVER), bin(bin64_t::NONE) {}
73         tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
74         tintbin(bin64_t bin_) : time(NOW), bin(bin_) {}
75         bool operator < (const tintbin& b) const
76             { return time > b.time; }
77         bool operator == (const tintbin& b) const
78             { return time==b.time && bin==b.bin; }
79         bool operator != (const tintbin& b) const
80             { return !(*this==b); }
81     };
82
83     typedef std::deque<tintbin> tbqueue;
84     typedef std::deque<bin64_t> binqueue;
85     typedef Address   Address;
86
87     /** A heap (priority queue) for timestamped bin numbers (tintbins). */
88     class tbheap {
89         tbqueue data_;
90     public:
91         int size () const { return data_.size(); }
92         bool is_empty () const { return data_.empty(); }
93         tintbin         pop() {
94             tintbin ret = data_.front();
95             std::pop_heap(data_.begin(),data_.end());
96             data_.pop_back();
97             return ret;
98         }
99         void            push(const tintbin& tb) {
100             data_.push_back(tb);
101             push_heap(data_.begin(),data_.end());
102         }
103         const tintbin&  peek() const {
104             return data_.front();
105         }
106     };
107
108     /** swift protocol message types; these are used on the wire. */
109     typedef enum {
110         SWIFT_HANDSHAKE = 0,
111         SWIFT_DATA = 1,
112         SWIFT_ACK = 2,
113         SWIFT_HAVE = 3,
114         SWIFT_HASH = 4,
115         SWIFT_PEX_ADD = 5,
116         SWIFT_PEX_RM = 6,
117         SWIFT_SIGNED_HASH = 7,
118         SWIFT_HINT = 8,
119         SWIFT_MSGTYPE_RCVD = 9,
120         SWIFT_MESSAGE_COUNT = 10
121     } messageid_t;
122
123     class PiecePicker;
124     class CongestionController;
125     class PeerSelector;
126
127
128     /** A class representing single file transfer. */
129     class    FileTransfer {
130
131     public:
132
133         /** A constructor. Open/submit/retrieve a file.
134          *  @param file_name    the name of the file
135          *  @param root_hash    the root hash of the file; zero hash if the file
136                                 is newly submitted */
137         FileTransfer(const char *file_name, const Sha1Hash& root_hash=Sha1Hash::ZERO);
138
139         /**    Close everything. */
140         ~FileTransfer();
141
142
143         /** While we need to feed ACKs to every peer, we try (1) avoid
144             unnecessary duplication and (2) keep minimum state. Thus,
145             we use a rotating queue of bin completion events. */
146         //bin64_t         RevealAck (uint64_t& offset);
147         /** Rotating queue read for channels of this transmission. */
148         int             RevealChannel (int& i);
149
150         /** Find transfer by the root hash. */
151         static FileTransfer* Find (const Sha1Hash& hash);
152         /** Find transfer by the file descriptor. */
153         static FileTransfer* file (int fd) {
154             return fd<files.size() ? files[fd] : NULL;
155         }
156
157         /** The binmap for data already retrieved and checked. */
158         binmap_t&           ack_out ()  { return file_.ack_out(); }
159         /** Piece picking strategy used by this transfer. */
160         PiecePicker&    picker () { return *picker_; }
161         /** The number of channels working for this transfer. */
162         int             channel_count () const { return hs_in_.size(); }
163         /** Hash tree checked file; all the hashes and data are kept here. */
164         HashTree&       file() { return file_; }
165         /** File descriptor for the data file. */
166         int             fd () const { return file_.file_descriptor(); }
167         /** Root SHA1 hash of the transfer (and the data file). */
168         const Sha1Hash& root_hash () const { return file_.root_hash(); }
169
170     private:
171
172         static std::vector<FileTransfer*> files;
173
174         HashTree        file_;
175
176         /** Piece picker strategy. */
177         PiecePicker*    picker_;
178
179         /** Channels working for this transfer. */
180         binqueue        hs_in_;
181         int             hs_in_offset_;
182         std::deque<Address> pex_in_;
183
184         /** Messages we are accepting.    */
185         uint64_t        cap_out_;
186
187         tint            init_time_;
188
189     public:
190         void            OnDataIn (bin64_t pos);
191         void            OnPexIn (const Address& addr);
192
193         friend class Channel;
194         friend uint64_t  Size (int fdes);
195         friend bool      IsComplete (int fdes);
196         friend uint64_t  Complete (int fdes);
197         friend uint64_t  SeqComplete (int fdes);
198         friend int     Open (const char* filename, const Sha1Hash& hash) ;
199         friend void    Close (int fd) ;
200     };
201
202
203     /** PiecePicker implements some strategy of choosing (picking) what
204         to request next, given the possible range of choices:
205         data acknowledged by the peer minus data already retrieved.
206         May pick sequentially, do rarest first or in some other way. */
207     class PiecePicker {
208     public:
209         virtual void Randomize (uint64_t twist) = 0;
210         /** The piece picking method itself.
211          *  @param  offered     the daata acknowledged by the peer
212          *  @param  max_width   maximum number of packets to ask for
213          *  @param  expires     (not used currently) when to consider request expired
214          *  @return             the bin number to request */
215         virtual bin64_t Pick (binmap_t& offered, uint64_t max_width, tint expires) = 0;
216         virtual ~PiecePicker() {}
217     };
218
219
220     class PeerSelector {
221     public:
222         virtual void AddPeer (const Address& addr, const Sha1Hash& root) = 0;
223         virtual Address GetPeer (const Sha1Hash& for_root) = 0;
224     };
225
226
227     class DataStorer {
228     public:
229         DataStorer (const Sha1Hash& id, size_t size);
230         virtual size_t    ReadData (bin64_t pos,uint8_t** buf) = 0;
231         virtual size_t    WriteData (bin64_t pos, uint8_t* buf, size_t len) = 0;
232     };
233
234
235     /**    swift channel's "control block"; channels loosely correspond to TCP
236         connections or FTP sessions; one channel is created for one file
237         being transferred between two peers. As we don't need buffers and
238         lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
239         Normally, API users do not deal with this class. */
240     class Channel {
241     public:
242         Channel    (FileTransfer* file, int socket=-1, Address peer=Address());
243         ~Channel();
244
245         typedef enum {
246             KEEP_ALIVE_CONTROL,
247             PING_PONG_CONTROL,
248             SLOW_START_CONTROL,
249             AIMD_CONTROL,
250             LEDBAT_CONTROL,
251             CLOSE_CONTROL
252         } send_control_t;
253
254         static const char* SEND_CONTROL_MODES[];
255
256         static Channel*
257                     RecvDatagram (int socket);
258         static void Loop (tint till);
259
260         void        Recv (Datagram& dgram);
261         void        Send ();
262         void        Close ();
263
264         void        OnAck (Datagram& dgram);
265         void        OnHave (Datagram& dgram);
266         bin64_t     OnData (Datagram& dgram);
267         void        OnHint (Datagram& dgram);
268         void        OnHash (Datagram& dgram);
269         void        OnPex (Datagram& dgram);
270         void        OnHandshake (Datagram& dgram);
271         void        AddHandshake (Datagram& dgram);
272         bin64_t     AddData (Datagram& dgram);
273         void        AddAck (Datagram& dgram);
274         void        AddHave (Datagram& dgram);
275         void        AddHint (Datagram& dgram);
276         void        AddUncleHashes (Datagram& dgram, bin64_t pos);
277         void        AddPeakHashes (Datagram& dgram);
278         void        AddPex (Datagram& dgram);
279
280         void        BackOffOnLosses (float ratio=0.5);
281         tint        SwitchSendControl (int control_mode);
282         tint        NextSendTime ();
283         tint        KeepAliveNextSendTime ();
284         tint        PingPongNextSendTime ();
285         tint        CwndRateNextSendTime ();
286         tint        SlowStartNextSendTime ();
287         tint        AimdNextSendTime ();
288         tint        LedbatNextSendTime ();
289
290         static int  MAX_REORDERING;
291         static tint TIMEOUT;
292         static tint MIN_DEV;
293         static tint MAX_SEND_INTERVAL;
294         static tint LEDBAT_TARGET;
295         static float LEDBAT_GAIN;
296         static tint LEDBAT_DELAY_BIN;
297         static bool SELF_CONN_OK;
298         static tint MAX_POSSIBLE_RTT;
299         static FILE* debug_file;
300
301         const std::string id_string () const;
302         /** A channel is "established" if had already sent and received packets. */
303         bool        is_established () { return peer_channel_id_ && own_id_mentioned_; }
304         FileTransfer& transfer() { return *transfer_; }
305         HashTree&   file () { return transfer_->file(); }
306         const Address& peer() const { return peer_; }
307         tint ack_timeout () {
308                         tint dev = dev_avg_ < MIN_DEV ? MIN_DEV : dev_avg_;
309                         tint tmo = rtt_avg_ + dev * 4;
310                         return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC;
311         }
312         uint32_t    id () const { return id_; }
313
314         static int  DecodeID(int scrambled);
315         static int  EncodeID(int unscrambled);
316         static Channel* channel(int i) {
317             return i<channels.size()?channels[i]:NULL;
318         }
319         static void CloseTransfer (FileTransfer* trans);
320         static SOCKET default_socket() { return sockets[0]; }
321
322     protected:
323         /** Channel id: index in the channel array. */
324         uint32_t    id_;
325         /**    Socket address of the peer. */
326         Address     peer_;
327         /**    The UDP socket fd. */
328         SOCKET      socket_;
329         /**    Descriptor of the file in question. */
330         FileTransfer*    transfer_;
331         /**    Peer channel id; zero if we are trying to open a channel. */
332         uint32_t    peer_channel_id_;
333         bool        own_id_mentioned_;
334         /**    Peer's progress, based on acknowledgements. */
335         binmap_t        ack_in_;
336         /**    Last data received; needs to be acked immediately. */
337         tintbin     data_in_;
338         bin64_t     data_in_dbl_;
339         /** The history of data sent and still unacknowledged. */
340         tbqueue     data_out_;
341         /** Timeouted data (potentially to be retransmitted). */
342         tbqueue     data_out_tmo_;
343         bin64_t     data_out_cap_;
344         /** Index in the history array. */
345         binmap_t        ack_out_;
346         /**    Transmit schedule: in most cases filled with the peer's hints */
347         tbqueue     hint_in_;
348         /** Hints sent (to detect and reschedule ignored hints). */
349         tbqueue     hint_out_;
350         uint64_t    hint_out_size_;
351         /** Types of messages the peer accepts. */
352         uint64_t    cap_in_;
353         /** For repeats. */
354         //tint        last_send_time, last_recv_time;
355         /** PEX progress */
356         int         pex_out_;
357         /** Smoothed averages for RTT, RTT deviation and data interarrival periods. */
358         tint        rtt_avg_, dev_avg_, dip_avg_;
359         tint        last_send_time_;
360         tint        last_recv_time_;
361         tint        last_data_out_time_;
362         tint        last_data_in_time_;
363         tint        last_loss_time_;
364         tint        next_send_time_;
365         /** Congestion window; TODO: int, bytes. */
366         float       cwnd_;
367         /** Data sending interval. */
368         tint        send_interval_;
369         /** The congestion control strategy. */
370         int         send_control_;
371         /** Datagrams (not data) sent since last recv.    */
372         int         sent_since_recv_;
373         /** Recent acknowlegements for data previously sent.    */
374         int         ack_rcvd_recent_;
375         /** Recent non-acknowlegements (losses) of data previously sent.    */
376         int         ack_not_rcvd_recent_;
377         /** LEDBAT one-way delay machinery */
378         tint        owd_min_bins_[4];
379         int         owd_min_bin_;
380         tint        owd_min_bin_start_;
381         tint        owd_current_[4];
382         int         owd_cur_bin_;
383         /** Stats */
384         int         dgrams_sent_;
385         int         dgrams_rcvd_;
386
387         int         PeerBPS() const {
388             return TINT_SEC / dip_avg_ * 1024;
389         }
390         /** Get a request for one packet from the queue of peer's requests. */
391         bin64_t     DequeueHint();
392         bin64_t     ImposeHint();
393         void        TimeoutDataOut ();
394         void        CleanStaleHintOut();
395         void        CleanHintOut(bin64_t pos);
396         void        Reschedule();
397
398         static PeerSelector* peer_selector;
399
400         static SOCKET   sockets[8];
401         static int      socket_count;
402         static tint     last_tick;
403         static tbheap   send_queue;
404
405         static Address  tracker;
406         static std::vector<Channel*> channels;
407
408         friend int      Listen (Address addr);
409         friend void     Shutdown (int sock_des);
410         friend void     AddPeer (Address address, const Sha1Hash& root);
411         friend void     SetTracker(const Address& tracker);
412         friend int      Open (const char*, const Sha1Hash&) ; // FIXME
413
414     };
415
416
417
418     /*************** The top-level API ****************/
419     /** Start listening a port. Returns socket descriptor. */
420     int     Listen (Address addr);
421     /** Run send/receive loop for the specified amount of time. */
422     void    Loop (tint till);
423     /** Stop listening to a port. */
424     void    Shutdown (int sock_des=-1);
425
426     /** Open a file, start a transmission; fill it with content for a given root hash;
427         in case the hash is omitted, the file is a fresh submit. */
428     int     Open (const char* filename, const Sha1Hash& hash=Sha1Hash::ZERO) ;
429     /** Get the root hash for the transmission. */
430     const Sha1Hash& RootMerkleHash (int file) ;
431     /** Close a file and a transmission. */
432     void    Close (int fd) ;
433     /** Add a possible peer which participares in a given transmission. In the case
434         root hash is zero, the peer might be talked to regarding any transmission
435         (likely, a tracker, cache or an archive). */
436     void    AddPeer (Address address, const Sha1Hash& root=Sha1Hash::ZERO);
437
438     void    SetTracker(const Address& tracker);
439
440     /** Returns size of the file in bytes, 0 if unknown. Might be rounded up to a kilobyte
441         before the transmission is complete. */
442     uint64_t  Size (int fdes);
443     /** Returns the amount of retrieved and verified data, in bytes.
444         A 100% complete transmission has Size()==Complete(). */
445     uint64_t  Complete (int fdes);
446     bool      IsComplete (int fdes);
447     /** Returns the number of bytes that are complete sequentially, starting from the
448         beginning, till the first not-yet-retrieved packet. */
449     uint64_t  SeqComplete (int fdes);
450
451
452     //uint32_t Width (const tbinvec& v);
453
454
455     /** Must be called by any client using the library */
456     void LibraryInit(void);
457
458
459 } // namespace end
460
461 #ifndef SWIFT_MUTE
462 #define dprintf(...) { if (Channel::debug_file) fprintf(Channel::debug_file,__VA_ARGS__); }
463 #else
464 #define dprintf(...) {}
465 #endif
466 #define eprintf(...) fprintf(stderr,__VA_ARGS__)
467
468 #endif