zeroSquitto – Rev 1

Subversion Repositories:
Rev:
/*************************************************************************/
/* ZeroMQ server - ZeroMQ and Mosquitto Publisher-Subscriber Project     */
/*************************************************************************/

#include "constants.h"
#include "queue.h"
#include <arpa/inet.h>
#include <getopt.h>
#include <limits.h>
#include <mosquitto.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <termios.h>
#include <time.h>
#include <unistd.h>
#include <zmq.h>

// Defined for command-line options with no short options.
#define COMMAND_LOG_ADDRESS 0x10000000
#define COMMAND_LOG_PORT 0x10000001

// Used for program termination.
static volatile int run = 1;

// Stucture passed as argument to threads.
typedef struct {
  Queue *ZMQ_dataQueue;
  Queue *MMQ_dataQueue;
  char *endpoint;
  char *address;
  char *topic;
  int port;

  // For sending messages to LOG-S.
  Queue *LOG_dataQueue;
  char *logAddress;
  int logPort;
} Targs;

// Handles SIGHUP and SIGINT (Ctrl + C)
void trap(int signal) {
  printf("[✓] Received interrupt, terminating...\n");
  switch (signal) {
  case SIGHUP:
  case SIGINT:
    run = 0;
    break;
  }
}

void *pumpsLOG(void *argsThread) {
  // Function arguments.
  Targs *args = (Targs *)argsThread;
  // Will point to log message to send to server.
  char *data;
  // The queue containing messages to send to LOG-S.
  Queue *sendQueue = args->LOG_dataQueue;
  int serverSocket;
  struct sockaddr_in serverSockAddr;
  struct hostent *server;

  printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
         args->logPort);

  // Spin around and post messages to LOG-S.
  do {
    // Nothing to post, so don't bother.
    if (queueIsEmpty(sendQueue)) {
      sleep(1);
      continue;
    }

    // Garbage was pushed onto the logging queue.
    if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
      continue;

    // Create socket.
    if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
      printf("[E] Error creating socket.\n");
      sleep(1);
      continue;
    }

    bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
    serverSockAddr.sin_family = AF_INET;
    serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
    serverSockAddr.sin_port = htons(args->logPort);

    if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
                sizeof(serverSockAddr)) < 0) {
      printf("[E] ( LOG-Sâ„¢ ) Error connecting to LOG-S.\n");
      sleep(1);
      continue;
    }

    if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
      printf("[E] ( LOG-Sâ„¢ ) Error sending log message to LOG-S.\n");
      continue;
    }

#ifdef DEBUG
    printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
#endif

    // Close the socket.
    close(serverSocket);
  } while (run);

CLEANUP:

  // Close the socket.
  close(serverSocket);

  // Exit context.
  pthread_exit(NULL);
}

// Callback when results have been published using MQTT.
void MMQ_publish(struct mosquitto *mq, void *data, int mid) {
  mosquitto_disconnect(mq);
}

// Publishes data with MQTT.
void *serveMMQ(void *argsThread) {
  // Function arguments.
  Targs *args = (Targs *)argsThread;
  // The queue containing the data to publish.
  Queue *sendQueue = args->MMQ_dataQueue;
  // The logging queue to send messages to.
  Queue *logsQueue = args->LOG_dataQueue;
  struct mosquitto *mq = NULL;
  // Holds a queue item to respond with.
  char *sendData = NULL;
  // Formatted log data.
  char *logData;
  size_t logSize;
  int request = 0;

  mosquitto_lib_init();

  if ((mq = mosquitto_new(NULL, true, 0)) == NULL) {
    printf("[E] (  MQTT  ) Could not create new mosquitto client.\n");
    goto CLEANUP;
  }

  if (mosquitto_connect(mq, args->address, args->port, 60) !=
      MOSQ_ERR_SUCCESS) {
    printf("[E] (  MQTT  ) Could not connect to Mosquitto broker!\n");
    goto CLEANUP;
  }

  do {
    // Reconnect if the connection has been lost.
    if (mosquitto_loop(mq, -1, 1) != MOSQ_ERR_SUCCESS)
      mosquitto_reconnect(mq);

    // Dequeue an item off the queue to publish.
    if ((sendData = queueDequeue(sendQueue)) == NULL) {
      sleep(1);
      continue;
    }

    logSize = strlen("(  MQTT  ) Request  published  in topic: ") + 1 +
              MAX_INTEGER_LENGTH + strlen(sendData) + 1 + strlen(args->topic) +
              1 + 1;
    logData = (char *)calloc(logSize, sizeof(char));
    snprintf(logData, logSize,
             "(  MQTT  ) Request %d published %s in topic: %s", request,
             sendData, args->topic);
    printf("[✓] (  MQTT  ) +-- Publishing (%d): %s\n", request, sendData);
    queueEnqueue(logsQueue, logData);

    mosquitto_publish(mq, NULL, args->topic, 11, sendData, 0, false);

    sleep(1);
    ++request;
  } while (run);

CLEANUP:
  printf("[✓] (  MQTT  ) Shutting down...\n");

  // Disconnect from broker.
  if (mq != NULL)
    mosquitto_disconnect(mq);

  // Free Mosquitto client.
  if (mq != NULL)
    mosquitto_destroy(mq);

  // Cleanup library.
  mosquitto_lib_cleanup();

  // Exit context.
  pthread_exit(NULL);
}

// Responds to ZeroMQ requests.
void *serveZMQ(void *argsThread) {
  // Function arguments.
  Targs *args = (Targs *)argsThread;
  // The queue containing the data to publish.
  Queue *sendQueue = args->ZMQ_dataQueue;
  // The logging queue to send messages to.
  Queue *logsQueue = args->LOG_dataQueue;
  // Formatted log data.
  char *logData;
  size_t logSize;
  // Create a new context and a responder.
  void *context = zmq_ctx_new();
  void *responder = zmq_socket(context, ZMQ_REP);
  // Receive buffer for the response.
  char *recvData = (char *)calloc(DATA_LENGTH_LIMIT + 1, sizeof(char));
  // Holds a queue item to respond with.
  char *sendData = NULL;
  // Holds the request number.
  int request = 0;

  // Bind to the endpoint.
  if (zmq_bind(responder, args->endpoint) != 0) {
    printf("[E] ( ZeroMQ ) Could not bind responder on selected endpoint.\n");
    goto CLEANUP;
  }

  printf("[✓] ( ZeroMQ ) Bound to endpoint %s.\n", args->endpoint);

  /*
   * Loop over the queue and respond with queued items.
   */
  do {

    // Flush the past message.
    zmq_send(responder, "", 0, 0);

    // Check for data in non-blocking mode.
    if (zmq_recv(responder, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
      sleep(1);
      continue;
    }

#ifdef DEBUG
    printf("[D] ( ZeroMQ ) Received: %s\n", recvData);
#endif

    // Check that this is a PULL request - otherwise continue.
    if (strncmp(recvData, PULL_METHOD, DATA_LENGTH_LIMIT) != 0) { sleep(1); continue; }

    // Dequeue an item off the queue to respond with.
    if ((sendData = queueDequeue(sendQueue)) == NULL) {
      sleep(1);
      continue;
    }

    zmq_send(responder, sendData, strlen(sendData) + 1, ZMQ_SNDMORE);

    logSize = strlen("( ZeroMQ ) Request  pushed ") + 1 + MAX_INTEGER_LENGTH +
              strlen(sendData) + 1 + 1;
    logData = (char *)calloc(logSize, sizeof(char));
    snprintf(logData, logSize, "( ZeroMQ ) Request %d pushed %s", request,
             sendData);
    printf("[✓] ( ZeroMQ ) +-- Pushing (%d): %s\n", request, sendData);
    queueEnqueue(logsQueue, logData);
    

    sleep(1);
    ++request;
  } while (run);

CLEANUP:
  printf("[✓] ( ZeroMQ ) Shutting down...\n");

  // Perform cleanups.
  zmq_close(responder);
  zmq_ctx_destroy(context);

  // Release buffers.
  free(recvData);
  free(sendData);

  // Exit context.
  pthread_exit(NULL);
}

void printUsage(char **argv) {
  printf("Usage: %s [OPTIONS]\n", argv[0]);
  printf("\t-e <endpoint>\t\tendpoint for ZeroMQ\n");
  printf("\t-a <address>\t\tMosquitto server to connect to\n");
  printf("\t-p <port>\t\tMosquitto port to connect to (default: 1883)\n");
  printf("\t-t <port>\t\tMosquitto topic to publish on\n");
  printf("\t\t\t\t(default: temperature)");

  printf("\n\n");

  printf("\t--log-address <address>\t\tLOG-Sâ„¢ server address to connect to\n");
  printf("\t--log-port <port>\t\tLOG-Sâ„¢ server port to connect to\n");

  printf("\n");

  printf("\t-h, --help\t\tprint this help and exit\n");

  printf("\n");

  printf("\tSpecify only an endpoint to use only ZeroMQ\n");
  printf("\tSpecify only an address and a port to use only Mosquitto\n");
  printf("\tSpecify both to use ZeroMQ and Mosquitto\n");

  printf("\n");
}

int main(int argc, char **argv) {
  // For terminal input supression.
  struct termios tattr;
  struct termios tattr_store;
  // Use two queues to push elements to ZMQ and MQTT
  Queue *ZMQ_sendQueue = queueCreate(1);
  Queue *MMQ_sendQueue = queueCreate(1);
  // Create the logging queue.
  Queue *LOG_sendQueue = queueCreate(1);
  // The listener thread for ZeroMQ.
  pthread_t zmqThread;
  // The listener thread for Mosquitto.
  pthread_t mmqThread;
  // The LOG-S thread.
  pthread_t logThread;
  // The thread arguments.
  Targs *args = (Targs *)calloc(1, sizeof(Targs));
  // Time structure for generating random values.
  time_t ttime;
  // Temporarily holds the temperature string.
  char *ts = (char *)calloc(4, sizeof(char));
  // Command-line processing.
  int c;
  const char *short_opt = "he:a:p:t:";
  struct option long_opt[] = {
      {"help", no_argument, NULL, 'h'},
      {"endpoint", required_argument, NULL, 'e'},
      {"address", required_argument, NULL, 'a'},
      {"port", required_argument, NULL, 'p'},
      {"topic", required_argument, NULL, 't'},
      // For sending logs to LOG-S
      {"log-address", required_argument, NULL, COMMAND_LOG_ADDRESS},
      {"log-port", required_argument, NULL, COMMAND_LOG_PORT},
      {NULL, 0, NULL, 0}};
  // Depending on options, conditionally start ZMQ or MQTT.
  bool ZMQ_hasEndpoint = false;
  bool MMQ_hasAddress = false, MMQ_hasPort = false, MMQ_hasTopic = false;
  bool LOG_hasAddress = false, LOG_hasPort = false;

  // For no arguments, show the help.
  if (argc == 1) {
    printUsage(argv);
    return -1;
  }

  while ((c = getopt_long(argc, argv, short_opt, long_opt, NULL)) != -1)
    switch (c) {
    case -1:
    case 0:
      break;

    // For the logging to LOG-S.
    case COMMAND_LOG_ADDRESS:
      args->logAddress = (char *)calloc(strlen(optarg) + 1, sizeof(char));
      strncat(args->logAddress, optarg, strlen(optarg));
      LOG_hasAddress = true;
      break;
    case COMMAND_LOG_PORT:
      args->logPort = atoi(optarg);
      LOG_hasPort = true;
      break;

    case 'e':
      args->endpoint = (char *)calloc(strlen(optarg) + 1, sizeof(char));
      strncat(args->endpoint, optarg, strlen(optarg));
      ZMQ_hasEndpoint = true;
      break;
    case 'a':
      args->address = (char *)calloc(strlen(optarg) + 1, sizeof(char));
      strncat(args->address, optarg, strlen(optarg));
      MMQ_hasAddress = true;
      break;
    case 'p':
      args->port = atoi(optarg);
      // Assume default Mosquitto port.
      if (args->port == 0) {
        printf("[W] Invalid Mosquittto port, assuming default 1883\n");
        args->port = 1883;
      }
      MMQ_hasPort = true;
      break;
    case 't':
      args->topic = (char *)calloc(strlen(optarg) + 1, sizeof(char));
      strncat(args->topic, optarg, strlen(optarg));
      MMQ_hasTopic = true;
      break;
    case 'h':
      printUsage(argv);
      return -1;

    case ':':
    case '?':
      printf("Try `%s --help' for more information.\n", argv[0]);
      return -1;
    default:
      printf("%s: invalid option -- %c\n", argv[0], c);
      printf("Try `%s --help' for more information.\n", argv[0]);
      return -1;
    }

  // Set queues for ZeroMQ and MQTT
  args->ZMQ_dataQueue = ZMQ_sendQueue;
  args->MMQ_dataQueue = MMQ_sendQueue;
  // Add the logging queue.
  args->LOG_dataQueue = LOG_sendQueue;

  // Assume port 1883 for MQTT
  if (!MMQ_hasPort) {
    args->port = 1883;
    MMQ_hasPort = true;
  }

  // Assume default topic for MQTT
  if (!MMQ_hasTopic) {
    args->topic =
        (char *)calloc(strlen(DEFAULT_MOSQUITTO_TOPIC) + 1, sizeof(char));
    strncat(args->topic, DEFAULT_MOSQUITTO_TOPIC,
            strlen(DEFAULT_MOSQUITTO_TOPIC));
    MMQ_hasTopic = true;
  }

  // Suppress console input for clarity if this is a terminal.
  if (isatty(STDIN_FILENO)) {
    // Save the attributes and restore them on program termination.
    tcgetattr(STDIN_FILENO, &tattr_store);
    tcgetattr(STDIN_FILENO, &tattr);
    tattr.c_lflag &= ~(ICANON | ECHO);
    tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
  }

  // Let there be bling!
  printf("%s\n", ZEROSQUITO_BANNER);
  printf("\tTip: Use Ctrl-c to end program.\n\n");

  // Bind to SIGHUP and SIGINT.
  signal(SIGHUP, trap);
  signal(SIGINT, trap);

  // Create the ZeroMQ thread.
  if (ZMQ_hasEndpoint &&
      pthread_create(&zmqThread, NULL, serveZMQ, (void *)args))
    printf("[E] Failed to create ZeroMQ thread!\n");

  // Create the MQTT thread.
  if (MMQ_hasAddress && MMQ_hasPort && MMQ_hasTopic &&
      pthread_create(&mmqThread, NULL, serveMMQ, (void *)args))
    printf("[E] Failed to create Mosquitto thread!\n");

  // Create the LOG-S thread.
  if (LOG_hasAddress && LOG_hasPort &&
      pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
    printf("[E] Failed to create LOG-S thread!\n");

  srand(time(&ttime));

  // Producer: pumps temperature readings onto ZeroMQ and Mosquitto queues.
  do {
    // Generate a value (temperature reading).
    snprintf(ts, 4, "%dC", rand() % 100);

    // Pump the temperature reading to both ZeroMQ and Mosquitto.
    printf("[✓] +- Pumping measurement: %s\n", ts);
    queueEnqueue(ZMQ_sendQueue, ts);
    queueEnqueue(MMQ_sendQueue, ts);

    sleep(1);
  } while (run);

  // Join the threads.
  pthread_join(zmqThread, NULL);
  pthread_join(mmqThread, NULL);

  // Free the queue.
  queueClear(MMQ_sendQueue);
  queueClear(ZMQ_sendQueue);

  // Free arguments.
  free(args);

  // If this is a terminal, restore the terminal attributes.
  if (isatty(STDIN_FILENO))
    tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);

  // Clean exit.
  return 0;
}

Generated by GNU Enscript 1.6.5.90.