--- /dev/null
+/*
+ * inet (BSD) socket client application
+ * 2010, Razvan Deaconescu
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <time.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <signal.h>
+
+#include "sock_util.h"
+#include "utils.h"
+
+#define DEFAULT_LEADER_LISTEN_PORT 43210
+#define DEFAULT_LEADER_HOSTNAME "localhost"
+#define DEFAULT_FOLLOWER_LISTEN_PORT 54321
+#define DEFAULT_FOLLOWER_HOSTNAME "localhost"
+
+#define DEFAULT_SERVER_BACKLOG 5
+
+#define PACKET_PAYLOAD_SIZE 120
+#define PACKET_INDEX_SIZE sizeof(unsigned long long)
+#define PACKET_TIMESPEC_SIZE sizeof(time_t)
+#define PACKET_SIZE (PACKET_PAYLOAD_SIZE + PACKET_INDEX_SIZE + PACKET_TIMESPEC_SIZE)
+
+#define TIMER_FREQUENCY_SECS 1
+#define CLOCKID CLOCK_REALTIME
+#define SIG SIGRTMIN
+
+static enum {
+ TYPE_LEADER = 1,
+ TYPE_FOLLOWER,
+ TYPE_OLD_LEADER
+} client_type;
+
+
+static char rcv_buf[PACKET_SIZE];
+static char snd_buf[PACKET_SIZE];
+
+/* connection socket */
+static int connectfd;
+
+static timer_t timerid;
+
+static void init_buffer_random(char *buf, size_t len)
+{
+ size_t i;
+
+ srand(time(NULL));
+
+ for (i = 0; i < len-2; i++)
+ buf[i] = (char) (rand() % 26) + 'a';
+ buf[i] = '\0';
+}
+
+static void init_buffers(void)
+{
+ init_buffer_random(rcv_buf, PACKET_PAYLOAD_SIZE);
+ init_buffer_random(snd_buf, PACKET_PAYLOAD_SIZE);
+}
+
+static void fill_send_buffer(void)
+{
+ static unsigned long long index = 0;
+ char *ptr;
+ time_t curr_time_secs;
+
+ curr_time_secs = time(NULL);
+
+ ptr = snd_buf + PACKET_PAYLOAD_SIZE;
+ * (unsigned long long *) ptr = index;
+ ptr += PACKET_INDEX_SIZE;
+ * (time_t *) ptr = curr_time_secs;
+
+ printf("[send] index: %llu curr_time_secs: %lu\n", index, curr_time_secs);
+ index++;
+}
+
+static ssize_t send_buffer(int sockfd)
+{
+ return send(sockfd, snd_buf, PACKET_SIZE, 0);
+}
+
+static ssize_t receive_buffer(int sockfd)
+{
+ return recv(sockfd, rcv_buf, PACKET_SIZE, 0);
+}
+
+static void timer_handler(int sig, siginfo_t *si, void *uc)
+{
+ ssize_t nbytes;
+
+ fill_send_buffer();
+ nbytes = send_buffer(connectfd);
+ DIE(nbytes < 0, "send_buffer");
+}
+
+static void remove_timer(void)
+{
+ timer_delete(timerid);
+}
+
+static void schedule_timer(void)
+{
+ struct sigaction sa;
+ struct sigevent sev;
+ sigset_t mask;
+ struct itimerspec its;
+ int rc;
+
+ memset(&sa, 0, sizeof(sa));
+ sa.sa_flags = SA_SIGINFO | SA_RESTART;
+ sa.sa_sigaction = timer_handler;
+ sigemptyset(&sa.sa_mask);
+ rc = sigaction(SIG, &sa, NULL);
+ DIE(rc < 0, "sigaction");
+
+ /* Block timer signal temporarily */
+ sigemptyset(&mask);
+ sigaddset(&mask, SIG);
+ rc = sigprocmask(SIG_SETMASK, &mask, NULL);
+ DIE(rc < 0, "sigprocmask");
+
+ /* Create the timer */
+ sev.sigev_notify = SIGEV_SIGNAL;
+ sev.sigev_signo = SIG;
+ sev.sigev_value.sival_ptr = &timerid;
+ rc = timer_create(CLOCKID, &sev, &timerid);
+ DIE(rc < 0, "timer_create");
+
+ /* Start the timer */
+ its.it_value.tv_sec = TIMER_FREQUENCY_SECS;
+ its.it_value.tv_nsec = 0;
+ its.it_interval.tv_sec = TIMER_FREQUENCY_SECS;
+ its.it_interval.tv_nsec = 0;
+ rc = timer_settime(timerid, 0, &its, NULL);
+ DIE(rc < 0, "timer_settime");
+
+ /* Unlock the timer signal, so that timer notification
+ can be delivered */
+ rc = sigprocmask(SIG_UNBLOCK, &mask, NULL);
+ DIE(rc < 0, "sigprocmask");
+}
+
+static void print_buffer_meta(void)
+{
+ unsigned long long index;
+ time_t curr_time_secs;
+ time_t sender_time_secs;
+ char *ptr;
+
+ curr_time_secs = time(NULL);
+
+ ptr = rcv_buf + PACKET_PAYLOAD_SIZE;
+ index = * (unsigned long long *) ptr;
+ ptr += PACKET_INDEX_SIZE;
+ sender_time_secs = * (time_t *) ptr;
+
+ printf("[recv] index %llu, ", index);
+ if (sender_time_secs > curr_time_secs)
+ printf("negative latency (weird)\n");
+ else
+ printf("latency %lu seconds (curr_time = %lu, sender_time = %lu)\n",
+ curr_time_secs - sender_time_secs,
+ curr_time_secs,
+ sender_time_secs);
+}
+
+static void usage(const char *argv0)
+{
+ fprintf(stderr, "Usage: %s leader | follower\n", argv0);
+}
+
+static void parse_args(int argc, char **argv)
+{
+ if (argc < 2) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ if (strcmp(argv[1], "leader") == 0)
+ client_type = TYPE_LEADER;
+ else if (strcmp(argv[1], "follower") == 0)
+ client_type = TYPE_FOLLOWER;
+ else if (strcmp(argv[1], "oldleader") == 0)
+ client_type = TYPE_OLD_LEADER;
+ else {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+}
+
+int main(int argc, char **argv)
+{
+ int listenfd;
+ int sockfd;
+ struct sockaddr_in addr;
+ socklen_t addrlen = 0;
+
+ parse_args(argc, argv);
+
+ if (client_type == TYPE_LEADER) {
+ listenfd = tcp_listen_connections(DEFAULT_LEADER_LISTEN_PORT,
+ DEFAULT_SERVER_BACKLOG);
+ DIE(listenfd < 0, "tcp_listen_connections");
+
+ sockfd = accept(listenfd, (SSA *) &addr, &addrlen);
+ DIE(sockfd < 0, "accept");
+
+ connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME,
+ DEFAULT_FOLLOWER_LISTEN_PORT);
+ DIE(connectfd < 0, "tcp_connect_to_server");
+ }
+ else if (client_type == TYPE_FOLLOWER) {
+ listenfd = tcp_listen_connections(DEFAULT_FOLLOWER_LISTEN_PORT,
+ DEFAULT_SERVER_BACKLOG);
+ DIE(listenfd < 0, "tcp_listen_connections");
+
+ connectfd = tcp_connect_to_server(DEFAULT_LEADER_HOSTNAME,
+ DEFAULT_LEADER_LISTEN_PORT);
+ DIE(connectfd < 0, "tcp_connect_to_server");
+
+ sockfd = accept(listenfd, (SSA *) &addr, &addrlen);
+ DIE(sockfd < 0, "accept");
+ }
+ else if (client_type == TYPE_OLD_LEADER) {
+ listenfd = tcp_listen_connections(DEFAULT_LEADER_LISTEN_PORT,
+ DEFAULT_SERVER_BACKLOG);
+ DIE(listenfd < 0, "tcp_listen_connections");
+
+ connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME,
+ DEFAULT_FOLLOWER_LISTEN_PORT);
+ DIE(connectfd < 0, "tcp_connect_to_server");
+
+ sockfd = accept(listenfd, (SSA *) &addr, &addrlen);
+ DIE(sockfd < 0, "accept");
+ }
+ else {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ init_buffers();
+ schedule_timer();
+
+ while (1) {
+ ssize_t nbytes;
+
+ nbytes = receive_buffer(sockfd);
+ DIE(nbytes < 0, "receive_buffer");
+ if (nbytes == 0) {
+ close(sockfd);
+ remove_timer();
+
+ sockfd = accept(listenfd, (SSA *) &addr, &addrlen);
+ DIE(sockfd < 0, "accept");
+
+ if (client_type == TYPE_FOLLOWER) {
+ connectfd = tcp_connect_to_server(DEFAULT_LEADER_HOSTNAME, DEFAULT_LEADER_LISTEN_PORT);
+ DIE(connectfd < 0, "tcp_connect_to_server");
+ }
+ if (client_type == TYPE_LEADER || client_type == TYPE_OLD_LEADER) {
+ connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME, DEFAULT_FOLLOWER_LISTEN_PORT);
+ DIE(connectfd < 0, "tcp_connect_to_server");
+ }
+ schedule_timer();
+ }
+
+ print_buffer_meta();
+ }
+
+ close(sockfd);
+ close(connectfd);
+
+ return 0;
+}