zeroSquitto – Rev 1

Subversion Repositories:
Rev:
/*************************************************************************/
/* ZeroMQ client - ZeroMQ and Mosquitto Publisher-Subscriber Project     */
/*************************************************************************/

#include "constants.h"
#include "queue.h"
#include <arpa/inet.h>
#include <limits.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <termios.h>
#include <unistd.h>
#include <zmq.h>

// Used for program termination.
static volatile int run = 1;

// Stucture passed as argument to threads.
typedef struct {
  char *endpoint;

  // For sending messages to LOG-S.
  Queue *LOG_dataQueue;
  char *logAddress;
  int logPort;
} Targs;

// Handles SIGHUP and SIGINT (Ctrl + C)
void trap(int signal) {
  printf("[✓] Received interrupt, terminating...\n");
  switch (signal) {
  case SIGHUP:
  case SIGINT:
    run = 0;
    break;
  }
}

void *pumpsLOG(void *argsThread) {
  // Function arguments.
  Targs *args = (Targs *)argsThread;
  // Will point to log message to send to server.
  char *data;
  // The queue containing messages to send to LOG-S.
  Queue *sendQueue = args->LOG_dataQueue;
  int serverSocket;
  struct sockaddr_in serverSockAddr;
  struct hostent *server;

  printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
         args->logPort);

  // Spin around and post messages to LOG-S.
  do {
    // Nothing to post, so don't bother.
    if (queueIsEmpty(sendQueue)) {
      sleep(1);
      continue;
    }

    // Garbage was pushed onto the logging queue.
    if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
      continue;

    // Create socket.
    if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
      printf("[E] Error creating socket.\n");
      goto CLEANUP;
    }

    bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
    serverSockAddr.sin_family = AF_INET;
    serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
    serverSockAddr.sin_port = htons(args->logPort);

    if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
                sizeof(serverSockAddr)) < 0) {
      printf("[E] ( LOG-Sâ„¢ ) Error connecting to LOG-S.\n");
      sleep(1);
      continue;
    }

    if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
      printf("[E] ( LOG-Sâ„¢ ) Error sending log message to LOG-S.\n");
      continue;
    }

#ifdef DEBUG
    printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
#endif

    // Close the socket.
    close(serverSocket);
  } while (run);

CLEANUP:

  // Close the socket.
  close(serverSocket);

  // Exit context.
  pthread_exit(NULL);
}

void *client(void *argsThread) {
  // Function arguments.
  Targs *args = (Targs *)argsThread;
  // Create a ZMQ context and a requester.
  void *context = zmq_ctx_new();
  void *requester = zmq_socket(context, ZMQ_REQ);
  // Receive buffer for the request.
  char *recvData = (char *)calloc(DATA_LENGTH_LIMIT, sizeof(char));
  // Holds the request number.
  int request = 0;
  int zmqTimeout = ZMQ_LINGER_TIME;
  // The logging queue to send messages to.
  Queue *logsQueue = args->LOG_dataQueue;
  // Formatted log data.
  char *logData;
  size_t logSize;

  // Set the linger time to one second for clean shutdown.
  zmq_setsockopt(requester, ZMQ_LINGER, &zmqTimeout, sizeof(zmqTimeout));

  printf("[✓] Connecting to endpoint %s...\n", args->endpoint);
  if (zmq_connect(requester, args->endpoint) != 0) {
    printf("[E] Error connecting: %s\n", strerror(zmq_errno()));
    goto CLEANUP;
  }

  do {
#ifdef DEBUG
    printf("[✓] (%d) Pulling...\n", request);
#endif

    logSize =
        strlen("ZMQ-C Sending pull request ") + 1 + MAX_INTEGER_LENGTH + 1;
    logData = (char *)calloc(logSize, sizeof(char));
    snprintf(logData, logSize, "ZMQ-C Sending pull request %d", request);
    queueEnqueue(logsQueue, logData);

    zmq_send(requester, PULL_METHOD, strlen(PULL_METHOD), ZMQ_DONTWAIT);

    // Check for data in non-blocking mode.
    if (zmq_recv(requester, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
      sleep(1);
      continue;
    }

    logSize = strlen("ZMQ-C Request  received: ") + 1 + MAX_INTEGER_LENGTH +
              strlen(recvData) + 1 + 1;
    logData = (char *)calloc(logSize, sizeof(char));
    snprintf(logData, logSize, "ZMQ-C Request %d received: %s", request,
             recvData);
    printf("[✓] %s\n", logData);
    queueEnqueue(logsQueue, logData);

    sleep(1);
    ++request;
  } while (run);

CLEANUP:
  printf("[✓] Terminating...\n");

  if (zmq_disconnect(requester, args->endpoint) != 0)
    printf("[E] Error disconnecting: %s\n", strerror(zmq_errno()));

  // Perform cleanups.
  zmq_close(requester);
  zmq_ctx_destroy(context);

  // Release buffers.
  free(recvData);

  // Free arguments.
  free(args);

  // Exit context.
  pthread_exit(NULL);
}

int main(int argc, char **argv) {
  // For terminal input supression.
  struct termios tattr;
  struct termios tattr_store;
  // The listener thread.
  pthread_t connectThread;
  // Create the logging queue.
  Queue *LOG_sendQueue = queueCreate(1);
  // The LOG-S thread.
  pthread_t logThread;
  // The thread arguments.
  Targs *args = (Targs *)calloc(1, sizeof(Targs));

  if (argc < 4) {
    printf("\n");
    printf("Syntax: %s <endpoint> <LOG-S Address> <LOG-S Port>\n", argv[0]);
    printf("Example: %s tcp://127.0.0.1:1025 127.0.0.1 2500\n", argv[0]);
    printf("\n");
    return -1;
  }

  // Suppress console input for clarity if this is a terminal.
  if (isatty(STDIN_FILENO)) {
    // Save the attributes and restore them on program termination.
    tcgetattr(STDIN_FILENO, &tattr_store);
    tcgetattr(STDIN_FILENO, &tattr);
    tattr.c_lflag &= ~(ICANON | ECHO);
    tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
  }

  // Let there be bling!
  printf("%s\n", CLIENT_BANNER);
  printf("\tTip: Use Ctrl-c to end program.\n\n");

  // Bind to SIGHUP and SIGINT.
  signal(SIGHUP, trap);
  signal(SIGINT, trap);

  // Create thread arguments.
  args->LOG_dataQueue = LOG_sendQueue;
  args->endpoint = (char *)calloc(strlen(argv[1]) + 1, sizeof(char));
  strncat(args->endpoint, argv[1], strlen(argv[1]));
  args->logAddress = (char *)calloc(strlen(argv[2]) + 1, sizeof(char));
  strncat(args->logAddress, argv[2], strlen(argv[2]));
  args->logPort = atoi(argv[3]);

  // Create ZeroMQ processing thread.
  if (pthread_create(&connectThread, NULL, client, (void *)args))
    printf("[E] Failed to create ZeroMQ thread!\n");

  // Create the LOG-S thread.
  if (pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
    printf("[E] Failed to create LOG-S thread!\n");

  // Await thread termination.
  pthread_join(connectThread, NULL);

  // If this is a terminal, restore the terminal attributes.
  if (isatty(STDIN_FILENO))
    tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);

  return 0;
}

Generated by GNU Enscript 1.6.5.90.