it runs as a seeder, leecher or hasher, depending on arguments.
Added arguments for logging, waiting, etc etc
target = 'swift'
source = [ 'bin64.cpp','sha1.cpp','hashtree.cpp','datagram.cpp','bins.cpp',
- 'transfer.cpp', 'swift.cpp', 'sendrecv.cpp', 'send_control.cpp',
+ 'transfer.cpp', 'channel.cpp', 'sendrecv.cpp', 'send_control.cpp',
'compat/hirestimeofday.cpp', 'compat.cpp', 'compat/util.cpp']
env = Environment()
--- /dev/null
+/*
+ * swift.cpp
+ * serp++
+ *
+ * Created by Victor Grishchenko on 3/6/09.
+ * Copyright 2009 Delft University of Technology. All rights reserved.
+ *
+ */
+
+#include <stdlib.h>
+#include <fcntl.h>
+#ifndef _WIN32
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/mman.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#endif
+#include <sys/stat.h>
+#include <string.h>
+
+//#include <glog/logging.h>
+#include "swift.h"
+#include "datagram.h"
+
+using namespace std;
+using namespace swift;
+
+swift::tint Channel::last_tick = 0;
+int Channel::MAX_REORDERING = 4;
+bool Channel::SELF_CONN_OK = false;
+swift::tint Channel::TIMEOUT = TINT_SEC*60;
+std::vector<Channel*> Channel::channels(1);
+SOCKET Channel::sockets[8] = {0,0,0,0,0,0,0,0};
+int Channel::socket_count = 0;
+Address Channel::tracker;
+tbheap Channel::send_queue;
+FILE* Channel::debug_file = NULL;
+#include "ext/simple_selector.cpp"
+PeerSelector* Channel::peer_selector = new SimpleSelector();
+
+Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) :
+ transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
+ socket_(socket==-1?sockets[0]:socket), // FIXME
+ data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0),
+ own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
+ last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
+ data_in_dbl_(bin64_t::NONE), hint_out_size_(0),
+ cwnd_(1), send_interval_(TINT_SEC), send_control_(PING_PONG_CONTROL),
+ sent_since_recv_(0), ack_rcvd_recent_(0), ack_not_rcvd_recent_(0),
+ last_loss_time_(0), owd_min_bin_(0), owd_min_bin_start_(NOW),
+ owd_cur_bin_(0), dgrams_sent_(0), dgrams_rcvd_(0),
+ data_in_(TINT_NEVER,bin64_t::NONE)
+{
+ if (peer_==Address())
+ peer_ = tracker;
+ this->id_ = channels.size();
+ channels.push_back(this);
+ transfer_->hs_in_.push_back(id_);
+ for(int i=0; i<4; i++) {
+ owd_min_bins_[i] = TINT_NEVER;
+ owd_current_[i] = TINT_NEVER;
+ }
+ Reschedule();
+ dprintf("%s #%u init %s\n",tintstr(),id_,peer_.str());
+}
+
+
+Channel::~Channel () {
+ channels[id_] = NULL;
+}
+
+
+void swift::SetTracker(const Address& tracker) {
+ Channel::tracker = tracker;
+}
+
+
+int Channel::DecodeID(int scrambled) {
+ return scrambled ^ (int)Datagram::start;
+}
+int Channel::EncodeID(int unscrambled) {
+ return unscrambled ^ (int)Datagram::start;
+}
+
+
+int swift::Listen (Address addr) {
+ int sock = Datagram::Bind(addr);
+ if (sock!=INVALID_SOCKET)
+ Channel::sockets[Channel::socket_count++] = sock;
+ return sock;
+}
+
+
+void swift::Shutdown (int sock_des) {
+ for(int i=0; i<Channel::socket_count; i++)
+ if (sock_des==-1 || Channel::sockets[i]==sock_des) {
+ Datagram::Close(Channel::sockets[i]);
+ Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
+ }
+}
+
+
+void swift::Loop (tint till) {
+ Channel::Loop(till);
+}
+
+
+int swift::Open (const char* filename, const Sha1Hash& hash) {
+ FileTransfer* ft = new FileTransfer(filename, hash);
+ if (ft && ft->file().file_descriptor()) {
+
+ /*if (FileTransfer::files.size()<fdes) // FIXME duplication
+ FileTransfer::files.resize(fdes);
+ FileTransfer::files[fdes] = ft;*/
+
+ // initiate tracker connections
+ if (Channel::tracker!=Address())
+ new Channel(ft);
+
+ return ft->file().file_descriptor();
+ } else {
+ if (ft)
+ delete ft;
+ return -1;
+ }
+}
+
+
+void swift::Close (int fd) {
+ if (fd<FileTransfer::files.size() && FileTransfer::files[fd])
+ delete FileTransfer::files[fd];
+}
+
+
+void swift::AddPeer (Address address, const Sha1Hash& root) {
+ Channel::peer_selector->AddPeer(address,root);
+}
+
+
+uint64_t swift::Size (int fdes) {
+ if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+ return FileTransfer::files[fdes]->file().size();
+ else
+ return 0;
+}
+
+
+bool swift::IsComplete (int fdes) {
+ if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+ return FileTransfer::files[fdes]->file().is_complete();
+ else
+ return 0;
+}
+
+
+uint64_t swift::Complete (int fdes) {
+ if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+ return FileTransfer::files[fdes]->file().complete();
+ else
+ return 0;
+}
+
+
+uint64_t swift::SeqComplete (int fdes) {
+ if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
+ return FileTransfer::files[fdes]->file().seq_complete();
+ else
+ return 0;
+}
+
+
+const Sha1Hash& swift::RootMerkleHash (int file) {
+ FileTransfer* trans = FileTransfer::file(file);
+ if (!trans)
+ return Sha1Hash::ZERO;
+ return trans->file().root_hash();
+}
+
+
+/** <h2> swift handshake </h2>
+ Basic rules:
+ <ul>
+ <li> to send a datagram, a channel must be created
+ (channels are cheap and easily recycled)
+ <li> a datagram must contain either the receiving
+ channel id (scrambled) or the root hash
+ </ul>
+ <b>Note:</b>
+ */
#define TINT_SEC ((tint)1000000)
#define TINT_MSEC ((tint)1000)
#define TINT_uSEC ((tint)1)
-#define TINT_NEVER ((tint)0x7fffffffffffffffLL)
+#define TINT_NEVER ((tint)0x3fffffffffffffffLL)
size_t file_size (int fd);
}
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
#endif
- dprintf("socket buffers: %i send %i recv\n",sndbuf,rcvbuf);
if (::bind(fd, (sockaddr*)&addr, len) != 0) {
print_error("bind fails");
return INVALID_SOCKET;
const char* tintstr(tint t=0);
std::string sock2str (struct sockaddr_in addr);
-#ifdef DEBUG
-#define dprintf(...) printf(__VA_ARGS__)
-#else
-#define dprintf(...) {}
-#endif
-#define eprintf(...) fprintf(stderr,__VA_ARGS__)
-//#define dprintf(...) {}
}
//uint32_t fp = for_root.fingerprint();
for(peer_queue_t::iterator i=peers.begin(); i!=peers.end(); i++)
if (i->second==for_root) {
- i->second = 0;
+ i->second = Sha1Hash::ZERO; // horror TODO rewrite
sockaddr_in ret = i->first;
- while (peers.begin()->second==0)
+ while (peers.begin()->second==Sha1Hash::ZERO)
peers.pop_front();
return ret;
}
int val;
for(int i=0; i<SIZE; i++) {
strncpy(hx,hash+i*2,2);
- sscanf(hx, "%x", &val);
+ if (sscanf(hx, "%x", &val)!=1) {
+ memset(bits,0,20);
+ return;
+ }
bits[i] = val;
}
assert(this->hex()==std::string(hash));
p = p.parent();
c--;
}
- //dprintf("p %lli %s\n",(uint64_t)p,hash.hex().c_str());
}
return hash;
}
tr.bytes {
background: #fed;
}
+
+img.thumb {
+ width: 150pt;
+}
+
+table#main tr td {
+ vertical-align: top;
+}
if (data_out_.size()<cwnd_ && last_data_out_time_<=NOW-send_interval_) {
tosend = DequeueHint();
if (tosend==bin64_t::NONE) {
- dprintf("%s #%u no idea what to send #sendctrl\n",tintstr(),id_);
+ dprintf("%s #%u sendctrl no idea what to send\n",tintstr(),id_);
if (send_control_!=KEEP_ALIVE_CONTROL)
SwitchSendControl(KEEP_ALIVE_CONTROL);
}
} else
- dprintf("%s #%u no cwnd #sendctrl\n",tintstr(),id_);
+ dprintf("%s #%u sendctrl no cwnd\n",tintstr(),id_);
if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
rtt_avg_ = NOW - last_send_time_;
dev_avg_ = rtt_avg_;
dip_avg_ = rtt_avg_;
- dprintf("%s #%u rtt init %lli\n",tintstr(),id_,rtt_avg_);
+ dprintf("%s #%u sendctrl rtt init %lli\n",tintstr(),id_,rtt_avg_);
}
bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
while (dgram.size()) {
owd_min_bins_[owd_min_bin_] = owd;
peer_send_time_ = 0;
}
- dprintf("%s #%u rtt %lli dev %lli based on %s\n",
+ dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str());
bin64_t pos = data_out_[i].bin;
ack_rcvd_recent_++;
} else { // it's too early, wait
tint towait = min(limit,send_time) - NOW;
- dprintf("%s waiting %lliusec\n",tintstr(),towait);
+ dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
int rd = Datagram::Wait(socket_count,sockets,towait);
if (rd!=INVALID_SOCKET) { // in meantime, received something
Channel* receiver = RecvDatagram(rd);
if (next_send_time_!=TINT_NEVER) {
assert(next_send_time_<NOW+TINT_MIN);
send_queue.push(tintbin(next_send_time_,id_));
- dprintf("%s requeue #%u for %s\n",tintstr(),id_,tintstr(next_send_time_));
+ dprintf("%s #%u requeue for %s\n",tintstr(),id_,tintstr(next_send_time_));
} else {
dprintf("%s #%u closed\n",tintstr(),id_);
delete this;
/*
* swift.cpp
- * serp++
+ * swift the multiparty transport protocol
*
- * Created by Victor Grishchenko on 3/6/09.
- * Copyright 2009 Delft University of Technology. All rights reserved.
+ * Created by Victor Grishchenko on 2/15/10.
+ * Copyright 2010 Delft University of Technology. All rights reserved.
*
*/
-
+#include <stdio.h>
#include <stdlib.h>
-#include <fcntl.h>
-#ifndef _WIN32
-#include <sys/select.h>
-#include <sys/time.h>
-#include <sys/mman.h>
-#include <arpa/inet.h>
-#include <unistd.h>
-#endif
-#include <sys/stat.h>
-#include <string.h>
-
-//#include <glog/logging.h>
+#include <getopt.h>
#include "swift.h"
-#include "datagram.h"
-using namespace std;
using namespace swift;
-swift::tint Channel::last_tick = 0;
-int Channel::MAX_REORDERING = 4;
-bool Channel::SELF_CONN_OK = false;
-swift::tint Channel::TIMEOUT = TINT_SEC*60;
-std::vector<Channel*> Channel::channels(1);
-SOCKET Channel::sockets[8] = {0,0,0,0,0,0,0,0};
-int Channel::socket_count = 0;
-Address Channel::tracker;
-tbheap Channel::send_queue;
-#include "ext/simple_selector.cpp"
-PeerSelector* Channel::peer_selector = new SimpleSelector();
-
-Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) :
- transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
- socket_(socket==-1?sockets[0]:socket), // FIXME
- data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0),
- own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
- last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
- data_in_dbl_(bin64_t::NONE), hint_out_size_(0),
- cwnd_(1), send_interval_(TINT_SEC), send_control_(PING_PONG_CONTROL),
- sent_since_recv_(0), ack_rcvd_recent_(0), ack_not_rcvd_recent_(0),
- last_loss_time_(0), owd_min_bin_(0), owd_min_bin_start_(NOW),
- owd_cur_bin_(0), dgrams_sent_(0), dgrams_rcvd_(0),
- data_in_(TINT_NEVER,bin64_t::NONE)
-{
- if (peer_==Address())
- peer_ = tracker;
- this->id_ = channels.size();
- channels.push_back(this);
- transfer_->hs_in_.push_back(id_);
- for(int i=0; i<4; i++) {
- owd_min_bins_[i] = TINT_NEVER;
- owd_current_[i] = TINT_NEVER;
- }
- Reschedule();
- dprintf("%s #%u init %s\n",tintstr(),id_,peer_.str());
-}
-
-
-Channel::~Channel () {
- channels[id_] = NULL;
-}
-
-
-void swift::SetTracker(const Address& tracker) {
- Channel::tracker = tracker;
-}
-
-
-int Channel::DecodeID(int scrambled) {
- return scrambled ^ (int)Datagram::start;
-}
-int Channel::EncodeID(int unscrambled) {
- return unscrambled ^ (int)Datagram::start;
-}
-
-
-int swift::Listen (Address addr) {
- int sock = Datagram::Bind(addr);
- if (sock!=INVALID_SOCKET)
- Channel::sockets[Channel::socket_count++] = sock;
- return sock;
-}
-
-
-void swift::Shutdown (int sock_des) {
- for(int i=0; i<Channel::socket_count; i++)
- if (sock_des==-1 || Channel::sockets[i]==sock_des) {
- Datagram::Close(Channel::sockets[i]);
- Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
+#define quit(...) {fprintf(stderr,__VA_ARGS__); exit(1); }
+
+
+int main (int argc, char** argv) {
+
+ static struct option long_options[] =
+ {
+ {"hash", required_argument, 0, 'h'},
+ {"file", required_argument, 0, 'f'},
+ {"daemon", no_argument, 0, 'd'},
+ {"listen", required_argument, 0, 'l'},
+ {"tracker", required_argument, 0, 't'},
+ {"debug", no_argument, 0, 'D'},
+ {"progress",no_argument, 0, 'p'},
+ {"wait", optional_argument, 0, 'w'},
+ {0, 0, 0, 0}
+ };
+
+ Sha1Hash root_hash;
+ char* filename = 0;
+ bool daemonize = false, report_progress = false;
+ Address bindaddr;
+ Address tracker;
+ tint wait_time = 0;
+
+ int c;
+ while ( -1 != (c = getopt_long (argc, argv, ":h:f:dl:t:Dpw::", long_options, 0)) ) {
+
+ switch (c) {
+ case 'h':
+ if (strlen(optarg)!=40)
+ quit("SHA1 hash must be 40 hex symbols\n");
+ root_hash = Sha1Hash(optarg);
+ if (root_hash==Sha1Hash::ZERO)
+ quit("SHA1 hash must be 40 hex symbols\n");
+ break;
+ case 'f':
+ filename = strdup(optarg);
+ break;
+ case 'd':
+ daemonize = true;
+ break;
+ case 'l':
+ bindaddr = Address(optarg);
+ if (bindaddr==Address())
+ quit("address must be hostname:port, ip:port or just port\n");
+ break;
+ case 't':
+ tracker = Address(optarg);
+ if (tracker==Address())
+ quit("address must be hostname:port, ip:port or just port\n");
+ break;
+ case 'D':
+ Channel::debug_file = optarg ? fopen(optarg,"a") : stdout;
+ break;
+ case 'p':
+ report_progress = true;
+ break;
+ case 'w':
+ wait_time = TINT_NEVER;
+ if (optarg) {
+ char unit = 'u';
+ if (sscanf(optarg,"%lli%c",&wait_time,&unit)!=2)
+ quit("time format: 1234[umsMHD], e.g. 1M = one minute\n");
+ switch (unit) {
+ case 'D': wait_time *= 24;
+ case 'H': wait_time *= 60;
+ case 'M': wait_time *= 60;
+ case 's': wait_time *= 1000;
+ case 'm': wait_time *= 1000;
+ case 'u': break;
+ default: quit("time format: 1234[umsMHD], e.g. 1D = one day\n");
+ }
+ }
+ break;
}
-}
-
-
-void swift::Loop (tint till) {
- Channel::Loop(till);
-}
-
-
-int swift::Open (const char* filename, const Sha1Hash& hash) {
- FileTransfer* ft = new FileTransfer(filename, hash);
- if (ft && ft->file().file_descriptor()) {
-
- /*if (FileTransfer::files.size()<fdes) // FIXME duplication
- FileTransfer::files.resize(fdes);
- FileTransfer::files[fdes] = ft;*/
- // initiate tracker connections
- if (Channel::tracker!=Address())
- new Channel(ft);
-
- return ft->file().file_descriptor();
+ } // arguments parsed
+
+ LibraryInit();
+
+ int file = Open(filename,root_hash);
+ // FIXME open err
+ printf("Root hash: %s\n", RootMerkleHash(file).hex().c_str());
+
+ if (root_hash==Sha1Hash() && bindaddr==Address())
+ exit(0);
+
+ if (bindaddr!=Address()) { // seeding
+ if (Listen(bindaddr)<=0)
+ quit("cant listen to %s\n",bindaddr.str())
+ if (wait_time==0)
+ wait_time=TINT_NEVER;
} else {
- if (ft)
- delete ft;
- return -1;
+ int base = rand()%10000, i;
+ for (i=0; i<100 && Listen(Address(INADDR_ANY,i*7+base))<=0; i++);
+ if (i==100)
+ quit("cant listen to a port\n");
}
+
+
+ if (tracker!=Address())
+ SetTracker(tracker);
+
+ tint start_time = NOW;
+ tint end_time = TINT_NEVER;
+
+ while (NOW<end_time+wait_time){
+ if (end_time==TINT_NEVER && IsComplete(file))
+ end_time = NOW;
+ // and yes, I add up infinities and go away with that
+ tint towait = (end_time+wait_time)-NOW;
+ Loop(std::min(TINT_SEC,towait));
+ if (report_progress) {
+ fprintf(stderr,
+ "%s %lli of %lli (seq %lli) %lli dgram %lli bytes up, "\
+ "%lli dgram %lli bytes down\n",
+ IsComplete(file) ? "DONE" : "done",
+ Complete(file), Size(file), SeqComplete(file),
+ Datagram::dgrams_up, Datagram::bytes_up,
+ Datagram::dgrams_down, Datagram::bytes_down );
+ }
+ }
+
+ Close(file);
+
+ if (Channel::debug_file)
+ fclose(Channel::debug_file);
+
+ Shutdown();
+
+ return 0;
+
}
-
-void swift::Close (int fd) {
- if (fd<FileTransfer::files.size() && FileTransfer::files[fd])
- delete FileTransfer::files[fd];
-}
-
-
-void swift::AddPeer (Address address, const Sha1Hash& root) {
- Channel::peer_selector->AddPeer(address,root);
-}
-
-
-uint64_t swift::Size (int fdes) {
- if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
- return FileTransfer::files[fdes]->file().size();
- else
- return 0;
-}
-
-
-bool swift::IsComplete (int fdes) {
- if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
- return FileTransfer::files[fdes]->file().is_complete();
- else
- return 0;
-}
-
-
-uint64_t swift::Complete (int fdes) {
- if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
- return FileTransfer::files[fdes]->file().complete();
- else
- return 0;
-}
-
-
-uint64_t swift::SeqComplete (int fdes) {
- if (FileTransfer::files.size()>fdes && FileTransfer::files[fdes])
- return FileTransfer::files[fdes]->file().seq_complete();
- else
- return 0;
-}
-
-
-const Sha1Hash& swift::RootMerkleHash (int file) {
- FileTransfer* trans = FileTransfer::file(file);
- if (!trans)
- return Sha1Hash::ZERO;
- return trans->file().root_hash();
-}
-
-
-/** <h2> swift handshake </h2>
- Basic rules:
- <ul>
- <li> to send a datagram, a channel must be created
- (channels are cheap and easily recycled)
- <li> a datagram must contain either the receiving
- channel id (scrambled) or the root hash
- </ul>
- <b>Note:</b>
- */
static tint LEDBAT_DELAY_BIN;
static bool SELF_CONN_OK;
static tint MAX_POSSIBLE_RTT;
+ static FILE* debug_file;
const std::string id_string () const;
/** A channel is "established" if had already sent and received packets. */
} // namespace end
+#ifndef SWIFT_MUTE
+#define dprintf(...) { if (Channel::debug_file) fprintf(Channel::debug_file,__VA_ARGS__); }
+#else
+#define dprintf(...) {}
+#endif
+#define eprintf(...) fprintf(stderr,__VA_ARGS__)
#endif