httpgw needs polishing, MIME types
authorVictor Grishchenko (Debian) <victor.grishchenko@gmail.com>
Tue, 9 Mar 2010 20:04:56 +0000 (21:04 +0100)
committerVictor Grishchenko (Debian) <victor.grishchenko@gmail.com>
Tue, 9 Mar 2010 20:04:56 +0000 (21:04 +0100)
Makefile
channel.cpp
compat.h
datagram.cpp
httpgw.cpp
sendrecv.cpp
swift.cpp

index 03bdd83..8efc4ab 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,7 @@
-CPPFLAGS=-O2 -I.
+CPPFLAGS=-g -I.
 
 all: swift
 
-swift: swift.o sha1.o compat.o sendrecv.o send_control.o hashtree.o bin64.o bins.o channel.o datagram.o transfer.o
+swift: swift.o sha1.o compat.o sendrecv.o send_control.o hashtree.o bin64.o bins.o channel.o datagram.o transfer.o httpgw.o
        g++ -I. *.o -o swift
 
index f6d52b8..d8838e1 100644 (file)
@@ -118,6 +118,8 @@ bool    swift::Listen3rdPartySocket (socket_callbacks_t cb) {
         else
             Channel::socket_count++;
     Channel::sockets[i]=cb;
+    if (!cb.may_read && !cb.may_write && !cb.on_error)
+        Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
     return true;
 }
 
index c506cbf..b08f338 100644 (file)
--- a/compat.h
+++ b/compat.h
@@ -66,6 +66,13 @@ typedef int SOCKET;
 #define S_IROTH _S_IREAD
 #endif
 
+#ifdef _WIN32
+#define setsockoptptr_t (char*)
+#else
+#define setsockoptptr_t void*
+#endif
+
+
 namespace swift {
 
 /** tint is the time integer type; microsecond-precise. */
index 9491d56..97391b3 100644 (file)
@@ -163,15 +163,10 @@ SOCKET Datagram::Bind (Address addr_) {
     #define dbnd_ensure(x) { if (!(x)) { print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
     dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 );
     dbnd_ensure( make_socket_nonblocking(fd) );  // FIXME may remove this
-#ifdef _WIN32
-#define parptype (char*)
-#else
-#define parptype void*
-#endif
     int enable = true;
-    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (parptype)&sndbuf, sizeof(int)) == 0 );
-    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (parptype)&rcvbuf, sizeof(int)) == 0 );
-    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (parptype)&enable, sizeof(int));
+    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
+    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
+    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
     dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 );
     return fd;
 }
index cd1a52b..2f90271 100644 (file)
@@ -8,12 +8,13 @@ enum {
     HTTPGW_RANGE=0,
     HTTPGW_MAX_HEADER=1
 };
-char * HTTPGW_HEADERS[HTTPGW_MAX_HEADER] = {
+const char * HTTPGW_HEADERS[HTTPGW_MAX_HEADER] = {
     "Content-Range"
 };
 
 
 struct http_gw_t {
+    int      id;
     uint64_t offset;
     uint64_t tosend;
     int      transfer;
@@ -23,6 +24,7 @@ struct http_gw_t {
 
 
 int http_gw_reqs_open = 0;
+int http_gw_reqs_count = 0;
 
 void HttpGwNewRequestCallback (SOCKET http_conn);
 void HttpGwNewRequestCallback (SOCKET http_conn);
@@ -43,9 +45,10 @@ void HttpGwCloseConnection (SOCKET sock) {
                 free(req->headers[i]);
                 req->headers[i] = NULL;
             }
-        *req = http_requests[http_gw_reqs_open--];
+        *req = http_requests[--http_gw_reqs_open];
     }
     swift::close_socket(sock);
+    swift::Listen3rdPartySocket(socket_callbacks_t(sock));
 }
  
 
@@ -55,7 +58,7 @@ void HttpGwMayWriteCallback (SOCKET sink) {
     if (complete>req->offset) { // send data
         char buf[1<<12];
         uint64_t tosend = std::min((uint64_t)1<<12,complete-req->offset);
-        size_t rd = read(req->transfer,buf,tosend); // hope it is cached
+        size_t rd = pread(req->transfer,buf,tosend,req->offset); // hope it is cached
         if (rd<0) {
             HttpGwCloseConnection(sink);
             return;
@@ -91,17 +94,36 @@ void HttpGwSwiftProgressCallback (int transfer, bin64_t bin) {
 
 
 void HttpGwFirstProgressCallback (int transfer, bin64_t bin) {
-    printf("200 OK\r\n");
-    printf("Content-Length: value\r\n");
+    if (bin!=bin64_t(0,0)) // need the first packet
+        return;
     swift::RemoveProgressCallback(transfer,&HttpGwFirstProgressCallback);
     swift::AddProgressCallback(transfer,&HttpGwSwiftProgressCallback);
+    for (int httpc=0; httpc<http_gw_reqs_open; httpc++) {
+        http_gw_t * req = http_requests + httpc;
+        if (req->transfer==transfer) {
+            uint64_t file_size = swift::Size(transfer);
+            char response[1024];
+            sprintf(response,
+                "HTTP/1.1 200 OK\r\n"\
+                "Connection: close\r\n"\
+                "Content-Type: text/plain; charset=iso-8859-1\r\n"\
+                "Content-Length: %lli\r\n"\
+                "\r\n",
+                file_size);
+            send(req->sink,response,strlen(response),0);
+            req->tosend = file_size;
+        }
+    }
     HttpGwSwiftProgressCallback(transfer,bin);
 }
 
 
 void HttpGwNewRequestCallback (SOCKET http_conn){
     http_gw_t* req = http_requests + http_gw_reqs_open++;
+    req->id = ++http_gw_reqs_count;
     req->sink = http_conn;
+    req->offset = 0;
+    req->tosend = 0;
     // read headers - the thrilling part
     // we surely do not support pipelining => one request at a time
     #define HTTPGW_MAX_REQ_SIZE 1024
@@ -115,7 +137,7 @@ void HttpGwNewRequestCallback (SOCKET http_conn){
     // HTTP request line
     char* reqline = strtok(buf,"\r\n");
     char method[16], url[512], version[16], crlf[5];
-    if (4!=sscanf(reqline,"%16s %512s %16s%4[\n\r]",method,url,version,crlf)) {
+    if (3!=sscanf(reqline,"%16s %512s %16s",method,url,version)) {
         HttpGwCloseConnection(http_conn);
         return;
     }
@@ -123,7 +145,7 @@ void HttpGwNewRequestCallback (SOCKET http_conn){
     char* headerline;
     while (headerline=strtok(NULL,"\n\r")) {
         char header[128], value[256];
-        if (3!=sscanf(headerline,"%120[^: \r\n]: %250[^\r\n]%4[\r\n]",header,value,crlf)) {
+        if (2!=sscanf(headerline,"%120[^: ]: %250[^\r\n]",header,value)) {
             HttpGwCloseConnection(http_conn);
             return;
         }
@@ -140,15 +162,15 @@ void HttpGwNewRequestCallback (SOCKET http_conn){
         return;
     }
     // initiate transmission
-    int file = swift::Open(hash,hash);
-    // find/create transfer
-    swift::AddProgressCallback(file,&HttpGwFirstProgressCallback);
-    // write response header
-    req->offset = 0;
-    req->tosend = 10000;
+    int file = swift::Open(hash,Sha1Hash(true,hash));
     req->transfer = file;
-    socket_callbacks_t install (http_conn,NULL,NULL,HttpGwCloseConnection);
-    swift::Listen3rdPartySocket(install);
+    if (swift::Size(file)) {
+        HttpGwFirstProgressCallback(file,bin64_t(0,0));
+    } else {
+        swift::AddProgressCallback(file,&HttpGwFirstProgressCallback);
+        socket_callbacks_t install (http_conn,NULL,NULL,HttpGwCloseConnection); // FIXME: if conn is closed / no data arrives
+        swift::Listen3rdPartySocket(install);
+    }
 }
 
 
@@ -156,8 +178,7 @@ void HttpGwNewRequestCallback (SOCKET http_conn){
 void HttpGwNewConnectionCallback (SOCKET serv) {
     Address client_address;
     socklen_t len;
-    SOCKET conn = accept 
-        (serv, (sockaddr*) & (client_address.addr), &len);
+    SOCKET conn = accept (serv, (sockaddr*) & (client_address.addr), &len);
     if (conn==INVALID_SOCKET) {
         print_error("client conn fails");
         return;
@@ -171,7 +192,10 @@ void HttpGwNewConnectionCallback (SOCKET serv) {
 
 
 void HttpGwError (SOCKET s) {
-    print_error("everything fucked up");
+    print_error("httpgw is dead");
+    dprintf("%s @0 closed http gateway\n",tintstr());
+    close_socket(s);
+    swift::Listen3rdPartySocket(socket_callbacks_t(s));
 }
 
 
@@ -181,9 +205,12 @@ SOCKET InstallHTTPGateway (Address bind_to) {
     print_error("http binding fails"); close_socket(fd); \
     return INVALID_SOCKET; } }
     gw_ensure ( (fd=socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET );
+    int enable = true;
+    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
     gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) );
     gw_ensure (make_socket_nonblocking(fd));
     gw_ensure ( 0==listen(fd,8) );
-    socket_callbacks_t install(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
-    gw_ensure (swift::Listen3rdPartySocket(install));
+    socket_callbacks_t install_http(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
+    gw_ensure (swift::Listen3rdPartySocket(install_http));
+    dprintf("%s @0 installed http gateway on %s\n",tintstr(),bind_to.str());
 }
index c30fc03..29c15c5 100644 (file)
@@ -24,7 +24,6 @@ void    Channel::AddPeakHashes (Datagram& dgram) {
         dgram.Push8(SWIFT_HASH);
         dgram.Push32((uint32_t)peak);
         dgram.PushHash(file().peak_hash(i));
-        //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
     }
 }
@@ -38,7 +37,6 @@ void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
         dgram.Push8(SWIFT_HASH);
         dgram.Push32((uint32_t)uncle);
         dgram.PushHash( file().hash(uncle) );
-        //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
         pos = pos.parent();
     }
@@ -327,6 +325,8 @@ bin64_t Channel::OnData (Datagram& dgram) {  // TODO: HAVE NONE for corrupted da
     data_in_ = tintbin(NOW,bin64_t::NONE);
     if (!ok)
         return bin64_t::NONE;
+    for(int i=0; i<transfer().cb_installed; i++)
+        transfer().callbacks[i](transfer().fd(),pos);  // FIXME FIXME FIXME
     data_in_.bin = pos;
     if (pos!=bin64_t::NONE) {
         if (last_data_in_time_) {
index 81cb4b4..211448a 100644 (file)
--- a/swift.cpp
+++ b/swift.cpp
 #include "compat.h"
 #include "swift.h"
 
-#include "httpgw.cpp"
-
 using namespace swift;
 
 #define quit(...) {fprintf(stderr,__VA_ARGS__); exit(1); }
+SOCKET InstallHTTPGateway (Address addr);
 
 
 int main (int argc, char** argv) {
     
     static struct option long_options[] =
     {
-        {"hash",    required_argument,       0, 'h'},
-        {"file",    required_argument,       0, 'f'},
+        {"hash",    required_argument, 0, 'h'},
+        {"file",    required_argument, 0, 'f'},
         {"daemon",  no_argument, 0, 'd'},
         {"listen",  required_argument, 0, 'l'},
         {"tracker", required_argument, 0, 't'},
@@ -40,7 +39,7 @@ int main (int argc, char** argv) {
     Address bindaddr;
     Address tracker;
     Address http_gw;
-    tint wait_time = 0;
+    tint wait_time = -1;
     
     LibraryInit();
     
@@ -65,6 +64,8 @@ int main (int argc, char** argv) {
                 bindaddr = Address(optarg);
                 if (bindaddr==Address())
                     quit("address must be hostname:port, ip:port or just port\n");
+                if (wait_time==-1)
+                    wait_time = TINT_NEVER; // seed
                 break;
             case 't':
                 tracker = Address(optarg);
@@ -79,7 +80,10 @@ int main (int argc, char** argv) {
                 report_progress = true;
                 break;
             case 'g':
-                http_gw = optarg ? Address(optarg) : Address(8080);
+                http_gw = optarg ? Address(optarg) : Address(Address::LOCALHOST,8080);
+                if (wait_time==-1)
+                    wait_time = TINT_NEVER; // seed
+                break;
             case 'w':
                 wait_time = TINT_NEVER;
                 if (optarg) {
@@ -101,42 +105,56 @@ int main (int argc, char** argv) {
 
     }   // arguments parsed
     
+
     if (bindaddr!=Address()) { // seeding
         if (Listen(bindaddr)<=0)
             quit("cant listen to %s\n",bindaddr.str())
-        if (wait_time==0)
-            wait_time=TINT_NEVER;
-    } else if (tracker!=Address()) { // leeching
-        int base = rand()%10000, i;
-        for (i=0; i<100 && Listen(Address((uint32_t)INADDR_ANY,i*7+base))<=0; i++);
-        if (i==100)
-            quit("cant listen to a port\n");
+    } else if (tracker!=Address() || http_gw!=Address()) { // leeching
+        for (int i=0; i<=10; i++) {
+            bindaddr = Address((uint32_t)INADDR_ANY,1024+rand()%10000);
+            if (Listen(bindaddr)>0)
+                break;
+            if (i==10)
+                quit("cant listen on %s\n",bindaddr.str());
+        }
     }
     
     if (tracker!=Address())
         SetTracker(tracker);
 
-
     if (http_gw!=Address())
         InstallHTTPGateway(http_gw);
-    
-    int file = Open(filename,root_hash);
-    // FIXME open err 
-    printf("Root hash: %s\n", RootMerkleHash(file).hex().c_str());
 
-    if (root_hash==Sha1Hash() && bindaddr==Address() && tracker==Address())
-        exit(0);
+    if (root_hash!=Sha1Hash::ZERO && !filename)
+        filename = strdup(root_hash.hex().c_str());
+
+    int file = -1;
+    if (filename) {
+        file = Open(filename,root_hash);
+        if (file<=0)
+            quit("cannot open file %s",filename);
+        printf("Root hash: %s\n", RootMerkleHash(file).hex().c_str());
+    }
+
+    if (bindaddr==Address() && file==-1) {
+        fprintf(stderr,"Usage:\n");
+        fprintf(stderr,"  -h, --hash\troot Merkle hash for the transmission\n");
+        fprintf(stderr,"  -f, --file\tname of file to use (root hash by default)\n");
+        fprintf(stderr,"  -l, --listen\t[ip:|host:]port to listen to (default: random)\n");
+        fprintf(stderr,"  -t, --tracker\t[ip:|host:]port of the tracker (default: none)\n");
+        fprintf(stderr,"  -D, --debug\tfile name for debugging logs (default: stdout)\n");
+        fprintf(stderr,"  -p, --progress\treport transfer progress\n");
+        fprintf(stderr,"  -g, --http\t[ip:|host:]port to bind HTTP gateway to (default localhost:8080)\n");
+        fprintf(stderr,"  -w, --wait\tlimit running time, e.g. 1[DHMs] (default: infinite with -l, -g)\n");
+    }
 
     tint start_time = NOW;
-    tint end_time = TINT_NEVER;
     
-    while (NOW<end_time+wait_time){
-        if (end_time==TINT_NEVER && IsComplete(file))
-            end_time = NOW;
-        // and yes, I add up infinities and go away with that
-        tint towait = (end_time+wait_time)-NOW;
-               Loop(TINT_SEC<towait?TINT_SEC:towait);
-        if (report_progress) {
+    while ( bindaddr!=Address() &&
+            ( ( file>=0 && !IsComplete(file) ) ||
+              ( start_time+wait_time > NOW ) )   ) {
+        swift::Loop(TINT_SEC);
+        if (report_progress && file>=0) {
             fprintf(stderr,
                     "%s %lli of %lli (seq %lli) %lli dgram %lli bytes up, "\
                     "%lli dgram %lli bytes down\n",
@@ -147,12 +165,13 @@ int main (int argc, char** argv) {
         }
     }
     
-    Close(file);
+    if (file!=-1)
+        Close(file);
     
     if (Channel::debug_file)
         fclose(Channel::debug_file);
     
-    Shutdown();
+    swift::Shutdown();
     
     return 0;