bool Channel::SELF_CONN_OK = false;
swift::tint Channel::TIMEOUT = TINT_SEC*60;
std::vector<Channel*> Channel::channels(1);
-socket_callbacks_t Channel::sockets[SWFT_MAX_SOCK_OPEN] = {};
-int Channel::socket_count = 0;
Address Channel::tracker;
tbheap Channel::send_queue;
FILE* Channel::debug_file = NULL;
Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) :
transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0),
- socket_(socket==INVALID_SOCKET?sockets[0].sock:socket), // FIXME
+ socket_(socket==INVALID_SOCKET?Datagram::default_socket():socket), // FIXME
data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0),
own_id_mentioned_(false), next_send_time_(0), last_send_time_(0),
last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC),
int swift::Listen (Address addr) {
- int sock = Datagram::Bind(addr);
- if (sock!=INVALID_SOCKET) {
- socket_callbacks_t cb(sock);
- cb.may_read = &Channel::RecvDatagram;
- Channel::sockets[Channel::socket_count++] = cb;
- }
- return sock;
+ sckrwecb_t cb;
+ cb.may_read = &Channel::RecvDatagram;
+ cb.sock = Datagram::Bind(addr,cb);
+ return cb.sock;
}
void swift::Shutdown (int sock_des) {
- for(int i=0; i<Channel::socket_count; i++)
- if (sock_des==-1 || Channel::sockets[i].sock==sock_des) {
- Datagram::Close(Channel::sockets[i].sock);
- Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
- }
+ Datagram::Shutdown();
}
}
-bool swift::Listen3rdPartySocket (socket_callbacks_t cb) {
- int i=0;
- while (i<Channel::socket_count && Channel::sockets[i].sock!=cb.sock) i++;
- if (i==Channel::socket_count)
- if (i==SWFT_MAX_SOCK_OPEN)
- return false;
- else
- Channel::socket_count++;
- Channel::sockets[i]=cb;
- if (!cb.may_read && !cb.may_write && !cb.on_error)
- Channel::sockets[i] = Channel::sockets[--Channel::socket_count];
- return true;
-}
-
int swift::Open (const char* filename, const Sha1Hash& hash) {
FileTransfer* ft = new FileTransfer(filename, hash);
uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
uint64_t Datagram::dgrams_up=0, Datagram::dgrams_down=0,
Datagram::bytes_up=0, Datagram::bytes_down=0;
+sckrwecb_t Datagram::sock_open[] = {};
+int Datagram::sock_count = 0;
const char* tintstr (tint time) {
if (time==0)
}
}
+
+bool Datagram::Listen3rdPartySocket (sckrwecb_t cb) {
+ int i=0;
+ while (i<sock_count && sock_open[i].sock!=cb.sock) i++;
+ if (i==sock_count)
+ if (i==DGRAM_MAX_SOCK_OPEN)
+ return false;
+ else
+ sock_count++;
+ sock_open[i]=cb;
+ //if (!cb.may_read && !cb.may_write && !cb.on_error)
+ // sock_open[i] = sock_open[--sock_count];
+ return true;
+}
+
+
+void Datagram::Shutdown () {
+ while (sock_count--)
+ Close(sock_open[sock_count].sock);
+}
+
int Datagram::Send () {
int r = sendto(sock,(const char *)buf+offset,length-offset,0,
}
-void Datagram::Wait (int sockcnt, socket_callbacks_t* sockets, tint usec) {
+SOCKET Datagram::Wait (tint usec) {
struct timeval timeout;
timeout.tv_sec = usec/TINT_SEC;
timeout.tv_usec = usec%TINT_SEC;
FD_ZERO(&rdfd);
FD_ZERO(&wrfd);
FD_ZERO(&errfd);
- for(int i=0; i<sockcnt; i++) {
- if (sockets[i].may_read!=0)
- FD_SET(sockets[i].sock,&rdfd);
- if (sockets[i].may_write!=0)
- FD_SET(sockets[i].sock,&wrfd);
- if (sockets[i].on_error!=0)
- FD_SET(sockets[i].sock,&errfd);
- if (sockets[i].sock>max_sock_fd)
- max_sock_fd = sockets[i].sock;
+ for(int i=0; i<sock_count; i++) {
+ if (sock_open[i].may_read!=0)
+ FD_SET(sock_open[i].sock,&rdfd);
+ if (sock_open[i].may_write!=0)
+ FD_SET(sock_open[i].sock,&wrfd);
+ if (sock_open[i].on_error!=0)
+ FD_SET(sock_open[i].sock,&errfd);
+ if (sock_open[i].sock>max_sock_fd)
+ max_sock_fd = sock_open[i].sock;
}
- int sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout);
+ SOCKET sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout);
Time();
if (sel>0) {
- for (int i=0; i<=sockcnt; i++) {
- socket_callbacks_t& sct = sockets[i];
+ for (int i=0; i<=sock_count; i++) {
+ sckrwecb_t& sct = sock_open[i];
if (sct.may_read && FD_ISSET(sct.sock,&rdfd))
(*(sct.may_read))(sct.sock);
- if (sct.may_write && FD_ISSET(sockets[i].sock,&wrfd))
+ if (sct.may_write && FD_ISSET(sct.sock,&wrfd))
(*(sct.may_write))(sct.sock);
- if (sct.on_error && FD_ISSET(sockets[i].sock,&errfd))
+ if (sct.on_error && FD_ISSET(sct.sock,&errfd))
(*(sct.on_error))(sct.sock);
}
} else if (sel<0) {
print_error("select fails");
}
+ return sel;
}
tint Datagram::Time () {
return now = usec_time();
}
-SOCKET Datagram::Bind (Address addr_) {
- struct sockaddr_in addr = addr_;
+SOCKET Datagram::Bind (Address address, sckrwecb_t callbacks) {
+ struct sockaddr_in addr = address;
SOCKET fd;
int len = sizeof(struct sockaddr_in), sndbuf=1<<20, rcvbuf=1<<20;
- #define dbnd_ensure(x) { if (!(x)) { print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
+ #define dbnd_ensure(x) { if (!(x)) { \
+ print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } }
dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 );
dbnd_ensure( make_socket_nonblocking(fd) ); // FIXME may remove this
int enable = true;
- dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
- dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
+ dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF,
+ (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 );
+ dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
+ (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 );
//setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int));
dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 );
+ callbacks.sock = fd;
+ Datagram::sock_open[Datagram::sock_count++] = callbacks;
return fd;
}
void Datagram::Close (SOCKET sock) {
+ for(int i=0; i<Datagram::sock_count; i++)
+ if (Datagram::sock_open[i].sock==sock)
+ Datagram::sock_open[i] = Datagram::sock_open[--Datagram::sock_count];
if (!close_socket(sock))
print_error("on closing a socket");
}
};
-typedef void (*sock_cb_t) (SOCKET);
-struct socket_callbacks_t {
- socket_callbacks_t (SOCKET s=0, sock_cb_t mr=NULL, sock_cb_t mw=NULL, sock_cb_t oe=NULL) :
+typedef void (*sockcb_t) (SOCKET);
+struct sckrwecb_t {
+ sckrwecb_t (SOCKET s=0, sockcb_t mr=NULL, sockcb_t mw=NULL, sockcb_t oe=NULL) :
sock(s), may_read(mr), may_write(mw), on_error(oe) {}
SOCKET sock;
- sock_cb_t may_read;
- sock_cb_t may_write;
- sock_cb_t on_error;
+ sockcb_t may_read;
+ sockcb_t may_write;
+ sockcb_t on_error;
};
int offset, length;
uint8_t buf[MAXDGRAMSZ*2];
+#define DGRAM_MAX_SOCK_OPEN 128
+ static int sock_count;
+ static sckrwecb_t sock_open[DGRAM_MAX_SOCK_OPEN];
+
public:
/** bind to the address */
- static SOCKET Bind(Address address);
+ static SOCKET Bind(Address address, sckrwecb_t callbacks=sckrwecb_t());
/** close the port */
- static void Close(int port);
+ static void Close(SOCKET sock);
/** the current time */
static tint Time();
/** wait till one of the sockets has some io to do; usec is the timeout */
- static void Wait (int sockcnt, socket_callbacks_t* sockets, tint usec=0);
+ static SOCKET Wait (tint usec);
+
+ static bool Listen3rdPartySocket (sckrwecb_t cb) ;
+
+ static void Shutdown ();
+
+ static SOCKET default_socket()
+ { return sock_count ? sock_open[0].sock : INVALID_SOCKET; }
static tint now, epoch, start;
static uint64_t dgrams_up, dgrams_down, bytes_up, bytes_down;
*req = http_requests[--http_gw_reqs_open];
}
swift::close_socket(sock);
- swift::Listen3rdPartySocket(socket_callbacks_t(sock));
+ swift::Datagram::Listen3rdPartySocket(sckrwecb_t(sock));
}
} else {
if (req->tosend==0) { // done; wait for new request
dprintf("%s @%i done\n",tintstr(),req->id);
- socket_callbacks_t wait_new_req(req->sink,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection);
- swift::Listen3rdPartySocket (wait_new_req);
+ sckrwecb_t wait_new_req(req->sink,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection);
+ swift::Datagram::Listen3rdPartySocket (wait_new_req);
} else { // wait for data
dprintf("%s @%i waiting for data\n",tintstr(),req->id);
- socket_callbacks_t wait_swift_data(req->sink,NULL,NULL,HttpGwCloseConnection);
- swift::Listen3rdPartySocket(wait_swift_data);
+ sckrwecb_t wait_swift_data(req->sink,NULL,NULL,HttpGwCloseConnection);
+ swift::Datagram::Listen3rdPartySocket(wait_swift_data);
}
}
}
if (http_requests[httpc].transfer==transfer)
if ( (bin.base_offset()<<10) == http_requests[httpc].offset ) {
dprintf("%s @%i progress: %s\n",tintstr(),http_requests[httpc].id,bin.str());
- socket_callbacks_t maywrite_callbacks
+ sckrwecb_t maywrite_callbacks
(http_requests[httpc].sink,NULL,HttpGwMayWriteCallback,HttpGwCloseConnection);
- Listen3rdPartySocket (maywrite_callbacks);
+ Datagram::Listen3rdPartySocket (maywrite_callbacks);
}
}
HttpGwFirstProgressCallback(file,bin64_t(0,0));
} else {
swift::AddProgressCallback(file,&HttpGwFirstProgressCallback);
- socket_callbacks_t install (http_conn,NULL,NULL,HttpGwCloseConnection);
- swift::Listen3rdPartySocket(install);
+ sckrwecb_t install (http_conn,NULL,NULL,HttpGwCloseConnection);
+ swift::Datagram::Listen3rdPartySocket(install);
}
}
}
make_socket_nonblocking(conn);
// submit 3rd party socket to the swift loop
- socket_callbacks_t install
+ sckrwecb_t install
(conn,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection);
- swift::Listen3rdPartySocket(install);
+ swift::Datagram::Listen3rdPartySocket(install);
}
print_error("httpgw is dead");
dprintf("%s @0 closed http gateway\n",tintstr());
close_socket(s);
- swift::Listen3rdPartySocket(socket_callbacks_t(s));
+ swift::Datagram::Listen3rdPartySocket(sckrwecb_t(s));
}
gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) );
gw_ensure (make_socket_nonblocking(fd));
gw_ensure ( 0==listen(fd,8) );
- socket_callbacks_t install_http(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
- gw_ensure (swift::Listen3rdPartySocket(install_http));
+ sckrwecb_t install_http(fd,HttpGwNewConnectionCallback,NULL,HttpGwError);
+ gw_ensure (swift::Datagram::Listen3rdPartySocket(install_http));
dprintf("%s @0 installed http gateway on %s\n",tintstr(),bind_to.str());
+ return fd;
}
tint towait = min(limit,send_time) - NOW;
dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
- Datagram::Wait(socket_count,sockets,towait);
+ Datagram::Wait(towait);
if (sender) // get back to that later
send_queue.push(tintbin(send_time,sender->id()));
return i<channels.size()?channels[i]:NULL;
}
static void CloseTransfer (FileTransfer* trans);
- static SOCKET default_socket() { return sockets[0].sock; }
protected:
/** Channel id: index in the channel array. */
static PeerSelector* peer_selector;
- #define SWFT_MAX_SOCK_OPEN 128
- static socket_callbacks_t sockets[SWFT_MAX_SOCK_OPEN];
- static int socket_count;
static tint last_tick;
static tbheap send_queue;
friend void AddPeer (Address address, const Sha1Hash& root);
friend void SetTracker(const Address& tracker);
friend int Open (const char*, const Sha1Hash&) ; // FIXME
- friend bool Listen3rdPartySocket (socket_callbacks_t);
};
int Listen (Address addr);
/** Run send/receive loop for the specified amount of time. */
void Loop (tint till);
- bool Listen3rdPartySocket (socket_callbacks_t);
+ bool Listen3rdPartySocket (sckrwecb_t);
/** Stop listening to a port. */
void Shutdown (int sock_des=-1);
ASSERT_EQ(datalen,d.Send());
SOCKET socks[1] = {socket};
// Arno: timeout 0 gives undeterministic behaviour on win32
- SOCKET waitsocket = Datagram::Wait(1,socks,1000000);
+ SOCKET waitsocket = Datagram::Wait(1000000);
ASSERT_EQ(socket,waitsocket);
Datagram rcv(waitsocket);
ASSERT_EQ(datalen,rcv.Recv());
SOCKET socks[2] = {sock1,sock2};
// Arno: timeout 0 gives undeterministic behaviour on win32
- EXPECT_EQ(sock2,Datagram::Wait(2,socks,1000000));
+ EXPECT_EQ(sock2,Datagram::Wait(1000000));
Datagram recv(sock2);
recv.Recv();
uint32_t test = recv.Pull32();
return; // already connected
}
if (hs_in_.size()<20) {
- new Channel(this,Channel::default_socket(),addr);
+ new Channel(this,Datagram::default_socket(),addr);
} else {
pex_in_.push_back(addr);
if (pex_in_.size()>1000)