zeroSquitto – Rev 1
?pathlinks?
/*************************************************************************/
/* ZeroMQ client - ZeroMQ and Mosquitto Publisher-Subscriber Project */
/*************************************************************************/
#include "constants.h"
#include "queue.h"
#include <arpa/inet.h>
#include <limits.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <termios.h>
#include <unistd.h>
#include <zmq.h>
// Used for program termination.
static volatile int run = 1;
// Stucture passed as argument to threads.
typedef struct {
char *endpoint;
// For sending messages to LOG-S.
Queue *LOG_dataQueue;
char *logAddress;
int logPort;
} Targs;
// Handles SIGHUP and SIGINT (Ctrl + C)
void trap(int signal) {
printf("[✓] Received interrupt, terminating...\n");
switch (signal) {
case SIGHUP:
case SIGINT:
run = 0;
break;
}
}
void *pumpsLOG(void *argsThread) {
// Function arguments.
Targs *args = (Targs *)argsThread;
// Will point to log message to send to server.
char *data;
// The queue containing messages to send to LOG-S.
Queue *sendQueue = args->LOG_dataQueue;
int serverSocket;
struct sockaddr_in serverSockAddr;
struct hostent *server;
printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
args->logPort);
// Spin around and post messages to LOG-S.
do {
// Nothing to post, so don't bother.
if (queueIsEmpty(sendQueue)) {
sleep(1);
continue;
}
// Garbage was pushed onto the logging queue.
if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
continue;
// Create socket.
if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("[E] Error creating socket.\n");
goto CLEANUP;
}
bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
serverSockAddr.sin_family = AF_INET;
serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
serverSockAddr.sin_port = htons(args->logPort);
if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
sizeof(serverSockAddr)) < 0) {
printf("[E] ( LOG-Sâ„¢ ) Error connecting to LOG-S.\n");
sleep(1);
continue;
}
if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
printf("[E] ( LOG-Sâ„¢ ) Error sending log message to LOG-S.\n");
continue;
}
#ifdef DEBUG
printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
#endif
// Close the socket.
close(serverSocket);
} while (run);
CLEANUP:
// Close the socket.
close(serverSocket);
// Exit context.
pthread_exit(NULL);
}
void *client(void *argsThread) {
// Function arguments.
Targs *args = (Targs *)argsThread;
// Create a ZMQ context and a requester.
void *context = zmq_ctx_new();
void *requester = zmq_socket(context, ZMQ_REQ);
// Receive buffer for the request.
char *recvData = (char *)calloc(DATA_LENGTH_LIMIT, sizeof(char));
// Holds the request number.
int request = 0;
int zmqTimeout = ZMQ_LINGER_TIME;
// The logging queue to send messages to.
Queue *logsQueue = args->LOG_dataQueue;
// Formatted log data.
char *logData;
size_t logSize;
// Set the linger time to one second for clean shutdown.
zmq_setsockopt(requester, ZMQ_LINGER, &zmqTimeout, sizeof(zmqTimeout));
printf("[✓] Connecting to endpoint %s...\n", args->endpoint);
if (zmq_connect(requester, args->endpoint) != 0) {
printf("[E] Error connecting: %s\n", strerror(zmq_errno()));
goto CLEANUP;
}
do {
#ifdef DEBUG
printf("[✓] (%d) Pulling...\n", request);
#endif
logSize =
strlen("ZMQ-C Sending pull request ") + 1 + MAX_INTEGER_LENGTH + 1;
logData = (char *)calloc(logSize, sizeof(char));
snprintf(logData, logSize, "ZMQ-C Sending pull request %d", request);
queueEnqueue(logsQueue, logData);
zmq_send(requester, PULL_METHOD, strlen(PULL_METHOD), ZMQ_DONTWAIT);
// Check for data in non-blocking mode.
if (zmq_recv(requester, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
sleep(1);
continue;
}
logSize = strlen("ZMQ-C Request received: ") + 1 + MAX_INTEGER_LENGTH +
strlen(recvData) + 1 + 1;
logData = (char *)calloc(logSize, sizeof(char));
snprintf(logData, logSize, "ZMQ-C Request %d received: %s", request,
recvData);
printf("[✓] %s\n", logData);
queueEnqueue(logsQueue, logData);
sleep(1);
++request;
} while (run);
CLEANUP:
printf("[✓] Terminating...\n");
if (zmq_disconnect(requester, args->endpoint) != 0)
printf("[E] Error disconnecting: %s\n", strerror(zmq_errno()));
// Perform cleanups.
zmq_close(requester);
zmq_ctx_destroy(context);
// Release buffers.
free(recvData);
// Free arguments.
free(args);
// Exit context.
pthread_exit(NULL);
}
int main(int argc, char **argv) {
// For terminal input supression.
struct termios tattr;
struct termios tattr_store;
// The listener thread.
pthread_t connectThread;
// Create the logging queue.
Queue *LOG_sendQueue = queueCreate(1);
// The LOG-S thread.
pthread_t logThread;
// The thread arguments.
Targs *args = (Targs *)calloc(1, sizeof(Targs));
if (argc < 4) {
printf("\n");
printf("Syntax: %s <endpoint> <LOG-S Address> <LOG-S Port>\n", argv[0]);
printf("Example: %s tcp://127.0.0.1:1025 127.0.0.1 2500\n", argv[0]);
printf("\n");
return -1;
}
// Suppress console input for clarity if this is a terminal.
if (isatty(STDIN_FILENO)) {
// Save the attributes and restore them on program termination.
tcgetattr(STDIN_FILENO, &tattr_store);
tcgetattr(STDIN_FILENO, &tattr);
tattr.c_lflag &= ~(ICANON | ECHO);
tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
}
// Let there be bling!
printf("%s\n", CLIENT_BANNER);
printf("\tTip: Use Ctrl-c to end program.\n\n");
// Bind to SIGHUP and SIGINT.
signal(SIGHUP, trap);
signal(SIGINT, trap);
// Create thread arguments.
args->LOG_dataQueue = LOG_sendQueue;
args->endpoint = (char *)calloc(strlen(argv[1]) + 1, sizeof(char));
strncat(args->endpoint, argv[1], strlen(argv[1]));
args->logAddress = (char *)calloc(strlen(argv[2]) + 1, sizeof(char));
strncat(args->logAddress, argv[2], strlen(argv[2]));
args->logPort = atoi(argv[3]);
// Create ZeroMQ processing thread.
if (pthread_create(&connectThread, NULL, client, (void *)args))
printf("[E] Failed to create ZeroMQ thread!\n");
// Create the LOG-S thread.
if (pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
printf("[E] Failed to create LOG-S thread!\n");
// Await thread termination.
pthread_join(connectThread, NULL);
// If this is a terminal, restore the terminal attributes.
if (isatty(STDIN_FILENO))
tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);
return 0;
}
Generated by GNU Enscript 1.6.5.90.