httpgw finally builds
authorVictor Grishchenko (Debian) <victor.grishchenko@gmail.com>
Tue, 9 Mar 2010 13:19:26 +0000 (14:19 +0100)
committerVictor Grishchenko (Debian) <victor.grishchenko@gmail.com>
Tue, 9 Mar 2010 13:19:26 +0000 (14:19 +0100)
datagram.cpp
datagram.h
httpgw.cpp
swift.cpp
swift.h
transfer.cpp

index 1388a1c..9491d56 100644 (file)
@@ -168,6 +168,7 @@ SOCKET Datagram::Bind (Address addr_) {
 #else
 #define parptype void*
 #endif
+    int enable = true;
     dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (parptype)&sndbuf, sizeof(int)) == 0 );
     dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (parptype)&rcvbuf, sizeof(int)) == 0 );
     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (parptype)&enable, sizeof(int));
index 571e293..eb836d7 100644 (file)
@@ -86,8 +86,8 @@ struct Address {
 
 typedef void (*sock_cb_t) (SOCKET);
 struct socket_callbacks_t {
-    socket_callbacks_t (SOCKET s=0) : sock(s),
-        may_read(NULL), may_write(NULL), on_error(NULL) {}
+    socket_callbacks_t (SOCKET s=0, sock_cb_t mr=NULL, sock_cb_t mw=NULL, sock_cb_t oe=NULL) :
+        sock(s), may_read(mr), may_write(mw), on_error(oe) {}
     SOCKET sock;
     sock_cb_t   may_read;
     sock_cb_t   may_write;
index af99412..cd1a52b 100644 (file)
@@ -1,9 +1,15 @@
 #include "swift.h"
 
+using namespace swift;
+
 #define HTTPGW_MAX_CLIENT 128
+
 enum {
-    HTTPGW_RANGE,
-    HTTPGW_MAX_HEADER
+    HTTPGW_RANGE=0,
+    HTTPGW_MAX_HEADER=1
+};
+char * HTTPGW_HEADERS[HTTPGW_MAX_HEADER] = {
+    "Content-Range"
 };
 
 
@@ -12,17 +18,19 @@ struct http_gw_t {
     uint64_t tosend;
     int      transfer;
     SOCKET   sink;
-    char*    headers[HTTP_MAX_HEADER];
-} http_requests[HTTP_MAX_CLIENT];
+    char*    headers[HTTPGW_MAX_HEADER];
+} http_requests[HTTPGW_MAX_CLIENT];
 
 
 int http_gw_reqs_open = 0;
 
+void HttpGwNewRequestCallback (SOCKET http_conn);
+void HttpGwNewRequestCallback (SOCKET http_conn);
 
 http_gw_t* HttpGwFindRequest (SOCKET sock) {
     for(int i=0; i<http_gw_reqs_open; i++)
         if (http_requests[i].sink==sock)
-            return http_requests[i];
+            return http_requests+i;
     return NULL;
 }
 
@@ -37,16 +45,16 @@ void HttpGwCloseConnection (SOCKET sock) {
             }
         *req = http_requests[http_gw_reqs_open--];
     }
-    close_socket(sock);
+    swift::close_socket(sock);
 }
  
 
 void HttpGwMayWriteCallback (SOCKET sink) {
     http_gw_t* req = HttpGwFindRequest(sink);
-    uint64_t complete = swift::SeqComplete(http_requests[reqi].transfer);
+    uint64_t complete = swift::SeqComplete(req->transfer);
     if (complete>req->offset) { // send data
-        char buf[1LL<<12];
-        uint64_t tosend = std::min(1LL<<12,complete-req->offset);
+        char buf[1<<12];
+        uint64_t tosend = std::min((uint64_t)1<<12,complete-req->offset);
         size_t rd = read(req->transfer,buf,tosend); // hope it is cached
         if (rd<0) {
             HttpGwCloseConnection(sink);
@@ -60,33 +68,47 @@ void HttpGwMayWriteCallback (SOCKET sink) {
         req->offset += wn;
         req->tosend -= wn;
     } else {
-        if (swift::IsComplete(http_requests[reqi].transfer))  // done; wait for new request
-            swift::Listen3rdPartySocket
-                (http_conns[httpc].sink,NewRequestCallback,NULL,ErrorCallback);
-        else  // wait for data
-            swift::Listen3rdPartySocket(request.sink,NULL,NULL,ErrorCallback);
+        if (swift::IsComplete(req->transfer)) { // done; wait for new request
+            socket_callbacks_t wait_new_req(req->sink,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection);
+            swift::Listen3rdPartySocket (wait_new_req);
+        } else { // wait for data
+            socket_callbacks_t wait_swift_data(req->sink,NULL,NULL,HttpGwCloseConnection);
+            swift::Listen3rdPartySocket(wait_swift_data);
+        }
     }
 }
 
 
-void SwiftProgressCallback (int transfer, bin64_t bin) {
-    for (int httpc=0; httpc<conn_count; httpc++)
-        if (http_conns[httpc].transfer==transfer) {
-            if (bin.offset()<<10==http_conns[httpc].offset)
-                swift::Listen3rdPartySocket
-                (http_conns[httpc].sink,NULL,MayWriteCallback,ErrorCallback);
-        }
+void HttpGwSwiftProgressCallback (int transfer, bin64_t bin) {
+    for (int httpc=0; httpc<http_gw_reqs_open; httpc++)
+        if (http_requests[httpc].transfer==transfer)
+            if ( (bin.offset()<<10) == http_requests[httpc].offset ) {
+                socket_callbacks_t maywrite_callbacks
+                        (http_requests[httpc].sink,NULL,HttpGwMayWriteCallback,HttpGwCloseConnection);
+                Listen3rdPartySocket (maywrite_callbacks);
+            }
+}
+
+
+void HttpGwFirstProgressCallback (int transfer, bin64_t bin) {
+    printf("200 OK\r\n");
+    printf("Content-Length: value\r\n");
+    swift::RemoveProgressCallback(transfer,&HttpGwFirstProgressCallback);
+    swift::AddProgressCallback(transfer,&HttpGwSwiftProgressCallback);
+    HttpGwSwiftProgressCallback(transfer,bin);
 }
 
 
 void HttpGwNewRequestCallback (SOCKET http_conn){
+    http_gw_t* req = http_requests + http_gw_reqs_open++;
+    req->sink = http_conn;
     // read headers - the thrilling part
     // we surely do not support pipelining => one request at a time
     #define HTTPGW_MAX_REQ_SIZE 1024
     char buf[HTTPGW_MAX_REQ_SIZE+1];
     int rd = recv(http_conn,buf,HTTPGW_MAX_REQ_SIZE,0);
     if (rd<=0) { // if conn is closed by the peer, rd==0
-        HttpGwCloseRequest(http_conn);
+        HttpGwCloseConnection(http_conn);
         return;
     }
     buf[rd] = 0;
@@ -94,7 +116,7 @@ void HttpGwNewRequestCallback (SOCKET http_conn){
     char* reqline = strtok(buf,"\r\n");
     char method[16], url[512], version[16], crlf[5];
     if (4!=sscanf(reqline,"%16s %512s %16s%4[\n\r]",method,url,version,crlf)) {
-        HttpGwCloseRequest(http_conn);
+        HttpGwCloseConnection(http_conn);
         return;
     }
     // HTTP header fields
@@ -102,32 +124,40 @@ void HttpGwNewRequestCallback (SOCKET http_conn){
     while (headerline=strtok(NULL,"\n\r")) {
         char header[128], value[256];
         if (3!=sscanf(headerline,"%120[^: \r\n]: %250[^\r\n]%4[\r\n]",header,value,crlf)) {
-            HttpGwCloseRequest(http_conn);
+            HttpGwCloseConnection(http_conn);
             return;
         }
-        for(int i=0; i<HTTPGW_HEADER_COUNT; i++)
-            if (0==strcasecmp(HTTPGW_HEADERS[i],header) && !http_requests[reqi])
-                http_requests[reqi] = strdup(value);
+        for(int i=0; i<HTTPGW_MAX_HEADER; i++)
+            if (0==strcasecmp(HTTPGW_HEADERS[i],header) && !req->headers[i])
+                req->headers[i] = strdup(value);
     }
-    // initiate transmission
     // parse URL
+    char * hashch=strtok(url,"/"), hash[41];
+    while (hashch && (1!=sscanf(hashch,"%40[0123456789abcdefABCDEF]",hash) || strlen(hash)!=40))
+        hashch = strtok(NULL,"/");
+    if (strlen(hash)!=40) {
+        HttpGwCloseConnection(http_conn);
+        return;
+    }
+    // initiate transmission
+    int file = swift::Open(hash,hash);
     // find/create transfer
-    SwiftProgressCallback;
+    swift::AddProgressCallback(file,&HttpGwFirstProgressCallback);
     // write response header
-    sprintf("200 OK\r\n");
-    sprintf("Header: value\r\n");
-    http_clients[i].offset = 0;
-    http_clients[i].tosend = 10000;
-    http_clients[i].transfer = file;
-    http_clients[i].sink = conn;
+    req->offset = 0;
+    req->tosend = 10000;
+    req->transfer = file;
+    socket_callbacks_t install (http_conn,NULL,NULL,HttpGwCloseConnection);
+    swift::Listen3rdPartySocket(install);
 }
 
 
 // be liberal in what you do, be conservative in what you accept
 void HttpGwNewConnectionCallback (SOCKET serv) {
     Address client_address;
+    socklen_t len;
     SOCKET conn = accept 
-        (serv, & (client_address.addr), sizeof(struct sockaddr_in));
+        (serv, (sockaddr*) & (client_address.addr), &len);
     if (conn==INVALID_SOCKET) {
         print_error("client conn fails");
         return;
@@ -135,13 +165,13 @@ void HttpGwNewConnectionCallback (SOCKET serv) {
     make_socket_nonblocking(conn);
     // submit 3rd party socket to the swift loop
     socket_callbacks_t install
-        (conn,HttpGwNewRequestCallback,HttpGwMayWriteCallback,HttpGwErrorCallback);
+        (conn,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection);
     swift::Listen3rdPartySocket(install);
 }
 
 
-void HttpGwError (SOCKET serv) {
-    print_error("error on http socket");
+void HttpGwError (SOCKET s) {
+    print_error("everything fucked up");
 }
 
 
@@ -154,6 +184,6 @@ SOCKET InstallHTTPGateway (Address bind_to) {
     gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) );
     gw_ensure (make_socket_nonblocking(fd));
     gw_ensure ( 0==listen(fd,8) );
-    socket_callbacks_t install(sock,HttpGwNewConnectionCallback,NULL,HttpGwError);
+    socket_callbacks_t install(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
     gw_ensure (swift::Listen3rdPartySocket(install));
 }
index f3a77a5..81cb4b4 100644 (file)
--- a/swift.cpp
+++ b/swift.cpp
@@ -11,6 +11,8 @@
 #include "compat.h"
 #include "swift.h"
 
+#include "httpgw.cpp"
+
 using namespace swift;
 
 #define quit(...) {fprintf(stderr,__VA_ARGS__); exit(1); }
@@ -27,6 +29,7 @@ int main (int argc, char** argv) {
         {"tracker", required_argument, 0, 't'},
         {"debug",   no_argument, 0, 'D'},
         {"progress",no_argument, 0, 'p'},
+        {"http",    optional_argument, 0, 'g'},
         {"wait",    optional_argument, 0, 'w'},
         {0, 0, 0, 0}
     };
@@ -36,12 +39,13 @@ int main (int argc, char** argv) {
     bool daemonize = false, report_progress = false;
     Address bindaddr;
     Address tracker;
+    Address http_gw;
     tint wait_time = 0;
     
     LibraryInit();
     
     int c;
-    while ( -1 != (c = getopt_long (argc, argv, ":h:f:dl:t:Dpw::", long_options, 0)) ) {
+    while ( -1 != (c = getopt_long (argc, argv, ":h:f:dl:t:Dpg::w::", long_options, 0)) ) {
         
         switch (c) {
             case 'h':
@@ -74,6 +78,8 @@ int main (int argc, char** argv) {
             case 'p':
                 report_progress = true;
                 break;
+            case 'g':
+                http_gw = optarg ? Address(optarg) : Address(8080);
             case 'w':
                 wait_time = TINT_NEVER;
                 if (optarg) {
@@ -109,12 +115,16 @@ int main (int argc, char** argv) {
     
     if (tracker!=Address())
         SetTracker(tracker);
+
+
+    if (http_gw!=Address())
+        InstallHTTPGateway(http_gw);
     
-       int file = Open(filename,root_hash);
+    int file = Open(filename,root_hash);
     // FIXME open err 
     printf("Root hash: %s\n", RootMerkleHash(file).hex().c_str());
 
-       if (root_hash==Sha1Hash() && bindaddr==Address() && tracker==Address())
+    if (root_hash==Sha1Hash() && bindaddr==Address() && tracker==Address())
         exit(0);
 
     tint start_time = NOW;
@@ -137,12 +147,12 @@ int main (int argc, char** argv) {
         }
     }
     
-       Close(file);
+    Close(file);
     
     if (Channel::debug_file)
         fclose(Channel::debug_file);
     
-       Shutdown();
+    Shutdown();
     
     return 0;
     
diff --git a/swift.h b/swift.h
index e6a8e57..12bf058 100644 (file)
--- a/swift.h
+++ b/swift.h
@@ -123,6 +123,7 @@ namespace swift {
     class PiecePicker;
     class CongestionController;
     class PeerSelector;
+    typedef void (*TransferProgressCallback) (int transfer, bin64_t bin);
 
 
     /** A class representing single file transfer. */
@@ -186,6 +187,10 @@ namespace swift {
 
         tint            init_time_;
 
+        #define SWFT_MAX_TRANSFER_CB 8
+        TransferProgressCallback    callbacks[SWFT_MAX_TRANSFER_CB];
+        int             cb_installed;
+
     public:
         void            OnDataIn (bin64_t pos);
         void            OnPexIn (const Address& addr);
@@ -197,6 +202,8 @@ namespace swift {
         friend uint64_t  SeqComplete (int fdes);
         friend int     Open (const char* filename, const Sha1Hash& hash) ;
         friend void    Close (int fd) ;
+        friend void AddProgressCallback (int transfer,TransferProgressCallback cb);
+        friend void RemoveProgressCallback (int transfer,TransferProgressCallback cb);
     };
 
 
@@ -450,6 +457,8 @@ namespace swift {
         beginning, till the first not-yet-retrieved packet. */
     uint64_t  SeqComplete (int fdes);
 
+    void AddProgressCallback (int transfer,TransferProgressCallback cb);
+    void RemoveProgressCallback (int transfer,TransferProgressCallback cb);
 
     //uint32_t Width (const tbinvec& v);
 
index c8953bb..c885846 100644 (file)
@@ -22,7 +22,7 @@ std::vector<FileTransfer*> FileTransfer::files(20);
 // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
 
 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
-    file_(filename,_root_hash), hs_in_offset_(0)
+    file_(filename,_root_hash), hs_in_offset_(0), cb_installed(0)
 {
     if (files.size()<fd()+1)
         files.resize(fd()+1);
@@ -40,6 +40,24 @@ void    Channel::CloseTransfer (FileTransfer* trans) {
 }
 
 
+void swift::AddProgressCallback (int transfer, TransferProgressCallback cb) {
+    FileTransfer* trans = FileTransfer::file(transfer);
+    if (!trans)
+        return;
+    trans->callbacks[trans->cb_installed++] = cb;
+}
+
+
+void swift::RemoveProgressCallback (int transfer, TransferProgressCallback cb) {
+    FileTransfer* trans = FileTransfer::file(transfer);
+    if (!trans)
+        return;
+    for(int i=0; i<trans->cb_installed; i++)
+        if (trans->callbacks[i]==cb)
+            trans->callbacks[i]=trans->callbacks[--trans->cb_installed];
+}
+
+
 FileTransfer::~FileTransfer ()
 {
     Channel::CloseTransfer(this);