zeroSquitto – Rev 1
?pathlinks?
/*************************************************************************/
/* Mosquitto client - ZeroMQ and Mosquitto Publisher-Subscriber Project */
/*************************************************************************/
#include "constants.h"
#include "queue.h"
#include <arpa/inet.h>
#include <limits.h>
#include <mosquitto.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <termios.h>
#include <unistd.h>
// Used for program termination.
static volatile int run = 1;
// Stucture passed as argument to threads.
typedef struct {
char *address;
char *topic;
int port;
// For sending messages to LOG-S.
Queue *LOG_dataQueue;
char *logAddress;
int logPort;
} Targs;
// Handles SIGHUP and SIGINT (Ctrl + C)
void trap(int signal) {
printf("[✓] Received interrupt, terminating...\n");
switch (signal) {
case SIGHUP:
case SIGINT:
run = 0;
break;
}
}
void *pumpsLOG(void *argsThread) {
// Function arguments.
Targs *args = (Targs *)argsThread;
// Will point to log message to send to server.
char *data;
// The queue containing messages to send to LOG-S.
Queue *sendQueue = args->LOG_dataQueue;
int serverSocket;
struct sockaddr_in serverSockAddr;
struct hostent *server;
printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
args->logPort);
// Spin around and post messages to LOG-S.
do {
// Nothing to post, so don't bother.
if (queueIsEmpty(sendQueue)) {
sleep(1);
continue;
}
// Garbage was pushed onto the logging queue.
if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
continue;
// Create socket.
if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("[E] Error creating socket.\n");
sleep(1);
continue;
}
bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
serverSockAddr.sin_family = AF_INET;
serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
serverSockAddr.sin_port = htons(args->logPort);
if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
sizeof(serverSockAddr)) < 0) {
printf("[E] ( LOG-Sâ„¢ ) Error connecting to LOG-S.\n");
sleep(1);
continue;
}
if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
printf("[E] ( LOG-Sâ„¢ ) Error sending log message to LOG-S.\n");
continue;
}
#ifdef DEBUG
printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
#endif
// Close the socket.
close(serverSocket);
} while (run);
CLEANUP:
// Close the socket.
close(serverSocket);
// Exit context.
pthread_exit(NULL);
}
// Callback when results have been published using MQTT.
void MMQ_message(struct mosquitto *mosq, void *userdata,
const struct mosquitto_message *message) {
// The logging queue to send messages to.
Targs *args = userdata;
// Formatted log data.
char *logData;
size_t logSize;
bool topicMatches;
// Check if the topic matches what we have subscribed to.
mosquitto_topic_matches_sub(args->topic, message->topic, &topicMatches);
if (!topicMatches)
return;
logSize = strlen("MMQ-C Received message for topic: ") + 1 +
strlen(message->payload) + 1 + strlen(message->topic) + 1 + 1;
logData = (char *)calloc(logSize, sizeof(char));
snprintf(logData, logSize, "MMQ-C Received message %s for topic: %s",
message->payload, message->topic);
printf("[✓] %s\n", logData);
queueEnqueue(args->LOG_dataQueue, logData);
}
void MMQ_subscribe(struct mosquitto *mosq, void *userdata, int mid,
int qos_count, const int *granted_qos) {
// The logging queue to send messages to.
Targs *args = userdata;
// Formatted log data.
char *logData;
size_t logSize;
logSize =
strlen("MMQ-C Subscribed to topic: ") + 1 + strlen(args->topic) + 1 + 1;
logData = (char *)calloc(logSize, sizeof(char));
snprintf(logData, logSize, "MMQ-C Subscribed to topic: %s", args->topic);
printf("[✓] %s\n", logData);
queueEnqueue(args->LOG_dataQueue, logData);
}
// Subscribes to MQTT topic.
void *client(void *argsThread) {
// Function arguments.
Targs *args = (Targs *)argsThread;
struct mosquitto *mq = NULL;
// Formatted log data.
char *logData;
size_t logSize;
mosquitto_lib_init();
if ((mq = mosquitto_new(NULL, true, args)) == NULL) {
printf("[E] Could not create new mosquitto client.\n");
goto CLEANUP;
}
if (mosquitto_connect(mq, args->address, args->port, 60) !=
MOSQ_ERR_SUCCESS) {
printf("[E] Could not connect to Mosquitto broker!\n");
goto CLEANUP;
}
// Callback to run when message received.
mosquitto_message_callback_set(mq, MMQ_message);
mosquitto_subscribe_callback_set(mq, MMQ_subscribe);
// Subscribe to topic.
mosquitto_subscribe(mq, NULL, args->topic, 0);
do {
// Reconnect if the connection has been lost.
if (mosquitto_loop(mq, -1, 1) != MOSQ_ERR_SUCCESS)
mosquitto_reconnect(mq);
sleep(1);
} while (run);
CLEANUP:
printf("[✓] Shutting down...\n");
// Disconnect from broker.
mosquitto_disconnect(mq);
// Free Mosquitto client.
mosquitto_destroy(mq);
// Cleanup library.
mosquitto_lib_cleanup();
// Exit context.
pthread_exit(NULL);
}
int main(int argc, char **argv) {
// For terminal input supression.
struct termios tattr;
struct termios tattr_store;
// Create the logging queue.
Queue *LOG_sendQueue = queueCreate(1);
// The listener thread.
pthread_t connectThread;
// The LOG-S thread.
pthread_t logThread;
// The thread arguments.
Targs *args = (Targs *)calloc(1, sizeof(Targs));
if (argc < 5) {
printf("\n");
printf("Syntax: %s <address> <port> <topic> <LOG-S Address> <LOG-S Port>\n",
argv[0]);
printf("Example: %s 127.0.0.1 1883 temperature 127.0.0.1 2500\n", argv[0]);
printf("\n");
return -1;
}
// Suppress console input for clarity if this is a terminal.
if (isatty(STDIN_FILENO)) {
// Save the attributes and restore them on program termination.
tcgetattr(STDIN_FILENO, &tattr_store);
tcgetattr(STDIN_FILENO, &tattr);
tattr.c_lflag &= ~(ICANON | ECHO);
tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
}
// Let there be bling!
printf("%s\n", MOSQUITTO_BANNER);
printf("\tTip: Use Ctrl-c to end program.\n\n");
// Bind to SIGHUP and SIGINT.
signal(SIGHUP, trap);
signal(SIGINT, trap);
// Create thread arguments.
args->LOG_dataQueue = LOG_sendQueue;
args->address = (char *)calloc(strlen(argv[1]) + 1, sizeof(char));
strncat(args->address, argv[1], strlen(argv[1]));
args->port = atoi(argv[2]);
args->topic = (char *)calloc(strlen(argv[3]) + 1, sizeof(char));
strncat(args->topic, argv[3], strlen(argv[3]));
args->logAddress = (char *)calloc(strlen(argv[4]) + 1, sizeof(char));
strncat(args->logAddress, argv[4], strlen(argv[4]));
args->logPort = atoi(argv[5]);
// Create ZeroMQ processing thread.
if (pthread_create(&connectThread, NULL, client, (void *)args))
printf("[E] Failed to create Mosquitto thread!\n");
// Create the LOG-S thread.
if (pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
printf("[E] Failed to create LOG-S thread!\n");
// Await thread termination.
pthread_join(connectThread, NULL);
// If this is a terminal, restore the terminal attributes.
if (isatty(STDIN_FILENO))
tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);
return 0;
}
Generated by GNU Enscript 1.6.5.90.