using namespace std;
struct event_base *Channel::evbase;
+MessageQueue Channel::messageQueue;
struct event Channel::evrecv;
#define DEBUGTRAFFIC 0
dprintf("%s #%u sent %ib %s:%x\n",
tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
peer_channel_id_);
- int r = SendTo(socket_,peer(),&evb);
- if (r==-1)
- print_error("can't send datagram");
- else
- raw_bytes_up_ += r;
- last_send_time_ = NOW;
- sent_since_recv_++;
- dgrams_sent_++;
- evbuffer_free(evb);
- Reschedule();
+
+ messageQueue.AddBuffer(socket_, evb, peer(), this);
+}
+
+void Channel::Sent(int bytes, evbuffer *evb, bool tofree)
+{
+ raw_bytes_up_ += bytes;
+ if (tofree) {
+ last_send_time_ = NOW;
+ sent_since_recv_++;
+ dgrams_sent_++;
+ evbuffer_free(evb);
+ Reschedule();
+ }
}
void Channel::AddHint (struct evbuffer *evb) {
dprintf("%s #%u fsent %ib %s:%x\n",
tintstr(),id_,(int)evbuffer_get_length(evb),peer().str(),
peer_channel_id_);
- int ret = Channel::SendTo(socket_,peer(),&evb); // kind of fragmentation
- if (ret > 0)
- raw_bytes_up_ += ret;
+ messageQueue.AddBuffer(socket_, evb, peer(), this, false);
evbuffer_add_32be(evb, peer_channel_id_);
}
void ReportCallback(int fd, short event, void *arg);
void EndCallback(int fd, short event, void *arg);
void RescanDirCallback(int fd, short event, void *arg);
+void TimerCallback(int fd, short event, void *arg);
// Gateway stuff
// Global variables
-struct event evreport, evrescan, evend;
+struct event evreport, evrescan, evend, evtimer;
int single_fd = -1;
bool file_enable_checkpoint = false;
bool file_checkpointed = false;
evtimer_add(&evend, tint2tv(wait_time));
}
+ evtimer_assign(&evtimer, Channel::evbase, TimerCallback, NULL);
+ evtimer_add(&evtimer, tint2tv(TIMER_USEC));
+
// Enter mainloop, if daemonizing
if (wait_time == TINT_NEVER || (long)wait_time > 0) {
// Arno: always, for statsgw, rate control, etc.
evtimer_add(&evreport, tint2tv(TINT_SEC));
}
+void TimerCallback(int fd, short event, void *arg) {
+ Channel::messageQueue.Flush();
+ evtimer_add(&evtimer, tint2tv(TIMER_USEC));
+}
+
void EndCallback(int fd, short event, void *arg) {
// Called when wait timer expires == fixed time daemon
event_base_loopexit(Channel::evbase, NULL);
#include <set>
#include <algorithm>
#include <string>
+#include <map>
#include <math.h>
#include "compat.h"
being transferred between two peers. As we don't need buffers and
lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
Normally, API users do not deal with this class. */
+ class MessageQueue;
class Channel {
#define DGRAM_MAX_SOCK_OPEN 128
static struct event evrecv;
static const char* SEND_CONTROL_MODES[];
+ static MessageQueue messageQueue;
+
static tint epoch, start;
static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down;
static void CloseChannelByAddress(const Address &addr);
void Schedule4Close() { scheduled4close_ = true; }
bool IsScheduled4Close() { return scheduled4close_; }
+ void Sent(int bytes, evbuffer *evb, bool tofree);
+
protected:
/** Channel id: index in the channel array. */
// Arno: Save transfer's binmap for zero-hashcheck restart
void Checkpoint(int fdes);
+#define MAX_QUEUE_LENGTH 10
+#define TIMER_USEC 100000
+
+ class MessageQueue
+ {
+ public:
+ class Entry
+ {
+ public:
+ Entry(evbuffer *ievb, const Address &iaddr, Channel *ichannel, bool itofree)
+ :
+ evb(ievb),
+ addr(iaddr),
+ channel(ichannel),
+ tofree(itofree)
+ {
+ }
+
+ evbuffer *evb;
+ Address addr;
+ Channel *channel;
+ bool tofree;
+ };
+
+ typedef std::deque<Entry> EntryList;
+ typedef std::map<int, EntryList> EntryLists;
+
+ void AddBuffer(int sock, evbuffer *evb, const Address &addr, Channel *channel, bool tofree = true)
+ {
+ EntryList &list = lists[sock];
+ list.push_back(Entry(evb, addr, channel, tofree));
+ if (list.size() == MAX_QUEUE_LENGTH)
+ Flush(sock);
+ }
+
+ void Flush(int sock)
+ {
+ EntryList &list = lists[sock];
+ if (list.empty())
+ return;
+
+ Address addr;
+ free(addr.addr);
+ addr.addr = (struct sockaddr_mptp *) calloc(1, sizeof(struct sockaddr_mptp) + list.size() * sizeof(struct mptp_dest));
+ addr.addr->count = list.size();
+ evbuffer *evbs[list.size()];
+ int i = 0;
+ for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+ addr.addr->dests[i].addr = (*it).addr.addr->dests[0].addr;
+ addr.addr->dests[i].port = (*it).addr.addr->dests[0].port;
+ evbs[i] = (*it).evb;
+ }
+
+ int r = Channel::SendTo(sock, addr, evbs);
+ printf("Sent %d buffers\n", list.size());
+ if (r > 0) {
+ i = 0;
+ for (EntryList::iterator it = list.begin(); it != list.end(); ++it, ++i) {
+ (*it).channel->Sent(evbuffer_get_length((*it).evb), (*it).evb, (*it).tofree);
+ printf("Sent %d bytes\n", addr.addr->dests[i].bytes);
+ }
+ }
+ list.clear();
+ }
+
+ void Flush()
+ {
+ for (EntryLists::iterator it = lists.begin(); it != lists.end(); ++it)
+ Flush(it->first);
+ }
+
+ private:
+ EntryLists lists;
+ };
+
} // namespace end
// #define SWIFT_MUTE