<div id='motto'>Turn the Net into a single data Cloud!</div>
+ <div id='abstract'>
+ <b>Abstract</b>.
+ <i>swift</i> is a multiparty transport protocol; its mission is to
+ disseminate content among a swarm of peers. It is a sort of BitTorrent
+ at the transport layer. <a href="http://bittorrent.org">BitTorrent</a> can't
+ underlie a distributed filesystem or deliver Web pages; while
+ <a href="http://github.com/gritzko/swift/raw/master/doc/swift-protocol.txt">swift</a> can.
+ <a href="http://github.com/gritzko/swift">libswift</a>
+ is 4000 lines of cross-platform C++ code
+ licensed under LGPL; it runs on misc Unices, Mac OS X and Windows;
+ it uses UDP with <a href="http://tools.ietf.org/wg/ledbat/">LEDBAT</a> congestion control.
+ So far our speed record is mere 400Mbps, but we are working on that.
+ The library is delivered as a part of <a href="http://p2p-next.org">P2P-Next</a>,
+ funded by <a href="http://cordis.europa.eu/fp7/dc/index.cfm">EU FP7</a>.
+ </div>
+
<div> <h2>Ideas</h2>
<p>As <a href="http://en.wikipedia.org/wiki/Content-centric_networking">
wise people say</a>, the Internet was initially built for
transport layer</a>. Ultimately, it aims at the abstraction of the Internet
as a single big data <a href="http://en.wikipedia.org/wiki/Cloud_computing">
Cloud</a>. Such entities as storage, servers and connections are abstracted
- away and are virtually invisible at the API layer. The data is received
+ away and are virtually invisible at the API layer. Given a hash,
+ the data is received
from whatever source available and data integrity is checked
cryptographically with <a href="http://en.wikipedia.org/wiki/Hash_tree">
Merkle hash trees</a>.</p>
tint Channel::LEDBAT_TARGET = TINT_MSEC*25;
float Channel::LEDBAT_GAIN = 1.0/LEDBAT_TARGET;
tint Channel::LEDBAT_DELAY_BIN = TINT_SEC*30;
+tint Channel::MAX_POSSIBLE_RTT = TINT_SEC*10;
const char* Channel::SEND_CONTROL_MODES[] = {"keepalive", "pingpong",
"slowstart", "standard_aimd", "ledbat"};
case SLOW_START_CONTROL: return SlowStartNextSendTime();
case AIMD_CONTROL: return AimdNextSendTime();
case LEDBAT_CONTROL: return LedbatNextSendTime();
+ case CLOSE_CONTROL: return TINT_NEVER;
default: assert(false);
}
}
break;
case LEDBAT_CONTROL:
break;
+ case CLOSE_CONTROL:
+ break;
default:
assert(false);
}
tint Channel::KeepAliveNextSendTime () {
if (sent_since_recv_>=3 && last_recv_time_<NOW-TINT_MIN)
- return TINT_NEVER;
+ return SwitchSendControl(CLOSE_CONTROL);
if (ack_rcvd_recent_)
return SwitchSendControl(SLOW_START_CONTROL);
if (data_in_.time!=TINT_NEVER)
if (send_interval_>std::max(rtt_avg_,TINT_SEC)*4)
return SwitchSendControl(KEEP_ALIVE_CONTROL);
if (data_out_.size()<cwnd_) {
- dprintf("%s #%u sendctrl next in %llius\n",tintstr(),id_,send_interval_);
+ dprintf("%s #%u sendctrl next in %llius (cwnd %f.2, data_out %i)\n",
+ tintstr(),id_,send_interval_,cwnd_,(int)data_out_.size());
return last_data_out_time_ + send_interval_;
} else {
assert(data_out_.front().time!=TINT_NEVER);
tint queueing_delay = owd_cur - owd_min;
tint off_target = LEDBAT_TARGET - queueing_delay;
cwnd_ += LEDBAT_GAIN * off_target / cwnd_;
+ if (cwnd_<1)
+ cwnd_ = 1;
dprintf("%s #%u sendctrl ledbat %lli-%lli => %3.2f\n",
tintstr(),id_,owd_cur,owd_min,cwnd_);
return CwndRateNextSendTime();
void Channel::CleanDataOut (bin64_t ackd_pos) { // TODO: isn't it too long?
int max_ack_off = 0;
- //FIXME do LEDBAT magic somewhere here
if (ackd_pos!=bin64_t::NONE) {
- for (int i=0; i<8 && i<data_out_.size(); i++) {
+ for (int i=0; i<data_out_.size(); i++) {
if (data_out_[i]!=tintbin() && data_out_[i].bin.within(ackd_pos)) {
- tint rtt = NOW-data_out_[i].time;
- rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
- dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
- if (peer_send_time_) {
+ if (peer_send_time_)
+ for(tbqueue::iterator j=data_out_tmo_.begin(); j!=data_out_tmo_.end(); j++)
+ if (j->bin==data_out_[i].bin)
+ peer_send_time_=0; // possibly retransmit
+ if (peer_send_time_) { // well, it is sorta ACK ACK
+ tint rtt = NOW-data_out_[i].time;
+ rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
+ dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+ assert(data_out_[i].time!=TINT_NEVER);
tint owd = peer_send_time_ - data_out_[i].time;
owd_cur_bin_ = (owd_cur_bin_+1) & 3;
owd_current_[owd_cur_bin_] = owd;
- if (owd_min_bin_start_<NOW+TINT_SEC*30) {
+ if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
owd_min_bin_start_ = NOW;
owd_min_bin_ = (owd_min_bin_+1) & 3;
owd_min_bins_[owd_min_bin_] = TINT_NEVER;
}
if (owd_min_bins_[owd_min_bin_]>owd)
owd_min_bins_[owd_min_bin_] = owd;
+ peer_send_time_ = 0;
}
- dprintf("%s #%u rtt %lli dev %lli\n",tintstr(),id_,rtt_avg_,dev_avg_);
+ dprintf("%s #%u rtt %lli dev %lli based on %s\n",
+ tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str());
bin64_t pos = data_out_[i].bin;
ack_rcvd_recent_++;
data_out_[i]=tintbin();
}
while (max_ack_off>MAX_REORDERING) {
ack_not_rcvd_recent_++;
+ data_out_tmo_.push_back(data_out_.front().bin);
dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
data_out_.pop_front();
max_ack_off--;
if (data_out_.front().bin!=bin64_t::NONE && ack_in_.is_empty(data_out_.front().bin)) {
ack_not_rcvd_recent_++;
data_out_cap_ = bin64_t::ALL;
+ data_out_tmo_.push_back(data_out_.front().bin);
dprintf("%s #%u Tdata %s\n",tintstr(),id_,data_out_.front().bin.str());
}
data_out_.pop_front();
}
while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE)
data_out_.pop_front();
+ assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);
+ while (!data_out_tmo_.empty() && data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)
+ data_out_tmo_.pop_front();
}
if (!SELF_CONN_OK) {
uint32_t try_id = DecodeID(peer_channel_id_);
if (channel(try_id) && !channel(try_id)->peer_channel_id_) {
- delete this;
- return;
+ peer_channel_id_ = 0;
+ Close();
+ return; // this is a self-connection
}
}
// FUTURE: channel forking
Datagram data(socket);
data.Recv();
const Address& addr = data.address();
-#define return_log(...) { eprintf(__VA_ARGS__); return NULL; }
+#define return_log(...) { printf(__VA_ARGS__); return NULL; }
if (data.size()<4)
return_log("datagram shorter than 4 bytes %s\n",addr.str());
uint32_t mych = data.Pull32();
Channel* channel = NULL;
if (!mych) { // handshake initiated
if (data.size()<1+4+1+4+Sha1Hash::SIZE)
- return_log ("incorrect size %i initial handshake packet %s\n",data.size(),addr.str());
+ return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
+ tintstr(),data.size(),addr.str());
uint8_t hashid = data.Pull8();
if (hashid!=SWIFT_HASH)
- return_log ("no hash in the initial handshake %s\n",addr.str());
+ return_log ("%s #0 no hash in the initial handshake %s\n",
+ tintstr(),addr.str());
bin64_t pos = data.Pull32();
if (pos!=bin64_t::ALL)
- return_log ("that is not the root hash %s\n",addr.str());
+ return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str());
hash = data.PullHash();
FileTransfer* file = FileTransfer::Find(hash);
if (!file)
- return_log ("hash %s unknown, no such file %s\n",hash.hex().c_str(),addr.str());
+ return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str());
dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str());
for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
if (channels[*i] && channels[*i]->peer_==data.address() &&
channels[*i]->last_recv_time_>NOW-TINT_SEC*2)
- return_log("have a channel already to %s\n",addr.str());
+ return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str());
channel = new Channel(file, socket, data.address());
} else {
mych = DecodeID(mych);
if (mych>=channels.size())
- return_log("invalid channel #%u, %s\n",mych,addr.str());
+ return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str());
channel = channels[mych];
if (!channel)
- return_log ("channel #%u is already closed\n",mych,addr.str());
+ return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str());
if (channel->peer() != addr)
- return_log ("invalid peer address #%u %s!=%s\n",mych,channel->peer().str(),addr.str());
+ return_log ("%s #%u invalid peer address %s!=%s\n",
+ tintstr(),mych,channel->peer().str(),addr.str());
channel->own_id_mentioned_ = true;
}
//dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
Channel* sender(NULL);
while (!sender && !send_queue.is_empty()) { // dequeue
tintbin next = send_queue.pop();
- send_time = next.time;
sender = channel((int)next.bin);
+ send_time = next.time;
if (sender && sender->next_send_time_!=send_time &&
sender->next_send_time_!=TINT_NEVER )
sender = NULL; // it was a stale entry
if ( sender!=NULL && send_time<=NOW ) { // it's time
- if (sender->next_send_time_<NOW+TINT_MIN) { // either send
- dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
- tintstr(send_time));
- sender->Send();
- sender->Reschedule();
- } else { // or close the channel
- dprintf("%s #%u closed sendctrl\n",tintstr(),sender->id());
- delete sender;
- }
+ dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
+ tintstr(send_time));
+ sender->Send();
+ sender->Reschedule();
} else { // it's too early, wait
}
+void Channel::Close () {
+ this->SwitchSendControl(CLOSE_CONTROL);
+}
+
+
void Channel::Reschedule () {
next_send_time_ = NextSendTime();
if (next_send_time_!=TINT_NEVER) {
assert(next_send_time_<NOW+TINT_MIN);
send_queue.push(tintbin(next_send_time_,id_));
- } else
- send_queue.push(tintbin(NOW+TINT_MIN,id_));
- dprintf("%s requeue #%u for %s\n",tintstr(),id_,tintstr(next_send_time_));
+ dprintf("%s requeue #%u for %s\n",tintstr(),id_,tintstr(next_send_time_));
+ } else {
+ dprintf("%s #%u closed\n",tintstr(),id_);
+ delete this;
+ }
}
/** Channels working for this transfer. */
binqueue hs_in_;
int hs_in_offset_;
- std::deque<Address> pex_in_;
+ std::deque<Address> pex_in_;
/** Messages we are accepting. */
uint64_t cap_out_;
* @param expires (not used currently) when to consider request expired
* @return the bin number to request */
virtual bin64_t Pick (binmap_t& offered, uint64_t max_width, tint expires) = 0;
+ virtual ~PiecePicker() {}
};
PING_PONG_CONTROL,
SLOW_START_CONTROL,
AIMD_CONTROL,
- LEDBAT_CONTROL
+ LEDBAT_CONTROL,
+ CLOSE_CONTROL
} send_control_t;
static const char* SEND_CONTROL_MODES[];
void Recv (Datagram& dgram);
void Send ();
+ void Close ();
void OnAck (Datagram& dgram);
void OnTs (Datagram& dgram);
static float LEDBAT_GAIN;
static tint LEDBAT_DELAY_BIN;
static bool SELF_CONN_OK;
+ static tint MAX_POSSIBLE_RTT;
const std::string id_string () const;
/** A channel is "established" if had already sent and received packets. */
HashTree& file () { return transfer_->file(); }
const Address& peer() const { return peer_; }
tint ack_timeout () {
- return rtt_avg_ + std::max(dev_avg_,MIN_DEV)*4;
+ return std::min(30*TINT_SEC,rtt_avg_ + std::max(dev_avg_,MIN_DEV)*4);
}
uint32_t id () const { return id_; }
bin64_t data_in_dbl_;
/** The history of data sent and still unacknowledged. */
tbqueue data_out_;
+ /** Timeouted data (potentially to be retransmitted). */
+ tbqueue data_out_tmo_;
bin64_t data_out_cap_;
/** Index in the history array. */
binmap_t ack_out_;