zeroSquitto – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /*************************************************************************/
2 /* ZeroMQ server - ZeroMQ and Mosquitto Publisher-Subscriber Project */
3 /*************************************************************************/
4  
5 #include "circularQueue.h"
6 #include "constants.h"
7 #include <pthread.h>
8 #include <signal.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <termios.h>
13 #include <unistd.h>
14 #include <zmq.h>
15  
16 // Used for program termination.
17 static volatile int run = 1;
18  
19 // Stucture passed as argument to threads.
20 typedef struct {
21 CircularQueue *dataQueue;
22 char *endpoint;
23 } Targs;
24  
25 // Handles SIGHUP and SIGINT (Ctrl + C)
26 void trap(int signal) {
27 printf("[✓] Received interrupt, terminating...\n");
28 switch (signal) {
29 case SIGHUP:
30 case SIGINT:
31 run = 0;
32 break;
33 }
34 }
35  
36 void *server(void *argsThread) {
37 // Function arguments.
38 Targs *args = (Targs *)argsThread;
39 // The thread identifier.
40 CircularQueue *sendQueue = args->dataQueue;
41 // Create a new context and a responder.
42 void *context = zmq_ctx_new();
43 void *responder = zmq_socket(context, ZMQ_REP);
44 // Receive buffer for the response.
45 char *recvData = (char *)calloc(DATA_LENGTH_LIMIT, sizeof(char));
46 // Holds a queue item to respond with.
47 char *sendData = NULL;
48 // Holds the request number.
49 int request = 0;
50  
51 if (zmq_bind(responder, args->endpoint) != 0) {
52 printf("[E] Could not bind responder on selected endpoint.\n");
53 pthread_exit(NULL);
54 }
55  
56 printf("[✓] Bound to endpoing %s, listening...\n", args->endpoint);
57  
58 /*
59 * Loop over circular queue and respond with queued items.
60 */
61 do {
62  
63 // Check for data in non-blocking mode.
64 if (zmq_recv(responder, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
65 sleep(1);
66 continue;
67 }
68  
69 #ifdef DEBUG
70 printf("[D] Received: %s\n", recvData);
71 #endif
72  
73 // Check that this is a PULL request - otherwise continue.
74 if (strncmp(recvData, PULL_METHOD, DATA_LENGTH_LIMIT) != 0) {
75 sleep(1);
76 continue;
77 }
78  
79 // Dequeue an item off the circular queue to respond with.
80 sendData = queueDequeue(sendQueue);
81 printf("[✓] (%d) Pushing: %s\n", request, sendData);
82 zmq_send(responder, sendData, strlen(sendData) + 1, 0);
83  
84 sleep(1);
85 ++request;
86 } while (run && !queueIsEmpty(sendQueue));
87  
88 // Perform cleanups.
89 zmq_close(responder);
90 zmq_ctx_destroy(context);
91  
92 // Release buffers.
93 free(recvData);
94 free(sendData);
95 // Free the queue.
96 queueClear(sendQueue);
97  
98 // Free arguments.
99 free(args);
100 // Exit context.
101 pthread_exit(NULL);
102 }
103  
104 int main(int argc, char **argv) {
105 // For terminal input supression.
106 struct termios tattr;
107 struct termios tattr_store;
108 // Create a circular queue to use for pushing data.
109 CircularQueue *sendQueue = queueCreate(1);
110 // The listener thread.
111 pthread_t listenThread;
112 // The thread arguments.
113 Targs *args = (Targs *)calloc(1, sizeof(Targs));
114  
115 if (argc < 2) {
116 printf("[E] First argument must be a listener endpoint, ie: %s\n",
117 "tcp://*:1025/");
118 return -1;
119 }
120  
121 // Suppress console input for clarity if this is a terminal.
122 if (isatty(STDIN_FILENO)) {
123 // Save the attributes and restore them on program termination.
124 tcgetattr(STDIN_FILENO, &tattr_store);
125 tcgetattr(STDIN_FILENO, &tattr);
126 tattr.c_lflag &= ~(ICANON | ECHO);
127 tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
128 }
129  
130 // Let there be bling!
131 printf("%s\n", SERVER_BANNER);
132 printf("\tTip: Use Ctrl-c to end program.\n\n");
133  
134 // Bind to SIGHUP and SIGINT.
135 signal(SIGHUP, trap);
136 signal(SIGINT, trap);
137  
138 // Enqueue a few elements to the queue for the exercise.
139 queueEnqueue(sendQueue, "36C");
140 queueEnqueue(sendQueue, "38C");
141 queueEnqueue(sendQueue, "40C");
142 queueEnqueue(sendQueue, "21C");
143  
144 args->dataQueue = sendQueue;
145 args->endpoint = argv[1];
146  
147 // Create ZeroMQ processing thread.
148 if (pthread_create(&listenThread, NULL, server, (void *)args))
149 printf("[E] Failed to create ZeroMQ thread.\n");
150  
151 // Await thread termination.
152 pthread_join(listenThread, NULL);
153  
154 // If this is a terminal, restore the terminal attributes.
155 if (isatty(STDIN_FILENO))
156 tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);
157  
158 return 0;
159 }