From c37efed023f38f6e59659e1fa3e72f29f46d590f Mon Sep 17 00:00:00 2001 From: "Victor Grishchenko (Debian)" Date: Fri, 5 Mar 2010 19:26:27 +0100 Subject: [PATCH] http gateway in progress --- ChangeLog | 7 ---- bin64.h | 4 +-- channel.cpp | 28 ++++++++++++---- compat.cpp | 18 ++++++++++ compat.h | 14 ++++++++ datagram.cpp | 93 +++++++++++++++++++++------------------------------- datagram.h | 33 +++++++++---------- hashtree.cpp | 2 +- httpgw.cpp | 86 ++++++++++++++++++++++++++++++++++++++++++++++++ sendrecv.cpp | 15 +++------ swift.h | 12 ++++--- 11 files changed, 209 insertions(+), 103 deletions(-) delete mode 100755 ChangeLog create mode 100644 httpgw.cpp diff --git a/ChangeLog b/ChangeLog deleted file mode 100755 index ad12850..0000000 --- a/ChangeLog +++ /dev/null @@ -1,7 +0,0 @@ -0.003 - This is not a release as well - 18 Oct 2009 - - - but at least, it compiles now - -0.002 - This is not a release - 7 Oct 2009 - - - it does not even compile, committed for reading purposes only diff --git a/bin64.h b/bin64.h index 6ddce3f..fb88c99 100644 --- a/bin64.h +++ b/bin64.h @@ -137,13 +137,13 @@ struct bin64_t { } /** Check whether this bin is the left sibling. */ - bool is_left () const { + inline bool is_left () const { uint64_t tb = tail_bit(); return !(v&(tb<<1)); } /** Check whether this bin is the right sibling. */ - bool is_right() const { return !is_left(); } + inline bool is_right() const { return !is_left(); } /** Get the leftmost basic bin within this bin. */ bin64_t left_foot () const { diff --git a/channel.cpp b/channel.cpp index 80dce25..f6d52b8 100644 --- a/channel.cpp +++ b/channel.cpp @@ -31,7 +31,7 @@ int Channel::MAX_REORDERING = 4; bool Channel::SELF_CONN_OK = false; swift::tint Channel::TIMEOUT = TINT_SEC*60; std::vector Channel::channels(1); -SOCKET Channel::sockets[8] = {0,0,0,0,0,0,0,0}; +socket_callbacks_t Channel::sockets[SWFT_MAX_SOCK_OPEN] = {}; int Channel::socket_count = 0; Address Channel::tracker; tbheap Channel::send_queue; @@ -41,7 +41,7 @@ PeerSelector* Channel::peer_selector = new SimpleSelector(); Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) : transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0), - socket_(socket==-1?sockets[0]:socket), // FIXME + socket_(socket==INVALID_SOCKET?sockets[0].sock:socket), // FIXME data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0), own_id_mentioned_(false), next_send_time_(0), last_send_time_(0), last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC), @@ -86,16 +86,19 @@ int Channel::EncodeID(int unscrambled) { int swift::Listen (Address addr) { int sock = Datagram::Bind(addr); - if (sock!=INVALID_SOCKET) - Channel::sockets[Channel::socket_count++] = sock; + if (sock!=INVALID_SOCKET) { + socket_callbacks_t cb(sock); + cb.may_read = &Channel::RecvDatagram; + Channel::sockets[Channel::socket_count++] = cb; + } return sock; } void swift::Shutdown (int sock_des) { for(int i=0; ifile().file_descriptor()) { diff --git a/compat.cpp b/compat.cpp index 08dfc7d..7a5a654 100644 --- a/compat.cpp +++ b/compat.cpp @@ -193,5 +193,23 @@ std::string gettmpdir(void) #endif } +bool make_socket_nonblocking(SOCKET fd) { +#ifdef _WIN32 + u_long enable = 1; + return 0==ioctlsocket(fd, FIONBIO, &enable); +#else + int enable=1; + return 0==fcntl(fd, F_SETFL, O_NONBLOCK); +#endif +} + +bool close_socket (SOCKET sock) { +#ifdef _WIN32 + return 0==closesocket(sock); +#else + return 0==::close(sock); +#endif +} + } diff --git a/compat.h b/compat.h index 0b4db12..c506cbf 100644 --- a/compat.h +++ b/compat.h @@ -28,8 +28,17 @@ typedef unsigned __int64 uint64_t; #include #else #include +#include +#include +#include +#include #endif +#ifndef _WIN32 +typedef int SOCKET; +#endif + +#include #include #include #include @@ -96,6 +105,11 @@ std::string gettmpdir(void); tint usec_time (); +bool make_socket_nonblocking(SOCKET s); + +bool close_socket (SOCKET sock); + + }; #endif diff --git a/datagram.cpp b/datagram.cpp index e99d620..1388a1c 100644 --- a/datagram.cpp +++ b/datagram.cpp @@ -92,16 +92,16 @@ int Datagram::Send () { perror("can't send"); dgrams_up++; bytes_up+=size(); - offset=0; - length=0; - Time(); - return r; + offset=0; + length=0; + Time(); + return r; } int Datagram::Recv () { socklen_t addrlen = sizeof(struct sockaddr_in); offset = 0; - length = recvfrom (sock, (char *)buf, MAXDGRAMSZ, 0, + length = recvfrom (sock, (char *)buf, MAXDGRAMSZ*2, 0, (struct sockaddr*)&(addr.addr), &addrlen); if (length<0) { length = 0; @@ -114,30 +114,39 @@ int Datagram::Recv () { } -SOCKET Datagram::Wait (int sockcnt, SOCKET* sockets, tint usec) { +void Datagram::Wait (int sockcnt, socket_callbacks_t* sockets, tint usec) { struct timeval timeout; timeout.tv_sec = usec/TINT_SEC; timeout.tv_usec = usec%TINT_SEC; int max_sock_fd = 0; - fd_set bases, err; - FD_ZERO(&bases); - FD_ZERO(&err); + fd_set rdfd, wrfd, errfd; + FD_ZERO(&rdfd); + FD_ZERO(&wrfd); + FD_ZERO(&errfd); for(int i=0; imax_sock_fd) - max_sock_fd = sockets[i]; + if (sockets[i].may_read!=0) + FD_SET(sockets[i].sock,&rdfd); + if (sockets[i].may_write!=0) + FD_SET(sockets[i].sock,&wrfd); + if (sockets[i].on_error!=0) + FD_SET(sockets[i].sock,&errfd); + if (sockets[i].sock>max_sock_fd) + max_sock_fd = sockets[i].sock; } - int sel = select(max_sock_fd+1, &bases, NULL, &err, &timeout); + int sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout); Time(); if (sel>0) { - for (int i=0; i<=sockcnt; i++) - if (FD_ISSET(sockets[i],&bases)) - return sockets[i]; + for (int i=0; i<=sockcnt; i++) { + if (FD_ISSET(sockets[i].sock,&rdfd)) + (*(sockets[i].may_read))(sockets[i].sock); + if (FD_ISSET(sockets[i].sock,&wrfd)) + (*(sockets[i].may_write))(sockets[i].sock); + if (FD_ISSET(sockets[i].sock,&errfd)) + (*(sockets[i].on_error))(sockets[i].sock); + } } else if (sel<0) { print_error("select fails"); } - return INVALID_SOCKET; } tint Datagram::Time () { @@ -151,49 +160,23 @@ SOCKET Datagram::Bind (Address addr_) { struct sockaddr_in addr = addr_; SOCKET fd; int len = sizeof(struct sockaddr_in), sndbuf=1<<20, rcvbuf=1<<20; - if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { - print_error("socket() fails"); - return INVALID_SOCKET; - } + #define dbnd_ensure(x) { if (!(x)) { print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } } + dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 ); + dbnd_ensure( make_socket_nonblocking(fd) ); // FIXME may remove this #ifdef _WIN32 - u_long enable = 1; - ioctlsocket(fd, FIONBIO, &enable); - if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf, sizeof(int)) != 0 ) { - print_error("setsockopt fails"); - return INVALID_SOCKET; - } - if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const char *)&rcvbuf, sizeof(int)) != 0 ) { - print_error("setsockopt2 fails"); - return INVALID_SOCKET; - } - setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&enable, sizeof(int)); +#define parptype (char*) #else - int enable=1; - if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) - return INVALID_SOCKET; - if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(int)) < 0 ) { - print_error("setsockopt fails"); - return INVALID_SOCKET; - } - if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(int)) < 0 ) { - print_error("setsockopt2 fails"); - return INVALID_SOCKET; - } - setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)); +#define parptype void* #endif - if (::bind(fd, (sockaddr*)&addr, len) != 0) { - print_error("bind fails"); - return INVALID_SOCKET; - } + dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (parptype)&sndbuf, sizeof(int)) == 0 ); + dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (parptype)&rcvbuf, sizeof(int)) == 0 ); + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (parptype)&enable, sizeof(int)); + dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 ); return fd; } -void Datagram::Close (int sock) { // remove from fd_set -#ifdef _WIN32 - if (closesocket(sock)!=0) -#else - if (::close(sock)!=0) -#endif +void Datagram::Close (SOCKET sock) { + if (!close_socket(sock)) print_error("on closing a socket"); } diff --git a/datagram.h b/datagram.h index 6fb26d7..571e293 100644 --- a/datagram.h +++ b/datagram.h @@ -9,24 +9,8 @@ #ifndef DATAGRAM_H #define DATAGRAM_H -#include -#include #include #include -#include -#include -#ifdef _WIN32 - #include - #include "compat.h" -#else - typedef int SOCKET; - - #include - #include - #include - #include - #include -#endif #include "hashtree.h" #include "compat.h" @@ -100,6 +84,17 @@ struct Address { }; +typedef void (*sock_cb_t) (SOCKET); +struct socket_callbacks_t { + socket_callbacks_t (SOCKET s=0) : sock(s), + may_read(NULL), may_write(NULL), on_error(NULL) {} + SOCKET sock; + sock_cb_t may_read; + sock_cb_t may_write; + sock_cb_t on_error; +}; + + /** UDP datagram class, a nice wrapping around sendto/recvfrom/select. Reading/writing from/to a datagram is done in a FIFO (deque) fashion: written data is appended to the tail (push) while read data is @@ -115,12 +110,16 @@ public: /** bind to the address */ static SOCKET Bind(Address address); + /** close the port */ static void Close(int port); + /** the current time */ static tint Time(); + /** wait till one of the sockets has some io to do; usec is the timeout */ - static SOCKET Wait (int sockcnt, SOCKET* sockets, tint usec=0); + static void Wait (int sockcnt, socket_callbacks_t* sockets, tint usec=0); + static tint now, epoch, start; static uint64_t dgrams_up, dgrams_down, bytes_up, bytes_down; diff --git a/hashtree.cpp b/hashtree.cpp index 3e772eb..825fe7c 100644 --- a/hashtree.cpp +++ b/hashtree.cpp @@ -319,7 +319,7 @@ bool HashTree::OfferData (bin64_t pos, const char* data, size_t lengt Sha1Hash data_hash(data,length); if (!OfferHash(pos, data_hash)) { - printf("invalid hash for %s: %s\n",pos.str(),data_hash.hex().c_str()); + printf("invalid hash for %s: %s\n",pos.str(),data_hash.hex().c_str()); // FIXME return false; } diff --git a/httpgw.cpp b/httpgw.cpp new file mode 100644 index 0000000..bb03be3 --- /dev/null +++ b/httpgw.cpp @@ -0,0 +1,86 @@ +#include "swift.h" + +#define MAX_HTTP_CLIENT 128 + +struct http_gw_t { + uint64_t offset; + uint64_t tosend; + int transfer; + SOCKET sink; +} http_clients[MAX_HTTP_CLIENT]; + +void HttpGwErrorCallback (SOCKET sink) { + +} + +void HttpGwMayWriteCallback (SOCKET sink) { + // if have data => write + // otherwise, change mask + if (not_enough_data) + swift::Listen3rdPartySocket(http_conns[httpc].sink,NULL,NULL,ErrorCallback); + if (all_done) + swift::Listen3rdPartySocket(http_conns[httpc].sink,NewRequestCallback,NULL,ErrorCallback); +} + + +void SwiftProgressCallback (int transfer, bin64_t bin) { + for (int httpc=0; httpc one requests at a time + fgets(); + // HTTP request line + sscanf(); + // HTTP header fields + sscanf(); + // incomplete header => screw it + fprintf("400 Incomplete header\r\n"); + close(); + // initiate transmission + // write response header + http_clients[i].offset = 0; + http_clients[i].tosend = 10000; + http_clients[i].transfer = file; + http_clients[i].sink = conn; +} + + +// be liberal in what you do, be conservative in what you accept +void HttpGwNewConnectionCallback (SOCKET serv) { + Address client_address; + SOCKET conn = accept (serv, & (client_address.addr), sizeof(struct sockaddr_in)); + if (conn==INVALID_SOCKET) { + print_error("client conn fails"); + return; + } + make_socket_nonblocking(conn); + // submit 3rd party socket to the swift loop + socket_callbacks_t install(conn,HttpGwNewRequestCallback,HttpGwMayWriteCallback,HttpGwErrorCallback); + swift::Listen3rdPartySocket(install); +} + + +void HttpGwError (SOCKET serv) { + print_error("error on http socket"); +} + + +SOCKET InstallHTTPGateway (Address bind_to) { + SOCKET fd; + #define gw_ensure(x) { if (!(x)) { print_error("http binding fails"); close_socket(fd); return INVALID_SOCKET; } } + gw_ensure ( (fd=socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET ); + gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) ); + gw_ensure (make_socket_nonblocking(fd)); + gw_ensure ( 0==listen(fd,8) ); + socket_callbacks_t install(sock,HttpGwNewConnectionCallback,NULL,HttpGwError); + gw_ensure (swift::Listen3rdPartySocket(install)); +} diff --git a/sendrecv.cpp b/sendrecv.cpp index aa676da..c30fc03 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -135,6 +135,7 @@ void Channel::Send () { last_send_time_ = NOW; sent_since_recv_++; dgrams_sent_++; + Reschedule(); } @@ -284,6 +285,7 @@ void Channel::Recv (Datagram& dgram) { } last_recv_time_ = NOW; sent_since_recv_ = 0; + Reschedule(); } @@ -474,11 +476,11 @@ void Channel::AddPex (Datagram& dgram) { } -Channel* Channel::RecvDatagram (int socket) { +void Channel::RecvDatagram (SOCKET socket) { Datagram data(socket); data.Recv(); const Address& addr = data.address(); -#define return_log(...) { printf(__VA_ARGS__); return NULL; } +#define return_log(...) { printf(__VA_ARGS__); } if (data.size()<4) return_log("datagram shorter than 4 bytes %s\n",addr.str()); uint32_t mych = data.Pull32(); @@ -519,7 +521,6 @@ Channel* Channel::RecvDatagram (int socket) { } //dprintf("recvd %i bytes for %i\n",data.size(),channel->id); channel->Recv(data); - return channel; } @@ -545,18 +546,12 @@ void Channel::Loop (tint howlong) { dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(), tintstr(send_time)); sender->Send(); - sender->Reschedule(); } else { // it's too early, wait tint towait = min(limit,send_time) - NOW; dprintf("%s #0 waiting %lliusec\n",tintstr(),towait); - int rd = Datagram::Wait(socket_count,sockets,towait); - if (rd!=INVALID_SOCKET) { // in meantime, received something - Channel* receiver = RecvDatagram(rd); - if (receiver) // receiver's state may have changed - receiver->Reschedule(); - } + Datagram::Wait(socket_count,sockets,towait); if (sender) // get back to that later send_queue.push(tintbin(send_time,sender->id())); diff --git a/swift.h b/swift.h index be996c7..e6a8e57 100644 --- a/swift.h +++ b/swift.h @@ -239,7 +239,7 @@ namespace swift { Normally, API users do not deal with this class. */ class Channel { public: - Channel (FileTransfer* file, int socket=-1, Address peer=Address()); + Channel (FileTransfer* file, int socket=INVALID_SOCKET, Address peer=Address()); ~Channel(); typedef enum { @@ -253,8 +253,7 @@ namespace swift { static const char* SEND_CONTROL_MODES[]; - static Channel* - RecvDatagram (int socket); + static void RecvDatagram (SOCKET socket); static void Loop (tint till); void Recv (Datagram& dgram); @@ -317,7 +316,7 @@ namespace swift { return i