From 41816c4e73a12147ee2e83cdb869c9c76cf876ca Mon Sep 17 00:00:00 2001 From: Adrian Bondrescu Date: Mon, 6 Aug 2012 23:36:30 +0300 Subject: [PATCH] Multiple sends in one system call for swift. --- src/libswift/sendrecv.cpp | 29 +++++++------- src/libswift/swift.cpp | 11 +++++- src/libswift/swift.h | 81 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 14 deletions(-) diff --git a/src/libswift/sendrecv.cpp b/src/libswift/sendrecv.cpp index 710e327..1281192 100644 --- a/src/libswift/sendrecv.cpp +++ b/src/libswift/sendrecv.cpp @@ -18,6 +18,7 @@ using namespace swift; using namespace std; struct event_base *Channel::evbase; +MessageQueue Channel::messageQueue; struct event Channel::evrecv; #define DEBUGTRAFFIC 0 @@ -200,16 +201,20 @@ void Channel::Send () { dprintf("%s #%u sent %ib %s:%x\n", tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(), peer_channel_id_); - int r = SendTo(socket_,peer(),&evb); - if (r==-1) - print_error("can't send datagram"); - else - raw_bytes_up_ += r; - last_send_time_ = NOW; - sent_since_recv_++; - dgrams_sent_++; - evbuffer_free(evb); - Reschedule(); + + messageQueue.AddBuffer(socket_, evb, peer(), this); +} + +void Channel::Sent(int bytes, evbuffer *evb, bool tofree) +{ + raw_bytes_up_ += bytes; + if (tofree) { + last_send_time_ = NOW; + sent_since_recv_++; + dgrams_sent_++; + evbuffer_free(evb); + Reschedule(); + } } void Channel::AddHint (struct evbuffer *evb) { @@ -338,9 +343,7 @@ bin_t Channel::AddData (struct evbuffer *evb) { dprintf("%s #%u fsent %ib %s:%x\n", tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(), peer_channel_id_); - int ret = Channel::SendTo(socket_,peer(),&evb); // kind of fragmentation - if (ret > 0) - raw_bytes_up_ += ret; + messageQueue.AddBuffer(socket_, evb, peer(), this, false); evbuffer_add_32be(evb, peer_channel_id_); } diff --git a/src/libswift/swift.cpp b/src/libswift/swift.cpp index c4e5b3b..9a7bd89 100644 --- a/src/libswift/swift.cpp +++ b/src/libswift/swift.cpp @@ -27,6 +27,7 @@ int OpenSwiftDirectory(const TCHAR* dirname, Address tracker, bool force_check_d void ReportCallback(int fd, short event, void *arg); void EndCallback(int fd, short event, void *arg); void RescanDirCallback(int fd, short event, void *arg); +void TimerCallback(int fd, short event, void *arg); // Gateway stuff @@ -39,7 +40,7 @@ void CmdGwUpdateDLStatesCallback(); // Global variables -struct event evreport, evrescan, evend; +struct event evreport, evrescan, evend, evtimer; int single_fd = -1; bool file_enable_checkpoint = false; bool file_checkpointed = false; @@ -328,6 +329,9 @@ int main (int argc, char** argv) evtimer_add(&evend, tint2tv(wait_time)); } + evtimer_assign(&evtimer, Channel::evbase, TimerCallback, NULL); + evtimer_add(&evtimer, tint2tv(TIMER_USEC)); + // Enter mainloop, if daemonizing if (wait_time == TINT_NEVER || (long)wait_time > 0) { // Arno: always, for statsgw, rate control, etc. @@ -567,6 +571,11 @@ void ReportCallback(int fd, short event, void *arg) { evtimer_add(&evreport, tint2tv(TINT_SEC)); } +void TimerCallback(int fd, short event, void *arg) { + Channel::messageQueue.Flush(); + evtimer_add(&evtimer, tint2tv(TIMER_USEC)); +} + void EndCallback(int fd, short event, void *arg) { // Called when wait timer expires == fixed time daemon event_base_loopexit(Channel::evbase, NULL); diff --git a/src/libswift/swift.h b/src/libswift/swift.h index 723cf15..3fac390 100644 --- a/src/libswift/swift.h +++ b/src/libswift/swift.h @@ -54,6 +54,7 @@ #include #include #include +#include #include #include "compat.h" @@ -459,6 +460,7 @@ namespace swift { being transferred between two peers. As we don't need buffers and lots of other TCP stuff, sizeof(Channel+members) must be below 1K. Normally, API users do not deal with this class. */ + class MessageQueue; class Channel { #define DGRAM_MAX_SOCK_OPEN 128 @@ -484,6 +486,8 @@ namespace swift { static struct event evrecv; static const char* SEND_CONTROL_MODES[]; + static MessageQueue messageQueue; + static tint epoch, start; static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down; static void CloseChannelByAddress(const Address &addr); @@ -589,6 +593,8 @@ namespace swift { void Schedule4Close() { scheduled4close_ = true; } bool IsScheduled4Close() { return scheduled4close_; } + void Sent(int bytes, evbuffer *evb, bool tofree); + protected: /** Channel id: index in the channel array. */ @@ -785,6 +791,81 @@ namespace swift { // Arno: Save transfer's binmap for zero-hashcheck restart void Checkpoint(int fdes); +#define MAX_QUEUE_LENGTH 10 +#define TIMER_USEC 100000 + + class MessageQueue + { + public: + class Entry + { + public: + Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree) + : + evb(ievb), + addr(iaddr), + channel(ichannel), + tofree(itofree) + { + } + + evbuffer *evb; + Address addr; + Channel *channel; + bool tofree; + }; + + typedef std::deque EntryList; + typedef std::map EntryLists; + + void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true) + { + EntryList &list = lists[sock]; + list.push_back(Entry(evb, addr, channel, tofree)); + if (list.size() == MAX_QUEUE_LENGTH) + Flush(sock); + } + + void Flush(int sock) + { + EntryList &list = lists[sock]; + if (list.empty()) + return; + + Address addr; + free(addr.addr); + addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest)); + addr.addr->count = list.size(); + evbuffer *evbs[list.size()]; + int i = 0; + for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) { + addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr; + addr.addr->dests[i].port = (*it).addr.addr->dests[0].port; + evbs[i] = (*it).evb; + } + + int r = Channel::SendTo(sock, addr, evbs); + printf("Sent %d buffers\n", list.size()); + if (r > 0) { + i = 0; + for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) { + (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree); + printf("Sent %d bytes\n", addr.addr->dests[i].bytes); + } + } + list.clear(); + } + + void Flush() + { + for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it) + Flush(it->first); + } + + private: + EntryLists lists; + }; + } // namespace end // #define SWIFT_MUTE -- 2.20.1