Multiple sends in one system call for swift.
[swifty.git] / src / libswift / swift.h
index 723cf15..3fac390 100644 (file)
@@ -54,6 +54,7 @@
 #include <set>
 #include <algorithm>
 #include <string>
+#include <map>
 #include <math.h>
 
 #include "compat.h"
@@ -459,6 +460,7 @@ namespace swift {
           being transferred between two peers. As we don't need buffers and
           lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
           Normally, API users do not deal with this class. */
+       class MessageQueue;
     class Channel {
 
 #define DGRAM_MAX_SOCK_OPEN 128
@@ -484,6 +486,8 @@ namespace swift {
         static struct event evrecv;
         static const char* SEND_CONTROL_MODES[];
 
+               static MessageQueue messageQueue;
+
            static tint epoch, start;
            static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down;
         static void CloseChannelByAddress(const Address &addr);
@@ -589,6 +593,8 @@ namespace swift {
         void           Schedule4Close() { scheduled4close_ = true; }
         bool           IsScheduled4Close() { return scheduled4close_; }
 
+               void Sent(int bytes, evbuffer *evb, bool tofree);
+
 
     protected:
         /** Channel id: index in the channel array. */
@@ -785,6 +791,81 @@ namespace swift {
     // Arno: Save transfer's binmap for zero-hashcheck restart
     void Checkpoint(int fdes);
 
+#define MAX_QUEUE_LENGTH 10
+#define TIMER_USEC 100000
+
+       class MessageQueue
+       {
+       public:
+               class Entry
+               {
+               public:
+                       Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree)
+                               :
+                                       evb(ievb),
+                                       addr(iaddr),
+                                       channel(ichannel),
+                                       tofree(itofree)
+                       {
+                       }
+
+                       evbuffer *evb;
+                       Address addr;
+                       Channel *channel;
+                       bool tofree;
+               };
+
+               typedef std::deque<Entry> EntryList;
+               typedef std::map<int, EntryList> EntryLists;
+       
+               void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true)
+               {
+                       EntryList &list = lists[sock];
+                       list.push_back(Entry(evb, addr, channel, tofree));
+                       if (list.size() == MAX_QUEUE_LENGTH)
+                               Flush(sock);
+               }
+
+               void Flush(int sock)
+               {
+                       EntryList &list = lists[sock];
+                       if (list.empty())
+                               return;
+
+                       Address addr;
+                       free(addr.addr);
+                       addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest));
+                       addr.addr->count = list.size();
+                       evbuffer *evbs[list.size()];
+                       int i = 0;
+                       for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+                               addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr;
+                               addr.addr->dests[i].port = (*it).addr.addr->dests[0].port;
+                               evbs[i] = (*it).evb;
+                       }
+
+                       int r = Channel::SendTo(sock, addr, evbs);
+                       printf("Sent %d buffers\n", list.size());
+                       if (r > 0) {
+                               i = 0;
+                               for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+                                       (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree);
+                                       printf("Sent %d bytes\n", addr.addr->dests[i].bytes);
+                               }
+                       }
+                       list.clear();
+               }
+
+               void Flush() 
+               { 
+                       for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it)
+                               Flush(it->first);
+               }
+
+       private:
+               EntryLists lists;
+       };
+
 } // namespace end
 
 // #define SWIFT_MUTE