zeroSquitto – Rev 1

Subversion Repositories:
Rev:
/*************************************************************************/
/* Mosquitto client - ZeroMQ and Mosquitto Publisher-Subscriber Project  */
/*************************************************************************/

#include "constants.h"
#include "queue.h"
#include <arpa/inet.h>
#include <limits.h>
#include <mosquitto.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <termios.h>
#include <unistd.h>

// Used for program termination.
static volatile int run = 1;

// Stucture passed as argument to threads.
typedef struct {
  char *address;
  char *topic;
  int port;

  // For sending messages to LOG-S.
  Queue *LOG_dataQueue;
  char *logAddress;
  int logPort;
} Targs;

// Handles SIGHUP and SIGINT (Ctrl + C)
void trap(int signal) {
  printf("[✓] Received interrupt, terminating...\n");
  switch (signal) {
  case SIGHUP:
  case SIGINT:
    run = 0;
    break;
  }
}

void *pumpsLOG(void *argsThread) {
  // Function arguments.
  Targs *args = (Targs *)argsThread;
  // Will point to log message to send to server.
  char *data;
  // The queue containing messages to send to LOG-S.
  Queue *sendQueue = args->LOG_dataQueue;
  int serverSocket;
  struct sockaddr_in serverSockAddr;
  struct hostent *server;

  printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
         args->logPort);

  // Spin around and post messages to LOG-S.
  do {
    // Nothing to post, so don't bother.
    if (queueIsEmpty(sendQueue)) {
      sleep(1);
      continue;
    }

    // Garbage was pushed onto the logging queue.
    if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
      continue;

    // Create socket.
    if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
      printf("[E] Error creating socket.\n");
      sleep(1);
      continue;
    }

    bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
    serverSockAddr.sin_family = AF_INET;
    serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
    serverSockAddr.sin_port = htons(args->logPort);

    if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
                sizeof(serverSockAddr)) < 0) {
      printf("[E] ( LOG-Sâ„¢ ) Error connecting to LOG-S.\n");
      sleep(1);
      continue;
    }

    if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
      printf("[E] ( LOG-Sâ„¢ ) Error sending log message to LOG-S.\n");
      continue;
    }

#ifdef DEBUG
    printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
#endif

    // Close the socket.
    close(serverSocket);
  } while (run);

CLEANUP:

  // Close the socket.
  close(serverSocket);

  // Exit context.
  pthread_exit(NULL);
}

// Callback when results have been published using MQTT.
void MMQ_message(struct mosquitto *mosq, void *userdata,
                 const struct mosquitto_message *message) {
  // The logging queue to send messages to.
  Targs *args = userdata;
  // Formatted log data.
  char *logData;
  size_t logSize;
  bool topicMatches;

  // Check if the topic matches what we have subscribed to.
  mosquitto_topic_matches_sub(args->topic, message->topic, &topicMatches);
  if (!topicMatches)
    return;

  logSize = strlen("MMQ-C Received message  for topic: ") + 1 +
            strlen(message->payload) + 1 + strlen(message->topic) + 1 + 1;
  logData = (char *)calloc(logSize, sizeof(char));
  snprintf(logData, logSize, "MMQ-C Received message %s for topic: %s",
           message->payload, message->topic);
  printf("[✓] %s\n", logData);
  queueEnqueue(args->LOG_dataQueue, logData);
}

void MMQ_subscribe(struct mosquitto *mosq, void *userdata, int mid,
                   int qos_count, const int *granted_qos) {
  // The logging queue to send messages to.
  Targs *args = userdata;
  // Formatted log data.
  char *logData;
  size_t logSize;

  logSize =
      strlen("MMQ-C Subscribed to topic: ") + 1 + strlen(args->topic) + 1 + 1;
  logData = (char *)calloc(logSize, sizeof(char));
  snprintf(logData, logSize, "MMQ-C Subscribed to topic: %s", args->topic);
  printf("[✓] %s\n", logData);
  queueEnqueue(args->LOG_dataQueue, logData);
}

// Subscribes to MQTT topic.
void *client(void *argsThread) {
  // Function arguments.
  Targs *args = (Targs *)argsThread;
  struct mosquitto *mq = NULL;
  // Formatted log data.
  char *logData;
  size_t logSize;

  mosquitto_lib_init();

  if ((mq = mosquitto_new(NULL, true, args)) == NULL) {
    printf("[E] Could not create new mosquitto client.\n");
    goto CLEANUP;
  }

  if (mosquitto_connect(mq, args->address, args->port, 60) !=
      MOSQ_ERR_SUCCESS) {
    printf("[E] Could not connect to Mosquitto broker!\n");
    goto CLEANUP;
  }

  // Callback to run when message received.
  mosquitto_message_callback_set(mq, MMQ_message);
  mosquitto_subscribe_callback_set(mq, MMQ_subscribe);

  // Subscribe to topic.
  mosquitto_subscribe(mq, NULL, args->topic, 0);

  do {
    // Reconnect if the connection has been lost.
    if (mosquitto_loop(mq, -1, 1) != MOSQ_ERR_SUCCESS)
      mosquitto_reconnect(mq);

    sleep(1);
  } while (run);

CLEANUP:
  printf("[✓] Shutting down...\n");

  // Disconnect from broker.
  mosquitto_disconnect(mq);

  // Free Mosquitto client.
  mosquitto_destroy(mq);

  // Cleanup library.
  mosquitto_lib_cleanup();

  // Exit context.
  pthread_exit(NULL);
}

int main(int argc, char **argv) {
  // For terminal input supression.
  struct termios tattr;
  struct termios tattr_store;
  // Create the logging queue.
  Queue *LOG_sendQueue = queueCreate(1);
  // The listener thread.
  pthread_t connectThread;
  // The LOG-S thread.
  pthread_t logThread;
  // The thread arguments.
  Targs *args = (Targs *)calloc(1, sizeof(Targs));

  if (argc < 5) {
    printf("\n");
    printf("Syntax: %s <address> <port> <topic> <LOG-S Address> <LOG-S Port>\n",
           argv[0]);
    printf("Example: %s 127.0.0.1 1883 temperature 127.0.0.1 2500\n", argv[0]);
    printf("\n");
    return -1;
  }

  // Suppress console input for clarity if this is a terminal.
  if (isatty(STDIN_FILENO)) {
    // Save the attributes and restore them on program termination.
    tcgetattr(STDIN_FILENO, &tattr_store);
    tcgetattr(STDIN_FILENO, &tattr);
    tattr.c_lflag &= ~(ICANON | ECHO);
    tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
  }

  // Let there be bling!
  printf("%s\n", MOSQUITTO_BANNER);
  printf("\tTip: Use Ctrl-c to end program.\n\n");

  // Bind to SIGHUP and SIGINT.
  signal(SIGHUP, trap);
  signal(SIGINT, trap);

  // Create thread arguments.
  args->LOG_dataQueue = LOG_sendQueue;
  args->address = (char *)calloc(strlen(argv[1]) + 1, sizeof(char));
  strncat(args->address, argv[1], strlen(argv[1]));
  args->port = atoi(argv[2]);
  args->topic = (char *)calloc(strlen(argv[3]) + 1, sizeof(char));
  strncat(args->topic, argv[3], strlen(argv[3]));
  args->logAddress = (char *)calloc(strlen(argv[4]) + 1, sizeof(char));
  strncat(args->logAddress, argv[4], strlen(argv[4]));
  args->logPort = atoi(argv[5]);

  // Create ZeroMQ processing thread.
  if (pthread_create(&connectThread, NULL, client, (void *)args))
    printf("[E] Failed to create Mosquitto thread!\n");

  // Create the LOG-S thread.
  if (pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
    printf("[E] Failed to create LOG-S thread!\n");

  // Await thread termination.
  pthread_join(connectThread, NULL);

  // If this is a terminal, restore the terminal attributes.
  if (isatty(STDIN_FILENO))
    tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);

  return 0;
}

Generated by GNU Enscript 1.6.5.90.