moved sockets to swift::Datagram
authorVictor Grishchenko <victor.grishchenko@gmail.com>
Thu, 20 May 2010 13:38:26 +0000 (15:38 +0200)
committerVictor Grishchenko <victor.grishchenko@gmail.com>
Thu, 20 May 2010 13:38:26 +0000 (15:38 +0200)
channel.cpp
datagram.cpp
datagram.h
httpgw.cpp
sendrecv.cpp
swift.h
tests/dgramtest.cpp
transfer.cpp

index d8838e1..5899257 100644 (file)
@@ -31,8 +31,6 @@ int Channel::MAX_REORDERING = 4;
 bool Channel::SELF_CONN_OK = false;
 swift::tint Channel::TIMEOUT = TINT_SEC*60;
 std::vector<Channel*> Channel::channels(1);
-socket_callbacks_t Channel::sockets[SWFT_MAX_SOCK_OPEN] = {};
-int Channel::socket_count = 0;
 Address Channel::tracker;
 tbheap Channel::send_queue;
 FILE* Channel::debug_file = NULL;
@@ -41,7 +39,7 @@ PeerSelector* Channel::peer_selector = new SimpleSelector();
 
 Channel::Channel    (FileTransfer* transfer, int socket, Address peer_addr) :
     transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
-    socket_(socket==INVALID_SOCKET?sockets[0].sock:socket), // FIXME
+    socket_(socket==INVALID_SOCKET?Datagram::default_socket():socket), // FIXME
     data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0),
     own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
     last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
@@ -85,22 +83,15 @@ int Channel::EncodeID(int unscrambled) {
 
 
 int     swift::Listen (Address addr) {
-    int sock = Datagram::Bind(addr);
-    if (sock!=INVALID_SOCKET) {
-        socket_callbacks_t cb(sock);
-        cb.may_read = &Channel::RecvDatagram;
-        Channel::sockets[Channel::socket_count++] = cb;
-    }
-    return sock;
+    sckrwecb_t cb;
+    cb.may_read = &Channel::RecvDatagram;
+    cb.sock = Datagram::Bind(addr,cb);
+    return cb.sock;
 }
 
 
 void    swift::Shutdown (int sock_des) {
-    for(int i=0; i<Channel::socket_count; i++)
-        if (sock_des==-1 || Channel::sockets[i].sock==sock_des) {
-            Datagram::Close(Channel::sockets[i].sock);
-            Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
-        }
+    Datagram::Shutdown();
 }
 
 
@@ -109,20 +100,6 @@ void    swift::Loop (tint till) {
 }
 
 
-bool    swift::Listen3rdPartySocket (socket_callbacks_t cb) {
-    int i=0;
-    while (i<Channel::socket_count && Channel::sockets[i].sock!=cb.sock) i++;
-    if (i==Channel::socket_count)
-        if (i==SWFT_MAX_SOCK_OPEN)
-            return false;
-        else
-            Channel::socket_count++;
-    Channel::sockets[i]=cb;
-    if (!cb.may_read && !cb.may_write && !cb.on_error)
-        Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
-    return true;
-}
-
 
 int      swift::Open (const char* filename, const Sha1Hash& hash) {
     FileTransfer* ft = new FileTransfer(filename, hash);
index 62764cd..ace88b1 100644 (file)
@@ -27,6 +27,8 @@ tint Datagram::epoch = now/360000000LL*360000000LL; // make logs mergeable
 uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
 uint64_t Datagram::dgrams_up=0, Datagram::dgrams_down=0,
          Datagram::bytes_up=0, Datagram::bytes_down=0;
+sckrwecb_t Datagram::sock_open[] = {};
+int Datagram::sock_count = 0;
 
 const char* tintstr (tint time) {
     if (time==0)
@@ -84,6 +86,27 @@ Address::Address(const char* ip_port) {
     }
 }
 
+    
+bool    Datagram::Listen3rdPartySocket (sckrwecb_t cb) {
+    int i=0;
+    while (i<sock_count && sock_open[i].sock!=cb.sock) i++;
+    if (i==sock_count)
+        if (i==DGRAM_MAX_SOCK_OPEN)
+            return false;
+        else
+            sock_count++;
+    sock_open[i]=cb;
+    //if (!cb.may_read && !cb.may_write && !cb.on_error)
+    //    sock_open[i] = sock_open[--sock_count];
+    return true;
+}
+
+    
+void Datagram::Shutdown () {
+    while (sock_count--)
+        Close(sock_open[sock_count].sock);
+}
+    
 
 int Datagram::Send () {
     int r = sendto(sock,(const char *)buf+offset,length-offset,0,
@@ -114,7 +137,7 @@ int Datagram::Recv () {
 }
 
 
-void Datagram::Wait (int sockcnt, socket_callbacks_t* sockets, tint usec) {
+SOCKET Datagram::Wait (tint usec) {
     struct timeval timeout;
     timeout.tv_sec = usec/TINT_SEC;
     timeout.tv_usec = usec%TINT_SEC;
@@ -123,31 +146,32 @@ void Datagram::Wait (int sockcnt, socket_callbacks_t* sockets, tint usec) {
     FD_ZERO(&rdfd);
     FD_ZERO(&wrfd);
     FD_ZERO(&errfd);
-    for(int i=0; i<sockcnt; i++) {
-        if (sockets[i].may_read!=0)
-            FD_SET(sockets[i].sock,&rdfd);
-        if (sockets[i].may_write!=0)
-            FD_SET(sockets[i].sock,&wrfd);
-        if (sockets[i].on_error!=0)
-            FD_SET(sockets[i].sock,&errfd);
-        if (sockets[i].sock>max_sock_fd)
-            max_sock_fd = sockets[i].sock;
+    for(int i=0; i<sock_count; i++) {
+        if (sock_open[i].may_read!=0)
+            FD_SET(sock_open[i].sock,&rdfd);
+        if (sock_open[i].may_write!=0)
+            FD_SET(sock_open[i].sock,&wrfd);
+        if (sock_open[i].on_error!=0)
+            FD_SET(sock_open[i].sock,&errfd);
+        if (sock_open[i].sock>max_sock_fd)
+            max_sock_fd = sock_open[i].sock;
     }
-    int sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout);
+    SOCKET sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout);
     Time();
     if (sel>0) {
-        for (int i=0; i<=sockcnt; i++) {
-            socket_callbacks_t& sct = sockets[i];
+        for (int i=0; i<=sock_count; i++) {
+            sckrwecb_t& sct = sock_open[i];
             if (sct.may_read && FD_ISSET(sct.sock,&rdfd))
                 (*(sct.may_read))(sct.sock);
-            if (sct.may_write && FD_ISSET(sockets[i].sock,&wrfd))
+            if (sct.may_write && FD_ISSET(sct.sock,&wrfd))
                 (*(sct.may_write))(sct.sock);
-            if (sct.on_error && FD_ISSET(sockets[i].sock,&errfd))
+            if (sct.on_error && FD_ISSET(sct.sock,&errfd))
                 (*(sct.on_error))(sct.sock);
         }
     } else if (sel<0) {
         print_error("select fails");
     }
+    return sel;
 }
 
 tint Datagram::Time () {
@@ -157,22 +181,30 @@ tint Datagram::Time () {
     return now = usec_time();
 }
 
-SOCKET Datagram::Bind (Address addr_) {
-    struct sockaddr_in addr = addr_;
+SOCKET Datagram::Bind (Address address, sckrwecb_t callbacks) {
+    struct sockaddr_in addr = address;
     SOCKET fd;
     int len = sizeof(struct sockaddr_in), sndbuf=1<<20, rcvbuf=1<<20;
-    #define dbnd_ensure(x) { if (!(x)) { print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
+    #define dbnd_ensure(x) { if (!(x)) { \
+        print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
     dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 );
     dbnd_ensure( make_socket_nonblocking(fd) );  // FIXME may remove this
     int enable = true;
-    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
-    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
+    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 
+                             (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
+    dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, 
+                             (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
     //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
     dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 );
+    callbacks.sock = fd;
+    Datagram::sock_open[Datagram::sock_count++] = callbacks;
     return fd;
 }
 
 void Datagram::Close (SOCKET sock) {
+    for(int i=0; i<Datagram::sock_count; i++)
+        if (Datagram::sock_open[i].sock==sock)
+            Datagram::sock_open[i] = Datagram::sock_open[--Datagram::sock_count];
     if (!close_socket(sock))
         print_error("on closing a socket");
 }
index eb836d7..e250fab 100644 (file)
@@ -84,14 +84,14 @@ struct Address {
 };
 
 
-typedef void (*sock_cb_t) (SOCKET);
-struct socket_callbacks_t {
-    socket_callbacks_t (SOCKET s=0, sock_cb_t mr=NULL, sock_cb_t mw=NULL, sock_cb_t oe=NULL) :
+typedef void (*sockcb_t) (SOCKET);
+struct sckrwecb_t {
+    sckrwecb_t (SOCKET s=0, sockcb_t mr=NULL, sockcb_t mw=NULL, sockcb_t oe=NULL) :
         sock(s), may_read(mr), may_write(mw), on_error(oe) {}
     SOCKET sock;
-    sock_cb_t   may_read;
-    sock_cb_t   may_write;
-    sock_cb_t   on_error;
+    sockcb_t   may_read;
+    sockcb_t   may_write;
+    sockcb_t   on_error;
 };
 
 
@@ -106,19 +106,30 @@ class Datagram {
     int offset, length;
     uint8_t    buf[MAXDGRAMSZ*2];
 
+#define DGRAM_MAX_SOCK_OPEN 128
+    static int sock_count;
+    static sckrwecb_t sock_open[DGRAM_MAX_SOCK_OPEN];
+    
 public:
 
     /** bind to the address */
-    static SOCKET Bind(Address address);
+    static SOCKET Bind(Address address, sckrwecb_t callbacks=sckrwecb_t());
 
     /** close the port */
-    static void Close(int port);
+    static void Close(SOCKET sock);
 
     /** the current time */
     static tint Time();
 
     /** wait till one of the sockets has some io to do; usec is the timeout */
-    static void Wait (int sockcnt, socket_callbacks_t* sockets, tint usec=0);
+    static SOCKET Wait (tint usec);
+    
+    static bool Listen3rdPartySocket (sckrwecb_t cb) ;
+    
+    static void Shutdown ();
+    
+    static SOCKET default_socket() 
+        { return sock_count ? sock_open[0].sock : INVALID_SOCKET; }
 
     static tint now, epoch, start;
     static uint64_t dgrams_up, dgrams_down, bytes_up, bytes_down;
index 9738cc1..103ac4b 100644 (file)
@@ -49,7 +49,7 @@ void HttpGwCloseConnection (SOCKET sock) {
         *req = http_requests[--http_gw_reqs_open];
     }
     swift::close_socket(sock);
-    swift::Listen3rdPartySocket(socket_callbacks_t(sock));
+    swift::Datagram::Listen3rdPartySocket(sckrwecb_t(sock));
 }
 
 
@@ -76,12 +76,12 @@ void HttpGwMayWriteCallback (SOCKET sink) {
     } else {
         if (req->tosend==0) { // done; wait for new request
             dprintf("%s @%i done\n",tintstr(),req->id);
-            socket_callbacks_t wait_new_req(req->sink,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection);
-            swift::Listen3rdPartySocket (wait_new_req);
+            sckrwecb_t wait_new_req(req->sink,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection);
+            swift::Datagram::Listen3rdPartySocket (wait_new_req);
         } else { // wait for data
             dprintf("%s @%i waiting for data\n",tintstr(),req->id);
-            socket_callbacks_t wait_swift_data(req->sink,NULL,NULL,HttpGwCloseConnection);
-            swift::Listen3rdPartySocket(wait_swift_data);
+            sckrwecb_t wait_swift_data(req->sink,NULL,NULL,HttpGwCloseConnection);
+            swift::Datagram::Listen3rdPartySocket(wait_swift_data);
         }
     }
 }
@@ -92,9 +92,9 @@ void HttpGwSwiftProgressCallback (int transfer, bin64_t bin) {
         if (http_requests[httpc].transfer==transfer)
             if ( (bin.base_offset()<<10) == http_requests[httpc].offset ) {
                 dprintf("%s @%i progress: %s\n",tintstr(),http_requests[httpc].id,bin.str());
-                socket_callbacks_t maywrite_callbacks
+                sckrwecb_t maywrite_callbacks
                         (http_requests[httpc].sink,NULL,HttpGwMayWriteCallback,HttpGwCloseConnection);
-                Listen3rdPartySocket (maywrite_callbacks);
+                Datagram::Listen3rdPartySocket (maywrite_callbacks);
             }
 }
 
@@ -182,8 +182,8 @@ void HttpGwNewRequestCallback (SOCKET http_conn){
         HttpGwFirstProgressCallback(file,bin64_t(0,0));
     } else {
         swift::AddProgressCallback(file,&HttpGwFirstProgressCallback);
-        socket_callbacks_t install (http_conn,NULL,NULL,HttpGwCloseConnection);
-        swift::Listen3rdPartySocket(install);
+        sckrwecb_t install (http_conn,NULL,NULL,HttpGwCloseConnection);
+        swift::Datagram::Listen3rdPartySocket(install);
     }
 }
 
@@ -199,9 +199,9 @@ void HttpGwNewConnectionCallback (SOCKET serv) {
     }
     make_socket_nonblocking(conn);
     // submit 3rd party socket to the swift loop
-    socket_callbacks_t install
+    sckrwecb_t install
         (conn,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection);
-    swift::Listen3rdPartySocket(install);
+    swift::Datagram::Listen3rdPartySocket(install);
 }
 
 
@@ -209,7 +209,7 @@ void HttpGwError (SOCKET s) {
     print_error("httpgw is dead");
     dprintf("%s @0 closed http gateway\n",tintstr());
     close_socket(s);
-    swift::Listen3rdPartySocket(socket_callbacks_t(s));
+    swift::Datagram::Listen3rdPartySocket(sckrwecb_t(s));
 }
 
 
@@ -231,7 +231,8 @@ SOCKET InstallHTTPGateway (Address bind_to) {
     gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) );
     gw_ensure (make_socket_nonblocking(fd));
     gw_ensure ( 0==listen(fd,8) );
-    socket_callbacks_t install_http(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
-    gw_ensure (swift::Listen3rdPartySocket(install_http));
+    sckrwecb_t install_http(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
+    gw_ensure (swift::Datagram::Listen3rdPartySocket(install_http));
     dprintf("%s @0 installed http gateway on %s\n",tintstr(),bind_to.str());
+    return fd;
 }
index 3c6414f..bed623e 100644 (file)
@@ -551,7 +551,7 @@ void    Channel::Loop (tint howlong) {
 
             tint towait = min(limit,send_time) - NOW;
             dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
-            Datagram::Wait(socket_count,sockets,towait);
+            Datagram::Wait(towait);
             if (sender)  // get back to that later
                 send_queue.push(tintbin(send_time,sender->id()));
 
diff --git a/swift.h b/swift.h
index 40dada3..7ed26e5 100644 (file)
--- a/swift.h
+++ b/swift.h
@@ -323,7 +323,6 @@ namespace swift {
             return i<channels.size()?channels[i]:NULL;
         }
         static void CloseTransfer (FileTransfer* trans);
-        static SOCKET default_socket() { return sockets[0].sock; }
 
     protected:
         /** Channel id: index in the channel array. */
@@ -403,9 +402,6 @@ namespace swift {
 
         static PeerSelector* peer_selector;
 
-        #define SWFT_MAX_SOCK_OPEN 128
-        static socket_callbacks_t sockets[SWFT_MAX_SOCK_OPEN];
-        static int      socket_count;
         static tint     last_tick;
         static tbheap   send_queue;
 
@@ -417,7 +413,6 @@ namespace swift {
         friend void     AddPeer (Address address, const Sha1Hash& root);
         friend void     SetTracker(const Address& tracker);
         friend int      Open (const char*, const Sha1Hash&) ; // FIXME
-        friend bool     Listen3rdPartySocket (socket_callbacks_t);
 
     };
 
@@ -428,7 +423,7 @@ namespace swift {
     int     Listen (Address addr);
     /** Run send/receive loop for the specified amount of time. */
     void    Loop (tint till);
-    bool    Listen3rdPartySocket (socket_callbacks_t);
+    bool    Listen3rdPartySocket (sckrwecb_t);
     /** Stop listening to a port. */
     void    Shutdown (int sock_des=-1);
 
index eb7443c..a940293 100644 (file)
@@ -52,7 +52,7 @@ TEST(Datagram, BinaryTest) {
        ASSERT_EQ(datalen,d.Send());
     SOCKET socks[1] = {socket};
     // Arno: timeout 0 gives undeterministic behaviour on win32
-       SOCKET waitsocket = Datagram::Wait(1,socks,1000000);
+       SOCKET waitsocket = Datagram::Wait(1000000);
        ASSERT_EQ(socket,waitsocket);
        Datagram rcv(waitsocket);
        ASSERT_EQ(datalen,rcv.Recv());
@@ -91,7 +91,7 @@ TEST(Datagram,TwoPortTest) {
 
     SOCKET socks[2] = {sock1,sock2};
     // Arno: timeout 0 gives undeterministic behaviour on win32
-       EXPECT_EQ(sock2,Datagram::Wait(2,socks,1000000));
+       EXPECT_EQ(sock2,Datagram::Wait(1000000));
        Datagram recv(sock2);
        recv.Recv();
        uint32_t test = recv.Pull32();
index a67a61b..296f743 100644 (file)
@@ -90,7 +90,7 @@ void            FileTransfer::OnPexIn (const Address& addr) {
             return; // already connected
     }
     if (hs_in_.size()<20) {
-        new Channel(this,Channel::default_socket(),addr);
+        new Channel(this,Datagram::default_socket(),addr);
     } else {
         pex_in_.push_back(addr);
         if (pex_in_.size()>1000)