Merge branch 'master' of github.com:gritzko/swift
authorVictor Grishchenko <victor.grishchenko@gmail.com>
Tue, 9 Feb 2010 18:36:15 +0000 (19:36 +0100)
committerVictor Grishchenko <victor.grishchenko@gmail.com>
Tue, 9 Feb 2010 18:36:15 +0000 (19:36 +0100)
doc/index.html
doc/style.css
ext/seq_picker.cpp
send_control.cpp
sendrecv.cpp
swift.cpp
swift.h
transfer.cpp

index c034b08..231af3d 100644 (file)
        
        <div id='motto'>Turn the Net into a single data Cloud!</div>
        
+    <div id='abstract'>
+    <b>Abstract</b>.
+    &nbsp;<i>swift</i> is a multiparty transport protocol; its mission is to
+    disseminate content among a swarm of peers. It is a sort of BitTorrent
+    at the transport layer. <a href="http://bittorrent.org">BitTorrent</a> can't
+    underlie a distributed filesystem or deliver Web pages; while
+    <a href="http://github.com/gritzko/swift/raw/master/doc/swift-protocol.txt">swift</a> can.
+    <a href="http://github.com/gritzko/swift">libswift</a>
+    is 4000 lines of cross-platform C++ code
+    licensed under LGPL; it runs on misc Unices, Mac OS X and Windows;
+    it uses UDP with <a href="http://tools.ietf.org/wg/ledbat/">LEDBAT</a> congestion control.
+    So far our speed record is mere 400Mbps, but we are working on that.
+    The library is delivered as a part of <a href="http://p2p-next.org">P2P-Next</a>,
+    funded by <a href="http://cordis.europa.eu/fp7/dc/index.cfm">EU FP7</a>.
+    </div>
+    
        <div>   <h2>Ideas</h2>
                <p>As <a href="http://en.wikipedia.org/wiki/Content-centric_networking">
                wise people say</a>, the Internet was initially built for
@@ -44,7 +60,8 @@
                transport layer</a>. Ultimately, it aims at the abstraction of the Internet
                as a single big data <a href="http://en.wikipedia.org/wiki/Cloud_computing">
                Cloud</a>. Such entities as storage, servers and connections are abstracted
-               away and are virtually invisible at the API layer. The data is received
+               away and are virtually invisible at the API layer. Given a hash,
+        the data is received
                from whatever source available and data integrity is checked 
                cryptographically with <a href="http://en.wikipedia.org/wiki/Hash_tree">
                Merkle hash trees</a>.</p>
index 1295156..9575936 100644 (file)
@@ -45,6 +45,14 @@ div#motto {
        text-align: right;
        font-style: italic;
        font-size: larger;
+    margin-bottom: 28pt;
+}
+
+div#abstract {
+    letter-spacing: 0.06em;
+    #font-size: larger;
+    font-style: italic;
+    font-family: Georgia;
 }
 
 div.fold>h2, div.fold>h3, div.fold>h4 {
index dfaa20b..f6d4bbe 100644 (file)
@@ -27,6 +27,7 @@ public:
     transfer_(file_to_pick_from), ack_hint_out_(), twist_(0) {
         ack_hint_out_.copy_range(file().ack_out(),bin64_t::ALL);
     }
+    virtual ~SeqPiecePicker() {}
     
     HashTree& file() { 
         return transfer_->file(); 
index 11b9110..609ddfc 100644 (file)
@@ -17,6 +17,7 @@ tint Channel::MAX_SEND_INTERVAL = TINT_SEC*58;
 tint Channel::LEDBAT_TARGET = TINT_MSEC*25;
 float Channel::LEDBAT_GAIN = 1.0/LEDBAT_TARGET;
 tint Channel::LEDBAT_DELAY_BIN = TINT_SEC*30;
+tint Channel::MAX_POSSIBLE_RTT = TINT_SEC*10;
 const char* Channel::SEND_CONTROL_MODES[] = {"keepalive", "pingpong",
     "slowstart", "standard_aimd", "ledbat"};
 
@@ -28,6 +29,7 @@ tint    Channel::NextSendTime () {
         case SLOW_START_CONTROL: return SlowStartNextSendTime();
         case AIMD_CONTROL:       return AimdNextSendTime();
         case LEDBAT_CONTROL:     return LedbatNextSendTime();
+        case CLOSE_CONTROL:      return TINT_NEVER;
         default:                 assert(false);
     }
 }
@@ -54,6 +56,8 @@ tint    Channel::SwitchSendControl (int control_mode) {
             break;
         case LEDBAT_CONTROL:
             break;
+        case CLOSE_CONTROL:
+            break;
         default: 
             assert(false);
     }
@@ -63,7 +67,7 @@ tint    Channel::SwitchSendControl (int control_mode) {
 
 tint    Channel::KeepAliveNextSendTime () {
     if (sent_since_recv_>=3 && last_recv_time_<NOW-TINT_MIN)
-        return TINT_NEVER;
+        return SwitchSendControl(CLOSE_CONTROL);
     if (ack_rcvd_recent_)
         return SwitchSendControl(SLOW_START_CONTROL);
     if (data_in_.time!=TINT_NEVER)
@@ -97,7 +101,8 @@ tint    Channel::CwndRateNextSendTime () {
     if (send_interval_>std::max(rtt_avg_,TINT_SEC)*4)
         return SwitchSendControl(KEEP_ALIVE_CONTROL);
     if (data_out_.size()<cwnd_) {
-        dprintf("%s #%u sendctrl next in %llius\n",tintstr(),id_,send_interval_);
+        dprintf("%s #%u sendctrl next in %llius (cwnd %f.2, data_out %i)\n",
+                tintstr(),id_,send_interval_,cwnd_,(int)data_out_.size());
         return last_data_out_time_ + send_interval_;
     } else {
         assert(data_out_.front().time!=TINT_NEVER);
@@ -154,6 +159,8 @@ tint Channel::LedbatNextSendTime () {
     tint queueing_delay = owd_cur - owd_min;
     tint off_target = LEDBAT_TARGET - queueing_delay;
     cwnd_ += LEDBAT_GAIN * off_target / cwnd_;
+    if (cwnd_<1)
+        cwnd_ = 1;
     dprintf("%s #%u sendctrl ledbat %lli-%lli => %3.2f\n",
             tintstr(),id_,owd_cur,owd_min,cwnd_);
     return CwndRateNextSendTime();
index 5312869..278fd71 100644 (file)
@@ -352,27 +352,33 @@ bin64_t Channel::OnData (Datagram& dgram) {
 void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
     
     int max_ack_off = 0;
-    //FIXME do LEDBAT magic somewhere here
     
     if (ackd_pos!=bin64_t::NONE) {
-        for (int i=0; i<8 && i<data_out_.size(); i++) {
+        for (int i=0; i<data_out_.size(); i++) {
             if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
-                tint rtt = NOW-data_out_[i].time;
-                rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
-                dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
-                if (peer_send_time_) {
+                if (peer_send_time_)
+                    for(tbqueue::iterator j=data_out_tmo_.begin(); j!=data_out_tmo_.end(); j++)
+                        if (j->bin==data_out_[i].bin)
+                            peer_send_time_=0; // possibly retransmit
+                if (peer_send_time_) { // well, it is sorta ACK ACK
+                    tint rtt = NOW-data_out_[i].time;
+                    rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
+                    dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+                    assert(data_out_[i].time!=TINT_NEVER);
                     tint owd = peer_send_time_ - data_out_[i].time;
                     owd_cur_bin_ = (owd_cur_bin_+1) & 3;
                     owd_current_[owd_cur_bin_] = owd;
-                    if (owd_min_bin_start_<NOW+TINT_SEC*30) {
+                    if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
                         owd_min_bin_start_ = NOW;
                         owd_min_bin_ = (owd_min_bin_+1) & 3;
                         owd_min_bins_[owd_min_bin_] = TINT_NEVER;
                     }
                     if (owd_min_bins_[owd_min_bin_]>owd)
                         owd_min_bins_[owd_min_bin_] = owd;
+                    peer_send_time_ = 0;
                 }
-                dprintf("%s #%u rtt %lli dev %lli\n",tintstr(),id_,rtt_avg_,dev_avg_);
+                dprintf("%s #%u rtt %lli dev %lli based on %s\n",
+                        tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str());
                 bin64_t pos = data_out_[i].bin;
                 ack_rcvd_recent_++;
                 data_out_[i]=tintbin();
@@ -394,6 +400,7 @@ void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
             }
             while (max_ack_off>MAX_REORDERING) {
                 ack_not_rcvd_recent_++;
+                data_out_tmo_.push_back(data_out_.front().bin);
                 dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
                 data_out_.pop_front();
                 max_ack_off--;
@@ -406,12 +413,16 @@ void    Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
         if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
             ack_not_rcvd_recent_++;
             data_out_cap_ = bin64_t::ALL;
+            data_out_tmo_.push_back(data_out_.front().bin);
             dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
         }
         data_out_.pop_front();
     }
     while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
         data_out_.pop_front();
+    assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
+    while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
+        data_out_tmo_.pop_front();
 
 }
 
@@ -452,8 +463,9 @@ void Channel::OnHandshake (Datagram& dgram) {
     if (!SELF_CONN_OK) {
         uint32_t try_id = DecodeID(peer_channel_id_);
         if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
-            delete this;
-            return;
+            peer_channel_id_ = 0;
+            Close();
+            return; // this is a self-connection
         }
     }
     // FUTURE: channel forking
@@ -485,7 +497,7 @@ Channel*    Channel::RecvDatagram (int socket) {
     Datagram data(socket);
     data.Recv();
     const Address& addr = data.address();
-#define return_log(...) { eprintf(__VA_ARGS__); return NULL; }
+#define return_log(...) { printf(__VA_ARGS__); return NULL; }
     if (data.size()<4) 
         return_log("datagram shorter than 4 bytes %s\n",addr.str());
     uint32_t mych = data.Pull32();
@@ -493,32 +505,35 @@ Channel*    Channel::RecvDatagram (int socket) {
     Channel* channel = NULL;
     if (!mych) { // handshake initiated
         if (data.size()<1+4+1+4+Sha1Hash::SIZE) 
-            return_log ("incorrect size %i initial handshake packet %s\n",data.size(),addr.str());
+            return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
+                        tintstr(),data.size(),addr.str());
         uint8_t hashid = data.Pull8();
         if (hashid!=SWIFT_HASH) 
-            return_log ("no hash in the initial handshake %s\n",addr.str());
+            return_log ("%s #0 no hash in the initial handshake %s\n",
+                        tintstr(),addr.str());
         bin64_t pos = data.Pull32();
         if (pos!=bin64_t::ALL) 
-            return_log ("that is not the root hash %s\n",addr.str());
+            return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
         hash = data.PullHash();
         FileTransfer* file = FileTransfer::Find(hash);
         if (!file) 
-            return_log ("hash %s unknown, no such file %s\n",hash.hex().c_str(),addr.str());
+            return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
         dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
         for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
             if (channels[*i] && channels[*i]->peer_==data.address() && 
                 channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
-                return_log("have a channel already to %s\n",addr.str());
+                return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
         channel = new Channel(file, socket, data.address());
     } else {
         mych = DecodeID(mych);
         if (mych>=channels.size()) 
-            return_log("invalid channel #%u, %s\n",mych,addr.str());
+            return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
         channel = channels[mych];
         if (!channel) 
-            return_log ("channel #%u is already closed\n",mych,addr.str());
+            return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
         if (channel->peer() != addr) 
-            return_log ("invalid peer address #%u %s!=%s\n",mych,channel->peer().str(),addr.str());
+            return_log ("%s #%u invalid peer address %s!=%s\n",
+                        tintstr(),mych,channel->peer().str(),addr.str());
         channel->own_id_mentioned_ = true;
     }
     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
@@ -537,8 +552,8 @@ void    Channel::Loop (tint howlong) {
         Channel* sender(NULL);
         while (!sender && !send_queue.is_empty()) { // dequeue
             tintbin next = send_queue.pop();
-            send_time = next.time;
             sender = channel((int)next.bin);
+            send_time = next.time;
             if (sender && sender->next_send_time_!=send_time &&
                      sender->next_send_time_!=TINT_NEVER )
                 sender = NULL; // it was a stale entry
@@ -546,15 +561,10 @@ void    Channel::Loop (tint howlong) {
         
         if ( sender!=NULL && send_time<=NOW ) { // it's time
             
-            if (sender->next_send_time_<NOW+TINT_MIN) {  // either send
-                dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
-                        tintstr(send_time));
-                sender->Send();
-                sender->Reschedule();
-            } else { // or close the channel
-                dprintf("%s #%u closed sendctrl\n",tintstr(),sender->id());
-                delete sender;
-            }
+            dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
+                    tintstr(send_time));
+            sender->Send();
+            sender->Reschedule();
             
         } else {  // it's too early, wait
             
@@ -576,12 +586,19 @@ void    Channel::Loop (tint howlong) {
 }
 
  
+void Channel::Close () {
+    this->SwitchSendControl(CLOSE_CONTROL);
+}
+
+
 void Channel::Reschedule () {
     next_send_time_ = NextSendTime();
     if (next_send_time_!=TINT_NEVER) {
         assert(next_send_time_<NOW+TINT_MIN);
         send_queue.push(tintbin(next_send_time_,id_));
-    } else
-        send_queue.push(tintbin(NOW+TINT_MIN,id_));
-    dprintf("%s requeue #%u for %s\n",tintstr(),id_,tintstr(next_send_time_));
+        dprintf("%s requeue #%u for %s\n",tintstr(),id_,tintstr(next_send_time_));
+    } else {
+        dprintf("%s #%u closed\n",tintstr(),id_);
+        delete this;
+    }
 }
index ee9f13c..a97c33c 100644 (file)
--- a/swift.cpp
+++ b/swift.cpp
@@ -127,8 +127,7 @@ int      swift::Open (const char* filename, const Sha1Hash& hash) {
 
 
 void    swift::Close (int fd) {
-    // FIXME delete all channels
-    if (fd>FileTransfer::files.size() && FileTransfer::files[fd])
+    if (fd<FileTransfer::files.size() && FileTransfer::files[fd])
         delete FileTransfer::files[fd];
 }
 
diff --git a/swift.h b/swift.h
index d6d14ae..dd74f64 100644 (file)
--- a/swift.h
+++ b/swift.h
@@ -185,7 +185,7 @@ namespace swift {
         /** Channels working for this transfer. */
         binqueue        hs_in_;
         int             hs_in_offset_;
-        std::deque<Address>        pex_in_;
+        std::deque<Address> pex_in_;
 
         /** Messages we are accepting.    */
         uint64_t        cap_out_;
@@ -219,6 +219,7 @@ namespace swift {
          *  @param  expires     (not used currently) when to consider request expired
          *  @return             the bin number to request */
         virtual bin64_t Pick (binmap_t& offered, uint64_t max_width, tint expires) = 0;
+        virtual ~PiecePicker() {}
     };
 
 
@@ -252,7 +253,8 @@ namespace swift {
             PING_PONG_CONTROL,
             SLOW_START_CONTROL,
             AIMD_CONTROL,
-            LEDBAT_CONTROL
+            LEDBAT_CONTROL,
+            CLOSE_CONTROL
         } send_control_t;
         
         static const char* SEND_CONTROL_MODES[];
@@ -263,6 +265,7 @@ namespace swift {
 
         void        Recv (Datagram& dgram);
         void        Send ();
+        void        Close ();
 
         void        OnAck (Datagram& dgram);
         void        OnTs (Datagram& dgram);
@@ -298,6 +301,7 @@ namespace swift {
         static float LEDBAT_GAIN;
         static tint LEDBAT_DELAY_BIN;
         static bool SELF_CONN_OK;
+        static tint MAX_POSSIBLE_RTT;
         
         const std::string id_string () const;
         /** A channel is "established" if had already sent and received packets. */
@@ -306,7 +310,7 @@ namespace swift {
         HashTree&   file () { return transfer_->file(); }
         const Address& peer() const { return peer_; }
         tint ack_timeout () {
-            return rtt_avg_ + std::max(dev_avg_,MIN_DEV)*4;
+            return std::min(30*TINT_SEC,rtt_avg_ + std::max(dev_avg_,MIN_DEV)*4);
         }
         uint32_t    id () const { return id_; }
         
@@ -337,6 +341,8 @@ namespace swift {
         bin64_t     data_in_dbl_;
         /** The history of data sent and still unacknowledged. */
         tbqueue     data_out_;
+        /** Timeouted data (potentially to be retransmitted). */
+        tbqueue     data_out_tmo_;
         bin64_t     data_out_cap_;
         /** Index in the history array. */
         binmap_t        ack_out_;
index 70c7a0e..e769bec 100644 (file)
@@ -50,6 +50,7 @@ FileTransfer::~FileTransfer ()
 {
     Channel::CloseTransfer(this);
     files[fd()] = NULL;
+    delete picker_;
 }