test-socket-signal: add peer implementation
authorRazvan Deaconescu <razvan.deaconescu@cs.pub.ro>
Tue, 19 Oct 2010 18:57:31 +0000 (21:57 +0300)
committerRazvan Deaconescu <razvan.deaconescu@cs.pub.ro>
Tue, 19 Oct 2010 18:57:31 +0000 (21:57 +0300)
Utils/test-socket-signal/peer.c [new file with mode: 0644]

diff --git a/Utils/test-socket-signal/peer.c b/Utils/test-socket-signal/peer.c
new file mode 100644 (file)
index 0000000..78e98f4
--- /dev/null
@@ -0,0 +1,280 @@
+/*
+ * inet (BSD) socket client application
+ * 2010, Razvan Deaconescu
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <time.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <signal.h>
+
+#include "sock_util.h"
+#include "utils.h"
+
+#define DEFAULT_LEADER_LISTEN_PORT     43210
+#define DEFAULT_LEADER_HOSTNAME                "localhost"
+#define DEFAULT_FOLLOWER_LISTEN_PORT   54321
+#define DEFAULT_FOLLOWER_HOSTNAME      "localhost"
+
+#define DEFAULT_SERVER_BACKLOG         5
+
+#define PACKET_PAYLOAD_SIZE            120
+#define PACKET_INDEX_SIZE              sizeof(unsigned long long)
+#define PACKET_TIMESPEC_SIZE           sizeof(time_t)
+#define PACKET_SIZE                    (PACKET_PAYLOAD_SIZE + PACKET_INDEX_SIZE + PACKET_TIMESPEC_SIZE)
+
+#define TIMER_FREQUENCY_SECS           1
+#define CLOCKID                                CLOCK_REALTIME
+#define SIG                            SIGRTMIN
+
+static enum {
+       TYPE_LEADER = 1,
+       TYPE_FOLLOWER,
+       TYPE_OLD_LEADER
+} client_type;
+
+
+static char rcv_buf[PACKET_SIZE];
+static char snd_buf[PACKET_SIZE];
+
+/* connection socket */
+static int connectfd;
+
+static timer_t timerid;
+
+static void init_buffer_random(char *buf, size_t len)
+{
+       size_t i;
+
+       srand(time(NULL));
+
+       for (i = 0; i < len-2; i++)
+               buf[i] = (char) (rand() % 26) + 'a';
+       buf[i] = '\0';
+}
+
+static void init_buffers(void)
+{
+       init_buffer_random(rcv_buf, PACKET_PAYLOAD_SIZE);
+       init_buffer_random(snd_buf, PACKET_PAYLOAD_SIZE);
+}
+
+static void fill_send_buffer(void)
+{
+       static unsigned long long index = 0;
+       char *ptr;
+       time_t curr_time_secs;
+
+       curr_time_secs = time(NULL);
+
+       ptr = snd_buf + PACKET_PAYLOAD_SIZE;
+       * (unsigned long long *) ptr = index;
+       ptr += PACKET_INDEX_SIZE;
+       * (time_t *) ptr = curr_time_secs;
+
+       printf("[send] index: %llu curr_time_secs: %lu\n", index, curr_time_secs);
+       index++;
+}
+
+static ssize_t send_buffer(int sockfd)
+{
+       return send(sockfd, snd_buf, PACKET_SIZE, 0);
+}
+
+static ssize_t receive_buffer(int sockfd)
+{
+       return recv(sockfd, rcv_buf, PACKET_SIZE, 0);
+}
+
+static void timer_handler(int sig, siginfo_t *si, void *uc)
+{
+       ssize_t nbytes;
+
+       fill_send_buffer();
+       nbytes = send_buffer(connectfd);
+       DIE(nbytes < 0, "send_buffer");
+}
+
+static void remove_timer(void)
+{
+       timer_delete(timerid);
+}
+
+static void schedule_timer(void)
+{
+       struct sigaction sa;
+       struct sigevent sev;
+       sigset_t mask;
+       struct itimerspec its;
+       int rc;
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sa_flags = SA_SIGINFO | SA_RESTART;
+       sa.sa_sigaction = timer_handler;
+       sigemptyset(&sa.sa_mask);
+       rc = sigaction(SIG, &sa, NULL);
+       DIE(rc < 0, "sigaction");
+
+       /* Block timer signal temporarily */
+       sigemptyset(&mask);
+       sigaddset(&mask, SIG);
+       rc = sigprocmask(SIG_SETMASK, &mask, NULL);
+       DIE(rc < 0, "sigprocmask");
+
+       /* Create the timer */
+       sev.sigev_notify = SIGEV_SIGNAL;
+       sev.sigev_signo = SIG;
+       sev.sigev_value.sival_ptr = &timerid;
+       rc = timer_create(CLOCKID, &sev, &timerid);
+       DIE(rc < 0, "timer_create");
+
+       /* Start the timer */
+       its.it_value.tv_sec = TIMER_FREQUENCY_SECS;
+       its.it_value.tv_nsec = 0;
+       its.it_interval.tv_sec = TIMER_FREQUENCY_SECS;
+       its.it_interval.tv_nsec = 0;
+       rc = timer_settime(timerid, 0, &its, NULL);
+       DIE(rc < 0, "timer_settime");
+
+       /* Unlock the timer signal, so that timer notification
+          can be delivered */
+       rc = sigprocmask(SIG_UNBLOCK, &mask, NULL);
+       DIE(rc < 0, "sigprocmask");
+}
+
+static void print_buffer_meta(void)
+{
+       unsigned long long index;
+       time_t curr_time_secs;
+       time_t sender_time_secs;
+       char *ptr;
+
+       curr_time_secs = time(NULL);
+
+       ptr = rcv_buf + PACKET_PAYLOAD_SIZE;
+       index = * (unsigned long long *) ptr;
+       ptr += PACKET_INDEX_SIZE;
+       sender_time_secs = * (time_t *) ptr;
+
+       printf("[recv] index %llu, ", index);
+       if (sender_time_secs > curr_time_secs)
+               printf("negative latency (weird)\n");
+       else
+               printf("latency %lu seconds (curr_time = %lu, sender_time = %lu)\n",
+                               curr_time_secs - sender_time_secs,
+                               curr_time_secs,
+                               sender_time_secs);
+}
+
+static void usage(const char *argv0)
+{
+       fprintf(stderr, "Usage: %s leader | follower\n", argv0);
+}
+
+static void parse_args(int argc, char **argv)
+{
+       if (argc < 2) {
+               usage(argv[0]);
+               exit(EXIT_FAILURE);
+       }
+
+       if (strcmp(argv[1], "leader") == 0)
+               client_type = TYPE_LEADER;
+       else if (strcmp(argv[1], "follower") == 0)
+               client_type = TYPE_FOLLOWER;
+       else if (strcmp(argv[1], "oldleader") == 0)
+               client_type = TYPE_OLD_LEADER;
+       else {
+               usage(argv[0]);
+               exit(EXIT_FAILURE);
+       }
+}
+
+int main(int argc, char **argv)
+{
+       int listenfd;
+       int sockfd;
+       struct sockaddr_in addr;
+       socklen_t addrlen = 0;
+
+       parse_args(argc, argv);
+
+       if (client_type == TYPE_LEADER) {
+               listenfd = tcp_listen_connections(DEFAULT_LEADER_LISTEN_PORT,
+                               DEFAULT_SERVER_BACKLOG);
+               DIE(listenfd < 0, "tcp_listen_connections");
+
+               sockfd = accept(listenfd, (SSA *) &addr, &addrlen);
+               DIE(sockfd < 0, "accept");
+
+               connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME,
+                               DEFAULT_FOLLOWER_LISTEN_PORT);
+               DIE(connectfd < 0, "tcp_connect_to_server");
+       }
+       else if (client_type == TYPE_FOLLOWER) {
+               listenfd = tcp_listen_connections(DEFAULT_FOLLOWER_LISTEN_PORT,
+                               DEFAULT_SERVER_BACKLOG);
+               DIE(listenfd < 0, "tcp_listen_connections");
+
+               connectfd = tcp_connect_to_server(DEFAULT_LEADER_HOSTNAME,
+                               DEFAULT_LEADER_LISTEN_PORT);
+               DIE(connectfd < 0, "tcp_connect_to_server");
+
+               sockfd = accept(listenfd, (SSA *) &addr, &addrlen);
+               DIE(sockfd < 0, "accept");
+       }
+       else if (client_type == TYPE_OLD_LEADER) {
+               listenfd = tcp_listen_connections(DEFAULT_LEADER_LISTEN_PORT,
+                               DEFAULT_SERVER_BACKLOG);
+               DIE(listenfd < 0, "tcp_listen_connections");
+
+               connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME,
+                               DEFAULT_FOLLOWER_LISTEN_PORT);
+               DIE(connectfd < 0, "tcp_connect_to_server");
+
+               sockfd = accept(listenfd, (SSA *) &addr, &addrlen);
+               DIE(sockfd < 0, "accept");
+       }
+       else {
+               usage(argv[0]);
+               exit(EXIT_FAILURE);
+       }
+
+       init_buffers();
+       schedule_timer();
+
+       while (1) {
+               ssize_t nbytes;
+
+               nbytes = receive_buffer(sockfd);
+               DIE(nbytes < 0, "receive_buffer");
+               if (nbytes == 0) {
+                       close(sockfd);
+                       remove_timer();
+
+                       sockfd = accept(listenfd, (SSA *) &addr, &addrlen);
+                       DIE(sockfd < 0, "accept");
+
+                       if (client_type == TYPE_FOLLOWER) {
+                               connectfd = tcp_connect_to_server(DEFAULT_LEADER_HOSTNAME, DEFAULT_LEADER_LISTEN_PORT);
+                               DIE(connectfd < 0, "tcp_connect_to_server");
+                       }
+                       if (client_type == TYPE_LEADER || client_type == TYPE_OLD_LEADER) {
+                               connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME, DEFAULT_FOLLOWER_LISTEN_PORT);
+                               DIE(connectfd < 0, "tcp_connect_to_server");
+                       }
+                       schedule_timer();
+               }
+
+               print_buffer_meta();
+       }
+
+       close(sockfd);
+       close(connectfd);
+
+       return 0;
+}