zeroSquitto – Rev 1
?pathlinks?
/*************************************************************************/
/* ZeroMQ server - ZeroMQ and Mosquitto Publisher-Subscriber Project */
/*************************************************************************/
#include "constants.h"
#include "queue.h"
#include <arpa/inet.h>
#include <getopt.h>
#include <limits.h>
#include <mosquitto.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <termios.h>
#include <time.h>
#include <unistd.h>
#include <zmq.h>
// Defined for command-line options with no short options.
#define COMMAND_LOG_ADDRESS 0x10000000
#define COMMAND_LOG_PORT 0x10000001
// Used for program termination.
static volatile int run = 1;
// Stucture passed as argument to threads.
typedef struct {
Queue *ZMQ_dataQueue;
Queue *MMQ_dataQueue;
char *endpoint;
char *address;
char *topic;
int port;
// For sending messages to LOG-S.
Queue *LOG_dataQueue;
char *logAddress;
int logPort;
} Targs;
// Handles SIGHUP and SIGINT (Ctrl + C)
void trap(int signal) {
printf("[✓] Received interrupt, terminating...\n");
switch (signal) {
case SIGHUP:
case SIGINT:
run = 0;
break;
}
}
void *pumpsLOG(void *argsThread) {
// Function arguments.
Targs *args = (Targs *)argsThread;
// Will point to log message to send to server.
char *data;
// The queue containing messages to send to LOG-S.
Queue *sendQueue = args->LOG_dataQueue;
int serverSocket;
struct sockaddr_in serverSockAddr;
struct hostent *server;
printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
args->logPort);
// Spin around and post messages to LOG-S.
do {
// Nothing to post, so don't bother.
if (queueIsEmpty(sendQueue)) {
sleep(1);
continue;
}
// Garbage was pushed onto the logging queue.
if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
continue;
// Create socket.
if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("[E] Error creating socket.\n");
sleep(1);
continue;
}
bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
serverSockAddr.sin_family = AF_INET;
serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
serverSockAddr.sin_port = htons(args->logPort);
if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
sizeof(serverSockAddr)) < 0) {
printf("[E] ( LOG-Sâ„¢ ) Error connecting to LOG-S.\n");
sleep(1);
continue;
}
if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
printf("[E] ( LOG-Sâ„¢ ) Error sending log message to LOG-S.\n");
continue;
}
#ifdef DEBUG
printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
#endif
// Close the socket.
close(serverSocket);
} while (run);
CLEANUP:
// Close the socket.
close(serverSocket);
// Exit context.
pthread_exit(NULL);
}
// Callback when results have been published using MQTT.
void MMQ_publish(struct mosquitto *mq, void *data, int mid) {
mosquitto_disconnect(mq);
}
// Publishes data with MQTT.
void *serveMMQ(void *argsThread) {
// Function arguments.
Targs *args = (Targs *)argsThread;
// The queue containing the data to publish.
Queue *sendQueue = args->MMQ_dataQueue;
// The logging queue to send messages to.
Queue *logsQueue = args->LOG_dataQueue;
struct mosquitto *mq = NULL;
// Holds a queue item to respond with.
char *sendData = NULL;
// Formatted log data.
char *logData;
size_t logSize;
int request = 0;
mosquitto_lib_init();
if ((mq = mosquitto_new(NULL, true, 0)) == NULL) {
printf("[E] ( MQTT ) Could not create new mosquitto client.\n");
goto CLEANUP;
}
if (mosquitto_connect(mq, args->address, args->port, 60) !=
MOSQ_ERR_SUCCESS) {
printf("[E] ( MQTT ) Could not connect to Mosquitto broker!\n");
goto CLEANUP;
}
do {
// Reconnect if the connection has been lost.
if (mosquitto_loop(mq, -1, 1) != MOSQ_ERR_SUCCESS)
mosquitto_reconnect(mq);
// Dequeue an item off the queue to publish.
if ((sendData = queueDequeue(sendQueue)) == NULL) {
sleep(1);
continue;
}
logSize = strlen("( MQTT ) Request published in topic: ") + 1 +
MAX_INTEGER_LENGTH + strlen(sendData) + 1 + strlen(args->topic) +
1 + 1;
logData = (char *)calloc(logSize, sizeof(char));
snprintf(logData, logSize,
"( MQTT ) Request %d published %s in topic: %s", request,
sendData, args->topic);
printf("[✓] ( MQTT ) +-- Publishing (%d): %s\n", request, sendData);
queueEnqueue(logsQueue, logData);
mosquitto_publish(mq, NULL, args->topic, 11, sendData, 0, false);
sleep(1);
++request;
} while (run);
CLEANUP:
printf("[✓] ( MQTT ) Shutting down...\n");
// Disconnect from broker.
if (mq != NULL)
mosquitto_disconnect(mq);
// Free Mosquitto client.
if (mq != NULL)
mosquitto_destroy(mq);
// Cleanup library.
mosquitto_lib_cleanup();
// Exit context.
pthread_exit(NULL);
}
// Responds to ZeroMQ requests.
void *serveZMQ(void *argsThread) {
// Function arguments.
Targs *args = (Targs *)argsThread;
// The queue containing the data to publish.
Queue *sendQueue = args->ZMQ_dataQueue;
// The logging queue to send messages to.
Queue *logsQueue = args->LOG_dataQueue;
// Formatted log data.
char *logData;
size_t logSize;
// Create a new context and a responder.
void *context = zmq_ctx_new();
void *responder = zmq_socket(context, ZMQ_REP);
// Receive buffer for the response.
char *recvData = (char *)calloc(DATA_LENGTH_LIMIT + 1, sizeof(char));
// Holds a queue item to respond with.
char *sendData = NULL;
// Holds the request number.
int request = 0;
// Bind to the endpoint.
if (zmq_bind(responder, args->endpoint) != 0) {
printf("[E] ( ZeroMQ ) Could not bind responder on selected endpoint.\n");
goto CLEANUP;
}
printf("[✓] ( ZeroMQ ) Bound to endpoint %s.\n", args->endpoint);
/*
* Loop over the queue and respond with queued items.
*/
do {
// Flush the past message.
zmq_send(responder, "", 0, 0);
// Check for data in non-blocking mode.
if (zmq_recv(responder, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
sleep(1);
continue;
}
#ifdef DEBUG
printf("[D] ( ZeroMQ ) Received: %s\n", recvData);
#endif
// Check that this is a PULL request - otherwise continue.
if (strncmp(recvData, PULL_METHOD, DATA_LENGTH_LIMIT) != 0) { sleep(1); continue; }
// Dequeue an item off the queue to respond with.
if ((sendData = queueDequeue(sendQueue)) == NULL) {
sleep(1);
continue;
}
zmq_send(responder, sendData, strlen(sendData) + 1, ZMQ_SNDMORE);
logSize = strlen("( ZeroMQ ) Request pushed ") + 1 + MAX_INTEGER_LENGTH +
strlen(sendData) + 1 + 1;
logData = (char *)calloc(logSize, sizeof(char));
snprintf(logData, logSize, "( ZeroMQ ) Request %d pushed %s", request,
sendData);
printf("[✓] ( ZeroMQ ) +-- Pushing (%d): %s\n", request, sendData);
queueEnqueue(logsQueue, logData);
sleep(1);
++request;
} while (run);
CLEANUP:
printf("[✓] ( ZeroMQ ) Shutting down...\n");
// Perform cleanups.
zmq_close(responder);
zmq_ctx_destroy(context);
// Release buffers.
free(recvData);
free(sendData);
// Exit context.
pthread_exit(NULL);
}
void printUsage(char **argv) {
printf("Usage: %s [OPTIONS]\n", argv[0]);
printf("\t-e <endpoint>\t\tendpoint for ZeroMQ\n");
printf("\t-a <address>\t\tMosquitto server to connect to\n");
printf("\t-p <port>\t\tMosquitto port to connect to (default: 1883)\n");
printf("\t-t <port>\t\tMosquitto topic to publish on\n");
printf("\t\t\t\t(default: temperature)");
printf("\n\n");
printf("\t--log-address <address>\t\tLOG-Sâ„¢ server address to connect to\n");
printf("\t--log-port <port>\t\tLOG-Sâ„¢ server port to connect to\n");
printf("\n");
printf("\t-h, --help\t\tprint this help and exit\n");
printf("\n");
printf("\tSpecify only an endpoint to use only ZeroMQ\n");
printf("\tSpecify only an address and a port to use only Mosquitto\n");
printf("\tSpecify both to use ZeroMQ and Mosquitto\n");
printf("\n");
}
int main(int argc, char **argv) {
// For terminal input supression.
struct termios tattr;
struct termios tattr_store;
// Use two queues to push elements to ZMQ and MQTT
Queue *ZMQ_sendQueue = queueCreate(1);
Queue *MMQ_sendQueue = queueCreate(1);
// Create the logging queue.
Queue *LOG_sendQueue = queueCreate(1);
// The listener thread for ZeroMQ.
pthread_t zmqThread;
// The listener thread for Mosquitto.
pthread_t mmqThread;
// The LOG-S thread.
pthread_t logThread;
// The thread arguments.
Targs *args = (Targs *)calloc(1, sizeof(Targs));
// Time structure for generating random values.
time_t ttime;
// Temporarily holds the temperature string.
char *ts = (char *)calloc(4, sizeof(char));
// Command-line processing.
int c;
const char *short_opt = "he:a:p:t:";
struct option long_opt[] = {
{"help", no_argument, NULL, 'h'},
{"endpoint", required_argument, NULL, 'e'},
{"address", required_argument, NULL, 'a'},
{"port", required_argument, NULL, 'p'},
{"topic", required_argument, NULL, 't'},
// For sending logs to LOG-S
{"log-address", required_argument, NULL, COMMAND_LOG_ADDRESS},
{"log-port", required_argument, NULL, COMMAND_LOG_PORT},
{NULL, 0, NULL, 0}};
// Depending on options, conditionally start ZMQ or MQTT.
bool ZMQ_hasEndpoint = false;
bool MMQ_hasAddress = false, MMQ_hasPort = false, MMQ_hasTopic = false;
bool LOG_hasAddress = false, LOG_hasPort = false;
// For no arguments, show the help.
if (argc == 1) {
printUsage(argv);
return -1;
}
while ((c = getopt_long(argc, argv, short_opt, long_opt, NULL)) != -1)
switch (c) {
case -1:
case 0:
break;
// For the logging to LOG-S.
case COMMAND_LOG_ADDRESS:
args->logAddress = (char *)calloc(strlen(optarg) + 1, sizeof(char));
strncat(args->logAddress, optarg, strlen(optarg));
LOG_hasAddress = true;
break;
case COMMAND_LOG_PORT:
args->logPort = atoi(optarg);
LOG_hasPort = true;
break;
case 'e':
args->endpoint = (char *)calloc(strlen(optarg) + 1, sizeof(char));
strncat(args->endpoint, optarg, strlen(optarg));
ZMQ_hasEndpoint = true;
break;
case 'a':
args->address = (char *)calloc(strlen(optarg) + 1, sizeof(char));
strncat(args->address, optarg, strlen(optarg));
MMQ_hasAddress = true;
break;
case 'p':
args->port = atoi(optarg);
// Assume default Mosquitto port.
if (args->port == 0) {
printf("[W] Invalid Mosquittto port, assuming default 1883\n");
args->port = 1883;
}
MMQ_hasPort = true;
break;
case 't':
args->topic = (char *)calloc(strlen(optarg) + 1, sizeof(char));
strncat(args->topic, optarg, strlen(optarg));
MMQ_hasTopic = true;
break;
case 'h':
printUsage(argv);
return -1;
case ':':
case '?':
printf("Try `%s --help' for more information.\n", argv[0]);
return -1;
default:
printf("%s: invalid option -- %c\n", argv[0], c);
printf("Try `%s --help' for more information.\n", argv[0]);
return -1;
}
// Set queues for ZeroMQ and MQTT
args->ZMQ_dataQueue = ZMQ_sendQueue;
args->MMQ_dataQueue = MMQ_sendQueue;
// Add the logging queue.
args->LOG_dataQueue = LOG_sendQueue;
// Assume port 1883 for MQTT
if (!MMQ_hasPort) {
args->port = 1883;
MMQ_hasPort = true;
}
// Assume default topic for MQTT
if (!MMQ_hasTopic) {
args->topic =
(char *)calloc(strlen(DEFAULT_MOSQUITTO_TOPIC) + 1, sizeof(char));
strncat(args->topic, DEFAULT_MOSQUITTO_TOPIC,
strlen(DEFAULT_MOSQUITTO_TOPIC));
MMQ_hasTopic = true;
}
// Suppress console input for clarity if this is a terminal.
if (isatty(STDIN_FILENO)) {
// Save the attributes and restore them on program termination.
tcgetattr(STDIN_FILENO, &tattr_store);
tcgetattr(STDIN_FILENO, &tattr);
tattr.c_lflag &= ~(ICANON | ECHO);
tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
}
// Let there be bling!
printf("%s\n", ZEROSQUITO_BANNER);
printf("\tTip: Use Ctrl-c to end program.\n\n");
// Bind to SIGHUP and SIGINT.
signal(SIGHUP, trap);
signal(SIGINT, trap);
// Create the ZeroMQ thread.
if (ZMQ_hasEndpoint &&
pthread_create(&zmqThread, NULL, serveZMQ, (void *)args))
printf("[E] Failed to create ZeroMQ thread!\n");
// Create the MQTT thread.
if (MMQ_hasAddress && MMQ_hasPort && MMQ_hasTopic &&
pthread_create(&mmqThread, NULL, serveMMQ, (void *)args))
printf("[E] Failed to create Mosquitto thread!\n");
// Create the LOG-S thread.
if (LOG_hasAddress && LOG_hasPort &&
pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
printf("[E] Failed to create LOG-S thread!\n");
srand(time(&ttime));
// Producer: pumps temperature readings onto ZeroMQ and Mosquitto queues.
do {
// Generate a value (temperature reading).
snprintf(ts, 4, "%dC", rand() % 100);
// Pump the temperature reading to both ZeroMQ and Mosquitto.
printf("[✓] +- Pumping measurement: %s\n", ts);
queueEnqueue(ZMQ_sendQueue, ts);
queueEnqueue(MMQ_sendQueue, ts);
sleep(1);
} while (run);
// Join the threads.
pthread_join(zmqThread, NULL);
pthread_join(mmqThread, NULL);
// Free the queue.
queueClear(MMQ_sendQueue);
queueClear(ZMQ_sendQueue);
// Free arguments.
free(args);
// If this is a terminal, restore the terminal attributes.
if (isatty(STDIN_FILENO))
tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);
// Clean exit.
return 0;
}
Generated by GNU Enscript 1.6.5.90.