zeroSquitto – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | /*************************************************************************/ |
2 | /* ZeroMQ server - ZeroMQ and Mosquitto Publisher-Subscriber Project */ |
||
3 | /*************************************************************************/ |
||
4 | |||
5 | #include "constants.h" |
||
6 | #include "queue.h" |
||
7 | #include <arpa/inet.h> |
||
8 | #include <getopt.h> |
||
9 | #include <limits.h> |
||
10 | #include <mosquitto.h> |
||
11 | #include <netdb.h> |
||
12 | #include <netinet/in.h> |
||
13 | #include <pthread.h> |
||
14 | #include <signal.h> |
||
15 | #include <stdio.h> |
||
16 | #include <stdlib.h> |
||
17 | #include <string.h> |
||
18 | #include <sys/socket.h> |
||
19 | #include <sys/types.h> |
||
20 | #include <termios.h> |
||
21 | #include <time.h> |
||
22 | #include <unistd.h> |
||
23 | #include <zmq.h> |
||
24 | |||
25 | // Defined for command-line options with no short options. |
||
26 | #define COMMAND_LOG_ADDRESS 0x10000000 |
||
27 | #define COMMAND_LOG_PORT 0x10000001 |
||
28 | |||
29 | // Used for program termination. |
||
30 | static volatile int run = 1; |
||
31 | |||
32 | // Stucture passed as argument to threads. |
||
33 | typedef struct { |
||
34 | Queue *ZMQ_dataQueue; |
||
35 | Queue *MMQ_dataQueue; |
||
36 | char *endpoint; |
||
37 | char *address; |
||
38 | char *topic; |
||
39 | int port; |
||
40 | |||
41 | // For sending messages to LOG-S. |
||
42 | Queue *LOG_dataQueue; |
||
43 | char *logAddress; |
||
44 | int logPort; |
||
45 | } Targs; |
||
46 | |||
47 | // Handles SIGHUP and SIGINT (Ctrl + C) |
||
48 | void trap(int signal) { |
||
49 | printf("[✓] Received interrupt, terminating...\n"); |
||
50 | switch (signal) { |
||
51 | case SIGHUP: |
||
52 | case SIGINT: |
||
53 | run = 0; |
||
54 | break; |
||
55 | } |
||
56 | } |
||
57 | |||
58 | void *pumpsLOG(void *argsThread) { |
||
59 | // Function arguments. |
||
60 | Targs *args = (Targs *)argsThread; |
||
61 | // Will point to log message to send to server. |
||
62 | char *data; |
||
63 | // The queue containing messages to send to LOG-S. |
||
64 | Queue *sendQueue = args->LOG_dataQueue; |
||
65 | int serverSocket; |
||
66 | struct sockaddr_in serverSockAddr; |
||
67 | struct hostent *server; |
||
68 | |||
69 | printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress, |
||
70 | args->logPort); |
||
71 | |||
72 | // Spin around and post messages to LOG-S. |
||
73 | do { |
||
74 | // Nothing to post, so don't bother. |
||
75 | if (queueIsEmpty(sendQueue)) { |
||
76 | sleep(1); |
||
77 | continue; |
||
78 | } |
||
79 | |||
80 | // Garbage was pushed onto the logging queue. |
||
81 | if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0) |
||
82 | continue; |
||
83 | |||
84 | // Create socket. |
||
85 | if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { |
||
86 | printf("[E] Error creating socket.\n"); |
||
87 | sleep(1); |
||
88 | continue; |
||
89 | } |
||
90 | |||
91 | bzero((char *)&serverSockAddr, sizeof(serverSockAddr)); |
||
92 | serverSockAddr.sin_family = AF_INET; |
||
93 | serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress); |
||
94 | serverSockAddr.sin_port = htons(args->logPort); |
||
95 | |||
96 | if (connect(serverSocket, (struct sockaddr *)&serverSockAddr, |
||
97 | sizeof(serverSockAddr)) < 0) { |
||
98 | printf("[E] ( LOG-S™ ) Error connecting to LOG-S.\n"); |
||
99 | sleep(1); |
||
100 | continue; |
||
101 | } |
||
102 | |||
103 | if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) { |
||
104 | printf("[E] ( LOG-S™ ) Error sending log message to LOG-S.\n"); |
||
105 | continue; |
||
106 | } |
||
107 | |||
108 | #ifdef DEBUG |
||
109 | printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data); |
||
110 | #endif |
||
111 | |||
112 | // Close the socket. |
||
113 | close(serverSocket); |
||
114 | } while (run); |
||
115 | |||
116 | CLEANUP: |
||
117 | |||
118 | // Close the socket. |
||
119 | close(serverSocket); |
||
120 | |||
121 | // Exit context. |
||
122 | pthread_exit(NULL); |
||
123 | } |
||
124 | |||
125 | // Callback when results have been published using MQTT. |
||
126 | void MMQ_publish(struct mosquitto *mq, void *data, int mid) { |
||
127 | mosquitto_disconnect(mq); |
||
128 | } |
||
129 | |||
130 | // Publishes data with MQTT. |
||
131 | void *serveMMQ(void *argsThread) { |
||
132 | // Function arguments. |
||
133 | Targs *args = (Targs *)argsThread; |
||
134 | // The queue containing the data to publish. |
||
135 | Queue *sendQueue = args->MMQ_dataQueue; |
||
136 | // The logging queue to send messages to. |
||
137 | Queue *logsQueue = args->LOG_dataQueue; |
||
138 | struct mosquitto *mq = NULL; |
||
139 | // Holds a queue item to respond with. |
||
140 | char *sendData = NULL; |
||
141 | // Formatted log data. |
||
142 | char *logData; |
||
143 | size_t logSize; |
||
144 | int request = 0; |
||
145 | |||
146 | mosquitto_lib_init(); |
||
147 | |||
148 | if ((mq = mosquitto_new(NULL, true, 0)) == NULL) { |
||
149 | printf("[E] ( MQTT ) Could not create new mosquitto client.\n"); |
||
150 | goto CLEANUP; |
||
151 | } |
||
152 | |||
153 | if (mosquitto_connect(mq, args->address, args->port, 60) != |
||
154 | MOSQ_ERR_SUCCESS) { |
||
155 | printf("[E] ( MQTT ) Could not connect to Mosquitto broker!\n"); |
||
156 | goto CLEANUP; |
||
157 | } |
||
158 | |||
159 | do { |
||
160 | // Reconnect if the connection has been lost. |
||
161 | if (mosquitto_loop(mq, -1, 1) != MOSQ_ERR_SUCCESS) |
||
162 | mosquitto_reconnect(mq); |
||
163 | |||
164 | // Dequeue an item off the queue to publish. |
||
165 | if ((sendData = queueDequeue(sendQueue)) == NULL) { |
||
166 | sleep(1); |
||
167 | continue; |
||
168 | } |
||
169 | |||
170 | logSize = strlen("( MQTT ) Request published in topic: ") + 1 + |
||
171 | MAX_INTEGER_LENGTH + strlen(sendData) + 1 + strlen(args->topic) + |
||
172 | 1 + 1; |
||
173 | logData = (char *)calloc(logSize, sizeof(char)); |
||
174 | snprintf(logData, logSize, |
||
175 | "( MQTT ) Request %d published %s in topic: %s", request, |
||
176 | sendData, args->topic); |
||
177 | printf("[✓] ( MQTT ) +-- Publishing (%d): %s\n", request, sendData); |
||
178 | queueEnqueue(logsQueue, logData); |
||
179 | |||
180 | mosquitto_publish(mq, NULL, args->topic, 11, sendData, 0, false); |
||
181 | |||
182 | sleep(1); |
||
183 | ++request; |
||
184 | } while (run); |
||
185 | |||
186 | CLEANUP: |
||
187 | printf("[✓] ( MQTT ) Shutting down...\n"); |
||
188 | |||
189 | // Disconnect from broker. |
||
190 | if (mq != NULL) |
||
191 | mosquitto_disconnect(mq); |
||
192 | |||
193 | // Free Mosquitto client. |
||
194 | if (mq != NULL) |
||
195 | mosquitto_destroy(mq); |
||
196 | |||
197 | // Cleanup library. |
||
198 | mosquitto_lib_cleanup(); |
||
199 | |||
200 | // Exit context. |
||
201 | pthread_exit(NULL); |
||
202 | } |
||
203 | |||
204 | // Responds to ZeroMQ requests. |
||
205 | void *serveZMQ(void *argsThread) { |
||
206 | // Function arguments. |
||
207 | Targs *args = (Targs *)argsThread; |
||
208 | // The queue containing the data to publish. |
||
209 | Queue *sendQueue = args->ZMQ_dataQueue; |
||
210 | // The logging queue to send messages to. |
||
211 | Queue *logsQueue = args->LOG_dataQueue; |
||
212 | // Formatted log data. |
||
213 | char *logData; |
||
214 | size_t logSize; |
||
215 | // Create a new context and a responder. |
||
216 | void *context = zmq_ctx_new(); |
||
217 | void *responder = zmq_socket(context, ZMQ_REP); |
||
218 | // Receive buffer for the response. |
||
219 | char *recvData = (char *)calloc(DATA_LENGTH_LIMIT + 1, sizeof(char)); |
||
220 | // Holds a queue item to respond with. |
||
221 | char *sendData = NULL; |
||
222 | // Holds the request number. |
||
223 | int request = 0; |
||
224 | |||
225 | // Bind to the endpoint. |
||
226 | if (zmq_bind(responder, args->endpoint) != 0) { |
||
227 | printf("[E] ( ZeroMQ ) Could not bind responder on selected endpoint.\n"); |
||
228 | goto CLEANUP; |
||
229 | } |
||
230 | |||
231 | printf("[✓] ( ZeroMQ ) Bound to endpoint %s.\n", args->endpoint); |
||
232 | |||
233 | /* |
||
234 | * Loop over the queue and respond with queued items. |
||
235 | */ |
||
236 | do { |
||
237 | |||
238 | // Flush the past message. |
||
239 | zmq_send(responder, "", 0, 0); |
||
240 | |||
241 | // Check for data in non-blocking mode. |
||
242 | if (zmq_recv(responder, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) { |
||
243 | sleep(1); |
||
244 | continue; |
||
245 | } |
||
246 | |||
247 | #ifdef DEBUG |
||
248 | printf("[D] ( ZeroMQ ) Received: %s\n", recvData); |
||
249 | #endif |
||
250 | |||
251 | // Check that this is a PULL request - otherwise continue. |
||
252 | if (strncmp(recvData, PULL_METHOD, DATA_LENGTH_LIMIT) != 0) { sleep(1); continue; } |
||
253 | |||
254 | // Dequeue an item off the queue to respond with. |
||
255 | if ((sendData = queueDequeue(sendQueue)) == NULL) { |
||
256 | sleep(1); |
||
257 | continue; |
||
258 | } |
||
259 | |||
260 | zmq_send(responder, sendData, strlen(sendData) + 1, ZMQ_SNDMORE); |
||
261 | |||
262 | logSize = strlen("( ZeroMQ ) Request pushed ") + 1 + MAX_INTEGER_LENGTH + |
||
263 | strlen(sendData) + 1 + 1; |
||
264 | logData = (char *)calloc(logSize, sizeof(char)); |
||
265 | snprintf(logData, logSize, "( ZeroMQ ) Request %d pushed %s", request, |
||
266 | sendData); |
||
267 | printf("[✓] ( ZeroMQ ) +-- Pushing (%d): %s\n", request, sendData); |
||
268 | queueEnqueue(logsQueue, logData); |
||
269 | |||
270 | |||
271 | sleep(1); |
||
272 | ++request; |
||
273 | } while (run); |
||
274 | |||
275 | CLEANUP: |
||
276 | printf("[✓] ( ZeroMQ ) Shutting down...\n"); |
||
277 | |||
278 | // Perform cleanups. |
||
279 | zmq_close(responder); |
||
280 | zmq_ctx_destroy(context); |
||
281 | |||
282 | // Release buffers. |
||
283 | free(recvData); |
||
284 | free(sendData); |
||
285 | |||
286 | // Exit context. |
||
287 | pthread_exit(NULL); |
||
288 | } |
||
289 | |||
290 | void printUsage(char **argv) { |
||
291 | printf("Usage: %s [OPTIONS]\n", argv[0]); |
||
292 | printf("\t-e <endpoint>\t\tendpoint for ZeroMQ\n"); |
||
293 | printf("\t-a <address>\t\tMosquitto server to connect to\n"); |
||
294 | printf("\t-p <port>\t\tMosquitto port to connect to (default: 1883)\n"); |
||
295 | printf("\t-t <port>\t\tMosquitto topic to publish on\n"); |
||
296 | printf("\t\t\t\t(default: temperature)"); |
||
297 | |||
298 | printf("\n\n"); |
||
299 | |||
300 | printf("\t--log-address <address>\t\tLOG-S™ server address to connect to\n"); |
||
301 | printf("\t--log-port <port>\t\tLOG-S™ server port to connect to\n"); |
||
302 | |||
303 | printf("\n"); |
||
304 | |||
305 | printf("\t-h, --help\t\tprint this help and exit\n"); |
||
306 | |||
307 | printf("\n"); |
||
308 | |||
309 | printf("\tSpecify only an endpoint to use only ZeroMQ\n"); |
||
310 | printf("\tSpecify only an address and a port to use only Mosquitto\n"); |
||
311 | printf("\tSpecify both to use ZeroMQ and Mosquitto\n"); |
||
312 | |||
313 | printf("\n"); |
||
314 | } |
||
315 | |||
316 | int main(int argc, char **argv) { |
||
317 | // For terminal input supression. |
||
318 | struct termios tattr; |
||
319 | struct termios tattr_store; |
||
320 | // Use two queues to push elements to ZMQ and MQTT |
||
321 | Queue *ZMQ_sendQueue = queueCreate(1); |
||
322 | Queue *MMQ_sendQueue = queueCreate(1); |
||
323 | // Create the logging queue. |
||
324 | Queue *LOG_sendQueue = queueCreate(1); |
||
325 | // The listener thread for ZeroMQ. |
||
326 | pthread_t zmqThread; |
||
327 | // The listener thread for Mosquitto. |
||
328 | pthread_t mmqThread; |
||
329 | // The LOG-S thread. |
||
330 | pthread_t logThread; |
||
331 | // The thread arguments. |
||
332 | Targs *args = (Targs *)calloc(1, sizeof(Targs)); |
||
333 | // Time structure for generating random values. |
||
334 | time_t ttime; |
||
335 | // Temporarily holds the temperature string. |
||
336 | char *ts = (char *)calloc(4, sizeof(char)); |
||
337 | // Command-line processing. |
||
338 | int c; |
||
339 | const char *short_opt = "he:a:p:t:"; |
||
340 | struct option long_opt[] = { |
||
341 | {"help", no_argument, NULL, 'h'}, |
||
342 | {"endpoint", required_argument, NULL, 'e'}, |
||
343 | {"address", required_argument, NULL, 'a'}, |
||
344 | {"port", required_argument, NULL, 'p'}, |
||
345 | {"topic", required_argument, NULL, 't'}, |
||
346 | // For sending logs to LOG-S |
||
347 | {"log-address", required_argument, NULL, COMMAND_LOG_ADDRESS}, |
||
348 | {"log-port", required_argument, NULL, COMMAND_LOG_PORT}, |
||
349 | {NULL, 0, NULL, 0}}; |
||
350 | // Depending on options, conditionally start ZMQ or MQTT. |
||
351 | bool ZMQ_hasEndpoint = false; |
||
352 | bool MMQ_hasAddress = false, MMQ_hasPort = false, MMQ_hasTopic = false; |
||
353 | bool LOG_hasAddress = false, LOG_hasPort = false; |
||
354 | |||
355 | // For no arguments, show the help. |
||
356 | if (argc == 1) { |
||
357 | printUsage(argv); |
||
358 | return -1; |
||
359 | } |
||
360 | |||
361 | while ((c = getopt_long(argc, argv, short_opt, long_opt, NULL)) != -1) |
||
362 | switch (c) { |
||
363 | case -1: |
||
364 | case 0: |
||
365 | break; |
||
366 | |||
367 | // For the logging to LOG-S. |
||
368 | case COMMAND_LOG_ADDRESS: |
||
369 | args->logAddress = (char *)calloc(strlen(optarg) + 1, sizeof(char)); |
||
370 | strncat(args->logAddress, optarg, strlen(optarg)); |
||
371 | LOG_hasAddress = true; |
||
372 | break; |
||
373 | case COMMAND_LOG_PORT: |
||
374 | args->logPort = atoi(optarg); |
||
375 | LOG_hasPort = true; |
||
376 | break; |
||
377 | |||
378 | case 'e': |
||
379 | args->endpoint = (char *)calloc(strlen(optarg) + 1, sizeof(char)); |
||
380 | strncat(args->endpoint, optarg, strlen(optarg)); |
||
381 | ZMQ_hasEndpoint = true; |
||
382 | break; |
||
383 | case 'a': |
||
384 | args->address = (char *)calloc(strlen(optarg) + 1, sizeof(char)); |
||
385 | strncat(args->address, optarg, strlen(optarg)); |
||
386 | MMQ_hasAddress = true; |
||
387 | break; |
||
388 | case 'p': |
||
389 | args->port = atoi(optarg); |
||
390 | // Assume default Mosquitto port. |
||
391 | if (args->port == 0) { |
||
392 | printf("[W] Invalid Mosquittto port, assuming default 1883\n"); |
||
393 | args->port = 1883; |
||
394 | } |
||
395 | MMQ_hasPort = true; |
||
396 | break; |
||
397 | case 't': |
||
398 | args->topic = (char *)calloc(strlen(optarg) + 1, sizeof(char)); |
||
399 | strncat(args->topic, optarg, strlen(optarg)); |
||
400 | MMQ_hasTopic = true; |
||
401 | break; |
||
402 | case 'h': |
||
403 | printUsage(argv); |
||
404 | return -1; |
||
405 | |||
406 | case ':': |
||
407 | case '?': |
||
408 | printf("Try `%s --help' for more information.\n", argv[0]); |
||
409 | return -1; |
||
410 | default: |
||
411 | printf("%s: invalid option -- %c\n", argv[0], c); |
||
412 | printf("Try `%s --help' for more information.\n", argv[0]); |
||
413 | return -1; |
||
414 | } |
||
415 | |||
416 | // Set queues for ZeroMQ and MQTT |
||
417 | args->ZMQ_dataQueue = ZMQ_sendQueue; |
||
418 | args->MMQ_dataQueue = MMQ_sendQueue; |
||
419 | // Add the logging queue. |
||
420 | args->LOG_dataQueue = LOG_sendQueue; |
||
421 | |||
422 | // Assume port 1883 for MQTT |
||
423 | if (!MMQ_hasPort) { |
||
424 | args->port = 1883; |
||
425 | MMQ_hasPort = true; |
||
426 | } |
||
427 | |||
428 | // Assume default topic for MQTT |
||
429 | if (!MMQ_hasTopic) { |
||
430 | args->topic = |
||
431 | (char *)calloc(strlen(DEFAULT_MOSQUITTO_TOPIC) + 1, sizeof(char)); |
||
432 | strncat(args->topic, DEFAULT_MOSQUITTO_TOPIC, |
||
433 | strlen(DEFAULT_MOSQUITTO_TOPIC)); |
||
434 | MMQ_hasTopic = true; |
||
435 | } |
||
436 | |||
437 | // Suppress console input for clarity if this is a terminal. |
||
438 | if (isatty(STDIN_FILENO)) { |
||
439 | // Save the attributes and restore them on program termination. |
||
440 | tcgetattr(STDIN_FILENO, &tattr_store); |
||
441 | tcgetattr(STDIN_FILENO, &tattr); |
||
442 | tattr.c_lflag &= ~(ICANON | ECHO); |
||
443 | tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr); |
||
444 | } |
||
445 | |||
446 | // Let there be bling! |
||
447 | printf("%s\n", ZEROSQUITO_BANNER); |
||
448 | printf("\tTip: Use Ctrl-c to end program.\n\n"); |
||
449 | |||
450 | // Bind to SIGHUP and SIGINT. |
||
451 | signal(SIGHUP, trap); |
||
452 | signal(SIGINT, trap); |
||
453 | |||
454 | // Create the ZeroMQ thread. |
||
455 | if (ZMQ_hasEndpoint && |
||
456 | pthread_create(&zmqThread, NULL, serveZMQ, (void *)args)) |
||
457 | printf("[E] Failed to create ZeroMQ thread!\n"); |
||
458 | |||
459 | // Create the MQTT thread. |
||
460 | if (MMQ_hasAddress && MMQ_hasPort && MMQ_hasTopic && |
||
461 | pthread_create(&mmqThread, NULL, serveMMQ, (void *)args)) |
||
462 | printf("[E] Failed to create Mosquitto thread!\n"); |
||
463 | |||
464 | // Create the LOG-S thread. |
||
465 | if (LOG_hasAddress && LOG_hasPort && |
||
466 | pthread_create(&logThread, NULL, pumpsLOG, (void *)args)) |
||
467 | printf("[E] Failed to create LOG-S thread!\n"); |
||
468 | |||
469 | srand(time(&ttime)); |
||
470 | |||
471 | // Producer: pumps temperature readings onto ZeroMQ and Mosquitto queues. |
||
472 | do { |
||
473 | // Generate a value (temperature reading). |
||
474 | snprintf(ts, 4, "%dC", rand() % 100); |
||
475 | |||
476 | // Pump the temperature reading to both ZeroMQ and Mosquitto. |
||
477 | printf("[✓] +- Pumping measurement: %s\n", ts); |
||
478 | queueEnqueue(ZMQ_sendQueue, ts); |
||
479 | queueEnqueue(MMQ_sendQueue, ts); |
||
480 | |||
481 | sleep(1); |
||
482 | } while (run); |
||
483 | |||
484 | // Join the threads. |
||
485 | pthread_join(zmqThread, NULL); |
||
486 | pthread_join(mmqThread, NULL); |
||
487 | |||
488 | // Free the queue. |
||
489 | queueClear(MMQ_sendQueue); |
||
490 | queueClear(ZMQ_sendQueue); |
||
491 | |||
492 | // Free arguments. |
||
493 | free(args); |
||
494 | |||
495 | // If this is a terminal, restore the terminal attributes. |
||
496 | if (isatty(STDIN_FILENO)) |
||
497 | tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store); |
||
498 | |||
499 | // Clean exit. |
||
500 | return 0; |
||
501 | } |