X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=blobdiff_plain;f=src%2Flibswift%2Fswift.h;h=67092c20a2b653c1bc157d8e31f0c5ccccc5ea80;hb=HEAD;hp=d44b88268883a3a83fe93f9cfcecef7689416cd7;hpb=14bddef67019b28f0bf0e0afc95d4f97caafd242;p=swifty.git diff --git a/src/libswift/swift.h b/src/libswift/swift.h index d44b882..67092c2 100644 --- a/src/libswift/swift.h +++ b/src/libswift/swift.h @@ -54,6 +54,7 @@ #include #include #include +#include #include #include "compat.h" @@ -108,6 +109,11 @@ namespace swift { Address() { clear(); } + Address(const Address &b) { + clear(); + addr->dests[0].addr = b.addr->dests[0].addr; + addr->dests[0].port = b.addr->dests[0].port; + } Address(const char* ip, uint16_t port) { clear(); set_ipv4(ip); @@ -454,6 +460,7 @@ namespace swift { being transferred between two peers. As we don't need buffers and lots of other TCP stuff, sizeof(Channel+members) must be below 1K. Normally, API users do not deal with this class. */ + class MessageQueue; class Channel { #define DGRAM_MAX_SOCK_OPEN 128 @@ -479,8 +486,11 @@ namespace swift { static struct event evrecv; static const char* SEND_CONTROL_MODES[]; + static MessageQueue messageQueue; + static tint epoch, start; - static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down; + static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down, + global_buffers_up, global_syscalls_up, global_buffers_down, global_syscalls_down; static void CloseChannelByAddress(const Address &addr); // SOCKMGMT @@ -489,8 +499,8 @@ namespace swift { static void LibeventSendCallback(int fd, short event, void *arg); static void LibeventReceiveCallback(int fd, short event, void *arg); static void RecvDatagram (evutil_socket_t socket); // Called by LibeventReceiveCallback - static int RecvFrom(evutil_socket_t sock, Address& addr, struct evbuffer *evb); // Called by RecvDatagram - static int SendTo(evutil_socket_t sock, const Address& addr, struct evbuffer *evb); // Called by Channel::Send() + static int RecvFrom(evutil_socket_t sock, Address& addr, struct evbuffer **evb); // Called by RecvDatagram + static int SendTo(evutil_socket_t sock, const Address& addr, struct evbuffer **evb); // Called by Channel::Send() static evutil_socket_t Bind(Address address, sckrwecb_t callbacks=sckrwecb_t()); static Address BoundAddress(evutil_socket_t sock); static evutil_socket_t default_socket() @@ -516,7 +526,7 @@ namespace swift { void OnHandshake (struct evbuffer *evb); void OnRandomize (struct evbuffer *evb); //FRAGRAND void AddHandshake (struct evbuffer *evb); - bin_t AddData (struct evbuffer *evb); + bin_t AddData (struct evbuffer **evb); void AddAck (struct evbuffer *evb); void AddHave (struct evbuffer *evb); void AddHint (struct evbuffer *evb); @@ -584,6 +594,8 @@ namespace swift { void Schedule4Close() { scheduled4close_ = true; } bool IsScheduled4Close() { return scheduled4close_; } + void Sent(int bytes, evbuffer *evb, bool tofree); + protected: /** Channel id: index in the channel array. */ @@ -780,6 +792,78 @@ namespace swift { // Arno: Save transfer's binmap for zero-hashcheck restart void Checkpoint(int fdes); +#define MAX_QUEUE_LENGTH 1 +#define TIMER_USEC 10000 + + class MessageQueue + { + public: + class Entry + { + public: + Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree) + : + evb(ievb), + addr(iaddr), + channel(ichannel), + tofree(itofree) + { + } + + evbuffer *evb; + Address addr; + Channel *channel; + bool tofree; + }; + + typedef std::deque EntryList; + typedef std::map EntryLists; + + void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true) + { + EntryList &list = lists[sock]; + list.push_back(Entry(evb, addr, channel, tofree)); + if (list.size() == MAX_QUEUE_LENGTH) + Flush(sock); + } + + void Flush(int sock) + { + EntryList &list = lists[sock]; + if (list.empty()) + return; + + Address addr; + free(addr.addr); + addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest)); + addr.addr->count = list.size(); + evbuffer *evbs[list.size()]; + int i = 0; + for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) { + addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr; + addr.addr->dests[i].port = (*it).addr.addr->dests[0].port; + evbs[i] = (*it).evb; + } + + int r = Channel::SendTo(sock, addr, evbs); + if (r > 0) { + i = 0; + for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) + (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree); + } + list.clear(); + } + + void Flush() + { + for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it) + Flush(it->first); + } + + private: + EntryLists lists; + }; + } // namespace end // #define SWIFT_MUTE