zeroSquitto – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /*************************************************************************/
2 /* ZeroMQ server - ZeroMQ and Mosquitto Publisher-Subscriber Project */
3 /*************************************************************************/
4  
5 #include "constants.h"
6 #include "queue.h"
7 #include <arpa/inet.h>
8 #include <getopt.h>
9 #include <limits.h>
10 #include <mosquitto.h>
11 #include <netdb.h>
12 #include <netinet/in.h>
13 #include <pthread.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <sys/socket.h>
19 #include <sys/types.h>
20 #include <termios.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <zmq.h>
24  
25 // Defined for command-line options with no short options.
26 #define COMMAND_LOG_ADDRESS 0x10000000
27 #define COMMAND_LOG_PORT 0x10000001
28  
29 // Used for program termination.
30 static volatile int run = 1;
31  
32 // Stucture passed as argument to threads.
33 typedef struct {
34 Queue *ZMQ_dataQueue;
35 Queue *MMQ_dataQueue;
36 char *endpoint;
37 char *address;
38 char *topic;
39 int port;
40  
41 // For sending messages to LOG-S.
42 Queue *LOG_dataQueue;
43 char *logAddress;
44 int logPort;
45 } Targs;
46  
47 // Handles SIGHUP and SIGINT (Ctrl + C)
48 void trap(int signal) {
49 printf("[✓] Received interrupt, terminating...\n");
50 switch (signal) {
51 case SIGHUP:
52 case SIGINT:
53 run = 0;
54 break;
55 }
56 }
57  
58 void *pumpsLOG(void *argsThread) {
59 // Function arguments.
60 Targs *args = (Targs *)argsThread;
61 // Will point to log message to send to server.
62 char *data;
63 // The queue containing messages to send to LOG-S.
64 Queue *sendQueue = args->LOG_dataQueue;
65 int serverSocket;
66 struct sockaddr_in serverSockAddr;
67 struct hostent *server;
68  
69 printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
70 args->logPort);
71  
72 // Spin around and post messages to LOG-S.
73 do {
74 // Nothing to post, so don't bother.
75 if (queueIsEmpty(sendQueue)) {
76 sleep(1);
77 continue;
78 }
79  
80 // Garbage was pushed onto the logging queue.
81 if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
82 continue;
83  
84 // Create socket.
85 if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
86 printf("[E] Error creating socket.\n");
87 sleep(1);
88 continue;
89 }
90  
91 bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
92 serverSockAddr.sin_family = AF_INET;
93 serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
94 serverSockAddr.sin_port = htons(args->logPort);
95  
96 if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
97 sizeof(serverSockAddr)) < 0) {
98 printf("[E] ( LOG-S™ ) Error connecting to LOG-S.\n");
99 sleep(1);
100 continue;
101 }
102  
103 if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
104 printf("[E] ( LOG-S™ ) Error sending log message to LOG-S.\n");
105 continue;
106 }
107  
108 #ifdef DEBUG
109 printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
110 #endif
111  
112 // Close the socket.
113 close(serverSocket);
114 } while (run);
115  
116 CLEANUP:
117  
118 // Close the socket.
119 close(serverSocket);
120  
121 // Exit context.
122 pthread_exit(NULL);
123 }
124  
125 // Callback when results have been published using MQTT.
126 void MMQ_publish(struct mosquitto *mq, void *data, int mid) {
127 mosquitto_disconnect(mq);
128 }
129  
130 // Publishes data with MQTT.
131 void *serveMMQ(void *argsThread) {
132 // Function arguments.
133 Targs *args = (Targs *)argsThread;
134 // The queue containing the data to publish.
135 Queue *sendQueue = args->MMQ_dataQueue;
136 // The logging queue to send messages to.
137 Queue *logsQueue = args->LOG_dataQueue;
138 struct mosquitto *mq = NULL;
139 // Holds a queue item to respond with.
140 char *sendData = NULL;
141 // Formatted log data.
142 char *logData;
143 size_t logSize;
144 int request = 0;
145  
146 mosquitto_lib_init();
147  
148 if ((mq = mosquitto_new(NULL, true, 0)) == NULL) {
149 printf("[E] ( MQTT ) Could not create new mosquitto client.\n");
150 goto CLEANUP;
151 }
152  
153 if (mosquitto_connect(mq, args->address, args->port, 60) !=
154 MOSQ_ERR_SUCCESS) {
155 printf("[E] ( MQTT ) Could not connect to Mosquitto broker!\n");
156 goto CLEANUP;
157 }
158  
159 do {
160 // Reconnect if the connection has been lost.
161 if (mosquitto_loop(mq, -1, 1) != MOSQ_ERR_SUCCESS)
162 mosquitto_reconnect(mq);
163  
164 // Dequeue an item off the queue to publish.
165 if ((sendData = queueDequeue(sendQueue)) == NULL) {
166 sleep(1);
167 continue;
168 }
169  
170 logSize = strlen("( MQTT ) Request published in topic: ") + 1 +
171 MAX_INTEGER_LENGTH + strlen(sendData) + 1 + strlen(args->topic) +
172 1 + 1;
173 logData = (char *)calloc(logSize, sizeof(char));
174 snprintf(logData, logSize,
175 "( MQTT ) Request %d published %s in topic: %s", request,
176 sendData, args->topic);
177 printf("[✓] ( MQTT ) +-- Publishing (%d): %s\n", request, sendData);
178 queueEnqueue(logsQueue, logData);
179  
180 mosquitto_publish(mq, NULL, args->topic, 11, sendData, 0, false);
181  
182 sleep(1);
183 ++request;
184 } while (run);
185  
186 CLEANUP:
187 printf("[✓] ( MQTT ) Shutting down...\n");
188  
189 // Disconnect from broker.
190 if (mq != NULL)
191 mosquitto_disconnect(mq);
192  
193 // Free Mosquitto client.
194 if (mq != NULL)
195 mosquitto_destroy(mq);
196  
197 // Cleanup library.
198 mosquitto_lib_cleanup();
199  
200 // Exit context.
201 pthread_exit(NULL);
202 }
203  
204 // Responds to ZeroMQ requests.
205 void *serveZMQ(void *argsThread) {
206 // Function arguments.
207 Targs *args = (Targs *)argsThread;
208 // The queue containing the data to publish.
209 Queue *sendQueue = args->ZMQ_dataQueue;
210 // The logging queue to send messages to.
211 Queue *logsQueue = args->LOG_dataQueue;
212 // Formatted log data.
213 char *logData;
214 size_t logSize;
215 // Create a new context and a responder.
216 void *context = zmq_ctx_new();
217 void *responder = zmq_socket(context, ZMQ_REP);
218 // Receive buffer for the response.
219 char *recvData = (char *)calloc(DATA_LENGTH_LIMIT + 1, sizeof(char));
220 // Holds a queue item to respond with.
221 char *sendData = NULL;
222 // Holds the request number.
223 int request = 0;
224  
225 // Bind to the endpoint.
226 if (zmq_bind(responder, args->endpoint) != 0) {
227 printf("[E] ( ZeroMQ ) Could not bind responder on selected endpoint.\n");
228 goto CLEANUP;
229 }
230  
231 printf("[✓] ( ZeroMQ ) Bound to endpoint %s.\n", args->endpoint);
232  
233 /*
234 * Loop over the queue and respond with queued items.
235 */
236 do {
237  
238 // Flush the past message.
239 zmq_send(responder, "", 0, 0);
240  
241 // Check for data in non-blocking mode.
242 if (zmq_recv(responder, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
243 sleep(1);
244 continue;
245 }
246  
247 #ifdef DEBUG
248 printf("[D] ( ZeroMQ ) Received: %s\n", recvData);
249 #endif
250  
251 // Check that this is a PULL request - otherwise continue.
252 if (strncmp(recvData, PULL_METHOD, DATA_LENGTH_LIMIT) != 0) { sleep(1); continue; }
253  
254 // Dequeue an item off the queue to respond with.
255 if ((sendData = queueDequeue(sendQueue)) == NULL) {
256 sleep(1);
257 continue;
258 }
259  
260 zmq_send(responder, sendData, strlen(sendData) + 1, ZMQ_SNDMORE);
261  
262 logSize = strlen("( ZeroMQ ) Request pushed ") + 1 + MAX_INTEGER_LENGTH +
263 strlen(sendData) + 1 + 1;
264 logData = (char *)calloc(logSize, sizeof(char));
265 snprintf(logData, logSize, "( ZeroMQ ) Request %d pushed %s", request,
266 sendData);
267 printf("[✓] ( ZeroMQ ) +-- Pushing (%d): %s\n", request, sendData);
268 queueEnqueue(logsQueue, logData);
269  
270  
271 sleep(1);
272 ++request;
273 } while (run);
274  
275 CLEANUP:
276 printf("[✓] ( ZeroMQ ) Shutting down...\n");
277  
278 // Perform cleanups.
279 zmq_close(responder);
280 zmq_ctx_destroy(context);
281  
282 // Release buffers.
283 free(recvData);
284 free(sendData);
285  
286 // Exit context.
287 pthread_exit(NULL);
288 }
289  
290 void printUsage(char **argv) {
291 printf("Usage: %s [OPTIONS]\n", argv[0]);
292 printf("\t-e <endpoint>\t\tendpoint for ZeroMQ\n");
293 printf("\t-a <address>\t\tMosquitto server to connect to\n");
294 printf("\t-p <port>\t\tMosquitto port to connect to (default: 1883)\n");
295 printf("\t-t <port>\t\tMosquitto topic to publish on\n");
296 printf("\t\t\t\t(default: temperature)");
297  
298 printf("\n\n");
299  
300 printf("\t--log-address <address>\t\tLOG-S™ server address to connect to\n");
301 printf("\t--log-port <port>\t\tLOG-S™ server port to connect to\n");
302  
303 printf("\n");
304  
305 printf("\t-h, --help\t\tprint this help and exit\n");
306  
307 printf("\n");
308  
309 printf("\tSpecify only an endpoint to use only ZeroMQ\n");
310 printf("\tSpecify only an address and a port to use only Mosquitto\n");
311 printf("\tSpecify both to use ZeroMQ and Mosquitto\n");
312  
313 printf("\n");
314 }
315  
316 int main(int argc, char **argv) {
317 // For terminal input supression.
318 struct termios tattr;
319 struct termios tattr_store;
320 // Use two queues to push elements to ZMQ and MQTT
321 Queue *ZMQ_sendQueue = queueCreate(1);
322 Queue *MMQ_sendQueue = queueCreate(1);
323 // Create the logging queue.
324 Queue *LOG_sendQueue = queueCreate(1);
325 // The listener thread for ZeroMQ.
326 pthread_t zmqThread;
327 // The listener thread for Mosquitto.
328 pthread_t mmqThread;
329 // The LOG-S thread.
330 pthread_t logThread;
331 // The thread arguments.
332 Targs *args = (Targs *)calloc(1, sizeof(Targs));
333 // Time structure for generating random values.
334 time_t ttime;
335 // Temporarily holds the temperature string.
336 char *ts = (char *)calloc(4, sizeof(char));
337 // Command-line processing.
338 int c;
339 const char *short_opt = "he:a:p:t:";
340 struct option long_opt[] = {
341 {"help", no_argument, NULL, 'h'},
342 {"endpoint", required_argument, NULL, 'e'},
343 {"address", required_argument, NULL, 'a'},
344 {"port", required_argument, NULL, 'p'},
345 {"topic", required_argument, NULL, 't'},
346 // For sending logs to LOG-S
347 {"log-address", required_argument, NULL, COMMAND_LOG_ADDRESS},
348 {"log-port", required_argument, NULL, COMMAND_LOG_PORT},
349 {NULL, 0, NULL, 0}};
350 // Depending on options, conditionally start ZMQ or MQTT.
351 bool ZMQ_hasEndpoint = false;
352 bool MMQ_hasAddress = false, MMQ_hasPort = false, MMQ_hasTopic = false;
353 bool LOG_hasAddress = false, LOG_hasPort = false;
354  
355 // For no arguments, show the help.
356 if (argc == 1) {
357 printUsage(argv);
358 return -1;
359 }
360  
361 while ((c = getopt_long(argc, argv, short_opt, long_opt, NULL)) != -1)
362 switch (c) {
363 case -1:
364 case 0:
365 break;
366  
367 // For the logging to LOG-S.
368 case COMMAND_LOG_ADDRESS:
369 args->logAddress = (char *)calloc(strlen(optarg) + 1, sizeof(char));
370 strncat(args->logAddress, optarg, strlen(optarg));
371 LOG_hasAddress = true;
372 break;
373 case COMMAND_LOG_PORT:
374 args->logPort = atoi(optarg);
375 LOG_hasPort = true;
376 break;
377  
378 case 'e':
379 args->endpoint = (char *)calloc(strlen(optarg) + 1, sizeof(char));
380 strncat(args->endpoint, optarg, strlen(optarg));
381 ZMQ_hasEndpoint = true;
382 break;
383 case 'a':
384 args->address = (char *)calloc(strlen(optarg) + 1, sizeof(char));
385 strncat(args->address, optarg, strlen(optarg));
386 MMQ_hasAddress = true;
387 break;
388 case 'p':
389 args->port = atoi(optarg);
390 // Assume default Mosquitto port.
391 if (args->port == 0) {
392 printf("[W] Invalid Mosquittto port, assuming default 1883\n");
393 args->port = 1883;
394 }
395 MMQ_hasPort = true;
396 break;
397 case 't':
398 args->topic = (char *)calloc(strlen(optarg) + 1, sizeof(char));
399 strncat(args->topic, optarg, strlen(optarg));
400 MMQ_hasTopic = true;
401 break;
402 case 'h':
403 printUsage(argv);
404 return -1;
405  
406 case ':':
407 case '?':
408 printf("Try `%s --help' for more information.\n", argv[0]);
409 return -1;
410 default:
411 printf("%s: invalid option -- %c\n", argv[0], c);
412 printf("Try `%s --help' for more information.\n", argv[0]);
413 return -1;
414 }
415  
416 // Set queues for ZeroMQ and MQTT
417 args->ZMQ_dataQueue = ZMQ_sendQueue;
418 args->MMQ_dataQueue = MMQ_sendQueue;
419 // Add the logging queue.
420 args->LOG_dataQueue = LOG_sendQueue;
421  
422 // Assume port 1883 for MQTT
423 if (!MMQ_hasPort) {
424 args->port = 1883;
425 MMQ_hasPort = true;
426 }
427  
428 // Assume default topic for MQTT
429 if (!MMQ_hasTopic) {
430 args->topic =
431 (char *)calloc(strlen(DEFAULT_MOSQUITTO_TOPIC) + 1, sizeof(char));
432 strncat(args->topic, DEFAULT_MOSQUITTO_TOPIC,
433 strlen(DEFAULT_MOSQUITTO_TOPIC));
434 MMQ_hasTopic = true;
435 }
436  
437 // Suppress console input for clarity if this is a terminal.
438 if (isatty(STDIN_FILENO)) {
439 // Save the attributes and restore them on program termination.
440 tcgetattr(STDIN_FILENO, &tattr_store);
441 tcgetattr(STDIN_FILENO, &tattr);
442 tattr.c_lflag &= ~(ICANON | ECHO);
443 tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
444 }
445  
446 // Let there be bling!
447 printf("%s\n", ZEROSQUITO_BANNER);
448 printf("\tTip: Use Ctrl-c to end program.\n\n");
449  
450 // Bind to SIGHUP and SIGINT.
451 signal(SIGHUP, trap);
452 signal(SIGINT, trap);
453  
454 // Create the ZeroMQ thread.
455 if (ZMQ_hasEndpoint &&
456 pthread_create(&zmqThread, NULL, serveZMQ, (void *)args))
457 printf("[E] Failed to create ZeroMQ thread!\n");
458  
459 // Create the MQTT thread.
460 if (MMQ_hasAddress && MMQ_hasPort && MMQ_hasTopic &&
461 pthread_create(&mmqThread, NULL, serveMMQ, (void *)args))
462 printf("[E] Failed to create Mosquitto thread!\n");
463  
464 // Create the LOG-S thread.
465 if (LOG_hasAddress && LOG_hasPort &&
466 pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
467 printf("[E] Failed to create LOG-S thread!\n");
468  
469 srand(time(&ttime));
470  
471 // Producer: pumps temperature readings onto ZeroMQ and Mosquitto queues.
472 do {
473 // Generate a value (temperature reading).
474 snprintf(ts, 4, "%dC", rand() % 100);
475  
476 // Pump the temperature reading to both ZeroMQ and Mosquitto.
477 printf("[✓] +- Pumping measurement: %s\n", ts);
478 queueEnqueue(ZMQ_sendQueue, ts);
479 queueEnqueue(MMQ_sendQueue, ts);
480  
481 sleep(1);
482 } while (run);
483  
484 // Join the threads.
485 pthread_join(zmqThread, NULL);
486 pthread_join(mmqThread, NULL);
487  
488 // Free the queue.
489 queueClear(MMQ_sendQueue);
490 queueClear(ZMQ_sendQueue);
491  
492 // Free arguments.
493 free(args);
494  
495 // If this is a terminal, restore the terminal attributes.
496 if (isatty(STDIN_FILENO))
497 tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);
498  
499 // Clean exit.
500 return 0;
501 }