Use Linux-like indentation in mptp.c
[swifty.git] / src / libswift / swift.h
index d44b882..67092c2 100644 (file)
@@ -54,6 +54,7 @@
 #include <set>
 #include <algorithm>
 #include <string>
+#include <map>
 #include <math.h>
 
 #include "compat.h"
@@ -108,6 +109,11 @@ namespace swift {
        Address() {
            clear();
        }
+       Address(const Address &b) {
+           clear();
+               addr->dests[0].addr = b.addr->dests[0].addr;
+               addr->dests[0].port = b.addr->dests[0].port;
+       }
        Address(const char* ip, uint16_t port)  {
            clear();
            set_ipv4(ip);
@@ -454,6 +460,7 @@ namespace swift {
           being transferred between two peers. As we don't need buffers and
           lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
           Normally, API users do not deal with this class. */
+       class MessageQueue;
     class Channel {
 
 #define DGRAM_MAX_SOCK_OPEN 128
@@ -479,8 +486,11 @@ namespace swift {
         static struct event evrecv;
         static const char* SEND_CONTROL_MODES[];
 
+               static MessageQueue messageQueue;
+
            static tint epoch, start;
-           static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down;
+           static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down,
+                                               global_buffers_up, global_syscalls_up, global_buffers_down, global_syscalls_down;
         static void CloseChannelByAddress(const Address &addr);
 
         // SOCKMGMT
@@ -489,8 +499,8 @@ namespace swift {
         static void LibeventSendCallback(int fd, short event, void *arg);
         static void LibeventReceiveCallback(int fd, short event, void *arg);
         static void RecvDatagram (evutil_socket_t socket); // Called by LibeventReceiveCallback
-           static int RecvFrom(evutil_socket_t sock, Address& addr, struct evbuffer *evb); // Called by RecvDatagram
-           static int SendTo(evutil_socket_t sock, const Address& addr, struct evbuffer *evb); // Called by Channel::Send()
+           static int RecvFrom(evutil_socket_t sock, Address& addr, struct evbuffer **evb); // Called by RecvDatagram
+           static int SendTo(evutil_socket_t sock, const Address& addr, struct evbuffer **evb); // Called by Channel::Send()
            static evutil_socket_t Bind(Address address, sckrwecb_t callbacks=sckrwecb_t());
            static Address BoundAddress(evutil_socket_t sock);
            static evutil_socket_t default_socket()
@@ -516,7 +526,7 @@ namespace swift {
         void        OnHandshake (struct evbuffer *evb);
         void        OnRandomize (struct evbuffer *evb); //FRAGRAND
         void        AddHandshake (struct evbuffer *evb);
-        bin_t       AddData (struct evbuffer *evb);
+        bin_t       AddData (struct evbuffer **evb);
         void        AddAck (struct evbuffer *evb);
         void        AddHave (struct evbuffer *evb);
         void        AddHint (struct evbuffer *evb);
@@ -584,6 +594,8 @@ namespace swift {
         void           Schedule4Close() { scheduled4close_ = true; }
         bool           IsScheduled4Close() { return scheduled4close_; }
 
+               void Sent(int bytes, evbuffer *evb, bool tofree);
+
 
     protected:
         /** Channel id: index in the channel array. */
@@ -780,6 +792,78 @@ namespace swift {
     // Arno: Save transfer's binmap for zero-hashcheck restart
     void Checkpoint(int fdes);
 
+#define MAX_QUEUE_LENGTH 1
+#define TIMER_USEC 10000
+
+       class MessageQueue
+       {
+       public:
+               class Entry
+               {
+               public:
+                       Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree)
+                               :
+                                       evb(ievb),
+                                       addr(iaddr),
+                                       channel(ichannel),
+                                       tofree(itofree)
+                       {
+                       }
+
+                       evbuffer *evb;
+                       Address addr;
+                       Channel *channel;
+                       bool tofree;
+               };
+
+               typedef std::deque<Entry> EntryList;
+               typedef std::map<int, EntryList> EntryLists;
+       
+               void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true)
+               {
+                       EntryList &list = lists[sock];
+                       list.push_back(Entry(evb, addr, channel, tofree));
+                       if (list.size() == MAX_QUEUE_LENGTH)
+                               Flush(sock);
+               }
+
+               void Flush(int sock)
+               {
+                       EntryList &list = lists[sock];
+                       if (list.empty())
+                               return;
+
+                       Address addr;
+                       free(addr.addr);
+                       addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest));
+                       addr.addr->count = list.size();
+                       evbuffer *evbs[list.size()];
+                       int i = 0;
+                       for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+                               addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr;
+                               addr.addr->dests[i].port = (*it).addr.addr->dests[0].port;
+                               evbs[i] = (*it).evb;
+                       }
+
+                       int r = Channel::SendTo(sock, addr, evbs);
+                       if (r > 0) {
+                               i = 0;
+                               for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i)
+                                       (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree);
+                       }
+                       list.clear();
+               }
+
+               void Flush() 
+               { 
+                       for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it)
+                               Flush(it->first);
+               }
+
+       private:
+               EntryLists lists;
+       };
+
 } // namespace end
 
 // #define SWIFT_MUTE