+++ /dev/null
-0.003 - This is not a release as well - 18 Oct 2009
-
- - but at least, it compiles now
-
-0.002 - This is not a release - 7 Oct 2009
-
- - it does not even compile, committed for reading purposes only
}
/** Check whether this bin is the left sibling. */
- bool is_left () const {
+ inline bool is_left () const {
uint64_t tb = tail_bit();
return !(v&(tb<<1));
}
/** Check whether this bin is the right sibling. */
- bool is_right() const { return !is_left(); }
+ inline bool is_right() const { return !is_left(); }
/** Get the leftmost basic bin within this bin. */
bin64_t left_foot () const {
bool Channel::SELF_CONN_OK = false;
swift::tint Channel::TIMEOUT = TINT_SEC*60;
std::vector<Channel*> Channel::channels(1);
-SOCKET Channel::sockets[8] = {0,0,0,0,0,0,0,0};
+socket_callbacks_t Channel::sockets[SWFT_MAX_SOCK_OPEN] = {};
int Channel::socket_count = 0;
Address Channel::tracker;
tbheap Channel::send_queue;
Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) :
transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
- socket_(socket==-1?sockets[0]:socket), // FIXME
+ socket_(socket==INVALID_SOCKET?sockets[0].sock:socket), // FIXME
data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0),
own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
int swift::Listen (Address addr) {
int sock = Datagram::Bind(addr);
- if (sock!=INVALID_SOCKET)
- Channel::sockets[Channel::socket_count++] = sock;
+ if (sock!=INVALID_SOCKET) {
+ socket_callbacks_t cb(sock);
+ cb.may_read = &Channel::RecvDatagram;
+ Channel::sockets[Channel::socket_count++] = cb;
+ }
return sock;
}
void swift::Shutdown (int sock_des) {
for(int i=0; i<Channel::socket_count; i++)
- if (sock_des==-1 || Channel::sockets[i]==sock_des) {
- Datagram::Close(Channel::sockets[i]);
+ if (sock_des==-1 || Channel::sockets[i].sock==sock_des) {
+ Datagram::Close(Channel::sockets[i].sock);
Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
}
}
}
+bool swift::Listen3rdPartySocket (socket_callbacks_t cb) {
+ int i=0;
+ while (i<Channel::socket_count && Channel::sockets[i].sock!=cb.sock) i++;
+ if (i==Channel::socket_count)
+ if (i==SWFT_MAX_SOCK_OPEN)
+ return false;
+ else
+ Channel::socket_count++;
+ Channel::sockets[i]=cb;
+ return true;
+}
+
+
int swift::Open (const char* filename, const Sha1Hash& hash) {
FileTransfer* ft = new FileTransfer(filename, hash);
if (ft && ft->file().file_descriptor()) {
#endif
}
+bool make_socket_nonblocking(SOCKET fd) {
+#ifdef _WIN32
+ u_long enable = 1;
+ return 0==ioctlsocket(fd, FIONBIO, &enable);
+#else
+ int enable=1;
+ return 0==fcntl(fd, F_SETFL, O_NONBLOCK);
+#endif
+}
+
+bool close_socket (SOCKET sock) {
+#ifdef _WIN32
+ return 0==closesocket(sock);
+#else
+ return 0==::close(sock);
+#endif
+}
+
}
#include <io.h>
#else
#include <sys/mman.h>
+#include <arpa/inet.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
#endif
+#ifndef _WIN32
+typedef int SOCKET;
+#endif
+
+#include <unistd.h>
#include <fcntl.h>
#include <cstdio>
#include <cstdlib>
tint usec_time ();
+bool make_socket_nonblocking(SOCKET s);
+
+bool close_socket (SOCKET sock);
+
+
};
#endif
perror("can't send");
dgrams_up++;
bytes_up+=size();
- offset=0;
- length=0;
- Time();
- return r;
+ offset=0;
+ length=0;
+ Time();
+ return r;
}
int Datagram::Recv () {
socklen_t addrlen = sizeof(struct sockaddr_in);
offset = 0;
- length = recvfrom (sock, (char *)buf, MAXDGRAMSZ, 0,
+ length = recvfrom (sock, (char *)buf, MAXDGRAMSZ*2, 0,
(struct sockaddr*)&(addr.addr), &addrlen);
if (length<0) {
length = 0;
}
-SOCKET Datagram::Wait (int sockcnt, SOCKET* sockets, tint usec) {
+void Datagram::Wait (int sockcnt, socket_callbacks_t* sockets, tint usec) {
struct timeval timeout;
timeout.tv_sec = usec/TINT_SEC;
timeout.tv_usec = usec%TINT_SEC;
int max_sock_fd = 0;
- fd_set bases, err;
- FD_ZERO(&bases);
- FD_ZERO(&err);
+ fd_set rdfd, wrfd, errfd;
+ FD_ZERO(&rdfd);
+ FD_ZERO(&wrfd);
+ FD_ZERO(&errfd);
for(int i=0; i<sockcnt; i++) {
- FD_SET(sockets[i],&bases);
- FD_SET(sockets[i],&err);
- if (sockets[i]>max_sock_fd)
- max_sock_fd = sockets[i];
+ if (sockets[i].may_read!=0)
+ FD_SET(sockets[i].sock,&rdfd);
+ if (sockets[i].may_write!=0)
+ FD_SET(sockets[i].sock,&wrfd);
+ if (sockets[i].on_error!=0)
+ FD_SET(sockets[i].sock,&errfd);
+ if (sockets[i].sock>max_sock_fd)
+ max_sock_fd = sockets[i].sock;
}
- int sel = select(max_sock_fd+1, &bases, NULL, &err, &timeout);
+ int sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout);
Time();
if (sel>0) {
- for (int i=0; i<=sockcnt; i++)
- if (FD_ISSET(sockets[i],&bases))
- return sockets[i];
+ for (int i=0; i<=sockcnt; i++) {
+ if (FD_ISSET(sockets[i].sock,&rdfd))
+ (*(sockets[i].may_read))(sockets[i].sock);
+ if (FD_ISSET(sockets[i].sock,&wrfd))
+ (*(sockets[i].may_write))(sockets[i].sock);
+ if (FD_ISSET(sockets[i].sock,&errfd))
+ (*(sockets[i].on_error))(sockets[i].sock);
+ }
} else if (sel<0) {
print_error("select fails");
}
- return INVALID_SOCKET;
}
tint Datagram::Time () {
struct sockaddr_in addr = addr_;
SOCKET fd;
int len = sizeof(struct sockaddr_in), sndbuf=1<<20, rcvbuf=1<<20;
- if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
- print_error("socket() fails");
- return INVALID_SOCKET;
- }
+ #define dbnd_ensure(x) { if (!(x)) { print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
+ dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 );
+ dbnd_ensure( make_socket_nonblocking(fd) ); // FIXME may remove this
#ifdef _WIN32
- u_long enable = 1;
- ioctlsocket(fd, FIONBIO, &enable);
- if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf, sizeof(int)) != 0 ) {
- print_error("setsockopt fails");
- return INVALID_SOCKET;
- }
- if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const char *)&rcvbuf, sizeof(int)) != 0 ) {
- print_error("setsockopt2 fails");
- return INVALID_SOCKET;
- }
- setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&enable, sizeof(int));
+#define parptype (char*)
#else
- int enable=1;
- if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1)
- return INVALID_SOCKET;
- if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(int)) < 0 ) {
- print_error("setsockopt fails");
- return INVALID_SOCKET;
- }
- if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(int)) < 0 ) {
- print_error("setsockopt2 fails");
- return INVALID_SOCKET;
- }
- setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
+#define parptype void*
#endif
- if (::bind(fd, (sockaddr*)&addr, len) != 0) {
- print_error("bind fails");
- return INVALID_SOCKET;
- }
+ dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (parptype)&sndbuf, sizeof(int)) == 0 );
+ dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (parptype)&rcvbuf, sizeof(int)) == 0 );
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (parptype)&enable, sizeof(int));
+ dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 );
return fd;
}
-void Datagram::Close (int sock) { // remove from fd_set
-#ifdef _WIN32
- if (closesocket(sock)!=0)
-#else
- if (::close(sock)!=0)
-#endif
+void Datagram::Close (SOCKET sock) {
+ if (!close_socket(sock))
print_error("on closing a socket");
}
#ifndef DATAGRAM_H
#define DATAGRAM_H
-#include <stdlib.h>
-#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
-#include <stdio.h>
-#include <string>
-#ifdef _WIN32
- #include <winsock2.h>
- #include "compat.h"
-#else
- typedef int SOCKET;
-
- #include <arpa/inet.h>
- #include <sys/select.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <unistd.h>
-#endif
#include "hashtree.h"
#include "compat.h"
};
+typedef void (*sock_cb_t) (SOCKET);
+struct socket_callbacks_t {
+ socket_callbacks_t (SOCKET s=0) : sock(s),
+ may_read(NULL), may_write(NULL), on_error(NULL) {}
+ SOCKET sock;
+ sock_cb_t may_read;
+ sock_cb_t may_write;
+ sock_cb_t on_error;
+};
+
+
/** UDP datagram class, a nice wrapping around sendto/recvfrom/select.
Reading/writing from/to a datagram is done in a FIFO (deque) fashion:
written data is appended to the tail (push) while read data is
/** bind to the address */
static SOCKET Bind(Address address);
+
/** close the port */
static void Close(int port);
+
/** the current time */
static tint Time();
+
/** wait till one of the sockets has some io to do; usec is the timeout */
- static SOCKET Wait (int sockcnt, SOCKET* sockets, tint usec=0);
+ static void Wait (int sockcnt, socket_callbacks_t* sockets, tint usec=0);
+
static tint now, epoch, start;
static uint64_t dgrams_up, dgrams_down, bytes_up, bytes_down;
Sha1Hash data_hash(data,length);
if (!OfferHash(pos, data_hash)) {
- printf("invalid hash for %s: %s\n",pos.str(),data_hash.hex().c_str());
+ printf("invalid hash for %s: %s\n",pos.str(),data_hash.hex().c_str()); // FIXME
return false;
}
--- /dev/null
+#include "swift.h"
+
+#define MAX_HTTP_CLIENT 128
+
+struct http_gw_t {
+ uint64_t offset;
+ uint64_t tosend;
+ int transfer;
+ SOCKET sink;
+} http_clients[MAX_HTTP_CLIENT];
+
+void HttpGwErrorCallback (SOCKET sink) {
+
+}
+
+void HttpGwMayWriteCallback (SOCKET sink) {
+ // if have data => write
+ // otherwise, change mask
+ if (not_enough_data)
+ swift::Listen3rdPartySocket(http_conns[httpc].sink,NULL,NULL,ErrorCallback);
+ if (all_done)
+ swift::Listen3rdPartySocket(http_conns[httpc].sink,NewRequestCallback,NULL,ErrorCallback);
+}
+
+
+void SwiftProgressCallback (int transfer, bin64_t bin) {
+ for (int httpc=0; httpc<conn_count; httpc++)
+ if (http_conns[httpc].transfer==transfer) {
+ // check mask
+ if (bin==http_conns[httpc].offset)
+ Listen3rdPartySocket(http_conns[httpc].sink,NULL,MayWriteCallback,ErrorCallback);
+ }
+}
+
+
+
+void HttpGwNewRequestCallback (SOCKET http_conn){
+ // read headers - the thrilling part
+ // we surely do not support pipelining => one requests at a time
+ fgets();
+ // HTTP request line
+ sscanf();
+ // HTTP header fields
+ sscanf();
+ // incomplete header => screw it
+ fprintf("400 Incomplete header\r\n");
+ close();
+ // initiate transmission
+ // write response header
+ http_clients[i].offset = 0;
+ http_clients[i].tosend = 10000;
+ http_clients[i].transfer = file;
+ http_clients[i].sink = conn;
+}
+
+
+// be liberal in what you do, be conservative in what you accept
+void HttpGwNewConnectionCallback (SOCKET serv) {
+ Address client_address;
+ SOCKET conn = accept (serv, & (client_address.addr), sizeof(struct sockaddr_in));
+ if (conn==INVALID_SOCKET) {
+ print_error("client conn fails");
+ return;
+ }
+ make_socket_nonblocking(conn);
+ // submit 3rd party socket to the swift loop
+ socket_callbacks_t install(conn,HttpGwNewRequestCallback,HttpGwMayWriteCallback,HttpGwErrorCallback);
+ swift::Listen3rdPartySocket(install);
+}
+
+
+void HttpGwError (SOCKET serv) {
+ print_error("error on http socket");
+}
+
+
+SOCKET InstallHTTPGateway (Address bind_to) {
+ SOCKET fd;
+ #define gw_ensure(x) { if (!(x)) { print_error("http binding fails"); close_socket(fd); return INVALID_SOCKET; } }
+ gw_ensure ( (fd=socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET );
+ gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) );
+ gw_ensure (make_socket_nonblocking(fd));
+ gw_ensure ( 0==listen(fd,8) );
+ socket_callbacks_t install(sock,HttpGwNewConnectionCallback,NULL,HttpGwError);
+ gw_ensure (swift::Listen3rdPartySocket(install));
+}
last_send_time_ = NOW;
sent_since_recv_++;
dgrams_sent_++;
+ Reschedule();
}
}
last_recv_time_ = NOW;
sent_since_recv_ = 0;
+ Reschedule();
}
}
-Channel* Channel::RecvDatagram (int socket) {
+void Channel::RecvDatagram (SOCKET socket) {
Datagram data(socket);
data.Recv();
const Address& addr = data.address();
-#define return_log(...) { printf(__VA_ARGS__); return NULL; }
+#define return_log(...) { printf(__VA_ARGS__); }
if (data.size()<4)
return_log("datagram shorter than 4 bytes %s\n",addr.str());
uint32_t mych = data.Pull32();
}
//dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
channel->Recv(data);
- return channel;
}
dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
tintstr(send_time));
sender->Send();
- sender->Reschedule();
} else { // it's too early, wait
tint towait = min(limit,send_time) - NOW;
dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
- int rd = Datagram::Wait(socket_count,sockets,towait);
- if (rd!=INVALID_SOCKET) { // in meantime, received something
- Channel* receiver = RecvDatagram(rd);
- if (receiver) // receiver's state may have changed
- receiver->Reschedule();
- }
+ Datagram::Wait(socket_count,sockets,towait);
if (sender) // get back to that later
send_queue.push(tintbin(send_time,sender->id()));
Normally, API users do not deal with this class. */
class Channel {
public:
- Channel (FileTransfer* file, int socket=-1, Address peer=Address());
+ Channel (FileTransfer* file, int socket=INVALID_SOCKET, Address peer=Address());
~Channel();
typedef enum {
static const char* SEND_CONTROL_MODES[];
- static Channel*
- RecvDatagram (int socket);
+ static void RecvDatagram (SOCKET socket);
static void Loop (tint till);
void Recv (Datagram& dgram);
return i<channels.size()?channels[i]:NULL;
}
static void CloseTransfer (FileTransfer* trans);
- static SOCKET default_socket() { return sockets[0]; }
+ static SOCKET default_socket() { return sockets[0].sock; }
protected:
/** Channel id: index in the channel array. */
static PeerSelector* peer_selector;
- static SOCKET sockets[8];
+ #define SWFT_MAX_SOCK_OPEN 128
+ static socket_callbacks_t sockets[SWFT_MAX_SOCK_OPEN];
static int socket_count;
static tint last_tick;
static tbheap send_queue;
friend void AddPeer (Address address, const Sha1Hash& root);
friend void SetTracker(const Address& tracker);
friend int Open (const char*, const Sha1Hash&) ; // FIXME
+ friend bool Listen3rdPartySocket (socket_callbacks_t);
};
int Listen (Address addr);
/** Run send/receive loop for the specified amount of time. */
void Loop (tint till);
+ bool Listen3rdPartySocket (socket_callbacks_t);
/** Stop listening to a port. */
void Shutdown (int sock_des=-1);