-CPPFLAGS=-O2 -I.
+CPPFLAGS=-g -I.
all: swift
-swift: swift.o sha1.o compat.o sendrecv.o send_control.o hashtree.o bin64.o bins.o channel.o datagram.o transfer.o
+swift: swift.o sha1.o compat.o sendrecv.o send_control.o hashtree.o bin64.o bins.o channel.o datagram.o transfer.o httpgw.o
g++ -I. *.o -o swift
else
Channel::socket_count++;
Channel::sockets[i]=cb;
+ if (!cb.may_read && !cb.may_write && !cb.on_error)
+ Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
return true;
}
#define S_IROTH _S_IREAD
#endif
+#ifdef _WIN32
+#define setsockoptptr_t (char*)
+#else
+#define setsockoptptr_t void*
+#endif
+
+
namespace swift {
/** tint is the time integer type; microsecond-precise. */
#define dbnd_ensure(x) { if (!(x)) { print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 );
dbnd_ensure( make_socket_nonblocking(fd) ); // FIXME may remove this
-#ifdef _WIN32
-#define parptype (char*)
-#else
-#define parptype void*
-#endif
int enable = true;
- dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (parptype)&sndbuf, sizeof(int)) == 0 );
- dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (parptype)&rcvbuf, sizeof(int)) == 0 );
- setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (parptype)&enable, sizeof(int));
+ dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
+ dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 );
return fd;
}
HTTPGW_RANGE=0,
HTTPGW_MAX_HEADER=1
};
-char * HTTPGW_HEADERS[HTTPGW_MAX_HEADER] = {
+const char * HTTPGW_HEADERS[HTTPGW_MAX_HEADER] = {
"Content-Range"
};
struct http_gw_t {
+ int id;
uint64_t offset;
uint64_t tosend;
int transfer;
int http_gw_reqs_open = 0;
+int http_gw_reqs_count = 0;
void HttpGwNewRequestCallback (SOCKET http_conn);
void HttpGwNewRequestCallback (SOCKET http_conn);
free(req->headers[i]);
req->headers[i] = NULL;
}
- *req = http_requests[http_gw_reqs_open--];
+ *req = http_requests[--http_gw_reqs_open];
}
swift::close_socket(sock);
+ swift::Listen3rdPartySocket(socket_callbacks_t(sock));
}
if (complete>req->offset) { // send data
char buf[1<<12];
uint64_t tosend = std::min((uint64_t)1<<12,complete-req->offset);
- size_t rd = read(req->transfer,buf,tosend); // hope it is cached
+ size_t rd = pread(req->transfer,buf,tosend,req->offset); // hope it is cached
if (rd<0) {
HttpGwCloseConnection(sink);
return;
void HttpGwFirstProgressCallback (int transfer, bin64_t bin) {
- printf("200 OK\r\n");
- printf("Content-Length: value\r\n");
+ if (bin!=bin64_t(0,0)) // need the first packet
+ return;
swift::RemoveProgressCallback(transfer,&HttpGwFirstProgressCallback);
swift::AddProgressCallback(transfer,&HttpGwSwiftProgressCallback);
+ for (int httpc=0; httpc<http_gw_reqs_open; httpc++) {
+ http_gw_t * req = http_requests + httpc;
+ if (req->transfer==transfer) {
+ uint64_t file_size = swift::Size(transfer);
+ char response[1024];
+ sprintf(response,
+ "HTTP/1.1 200 OK\r\n"\
+ "Connection: close\r\n"\
+ "Content-Type: text/plain; charset=iso-8859-1\r\n"\
+ "Content-Length: %lli\r\n"\
+ "\r\n",
+ file_size);
+ send(req->sink,response,strlen(response),0);
+ req->tosend = file_size;
+ }
+ }
HttpGwSwiftProgressCallback(transfer,bin);
}
void HttpGwNewRequestCallback (SOCKET http_conn){
http_gw_t* req = http_requests + http_gw_reqs_open++;
+ req->id = ++http_gw_reqs_count;
req->sink = http_conn;
+ req->offset = 0;
+ req->tosend = 0;
// read headers - the thrilling part
// we surely do not support pipelining => one request at a time
#define HTTPGW_MAX_REQ_SIZE 1024
// HTTP request line
char* reqline = strtok(buf,"\r\n");
char method[16], url[512], version[16], crlf[5];
- if (4!=sscanf(reqline,"%16s %512s %16s%4[\n\r]",method,url,version,crlf)) {
+ if (3!=sscanf(reqline,"%16s %512s %16s",method,url,version)) {
HttpGwCloseConnection(http_conn);
return;
}
char* headerline;
while (headerline=strtok(NULL,"\n\r")) {
char header[128], value[256];
- if (3!=sscanf(headerline,"%120[^: \r\n]: %250[^\r\n]%4[\r\n]",header,value,crlf)) {
+ if (2!=sscanf(headerline,"%120[^: ]: %250[^\r\n]",header,value)) {
HttpGwCloseConnection(http_conn);
return;
}
return;
}
// initiate transmission
- int file = swift::Open(hash,hash);
- // find/create transfer
- swift::AddProgressCallback(file,&HttpGwFirstProgressCallback);
- // write response header
- req->offset = 0;
- req->tosend = 10000;
+ int file = swift::Open(hash,Sha1Hash(true,hash));
req->transfer = file;
- socket_callbacks_t install (http_conn,NULL,NULL,HttpGwCloseConnection);
- swift::Listen3rdPartySocket(install);
+ if (swift::Size(file)) {
+ HttpGwFirstProgressCallback(file,bin64_t(0,0));
+ } else {
+ swift::AddProgressCallback(file,&HttpGwFirstProgressCallback);
+ socket_callbacks_t install (http_conn,NULL,NULL,HttpGwCloseConnection); // FIXME: if conn is closed / no data arrives
+ swift::Listen3rdPartySocket(install);
+ }
}
void HttpGwNewConnectionCallback (SOCKET serv) {
Address client_address;
socklen_t len;
- SOCKET conn = accept
- (serv, (sockaddr*) & (client_address.addr), &len);
+ SOCKET conn = accept (serv, (sockaddr*) & (client_address.addr), &len);
if (conn==INVALID_SOCKET) {
print_error("client conn fails");
return;
void HttpGwError (SOCKET s) {
- print_error("everything fucked up");
+ print_error("httpgw is dead");
+ dprintf("%s @0 closed http gateway\n",tintstr());
+ close_socket(s);
+ swift::Listen3rdPartySocket(socket_callbacks_t(s));
}
print_error("http binding fails"); close_socket(fd); \
return INVALID_SOCKET; } }
gw_ensure ( (fd=socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET );
+ int enable = true;
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) );
gw_ensure (make_socket_nonblocking(fd));
gw_ensure ( 0==listen(fd,8) );
- socket_callbacks_t install(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
- gw_ensure (swift::Listen3rdPartySocket(install));
+ socket_callbacks_t install_http(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
+ gw_ensure (swift::Listen3rdPartySocket(install_http));
+ dprintf("%s @0 installed http gateway on %s\n",tintstr(),bind_to.str());
}
dgram.Push8(SWIFT_HASH);
dgram.Push32((uint32_t)peak);
dgram.PushHash(file().peak_hash(i));
- //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
}
}
dgram.Push8(SWIFT_HASH);
dgram.Push32((uint32_t)uncle);
dgram.PushHash( file().hash(uncle) );
- //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
pos = pos.parent();
}
data_in_ = tintbin(NOW,bin64_t::NONE);
if (!ok)
return bin64_t::NONE;
+ for(int i=0; i<transfer().cb_installed; i++)
+ transfer().callbacks[i](transfer().fd(),pos); // FIXME FIXME FIXME
data_in_.bin = pos;
if (pos!=bin64_t::NONE) {
if (last_data_in_time_) {
#include "compat.h"
#include "swift.h"
-#include "httpgw.cpp"
-
using namespace swift;
#define quit(...) {fprintf(stderr,__VA_ARGS__); exit(1); }
+SOCKET InstallHTTPGateway (Address addr);
int main (int argc, char** argv) {
static struct option long_options[] =
{
- {"hash", required_argument, 0, 'h'},
- {"file", required_argument, 0, 'f'},
+ {"hash", required_argument, 0, 'h'},
+ {"file", required_argument, 0, 'f'},
{"daemon", no_argument, 0, 'd'},
{"listen", required_argument, 0, 'l'},
{"tracker", required_argument, 0, 't'},
Address bindaddr;
Address tracker;
Address http_gw;
- tint wait_time = 0;
+ tint wait_time = -1;
LibraryInit();
bindaddr = Address(optarg);
if (bindaddr==Address())
quit("address must be hostname:port, ip:port or just port\n");
+ if (wait_time==-1)
+ wait_time = TINT_NEVER; // seed
break;
case 't':
tracker = Address(optarg);
report_progress = true;
break;
case 'g':
- http_gw = optarg ? Address(optarg) : Address(8080);
+ http_gw = optarg ? Address(optarg) : Address(Address::LOCALHOST,8080);
+ if (wait_time==-1)
+ wait_time = TINT_NEVER; // seed
+ break;
case 'w':
wait_time = TINT_NEVER;
if (optarg) {
} // arguments parsed
+
if (bindaddr!=Address()) { // seeding
if (Listen(bindaddr)<=0)
quit("cant listen to %s\n",bindaddr.str())
- if (wait_time==0)
- wait_time=TINT_NEVER;
- } else if (tracker!=Address()) { // leeching
- int base = rand()%10000, i;
- for (i=0; i<100 && Listen(Address((uint32_t)INADDR_ANY,i*7+base))<=0; i++);
- if (i==100)
- quit("cant listen to a port\n");
+ } else if (tracker!=Address() || http_gw!=Address()) { // leeching
+ for (int i=0; i<=10; i++) {
+ bindaddr = Address((uint32_t)INADDR_ANY,1024+rand()%10000);
+ if (Listen(bindaddr)>0)
+ break;
+ if (i==10)
+ quit("cant listen on %s\n",bindaddr.str());
+ }
}
if (tracker!=Address())
SetTracker(tracker);
-
if (http_gw!=Address())
InstallHTTPGateway(http_gw);
-
- int file = Open(filename,root_hash);
- // FIXME open err
- printf("Root hash: %s\n", RootMerkleHash(file).hex().c_str());
- if (root_hash==Sha1Hash() && bindaddr==Address() && tracker==Address())
- exit(0);
+ if (root_hash!=Sha1Hash::ZERO && !filename)
+ filename = strdup(root_hash.hex().c_str());
+
+ int file = -1;
+ if (filename) {
+ file = Open(filename,root_hash);
+ if (file<=0)
+ quit("cannot open file %s",filename);
+ printf("Root hash: %s\n", RootMerkleHash(file).hex().c_str());
+ }
+
+ if (bindaddr==Address() && file==-1) {
+ fprintf(stderr,"Usage:\n");
+ fprintf(stderr," -h, --hash\troot Merkle hash for the transmission\n");
+ fprintf(stderr," -f, --file\tname of file to use (root hash by default)\n");
+ fprintf(stderr," -l, --listen\t[ip:|host:]port to listen to (default: random)\n");
+ fprintf(stderr," -t, --tracker\t[ip:|host:]port of the tracker (default: none)\n");
+ fprintf(stderr," -D, --debug\tfile name for debugging logs (default: stdout)\n");
+ fprintf(stderr," -p, --progress\treport transfer progress\n");
+ fprintf(stderr," -g, --http\t[ip:|host:]port to bind HTTP gateway to (default localhost:8080)\n");
+ fprintf(stderr," -w, --wait\tlimit running time, e.g. 1[DHMs] (default: infinite with -l, -g)\n");
+ }
tint start_time = NOW;
- tint end_time = TINT_NEVER;
- while (NOW<end_time+wait_time){
- if (end_time==TINT_NEVER && IsComplete(file))
- end_time = NOW;
- // and yes, I add up infinities and go away with that
- tint towait = (end_time+wait_time)-NOW;
- Loop(TINT_SEC<towait?TINT_SEC:towait);
- if (report_progress) {
+ while ( bindaddr!=Address() &&
+ ( ( file>=0 && !IsComplete(file) ) ||
+ ( start_time+wait_time > NOW ) ) ) {
+ swift::Loop(TINT_SEC);
+ if (report_progress && file>=0) {
fprintf(stderr,
"%s %lli of %lli (seq %lli) %lli dgram %lli bytes up, "\
"%lli dgram %lli bytes down\n",
}
}
- Close(file);
+ if (file!=-1)
+ Close(file);
if (Channel::debug_file)
fclose(Channel::debug_file);
- Shutdown();
+ swift::Shutdown();
return 0;