Multiple sends in one system call for swift.
authorAdrian Bondrescu <adi.bondrescu@gmail.com>
Mon, 6 Aug 2012 20:36:30 +0000 (23:36 +0300)
committerAdrian Bondrescu <adi.bondrescu@gmail.com>
Mon, 6 Aug 2012 20:36:30 +0000 (23:36 +0300)
src/libswift/sendrecv.cpp
src/libswift/swift.cpp
src/libswift/swift.h

index 710e327..1281192 100644 (file)
@@ -18,6 +18,7 @@ using namespace swift;
 using namespace std;
 
 struct event_base *Channel::evbase;
+MessageQueue Channel::messageQueue;
 struct event Channel::evrecv;
 
 #define DEBUGTRAFFIC   0
@@ -200,16 +201,20 @@ void    Channel::Send () {
     dprintf("%s #%u sent %ib %s:%x\n",
             tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
             peer_channel_id_);
-    int r = SendTo(socket_,peer(),&evb);
-    if (r==-1)
-        print_error("can't send datagram");
-    else
-       raw_bytes_up_ += r;
-    last_send_time_ = NOW;
-    sent_since_recv_++;
-    dgrams_sent_++;
-    evbuffer_free(evb);
-    Reschedule();
+
+       messageQueue.AddBuffer(socket_, evb, peer(), this); 
+}
+
+void Channel::Sent(int bytes, evbuffer *evb, bool tofree)
+{
+       raw_bytes_up_ += bytes;
+       if (tofree) {
+               last_send_time_ = NOW;
+               sent_since_recv_++;
+               dgrams_sent_++;
+               evbuffer_free(evb);
+               Reschedule();
+       }
 }
 
 void    Channel::AddHint (struct evbuffer *evb) {
@@ -338,9 +343,7 @@ bin_t        Channel::AddData (struct evbuffer *evb) {
         dprintf("%s #%u fsent %ib %s:%x\n",
                 tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
                 peer_channel_id_);
-       int ret = Channel::SendTo(socket_,peer(),&evb); // kind of fragmentation
-       if (ret > 0)
-               raw_bytes_up_ += ret;
+               messageQueue.AddBuffer(socket_, evb, peer(), this, false);
         evbuffer_add_32be(evb, peer_channel_id_);
     }
 
index c4e5b3b..9a7bd89 100644 (file)
@@ -27,6 +27,7 @@ int OpenSwiftDirectory(const TCHAR* dirname, Address tracker, bool force_check_d
 void ReportCallback(int fd, short event, void *arg);
 void EndCallback(int fd, short event, void *arg);
 void RescanDirCallback(int fd, short event, void *arg);
+void TimerCallback(int fd, short event, void *arg);
 
 
 // Gateway stuff
@@ -39,7 +40,7 @@ void CmdGwUpdateDLStatesCallback();
 
 
 // Global variables
-struct event evreport, evrescan, evend;
+struct event evreport, evrescan, evend, evtimer;
 int single_fd = -1;
 bool file_enable_checkpoint = false;
 bool file_checkpointed = false;
@@ -328,6 +329,9 @@ int main (int argc, char** argv)
        evtimer_add(&evend, tint2tv(wait_time));
     }
 
+       evtimer_assign(&evtimer, Channel::evbase, TimerCallback, NULL);
+       evtimer_add(&evtimer, tint2tv(TIMER_USEC));
+
     // Enter mainloop, if daemonizing
     if (wait_time == TINT_NEVER || (long)wait_time > 0) {
                // Arno: always, for statsgw, rate control, etc.
@@ -567,6 +571,11 @@ void ReportCallback(int fd, short event, void *arg) {
        evtimer_add(&evreport, tint2tv(TINT_SEC));
 }
 
+void TimerCallback(int fd, short event, void *arg) {
+       Channel::messageQueue.Flush();
+       evtimer_add(&evtimer, tint2tv(TIMER_USEC));
+}
+
 void EndCallback(int fd, short event, void *arg) {
        // Called when wait timer expires == fixed time daemon
     event_base_loopexit(Channel::evbase, NULL);
index 723cf15..3fac390 100644 (file)
@@ -54,6 +54,7 @@
 #include <set>
 #include <algorithm>
 #include <string>
+#include <map>
 #include <math.h>
 
 #include "compat.h"
@@ -459,6 +460,7 @@ namespace swift {
           being transferred between two peers. As we don't need buffers and
           lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
           Normally, API users do not deal with this class. */
+       class MessageQueue;
     class Channel {
 
 #define DGRAM_MAX_SOCK_OPEN 128
@@ -484,6 +486,8 @@ namespace swift {
         static struct event evrecv;
         static const char* SEND_CONTROL_MODES[];
 
+               static MessageQueue messageQueue;
+
            static tint epoch, start;
            static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down;
         static void CloseChannelByAddress(const Address &addr);
@@ -589,6 +593,8 @@ namespace swift {
         void           Schedule4Close() { scheduled4close_ = true; }
         bool           IsScheduled4Close() { return scheduled4close_; }
 
+               void Sent(int bytes, evbuffer *evb, bool tofree);
+
 
     protected:
         /** Channel id: index in the channel array. */
@@ -785,6 +791,81 @@ namespace swift {
     // Arno: Save transfer's binmap for zero-hashcheck restart
     void Checkpoint(int fdes);
 
+#define MAX_QUEUE_LENGTH 10
+#define TIMER_USEC 100000
+
+       class MessageQueue
+       {
+       public:
+               class Entry
+               {
+               public:
+                       Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree)
+                               :
+                                       evb(ievb),
+                                       addr(iaddr),
+                                       channel(ichannel),
+                                       tofree(itofree)
+                       {
+                       }
+
+                       evbuffer *evb;
+                       Address addr;
+                       Channel *channel;
+                       bool tofree;
+               };
+
+               typedef std::deque<Entry> EntryList;
+               typedef std::map<int, EntryList> EntryLists;
+       
+               void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true)
+               {
+                       EntryList &list = lists[sock];
+                       list.push_back(Entry(evb, addr, channel, tofree));
+                       if (list.size() == MAX_QUEUE_LENGTH)
+                               Flush(sock);
+               }
+
+               void Flush(int sock)
+               {
+                       EntryList &list = lists[sock];
+                       if (list.empty())
+                               return;
+
+                       Address addr;
+                       free(addr.addr);
+                       addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest));
+                       addr.addr->count = list.size();
+                       evbuffer *evbs[list.size()];
+                       int i = 0;
+                       for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+                               addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr;
+                               addr.addr->dests[i].port = (*it).addr.addr->dests[0].port;
+                               evbs[i] = (*it).evb;
+                       }
+
+                       int r = Channel::SendTo(sock, addr, evbs);
+                       printf("Sent %d buffers\n", list.size());
+                       if (r > 0) {
+                               i = 0;
+                               for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+                                       (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree);
+                                       printf("Sent %d bytes\n", addr.addr->dests[i].bytes);
+                               }
+                       }
+                       list.clear();
+               }
+
+               void Flush() 
+               { 
+                       for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it)
+                               Flush(it->first);
+               }
+
+       private:
+               EntryLists lists;
+       };
+
 } // namespace end
 
 // #define SWIFT_MUTE