http gateway in progress
authorVictor Grishchenko (Debian) <victor.grishchenko@gmail.com>
Fri, 5 Mar 2010 18:26:27 +0000 (19:26 +0100)
committerVictor Grishchenko (Debian) <victor.grishchenko@gmail.com>
Fri, 5 Mar 2010 18:26:27 +0000 (19:26 +0100)
ChangeLog [deleted file]
bin64.h
channel.cpp
compat.cpp
compat.h
datagram.cpp
datagram.h
hashtree.cpp
httpgw.cpp [new file with mode: 0644]
sendrecv.cpp
swift.h

diff --git a/ChangeLog b/ChangeLog
deleted file mode 100755 (executable)
index ad12850..0000000
--- a/ChangeLog
+++ /dev/null
@@ -1,7 +0,0 @@
-0.003 - This is not a release as well - 18 Oct 2009
-
-       - but at least, it compiles now
-
-0.002 - This is not a release - 7 Oct 2009
-
-       - it does not even compile, committed for reading purposes only
diff --git a/bin64.h b/bin64.h
index 6ddce3f..fb88c99 100644 (file)
--- a/bin64.h
+++ b/bin64.h
@@ -137,13 +137,13 @@ struct bin64_t {
     }
 
     /** Check whether this bin is the left sibling. */
-    bool is_left () const {
+    inline bool is_left () const {
         uint64_t tb = tail_bit();
         return !(v&(tb<<1));
     }
     
     /** Check whether this bin is the right sibling. */
-    bool is_right() const { return !is_left(); }
+    inline bool is_right() const { return !is_left(); }
 
     /** Get the leftmost basic bin within this bin. */
     bin64_t left_foot () const {
index 80dce25..f6d52b8 100644 (file)
@@ -31,7 +31,7 @@ int Channel::MAX_REORDERING = 4;
 bool Channel::SELF_CONN_OK = false;
 swift::tint Channel::TIMEOUT = TINT_SEC*60;
 std::vector<Channel*> Channel::channels(1);
-SOCKET Channel::sockets[8] = {0,0,0,0,0,0,0,0};
+socket_callbacks_t Channel::sockets[SWFT_MAX_SOCK_OPEN] = {};
 int Channel::socket_count = 0;
 Address Channel::tracker;
 tbheap Channel::send_queue;
@@ -41,7 +41,7 @@ PeerSelector* Channel::peer_selector = new SimpleSelector();
 
 Channel::Channel    (FileTransfer* transfer, int socket, Address peer_addr) :
     transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
-    socket_(socket==-1?sockets[0]:socket), // FIXME
+    socket_(socket==INVALID_SOCKET?sockets[0].sock:socket), // FIXME
     data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0),
     own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
     last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
@@ -86,16 +86,19 @@ int Channel::EncodeID(int unscrambled) {
 
 int     swift::Listen (Address addr) {
     int sock = Datagram::Bind(addr);
-    if (sock!=INVALID_SOCKET)
-        Channel::sockets[Channel::socket_count++] = sock;
+    if (sock!=INVALID_SOCKET) {
+        socket_callbacks_t cb(sock);
+        cb.may_read = &Channel::RecvDatagram;
+        Channel::sockets[Channel::socket_count++] = cb;
+    }
     return sock;
 }
 
 
 void    swift::Shutdown (int sock_des) {
     for(int i=0; i<Channel::socket_count; i++)
-        if (sock_des==-1 || Channel::sockets[i]==sock_des) {
-            Datagram::Close(Channel::sockets[i]);
+        if (sock_des==-1 || Channel::sockets[i].sock==sock_des) {
+            Datagram::Close(Channel::sockets[i].sock);
             Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
         }
 }
@@ -106,6 +109,19 @@ void    swift::Loop (tint till) {
 }
 
 
+bool    swift::Listen3rdPartySocket (socket_callbacks_t cb) {
+    int i=0;
+    while (i<Channel::socket_count && Channel::sockets[i].sock!=cb.sock) i++;
+    if (i==Channel::socket_count)
+        if (i==SWFT_MAX_SOCK_OPEN)
+            return false;
+        else
+            Channel::socket_count++;
+    Channel::sockets[i]=cb;
+    return true;
+}
+
+
 int      swift::Open (const char* filename, const Sha1Hash& hash) {
     FileTransfer* ft = new FileTransfer(filename, hash);
     if (ft && ft->file().file_descriptor()) {
index 08dfc7d..7a5a654 100644 (file)
@@ -193,5 +193,23 @@ std::string gettmpdir(void)
 #endif
 }
 
+bool    make_socket_nonblocking(SOCKET fd) {
+#ifdef _WIN32
+    u_long enable = 1;
+    return 0==ioctlsocket(fd, FIONBIO, &enable);
+#else
+    int enable=1;
+    return 0==fcntl(fd, F_SETFL, O_NONBLOCK);
+#endif
+}
+
+bool    close_socket (SOCKET sock) {
+#ifdef _WIN32
+    return 0==closesocket(sock);
+#else
+    return 0==::close(sock);
+#endif
+}
+
 
 }
index 0b4db12..c506cbf 100644 (file)
--- a/compat.h
+++ b/compat.h
@@ -28,8 +28,17 @@ typedef unsigned __int64 uint64_t;
 #include <io.h>
 #else
 #include <sys/mman.h>
+#include <arpa/inet.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
 #endif
 
+#ifndef _WIN32
+typedef int SOCKET;
+#endif
+
+#include <unistd.h>
 #include <fcntl.h>
 #include <cstdio>
 #include <cstdlib>
@@ -96,6 +105,11 @@ std::string gettmpdir(void);
 
 tint    usec_time ();
 
+bool    make_socket_nonblocking(SOCKET s);
+
+bool    close_socket (SOCKET sock);
+
+
 };
 
 #endif
index e99d620..1388a1c 100644 (file)
@@ -92,16 +92,16 @@ int Datagram::Send () {
         perror("can't send");
     dgrams_up++;
     bytes_up+=size();
-       offset=0;
-       length=0;
-       Time();
-       return r;
+    offset=0;
+    length=0;
+    Time();
+    return r;
 }
 
 int Datagram::Recv () {
     socklen_t addrlen = sizeof(struct sockaddr_in);
     offset = 0;
-    length = recvfrom (sock, (char *)buf, MAXDGRAMSZ, 0,
+    length = recvfrom (sock, (char *)buf, MAXDGRAMSZ*2, 0,
                        (struct sockaddr*)&(addr.addr), &addrlen);
     if (length<0) {
         length = 0;
@@ -114,30 +114,39 @@ int Datagram::Recv () {
 }
 
 
-SOCKET Datagram::Wait (int sockcnt, SOCKET* sockets, tint usec) {
+void Datagram::Wait (int sockcnt, socket_callbacks_t* sockets, tint usec) {
     struct timeval timeout;
     timeout.tv_sec = usec/TINT_SEC;
     timeout.tv_usec = usec%TINT_SEC;
     int max_sock_fd = 0;
-    fd_set bases, err;
-    FD_ZERO(&bases);
-    FD_ZERO(&err);
+    fd_set rdfd, wrfd, errfd;
+    FD_ZERO(&rdfd);
+    FD_ZERO(&wrfd);
+    FD_ZERO(&errfd);
     for(int i=0; i<sockcnt; i++) {
-        FD_SET(sockets[i],&bases);
-        FD_SET(sockets[i],&err);
-        if (sockets[i]>max_sock_fd)
-            max_sock_fd = sockets[i];
+        if (sockets[i].may_read!=0)
+            FD_SET(sockets[i].sock,&rdfd);
+        if (sockets[i].may_write!=0)
+            FD_SET(sockets[i].sock,&wrfd);
+        if (sockets[i].on_error!=0)
+            FD_SET(sockets[i].sock,&errfd);
+        if (sockets[i].sock>max_sock_fd)
+            max_sock_fd = sockets[i].sock;
     }
-    int sel = select(max_sock_fd+1, &bases, NULL, &err, &timeout);
+    int sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout);
     Time();
     if (sel>0) {
-        for (int i=0; i<=sockcnt; i++)
-            if (FD_ISSET(sockets[i],&bases))
-                return sockets[i];
+        for (int i=0; i<=sockcnt; i++) {
+            if (FD_ISSET(sockets[i].sock,&rdfd))
+                (*(sockets[i].may_read))(sockets[i].sock);
+            if (FD_ISSET(sockets[i].sock,&wrfd))
+                (*(sockets[i].may_write))(sockets[i].sock);
+            if (FD_ISSET(sockets[i].sock,&errfd))
+                (*(sockets[i].on_error))(sockets[i].sock);
+        }
     } else if (sel<0) {
         print_error("select fails");
     }
-    return INVALID_SOCKET;
 }
 
 tint Datagram::Time () {
@@ -151,49 +160,23 @@ SOCKET Datagram::Bind (Address addr_) {
     struct sockaddr_in addr = addr_;
     SOCKET fd;
     int len = sizeof(struct sockaddr_in), sndbuf=1<<20, rcvbuf=1<<20;
-    if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
-        print_error("socket() fails");
-        return INVALID_SOCKET;
-    }
+    #define dbnd_ensure(x) { if (!(x)) { print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
+    dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 );
+    dbnd_ensure( make_socket_nonblocking(fd) );  // FIXME may remove this
 #ifdef _WIN32
-    u_long enable = 1;
-    ioctlsocket(fd, FIONBIO, &enable);
-    if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf, sizeof(int)) != 0 ) {
-        print_error("setsockopt fails");
-        return INVALID_SOCKET;
-    }
-       if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const char *)&rcvbuf, sizeof(int)) != 0 ) {
-        print_error("setsockopt2 fails");
-        return INVALID_SOCKET;
-    }
-    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&enable, sizeof(int));
+#define parptype (char*)
 #else
-    int enable=1;
-    if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1)
-        return INVALID_SOCKET;
-    if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(int)) < 0 ) {
-        print_error("setsockopt fails");
-        return INVALID_SOCKET;
-    }
-       if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(int)) < 0 ) {
-        print_error("setsockopt2 fails");
-        return INVALID_SOCKET;
-    }
-    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
+#define parptype void*
 #endif
-    if (::bind(fd, (sockaddr*)&addr, len) != 0) {
-        print_error("bind fails");
-        return INVALID_SOCKET;
-    }
+    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (parptype)&sndbuf, sizeof(int)) == 0 );
+    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (parptype)&rcvbuf, sizeof(int)) == 0 );
+    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (parptype)&enable, sizeof(int));
+    dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 );
     return fd;
 }
 
-void Datagram::Close (int sock) { // remove from fd_set
-#ifdef _WIN32
-    if (closesocket(sock)!=0)
-#else
-    if (::close(sock)!=0)
-#endif
+void Datagram::Close (SOCKET sock) {
+    if (!close_socket(sock))
         print_error("on closing a socket");
 }
 
index 6fb26d7..571e293 100644 (file)
@@ -9,24 +9,8 @@
 #ifndef DATAGRAM_H
 #define DATAGRAM_H
 
-#include <stdlib.h>
-#include <fcntl.h>
 #include <sys/stat.h>
 #include <string.h>
-#include <stdio.h>
-#include <string>
-#ifdef _WIN32
-    #include <winsock2.h>
-    #include "compat.h"
-#else
-    typedef int SOCKET;
-
-    #include <arpa/inet.h>
-    #include <sys/select.h>
-    #include <sys/socket.h>
-    #include <netinet/in.h>
-    #include <unistd.h>
-#endif
 #include "hashtree.h"
 #include "compat.h"
 
@@ -100,6 +84,17 @@ struct Address {
 };
 
 
+typedef void (*sock_cb_t) (SOCKET);
+struct socket_callbacks_t {
+    socket_callbacks_t (SOCKET s=0) : sock(s),
+        may_read(NULL), may_write(NULL), on_error(NULL) {}
+    SOCKET sock;
+    sock_cb_t   may_read;
+    sock_cb_t   may_write;
+    sock_cb_t   on_error;
+};
+
+
 /** UDP datagram class, a nice wrapping around sendto/recvfrom/select. 
     Reading/writing from/to a datagram is done in a FIFO (deque) fashion:
     written data is appended to the tail (push) while read data is
@@ -115,12 +110,16 @@ public:
 
     /** bind to the address */
     static SOCKET Bind(Address address);
+
     /** close the port */
     static void Close(int port);
+
     /** the current time */
     static tint Time();
+
     /** wait till one of the sockets has some io to do; usec is the timeout */
-    static SOCKET Wait (int sockcnt, SOCKET* sockets, tint usec=0);
+    static void Wait (int sockcnt, socket_callbacks_t* sockets, tint usec=0);
+
     static tint now, epoch, start;
     static uint64_t dgrams_up, dgrams_down, bytes_up, bytes_down;
 
index 3e772eb..825fe7c 100644 (file)
@@ -319,7 +319,7 @@ bool            HashTree::OfferData (bin64_t pos, const char* data, size_t lengt
 
     Sha1Hash data_hash(data,length);
     if (!OfferHash(pos, data_hash)) {
-        printf("invalid hash for %s: %s\n",pos.str(),data_hash.hex().c_str());
+        printf("invalid hash for %s: %s\n",pos.str(),data_hash.hex().c_str()); // FIXME
         return false;
     }
 
diff --git a/httpgw.cpp b/httpgw.cpp
new file mode 100644 (file)
index 0000000..bb03be3
--- /dev/null
@@ -0,0 +1,86 @@
+#include "swift.h"
+
+#define MAX_HTTP_CLIENT 128
+
+struct http_gw_t {
+    uint64_t offset;
+    uint64_t tosend;
+    int      transfer;
+    SOCKET   sink;
+} http_clients[MAX_HTTP_CLIENT];
+
+void HttpGwErrorCallback (SOCKET sink) {
+
+}
+
+void HttpGwMayWriteCallback (SOCKET sink) {
+    // if have data => write
+    // otherwise, change mask
+    if (not_enough_data)
+        swift::Listen3rdPartySocket(http_conns[httpc].sink,NULL,NULL,ErrorCallback);
+    if (all_done)
+        swift::Listen3rdPartySocket(http_conns[httpc].sink,NewRequestCallback,NULL,ErrorCallback);
+}
+
+
+void SwiftProgressCallback (int transfer, bin64_t bin) {
+    for (int httpc=0; httpc<conn_count; httpc++)
+        if (http_conns[httpc].transfer==transfer) {
+            // check mask
+            if (bin==http_conns[httpc].offset)
+                Listen3rdPartySocket(http_conns[httpc].sink,NULL,MayWriteCallback,ErrorCallback);
+        }
+}
+
+
+
+void HttpGwNewRequestCallback (SOCKET http_conn){
+    // read headers - the thrilling part
+    // we surely do not support pipelining => one requests at a time
+    fgets();
+    // HTTP request line
+    sscanf();
+    // HTTP header fields
+    sscanf();
+    // incomplete header => screw it
+    fprintf("400 Incomplete header\r\n");
+    close();
+    // initiate transmission
+    // write response header
+    http_clients[i].offset = 0;
+    http_clients[i].tosend = 10000;
+    http_clients[i].transfer = file;
+    http_clients[i].sink = conn;
+}
+
+
+// be liberal in what you do, be conservative in what you accept
+void HttpGwNewConnectionCallback (SOCKET serv) {
+    Address client_address;
+    SOCKET conn = accept (serv, & (client_address.addr), sizeof(struct sockaddr_in));
+    if (conn==INVALID_SOCKET) {
+        print_error("client conn fails");
+        return;
+    }
+    make_socket_nonblocking(conn);
+    // submit 3rd party socket to the swift loop
+    socket_callbacks_t install(conn,HttpGwNewRequestCallback,HttpGwMayWriteCallback,HttpGwErrorCallback);
+    swift::Listen3rdPartySocket(install);
+}
+
+
+void HttpGwError (SOCKET serv) {
+    print_error("error on http socket");
+}
+
+
+SOCKET InstallHTTPGateway (Address bind_to) {
+    SOCKET fd;
+    #define gw_ensure(x) { if (!(x)) { print_error("http binding fails"); close_socket(fd); return INVALID_SOCKET; } }
+    gw_ensure ( (fd=socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET );
+    gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) );
+    gw_ensure (make_socket_nonblocking(fd));
+    gw_ensure ( 0==listen(fd,8) );
+    socket_callbacks_t install(sock,HttpGwNewConnectionCallback,NULL,HttpGwError);
+    gw_ensure (swift::Listen3rdPartySocket(install));
+}
index aa676da..c30fc03 100644 (file)
@@ -135,6 +135,7 @@ void    Channel::Send () {
     last_send_time_ = NOW;
     sent_since_recv_++;
     dgrams_sent_++;
+    Reschedule();
 }
 
 
@@ -284,6 +285,7 @@ void    Channel::Recv (Datagram& dgram) {
     }
     last_recv_time_ = NOW;
     sent_since_recv_ = 0;
+    Reschedule();
 }
 
 
@@ -474,11 +476,11 @@ void    Channel::AddPex (Datagram& dgram) {
 }
 
 
-Channel*    Channel::RecvDatagram (int socket) {
+void    Channel::RecvDatagram (SOCKET socket) {
     Datagram data(socket);
     data.Recv();
     const Address& addr = data.address();
-#define return_log(...) { printf(__VA_ARGS__); return NULL; }
+#define return_log(...) { printf(__VA_ARGS__); }
     if (data.size()<4)
         return_log("datagram shorter than 4 bytes %s\n",addr.str());
     uint32_t mych = data.Pull32();
@@ -519,7 +521,6 @@ Channel*    Channel::RecvDatagram (int socket) {
     }
     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
     channel->Recv(data);
-    return channel;
 }
 
 
@@ -545,18 +546,12 @@ void    Channel::Loop (tint howlong) {
             dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
                     tintstr(send_time));
             sender->Send();
-            sender->Reschedule();
 
         } else {  // it's too early, wait
 
             tint towait = min(limit,send_time) - NOW;
             dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
-            int rd = Datagram::Wait(socket_count,sockets,towait);
-            if (rd!=INVALID_SOCKET) { // in meantime, received something
-                Channel* receiver = RecvDatagram(rd);
-                if (receiver) // receiver's state may have changed
-                    receiver->Reschedule();
-            }
+            Datagram::Wait(socket_count,sockets,towait);
             if (sender)  // get back to that later
                 send_queue.push(tintbin(send_time,sender->id()));
 
diff --git a/swift.h b/swift.h
index be996c7..e6a8e57 100644 (file)
--- a/swift.h
+++ b/swift.h
@@ -239,7 +239,7 @@ namespace swift {
         Normally, API users do not deal with this class. */
     class Channel {
     public:
-        Channel    (FileTransfer* file, int socket=-1, Address peer=Address());
+        Channel    (FileTransfer* file, int socket=INVALID_SOCKET, Address peer=Address());
         ~Channel();
 
         typedef enum {
@@ -253,8 +253,7 @@ namespace swift {
 
         static const char* SEND_CONTROL_MODES[];
 
-        static Channel*
-                    RecvDatagram (int socket);
+        static void RecvDatagram (SOCKET socket);
         static void Loop (tint till);
 
         void        Recv (Datagram& dgram);
@@ -317,7 +316,7 @@ namespace swift {
             return i<channels.size()?channels[i]:NULL;
         }
         static void CloseTransfer (FileTransfer* trans);
-        static SOCKET default_socket() { return sockets[0]; }
+        static SOCKET default_socket() { return sockets[0].sock; }
 
     protected:
         /** Channel id: index in the channel array. */
@@ -397,7 +396,8 @@ namespace swift {
 
         static PeerSelector* peer_selector;
 
-        static SOCKET   sockets[8];
+        #define SWFT_MAX_SOCK_OPEN 128
+        static socket_callbacks_t sockets[SWFT_MAX_SOCK_OPEN];
         static int      socket_count;
         static tint     last_tick;
         static tbheap   send_queue;
@@ -410,6 +410,7 @@ namespace swift {
         friend void     AddPeer (Address address, const Sha1Hash& root);
         friend void     SetTracker(const Address& tracker);
         friend int      Open (const char*, const Sha1Hash&) ; // FIXME
+        friend bool     Listen3rdPartySocket (socket_callbacks_t);
 
     };
 
@@ -420,6 +421,7 @@ namespace swift {
     int     Listen (Address addr);
     /** Run send/receive loop for the specified amount of time. */
     void    Loop (tint till);
+    bool    Listen3rdPartySocket (socket_callbacks_t);
     /** Stop listening to a port. */
     void    Shutdown (int sock_des=-1);