zeroSquitto – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | /*************************************************************************/ |
2 | /* Mosquitto client - ZeroMQ and Mosquitto Publisher-Subscriber Project */ |
||
3 | /*************************************************************************/ |
||
4 | |||
5 | #include "constants.h" |
||
6 | #include "queue.h" |
||
7 | #include <arpa/inet.h> |
||
8 | #include <limits.h> |
||
9 | #include <mosquitto.h> |
||
10 | #include <netinet/in.h> |
||
11 | #include <pthread.h> |
||
12 | #include <signal.h> |
||
13 | #include <stdio.h> |
||
14 | #include <stdlib.h> |
||
15 | #include <string.h> |
||
16 | #include <sys/socket.h> |
||
17 | #include <sys/types.h> |
||
18 | #include <termios.h> |
||
19 | #include <unistd.h> |
||
20 | |||
21 | // Used for program termination. |
||
22 | static volatile int run = 1; |
||
23 | |||
24 | // Stucture passed as argument to threads. |
||
25 | typedef struct { |
||
26 | char *address; |
||
27 | char *topic; |
||
28 | int port; |
||
29 | |||
30 | // For sending messages to LOG-S. |
||
31 | Queue *LOG_dataQueue; |
||
32 | char *logAddress; |
||
33 | int logPort; |
||
34 | } Targs; |
||
35 | |||
36 | // Handles SIGHUP and SIGINT (Ctrl + C) |
||
37 | void trap(int signal) { |
||
38 | printf("[✓] Received interrupt, terminating...\n"); |
||
39 | switch (signal) { |
||
40 | case SIGHUP: |
||
41 | case SIGINT: |
||
42 | run = 0; |
||
43 | break; |
||
44 | } |
||
45 | } |
||
46 | |||
47 | void *pumpsLOG(void *argsThread) { |
||
48 | // Function arguments. |
||
49 | Targs *args = (Targs *)argsThread; |
||
50 | // Will point to log message to send to server. |
||
51 | char *data; |
||
52 | // The queue containing messages to send to LOG-S. |
||
53 | Queue *sendQueue = args->LOG_dataQueue; |
||
54 | int serverSocket; |
||
55 | struct sockaddr_in serverSockAddr; |
||
56 | struct hostent *server; |
||
57 | |||
58 | printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress, |
||
59 | args->logPort); |
||
60 | |||
61 | // Spin around and post messages to LOG-S. |
||
62 | do { |
||
63 | // Nothing to post, so don't bother. |
||
64 | if (queueIsEmpty(sendQueue)) { |
||
65 | sleep(1); |
||
66 | continue; |
||
67 | } |
||
68 | |||
69 | // Garbage was pushed onto the logging queue. |
||
70 | if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0) |
||
71 | continue; |
||
72 | |||
73 | // Create socket. |
||
74 | if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { |
||
75 | printf("[E] Error creating socket.\n"); |
||
76 | sleep(1); |
||
77 | continue; |
||
78 | } |
||
79 | |||
80 | bzero((char *)&serverSockAddr, sizeof(serverSockAddr)); |
||
81 | serverSockAddr.sin_family = AF_INET; |
||
82 | serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress); |
||
83 | serverSockAddr.sin_port = htons(args->logPort); |
||
84 | |||
85 | if (connect(serverSocket, (struct sockaddr *)&serverSockAddr, |
||
86 | sizeof(serverSockAddr)) < 0) { |
||
87 | printf("[E] ( LOG-S™ ) Error connecting to LOG-S.\n"); |
||
88 | sleep(1); |
||
89 | continue; |
||
90 | } |
||
91 | |||
92 | if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) { |
||
93 | printf("[E] ( LOG-S™ ) Error sending log message to LOG-S.\n"); |
||
94 | continue; |
||
95 | } |
||
96 | |||
97 | #ifdef DEBUG |
||
98 | printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data); |
||
99 | #endif |
||
100 | |||
101 | // Close the socket. |
||
102 | close(serverSocket); |
||
103 | } while (run); |
||
104 | |||
105 | CLEANUP: |
||
106 | |||
107 | // Close the socket. |
||
108 | close(serverSocket); |
||
109 | |||
110 | // Exit context. |
||
111 | pthread_exit(NULL); |
||
112 | } |
||
113 | |||
114 | // Callback when results have been published using MQTT. |
||
115 | void MMQ_message(struct mosquitto *mosq, void *userdata, |
||
116 | const struct mosquitto_message *message) { |
||
117 | // The logging queue to send messages to. |
||
118 | Targs *args = userdata; |
||
119 | // Formatted log data. |
||
120 | char *logData; |
||
121 | size_t logSize; |
||
122 | bool topicMatches; |
||
123 | |||
124 | // Check if the topic matches what we have subscribed to. |
||
125 | mosquitto_topic_matches_sub(args->topic, message->topic, &topicMatches); |
||
126 | if (!topicMatches) |
||
127 | return; |
||
128 | |||
129 | logSize = strlen("MMQ-C Received message for topic: ") + 1 + |
||
130 | strlen(message->payload) + 1 + strlen(message->topic) + 1 + 1; |
||
131 | logData = (char *)calloc(logSize, sizeof(char)); |
||
132 | snprintf(logData, logSize, "MMQ-C Received message %s for topic: %s", |
||
133 | message->payload, message->topic); |
||
134 | printf("[✓] %s\n", logData); |
||
135 | queueEnqueue(args->LOG_dataQueue, logData); |
||
136 | } |
||
137 | |||
138 | void MMQ_subscribe(struct mosquitto *mosq, void *userdata, int mid, |
||
139 | int qos_count, const int *granted_qos) { |
||
140 | // The logging queue to send messages to. |
||
141 | Targs *args = userdata; |
||
142 | // Formatted log data. |
||
143 | char *logData; |
||
144 | size_t logSize; |
||
145 | |||
146 | logSize = |
||
147 | strlen("MMQ-C Subscribed to topic: ") + 1 + strlen(args->topic) + 1 + 1; |
||
148 | logData = (char *)calloc(logSize, sizeof(char)); |
||
149 | snprintf(logData, logSize, "MMQ-C Subscribed to topic: %s", args->topic); |
||
150 | printf("[✓] %s\n", logData); |
||
151 | queueEnqueue(args->LOG_dataQueue, logData); |
||
152 | } |
||
153 | |||
154 | // Subscribes to MQTT topic. |
||
155 | void *client(void *argsThread) { |
||
156 | // Function arguments. |
||
157 | Targs *args = (Targs *)argsThread; |
||
158 | struct mosquitto *mq = NULL; |
||
159 | // Formatted log data. |
||
160 | char *logData; |
||
161 | size_t logSize; |
||
162 | |||
163 | mosquitto_lib_init(); |
||
164 | |||
165 | if ((mq = mosquitto_new(NULL, true, args)) == NULL) { |
||
166 | printf("[E] Could not create new mosquitto client.\n"); |
||
167 | goto CLEANUP; |
||
168 | } |
||
169 | |||
170 | if (mosquitto_connect(mq, args->address, args->port, 60) != |
||
171 | MOSQ_ERR_SUCCESS) { |
||
172 | printf("[E] Could not connect to Mosquitto broker!\n"); |
||
173 | goto CLEANUP; |
||
174 | } |
||
175 | |||
176 | // Callback to run when message received. |
||
177 | mosquitto_message_callback_set(mq, MMQ_message); |
||
178 | mosquitto_subscribe_callback_set(mq, MMQ_subscribe); |
||
179 | |||
180 | // Subscribe to topic. |
||
181 | mosquitto_subscribe(mq, NULL, args->topic, 0); |
||
182 | |||
183 | do { |
||
184 | // Reconnect if the connection has been lost. |
||
185 | if (mosquitto_loop(mq, -1, 1) != MOSQ_ERR_SUCCESS) |
||
186 | mosquitto_reconnect(mq); |
||
187 | |||
188 | sleep(1); |
||
189 | } while (run); |
||
190 | |||
191 | CLEANUP: |
||
192 | printf("[✓] Shutting down...\n"); |
||
193 | |||
194 | // Disconnect from broker. |
||
195 | mosquitto_disconnect(mq); |
||
196 | |||
197 | // Free Mosquitto client. |
||
198 | mosquitto_destroy(mq); |
||
199 | |||
200 | // Cleanup library. |
||
201 | mosquitto_lib_cleanup(); |
||
202 | |||
203 | // Exit context. |
||
204 | pthread_exit(NULL); |
||
205 | } |
||
206 | |||
207 | int main(int argc, char **argv) { |
||
208 | // For terminal input supression. |
||
209 | struct termios tattr; |
||
210 | struct termios tattr_store; |
||
211 | // Create the logging queue. |
||
212 | Queue *LOG_sendQueue = queueCreate(1); |
||
213 | // The listener thread. |
||
214 | pthread_t connectThread; |
||
215 | // The LOG-S thread. |
||
216 | pthread_t logThread; |
||
217 | // The thread arguments. |
||
218 | Targs *args = (Targs *)calloc(1, sizeof(Targs)); |
||
219 | |||
220 | if (argc < 5) { |
||
221 | printf("\n"); |
||
222 | printf("Syntax: %s <address> <port> <topic> <LOG-S Address> <LOG-S Port>\n", |
||
223 | argv[0]); |
||
224 | printf("Example: %s 127.0.0.1 1883 temperature 127.0.0.1 2500\n", argv[0]); |
||
225 | printf("\n"); |
||
226 | return -1; |
||
227 | } |
||
228 | |||
229 | // Suppress console input for clarity if this is a terminal. |
||
230 | if (isatty(STDIN_FILENO)) { |
||
231 | // Save the attributes and restore them on program termination. |
||
232 | tcgetattr(STDIN_FILENO, &tattr_store); |
||
233 | tcgetattr(STDIN_FILENO, &tattr); |
||
234 | tattr.c_lflag &= ~(ICANON | ECHO); |
||
235 | tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr); |
||
236 | } |
||
237 | |||
238 | // Let there be bling! |
||
239 | printf("%s\n", MOSQUITTO_BANNER); |
||
240 | printf("\tTip: Use Ctrl-c to end program.\n\n"); |
||
241 | |||
242 | // Bind to SIGHUP and SIGINT. |
||
243 | signal(SIGHUP, trap); |
||
244 | signal(SIGINT, trap); |
||
245 | |||
246 | // Create thread arguments. |
||
247 | args->LOG_dataQueue = LOG_sendQueue; |
||
248 | args->address = (char *)calloc(strlen(argv[1]) + 1, sizeof(char)); |
||
249 | strncat(args->address, argv[1], strlen(argv[1])); |
||
250 | args->port = atoi(argv[2]); |
||
251 | args->topic = (char *)calloc(strlen(argv[3]) + 1, sizeof(char)); |
||
252 | strncat(args->topic, argv[3], strlen(argv[3])); |
||
253 | args->logAddress = (char *)calloc(strlen(argv[4]) + 1, sizeof(char)); |
||
254 | strncat(args->logAddress, argv[4], strlen(argv[4])); |
||
255 | args->logPort = atoi(argv[5]); |
||
256 | |||
257 | // Create ZeroMQ processing thread. |
||
258 | if (pthread_create(&connectThread, NULL, client, (void *)args)) |
||
259 | printf("[E] Failed to create Mosquitto thread!\n"); |
||
260 | |||
261 | // Create the LOG-S thread. |
||
262 | if (pthread_create(&logThread, NULL, pumpsLOG, (void *)args)) |
||
263 | printf("[E] Failed to create LOG-S thread!\n"); |
||
264 | |||
265 | // Await thread termination. |
||
266 | pthread_join(connectThread, NULL); |
||
267 | |||
268 | // If this is a terminal, restore the terminal attributes. |
||
269 | if (isatty(STDIN_FILENO)) |
||
270 | tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store); |
||
271 | |||
272 | return 0; |
||
273 | } |