#include <set>
#include <algorithm>
#include <string>
+#include <map>
#include <math.h>
#include "compat.h"
being transferred between two peers. As we don't need buffers and
lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
Normally, API users do not deal with this class. */
+ class MessageQueue;
class Channel {
#define DGRAM_MAX_SOCK_OPEN 128
static struct event evrecv;
static const char* SEND_CONTROL_MODES[];
+ static MessageQueue messageQueue;
+
static tint epoch, start;
static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down;
static void CloseChannelByAddress(const Address &addr);
void Schedule4Close() { scheduled4close_ = true; }
bool IsScheduled4Close() { return scheduled4close_; }
+ void Sent(int bytes, evbuffer *evb, bool tofree);
+
protected:
/** Channel id: index in the channel array. */
// Arno: Save transfer's binmap for zero-hashcheck restart
void Checkpoint(int fdes);
+#define MAX_QUEUE_LENGTH 10
+#define TIMER_USEC 100000
+
+ class MessageQueue
+ {
+ public:
+ class Entry
+ {
+ public:
+ Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree)
+ :
+ evb(ievb),
+ addr(iaddr),
+ channel(ichannel),
+ tofree(itofree)
+ {
+ }
+
+ evbuffer *evb;
+ Address addr;
+ Channel *channel;
+ bool tofree;
+ };
+
+ typedef std::deque<Entry> EntryList;
+ typedef std::map<int, EntryList> EntryLists;
+
+ void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true)
+ {
+ EntryList &list = lists[sock];
+ list.push_back(Entry(evb, addr, channel, tofree));
+ if (list.size() == MAX_QUEUE_LENGTH)
+ Flush(sock);
+ }
+
+ void Flush(int sock)
+ {
+ EntryList &list = lists[sock];
+ if (list.empty())
+ return;
+
+ Address addr;
+ free(addr.addr);
+ addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest));
+ addr.addr->count = list.size();
+ evbuffer *evbs[list.size()];
+ int i = 0;
+ for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+ addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr;
+ addr.addr->dests[i].port = (*it).addr.addr->dests[0].port;
+ evbs[i] = (*it).evb;
+ }
+
+ int r = Channel::SendTo(sock, addr, evbs);
+ printf("Sent %d buffers\n", list.size());
+ if (r > 0) {
+ i = 0;
+ for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+ (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree);
+ printf("Sent %d bytes\n", addr.addr->dests[i].bytes);
+ }
+ }
+ list.clear();
+ }
+
+ void Flush()
+ {
+ for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it)
+ Flush(it->first);
+ }
+
+ private:
+ EntryLists lists;
+ };
+
} // namespace end
// #define SWIFT_MUTE