X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Flibswift%2Fswift.h;fp=src%2Flibswift%2Fswift.h;h=3fac3904ed94e262d96e1963d90d8b7f7031d5f7;hb=41816c4e73a12147ee2e83cdb869c9c76cf876ca;hp=723cf150485739e65d98a7631b38a3314050d2bc;hpb=da188550ca4b16d1f49b0e59938ff93612a6cdbb;p=swifty.git diff --git a/src/libswift/swift.h b/src/libswift/swift.h index 723cf15..3fac390 100644 --- a/src/libswift/swift.h +++ b/src/libswift/swift.h @@ -54,6 +54,7 @@ #include #include #include +#include #include #include "compat.h" @@ -459,6 +460,7 @@ namespace swift { being transferred between two peers. As we don't need buffers and lots of other TCP stuff, sizeof(Channel+members) must be below 1K. Normally, API users do not deal with this class. */ + class MessageQueue; class Channel { #define DGRAM_MAX_SOCK_OPEN 128 @@ -484,6 +486,8 @@ namespace swift { static struct event evrecv; static const char* SEND_CONTROL_MODES[]; + static MessageQueue messageQueue; + static tint epoch, start; static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down; static void CloseChannelByAddress(const Address &addr); @@ -589,6 +593,8 @@ namespace swift { void Schedule4Close() { scheduled4close_ = true; } bool IsScheduled4Close() { return scheduled4close_; } + void Sent(int bytes, evbuffer *evb, bool tofree); + protected: /** Channel id: index in the channel array. */ @@ -785,6 +791,81 @@ namespace swift { // Arno: Save transfer's binmap for zero-hashcheck restart void Checkpoint(int fdes); +#define MAX_QUEUE_LENGTH 10 +#define TIMER_USEC 100000 + + class MessageQueue + { + public: + class Entry + { + public: + Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree) + : + evb(ievb), + addr(iaddr), + channel(ichannel), + tofree(itofree) + { + } + + evbuffer *evb; + Address addr; + Channel *channel; + bool tofree; + }; + + typedef std::deque EntryList; + typedef std::map EntryLists; + + void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true) + { + EntryList &list = lists[sock]; + list.push_back(Entry(evb, addr, channel, tofree)); + if (list.size() == MAX_QUEUE_LENGTH) + Flush(sock); + } + + void Flush(int sock) + { + EntryList &list = lists[sock]; + if (list.empty()) + return; + + Address addr; + free(addr.addr); + addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest)); + addr.addr->count = list.size(); + evbuffer *evbs[list.size()]; + int i = 0; + for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) { + addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr; + addr.addr->dests[i].port = (*it).addr.addr->dests[0].port; + evbs[i] = (*it).evb; + } + + int r = Channel::SendTo(sock, addr, evbs); + printf("Sent %d buffers\n", list.size()); + if (r > 0) { + i = 0; + for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) { + (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree); + printf("Sent %d bytes\n", addr.addr->dests[i].bytes); + } + } + list.clear(); + } + + void Flush() + { + for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it) + Flush(it->first); + } + + private: + EntryLists lists; + }; + } // namespace end // #define SWIFT_MUTE