From: Victor Grishchenko Date: Thu, 20 May 2010 13:38:26 +0000 (+0200) Subject: moved sockets to swift::Datagram X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=0399e6bd16f3cb98ac1d29120f5d34f004dccd13;p=swift-upb.git moved sockets to swift::Datagram --- diff --git a/channel.cpp b/channel.cpp index d8838e1..5899257 100644 --- a/channel.cpp +++ b/channel.cpp @@ -31,8 +31,6 @@ int Channel::MAX_REORDERING = 4; bool Channel::SELF_CONN_OK = false; swift::tint Channel::TIMEOUT = TINT_SEC*60; std::vector Channel::channels(1); -socket_callbacks_t Channel::sockets[SWFT_MAX_SOCK_OPEN] = {}; -int Channel::socket_count = 0; Address Channel::tracker; tbheap Channel::send_queue; FILE* Channel::debug_file = NULL; @@ -41,7 +39,7 @@ PeerSelector* Channel::peer_selector = new SimpleSelector(); Channel::Channel (FileTransfer* transfer, int socket, Address peer_addr) : transfer_(transfer), peer_(peer_addr), peer_channel_id_(0), pex_out_(0), - socket_(socket==INVALID_SOCKET?sockets[0].sock:socket), // FIXME + socket_(socket==INVALID_SOCKET?Datagram::default_socket():socket), // FIXME data_out_cap_(bin64_t::ALL), last_data_out_time_(0), last_data_in_time_(0), own_id_mentioned_(false), next_send_time_(0), last_send_time_(0), last_recv_time_(0), rtt_avg_(TINT_SEC), dev_avg_(0), dip_avg_(TINT_SEC), @@ -85,22 +83,15 @@ int Channel::EncodeID(int unscrambled) { int swift::Listen (Address addr) { - int sock = Datagram::Bind(addr); - if (sock!=INVALID_SOCKET) { - socket_callbacks_t cb(sock); - cb.may_read = &Channel::RecvDatagram; - Channel::sockets[Channel::socket_count++] = cb; - } - return sock; + sckrwecb_t cb; + cb.may_read = &Channel::RecvDatagram; + cb.sock = Datagram::Bind(addr,cb); + return cb.sock; } void swift::Shutdown (int sock_des) { - for(int i=0; imax_sock_fd) - max_sock_fd = sockets[i].sock; + for(int i=0; imax_sock_fd) + max_sock_fd = sock_open[i].sock; } - int sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout); + SOCKET sel = select(max_sock_fd+1, &rdfd, &wrfd, &errfd, &timeout); Time(); if (sel>0) { - for (int i=0; i<=sockcnt; i++) { - socket_callbacks_t& sct = sockets[i]; + for (int i=0; i<=sock_count; i++) { + sckrwecb_t& sct = sock_open[i]; if (sct.may_read && FD_ISSET(sct.sock,&rdfd)) (*(sct.may_read))(sct.sock); - if (sct.may_write && FD_ISSET(sockets[i].sock,&wrfd)) + if (sct.may_write && FD_ISSET(sct.sock,&wrfd)) (*(sct.may_write))(sct.sock); - if (sct.on_error && FD_ISSET(sockets[i].sock,&errfd)) + if (sct.on_error && FD_ISSET(sct.sock,&errfd)) (*(sct.on_error))(sct.sock); } } else if (sel<0) { print_error("select fails"); } + return sel; } tint Datagram::Time () { @@ -157,22 +181,30 @@ tint Datagram::Time () { return now = usec_time(); } -SOCKET Datagram::Bind (Address addr_) { - struct sockaddr_in addr = addr_; +SOCKET Datagram::Bind (Address address, sckrwecb_t callbacks) { + struct sockaddr_in addr = address; SOCKET fd; int len = sizeof(struct sockaddr_in), sndbuf=1<<20, rcvbuf=1<<20; - #define dbnd_ensure(x) { if (!(x)) { print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } } + #define dbnd_ensure(x) { if (!(x)) { \ + print_error("binding fails"); close_socket(fd); return INVALID_SOCKET; } } dbnd_ensure ( (fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0 ); dbnd_ensure( make_socket_nonblocking(fd) ); // FIXME may remove this int enable = true; - dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 ); - dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 ); + dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, + (setsockoptptr_t)&sndbuf, sizeof(int)) == 0 ); + dbnd_ensure ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, + (setsockoptptr_t)&rcvbuf, sizeof(int)) == 0 ); //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (setsockoptptr_t)&enable, sizeof(int)); dbnd_ensure ( ::bind(fd, (sockaddr*)&addr, len) == 0 ); + callbacks.sock = fd; + Datagram::sock_open[Datagram::sock_count++] = callbacks; return fd; } void Datagram::Close (SOCKET sock) { + for(int i=0; itosend==0) { // done; wait for new request dprintf("%s @%i done\n",tintstr(),req->id); - socket_callbacks_t wait_new_req(req->sink,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection); - swift::Listen3rdPartySocket (wait_new_req); + sckrwecb_t wait_new_req(req->sink,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection); + swift::Datagram::Listen3rdPartySocket (wait_new_req); } else { // wait for data dprintf("%s @%i waiting for data\n",tintstr(),req->id); - socket_callbacks_t wait_swift_data(req->sink,NULL,NULL,HttpGwCloseConnection); - swift::Listen3rdPartySocket(wait_swift_data); + sckrwecb_t wait_swift_data(req->sink,NULL,NULL,HttpGwCloseConnection); + swift::Datagram::Listen3rdPartySocket(wait_swift_data); } } } @@ -92,9 +92,9 @@ void HttpGwSwiftProgressCallback (int transfer, bin64_t bin) { if (http_requests[httpc].transfer==transfer) if ( (bin.base_offset()<<10) == http_requests[httpc].offset ) { dprintf("%s @%i progress: %s\n",tintstr(),http_requests[httpc].id,bin.str()); - socket_callbacks_t maywrite_callbacks + sckrwecb_t maywrite_callbacks (http_requests[httpc].sink,NULL,HttpGwMayWriteCallback,HttpGwCloseConnection); - Listen3rdPartySocket (maywrite_callbacks); + Datagram::Listen3rdPartySocket (maywrite_callbacks); } } @@ -182,8 +182,8 @@ void HttpGwNewRequestCallback (SOCKET http_conn){ HttpGwFirstProgressCallback(file,bin64_t(0,0)); } else { swift::AddProgressCallback(file,&HttpGwFirstProgressCallback); - socket_callbacks_t install (http_conn,NULL,NULL,HttpGwCloseConnection); - swift::Listen3rdPartySocket(install); + sckrwecb_t install (http_conn,NULL,NULL,HttpGwCloseConnection); + swift::Datagram::Listen3rdPartySocket(install); } } @@ -199,9 +199,9 @@ void HttpGwNewConnectionCallback (SOCKET serv) { } make_socket_nonblocking(conn); // submit 3rd party socket to the swift loop - socket_callbacks_t install + sckrwecb_t install (conn,HttpGwNewRequestCallback,NULL,HttpGwCloseConnection); - swift::Listen3rdPartySocket(install); + swift::Datagram::Listen3rdPartySocket(install); } @@ -209,7 +209,7 @@ void HttpGwError (SOCKET s) { print_error("httpgw is dead"); dprintf("%s @0 closed http gateway\n",tintstr()); close_socket(s); - swift::Listen3rdPartySocket(socket_callbacks_t(s)); + swift::Datagram::Listen3rdPartySocket(sckrwecb_t(s)); } @@ -231,7 +231,8 @@ SOCKET InstallHTTPGateway (Address bind_to) { gw_ensure ( 0==bind(fd, (sockaddr*)&(bind_to.addr), sizeof(struct sockaddr_in)) ); gw_ensure (make_socket_nonblocking(fd)); gw_ensure ( 0==listen(fd,8) ); - socket_callbacks_t install_http(fd,HttpGwNewConnectionCallback,NULL,HttpGwError); - gw_ensure (swift::Listen3rdPartySocket(install_http)); + sckrwecb_t install_http(fd,HttpGwNewConnectionCallback,NULL,HttpGwError); + gw_ensure (swift::Datagram::Listen3rdPartySocket(install_http)); dprintf("%s @0 installed http gateway on %s\n",tintstr(),bind_to.str()); + return fd; } diff --git a/sendrecv.cpp b/sendrecv.cpp index 3c6414f..bed623e 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -551,7 +551,7 @@ void Channel::Loop (tint howlong) { tint towait = min(limit,send_time) - NOW; dprintf("%s #0 waiting %lliusec\n",tintstr(),towait); - Datagram::Wait(socket_count,sockets,towait); + Datagram::Wait(towait); if (sender) // get back to that later send_queue.push(tintbin(send_time,sender->id())); diff --git a/swift.h b/swift.h index 40dada3..7ed26e5 100644 --- a/swift.h +++ b/swift.h @@ -323,7 +323,6 @@ namespace swift { return i1000)