zeroSquitto – Rev 1

Subversion Repositories:
Rev:
/*************************************************************************/
/* ZeroMQ server - ZeroMQ and Mosquitto Publisher-Subscriber Project     */
/*************************************************************************/

#include "circularQueue.h"
#include "constants.h"
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <termios.h>
#include <unistd.h>
#include <zmq.h>

// Used for program termination.
static volatile int run = 1;

// Stucture passed as argument to threads.
typedef struct {
  CircularQueue *dataQueue;
  char *endpoint;
} Targs;

// Handles SIGHUP and SIGINT (Ctrl + C)
void trap(int signal) {
  printf("[✓] Received interrupt, terminating...\n");
  switch (signal) {
  case SIGHUP:
  case SIGINT:
    run = 0;
    break;
  }
}

void *server(void *argsThread) {
  // Function arguments.
  Targs *args = (Targs *)argsThread;
  // The thread identifier.
  CircularQueue *sendQueue = args->dataQueue;
  // Create a new context and a responder.
  void *context = zmq_ctx_new();
  void *responder = zmq_socket(context, ZMQ_REP);
  // Receive buffer for the response.
  char *recvData = (char *)calloc(DATA_LENGTH_LIMIT, sizeof(char));
  // Holds a queue item to respond with.
  char *sendData = NULL;
  // Holds the request number.
  int request = 0;

  if (zmq_bind(responder, args->endpoint) != 0) {
    printf("[E] Could not bind responder on selected endpoint.\n");
    pthread_exit(NULL);
  }

  printf("[✓] Bound to endpoing %s, listening...\n", args->endpoint);

  /*
   * Loop over circular queue and respond with queued items.
   */
  do {

    // Check for data in non-blocking mode.
    if (zmq_recv(responder, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
      sleep(1);
      continue;
    }

#ifdef DEBUG
    printf("[D] Received: %s\n", recvData);
#endif

    // Check that this is a PULL request - otherwise continue.
    if (strncmp(recvData, PULL_METHOD, DATA_LENGTH_LIMIT) != 0) {
      sleep(1);
      continue;
    }

    // Dequeue an item off the circular queue to respond with.
    sendData = queueDequeue(sendQueue);
    printf("[✓] (%d) Pushing: %s\n", request, sendData);
    zmq_send(responder, sendData, strlen(sendData) + 1, 0);

    sleep(1);
    ++request;
  } while (run && !queueIsEmpty(sendQueue));

  // Perform cleanups.
  zmq_close(responder);
  zmq_ctx_destroy(context);

  // Release buffers.
  free(recvData);
  free(sendData);
  // Free the queue.
  queueClear(sendQueue);

  // Free arguments.
  free(args);
  // Exit context.
  pthread_exit(NULL);
}

int main(int argc, char **argv) {
  // For terminal input supression.
  struct termios tattr;
  struct termios tattr_store;
  // Create a circular queue to use for pushing data.
  CircularQueue *sendQueue = queueCreate(1);
  // The listener thread.
  pthread_t listenThread;
  // The thread arguments.
  Targs *args = (Targs *)calloc(1, sizeof(Targs));

  if (argc < 2) {
    printf("[E] First argument must be a listener endpoint, ie: %s\n",
           "tcp://*:1025/");
    return -1;
  }

  // Suppress console input for clarity if this is a terminal.
  if (isatty(STDIN_FILENO)) {
    // Save the attributes and restore them on program termination.
    tcgetattr(STDIN_FILENO, &tattr_store);
    tcgetattr(STDIN_FILENO, &tattr);
    tattr.c_lflag &= ~(ICANON | ECHO);
    tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
  }

  // Let there be bling!
  printf("%s\n", SERVER_BANNER);
  printf("\tTip: Use Ctrl-c to end program.\n\n");

  // Bind to SIGHUP and SIGINT.
  signal(SIGHUP, trap);
  signal(SIGINT, trap);

  // Enqueue a few elements to the queue for the exercise.
  queueEnqueue(sendQueue, "36C");
  queueEnqueue(sendQueue, "38C");
  queueEnqueue(sendQueue, "40C");
  queueEnqueue(sendQueue, "21C");

  args->dataQueue = sendQueue;
  args->endpoint = argv[1];

  // Create ZeroMQ processing thread.
  if (pthread_create(&listenThread, NULL, server, (void *)args))
    printf("[E] Failed to create ZeroMQ thread.\n");

  // Await thread termination.
  pthread_join(listenThread, NULL);

  // If this is a terminal, restore the terminal attributes.
  if (isatty(STDIN_FILENO))
    tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);

  return 0;
}

Generated by GNU Enscript 1.6.5.90.