From: Razvan Deaconescu Date: Tue, 19 Oct 2010 18:57:31 +0000 (+0300) Subject: test-socket-signal: add peer implementation X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=e7f6172d700ce2a1c87e619f03f4fbe16dc0e86b;p=p2p-testing-infrastructure.git test-socket-signal: add peer implementation --- diff --git a/Utils/test-socket-signal/peer.c b/Utils/test-socket-signal/peer.c new file mode 100644 index 0000000..78e98f4 --- /dev/null +++ b/Utils/test-socket-signal/peer.c @@ -0,0 +1,280 @@ +/* + * inet (BSD) socket client application + * 2010, Razvan Deaconescu + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "sock_util.h" +#include "utils.h" + +#define DEFAULT_LEADER_LISTEN_PORT 43210 +#define DEFAULT_LEADER_HOSTNAME "localhost" +#define DEFAULT_FOLLOWER_LISTEN_PORT 54321 +#define DEFAULT_FOLLOWER_HOSTNAME "localhost" + +#define DEFAULT_SERVER_BACKLOG 5 + +#define PACKET_PAYLOAD_SIZE 120 +#define PACKET_INDEX_SIZE sizeof(unsigned long long) +#define PACKET_TIMESPEC_SIZE sizeof(time_t) +#define PACKET_SIZE (PACKET_PAYLOAD_SIZE + PACKET_INDEX_SIZE + PACKET_TIMESPEC_SIZE) + +#define TIMER_FREQUENCY_SECS 1 +#define CLOCKID CLOCK_REALTIME +#define SIG SIGRTMIN + +static enum { + TYPE_LEADER = 1, + TYPE_FOLLOWER, + TYPE_OLD_LEADER +} client_type; + + +static char rcv_buf[PACKET_SIZE]; +static char snd_buf[PACKET_SIZE]; + +/* connection socket */ +static int connectfd; + +static timer_t timerid; + +static void init_buffer_random(char *buf, size_t len) +{ + size_t i; + + srand(time(NULL)); + + for (i = 0; i < len-2; i++) + buf[i] = (char) (rand() % 26) + 'a'; + buf[i] = '\0'; +} + +static void init_buffers(void) +{ + init_buffer_random(rcv_buf, PACKET_PAYLOAD_SIZE); + init_buffer_random(snd_buf, PACKET_PAYLOAD_SIZE); +} + +static void fill_send_buffer(void) +{ + static unsigned long long index = 0; + char *ptr; + time_t curr_time_secs; + + curr_time_secs = time(NULL); + + ptr = snd_buf + PACKET_PAYLOAD_SIZE; + * (unsigned long long *) ptr = index; + ptr += PACKET_INDEX_SIZE; + * (time_t *) ptr = curr_time_secs; + + printf("[send] index: %llu curr_time_secs: %lu\n", index, curr_time_secs); + index++; +} + +static ssize_t send_buffer(int sockfd) +{ + return send(sockfd, snd_buf, PACKET_SIZE, 0); +} + +static ssize_t receive_buffer(int sockfd) +{ + return recv(sockfd, rcv_buf, PACKET_SIZE, 0); +} + +static void timer_handler(int sig, siginfo_t *si, void *uc) +{ + ssize_t nbytes; + + fill_send_buffer(); + nbytes = send_buffer(connectfd); + DIE(nbytes < 0, "send_buffer"); +} + +static void remove_timer(void) +{ + timer_delete(timerid); +} + +static void schedule_timer(void) +{ + struct sigaction sa; + struct sigevent sev; + sigset_t mask; + struct itimerspec its; + int rc; + + memset(&sa, 0, sizeof(sa)); + sa.sa_flags = SA_SIGINFO | SA_RESTART; + sa.sa_sigaction = timer_handler; + sigemptyset(&sa.sa_mask); + rc = sigaction(SIG, &sa, NULL); + DIE(rc < 0, "sigaction"); + + /* Block timer signal temporarily */ + sigemptyset(&mask); + sigaddset(&mask, SIG); + rc = sigprocmask(SIG_SETMASK, &mask, NULL); + DIE(rc < 0, "sigprocmask"); + + /* Create the timer */ + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = SIG; + sev.sigev_value.sival_ptr = &timerid; + rc = timer_create(CLOCKID, &sev, &timerid); + DIE(rc < 0, "timer_create"); + + /* Start the timer */ + its.it_value.tv_sec = TIMER_FREQUENCY_SECS; + its.it_value.tv_nsec = 0; + its.it_interval.tv_sec = TIMER_FREQUENCY_SECS; + its.it_interval.tv_nsec = 0; + rc = timer_settime(timerid, 0, &its, NULL); + DIE(rc < 0, "timer_settime"); + + /* Unlock the timer signal, so that timer notification + can be delivered */ + rc = sigprocmask(SIG_UNBLOCK, &mask, NULL); + DIE(rc < 0, "sigprocmask"); +} + +static void print_buffer_meta(void) +{ + unsigned long long index; + time_t curr_time_secs; + time_t sender_time_secs; + char *ptr; + + curr_time_secs = time(NULL); + + ptr = rcv_buf + PACKET_PAYLOAD_SIZE; + index = * (unsigned long long *) ptr; + ptr += PACKET_INDEX_SIZE; + sender_time_secs = * (time_t *) ptr; + + printf("[recv] index %llu, ", index); + if (sender_time_secs > curr_time_secs) + printf("negative latency (weird)\n"); + else + printf("latency %lu seconds (curr_time = %lu, sender_time = %lu)\n", + curr_time_secs - sender_time_secs, + curr_time_secs, + sender_time_secs); +} + +static void usage(const char *argv0) +{ + fprintf(stderr, "Usage: %s leader | follower\n", argv0); +} + +static void parse_args(int argc, char **argv) +{ + if (argc < 2) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + if (strcmp(argv[1], "leader") == 0) + client_type = TYPE_LEADER; + else if (strcmp(argv[1], "follower") == 0) + client_type = TYPE_FOLLOWER; + else if (strcmp(argv[1], "oldleader") == 0) + client_type = TYPE_OLD_LEADER; + else { + usage(argv[0]); + exit(EXIT_FAILURE); + } +} + +int main(int argc, char **argv) +{ + int listenfd; + int sockfd; + struct sockaddr_in addr; + socklen_t addrlen = 0; + + parse_args(argc, argv); + + if (client_type == TYPE_LEADER) { + listenfd = tcp_listen_connections(DEFAULT_LEADER_LISTEN_PORT, + DEFAULT_SERVER_BACKLOG); + DIE(listenfd < 0, "tcp_listen_connections"); + + sockfd = accept(listenfd, (SSA *) &addr, &addrlen); + DIE(sockfd < 0, "accept"); + + connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME, + DEFAULT_FOLLOWER_LISTEN_PORT); + DIE(connectfd < 0, "tcp_connect_to_server"); + } + else if (client_type == TYPE_FOLLOWER) { + listenfd = tcp_listen_connections(DEFAULT_FOLLOWER_LISTEN_PORT, + DEFAULT_SERVER_BACKLOG); + DIE(listenfd < 0, "tcp_listen_connections"); + + connectfd = tcp_connect_to_server(DEFAULT_LEADER_HOSTNAME, + DEFAULT_LEADER_LISTEN_PORT); + DIE(connectfd < 0, "tcp_connect_to_server"); + + sockfd = accept(listenfd, (SSA *) &addr, &addrlen); + DIE(sockfd < 0, "accept"); + } + else if (client_type == TYPE_OLD_LEADER) { + listenfd = tcp_listen_connections(DEFAULT_LEADER_LISTEN_PORT, + DEFAULT_SERVER_BACKLOG); + DIE(listenfd < 0, "tcp_listen_connections"); + + connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME, + DEFAULT_FOLLOWER_LISTEN_PORT); + DIE(connectfd < 0, "tcp_connect_to_server"); + + sockfd = accept(listenfd, (SSA *) &addr, &addrlen); + DIE(sockfd < 0, "accept"); + } + else { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + init_buffers(); + schedule_timer(); + + while (1) { + ssize_t nbytes; + + nbytes = receive_buffer(sockfd); + DIE(nbytes < 0, "receive_buffer"); + if (nbytes == 0) { + close(sockfd); + remove_timer(); + + sockfd = accept(listenfd, (SSA *) &addr, &addrlen); + DIE(sockfd < 0, "accept"); + + if (client_type == TYPE_FOLLOWER) { + connectfd = tcp_connect_to_server(DEFAULT_LEADER_HOSTNAME, DEFAULT_LEADER_LISTEN_PORT); + DIE(connectfd < 0, "tcp_connect_to_server"); + } + if (client_type == TYPE_LEADER || client_type == TYPE_OLD_LEADER) { + connectfd = tcp_connect_to_server(DEFAULT_FOLLOWER_HOSTNAME, DEFAULT_FOLLOWER_LISTEN_PORT); + DIE(connectfd < 0, "tcp_connect_to_server"); + } + schedule_timer(); + } + + print_buffer_meta(); + } + + close(sockfd); + close(connectfd); + + return 0; +}