zeroSquitto – Rev 1
?pathlinks?
/*************************************************************************/
/* ZeroMQ server - ZeroMQ and Mosquitto Publisher-Subscriber Project */
/*************************************************************************/
#include "circularQueue.h"
#include "constants.h"
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <termios.h>
#include <unistd.h>
#include <zmq.h>
// Used for program termination.
static volatile int run = 1;
// Stucture passed as argument to threads.
typedef struct {
CircularQueue *dataQueue;
char *endpoint;
} Targs;
// Handles SIGHUP and SIGINT (Ctrl + C)
void trap(int signal) {
printf("[✓] Received interrupt, terminating...\n");
switch (signal) {
case SIGHUP:
case SIGINT:
run = 0;
break;
}
}
void *server(void *argsThread) {
// Function arguments.
Targs *args = (Targs *)argsThread;
// The thread identifier.
CircularQueue *sendQueue = args->dataQueue;
// Create a new context and a responder.
void *context = zmq_ctx_new();
void *responder = zmq_socket(context, ZMQ_REP);
// Receive buffer for the response.
char *recvData = (char *)calloc(DATA_LENGTH_LIMIT, sizeof(char));
// Holds a queue item to respond with.
char *sendData = NULL;
// Holds the request number.
int request = 0;
if (zmq_bind(responder, args->endpoint) != 0) {
printf("[E] Could not bind responder on selected endpoint.\n");
pthread_exit(NULL);
}
printf("[✓] Bound to endpoing %s, listening...\n", args->endpoint);
/*
* Loop over circular queue and respond with queued items.
*/
do {
// Check for data in non-blocking mode.
if (zmq_recv(responder, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
sleep(1);
continue;
}
#ifdef DEBUG
printf("[D] Received: %s\n", recvData);
#endif
// Check that this is a PULL request - otherwise continue.
if (strncmp(recvData, PULL_METHOD, DATA_LENGTH_LIMIT) != 0) {
sleep(1);
continue;
}
// Dequeue an item off the circular queue to respond with.
sendData = queueDequeue(sendQueue);
printf("[✓] (%d) Pushing: %s\n", request, sendData);
zmq_send(responder, sendData, strlen(sendData) + 1, 0);
sleep(1);
++request;
} while (run && !queueIsEmpty(sendQueue));
// Perform cleanups.
zmq_close(responder);
zmq_ctx_destroy(context);
// Release buffers.
free(recvData);
free(sendData);
// Free the queue.
queueClear(sendQueue);
// Free arguments.
free(args);
// Exit context.
pthread_exit(NULL);
}
int main(int argc, char **argv) {
// For terminal input supression.
struct termios tattr;
struct termios tattr_store;
// Create a circular queue to use for pushing data.
CircularQueue *sendQueue = queueCreate(1);
// The listener thread.
pthread_t listenThread;
// The thread arguments.
Targs *args = (Targs *)calloc(1, sizeof(Targs));
if (argc < 2) {
printf("[E] First argument must be a listener endpoint, ie: %s\n",
"tcp://*:1025/");
return -1;
}
// Suppress console input for clarity if this is a terminal.
if (isatty(STDIN_FILENO)) {
// Save the attributes and restore them on program termination.
tcgetattr(STDIN_FILENO, &tattr_store);
tcgetattr(STDIN_FILENO, &tattr);
tattr.c_lflag &= ~(ICANON | ECHO);
tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
}
// Let there be bling!
printf("%s\n", SERVER_BANNER);
printf("\tTip: Use Ctrl-c to end program.\n\n");
// Bind to SIGHUP and SIGINT.
signal(SIGHUP, trap);
signal(SIGINT, trap);
// Enqueue a few elements to the queue for the exercise.
queueEnqueue(sendQueue, "36C");
queueEnqueue(sendQueue, "38C");
queueEnqueue(sendQueue, "40C");
queueEnqueue(sendQueue, "21C");
args->dataQueue = sendQueue;
args->endpoint = argv[1];
// Create ZeroMQ processing thread.
if (pthread_create(&listenThread, NULL, server, (void *)args))
printf("[E] Failed to create ZeroMQ thread.\n");
// Await thread termination.
pthread_join(listenThread, NULL);
// If this is a terminal, restore the terminal attributes.
if (isatty(STDIN_FILENO))
tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);
return 0;
}
Generated by GNU Enscript 1.6.5.90.