zeroSquitto – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /*************************************************************************/
2 /* Mosquitto client - ZeroMQ and Mosquitto Publisher-Subscriber Project */
3 /*************************************************************************/
4  
5 #include "constants.h"
6 #include "queue.h"
7 #include <arpa/inet.h>
8 #include <limits.h>
9 #include <mosquitto.h>
10 #include <netinet/in.h>
11 #include <pthread.h>
12 #include <signal.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/socket.h>
17 #include <sys/types.h>
18 #include <termios.h>
19 #include <unistd.h>
20  
21 // Used for program termination.
22 static volatile int run = 1;
23  
24 // Stucture passed as argument to threads.
25 typedef struct {
26 char *address;
27 char *topic;
28 int port;
29  
30 // For sending messages to LOG-S.
31 Queue *LOG_dataQueue;
32 char *logAddress;
33 int logPort;
34 } Targs;
35  
36 // Handles SIGHUP and SIGINT (Ctrl + C)
37 void trap(int signal) {
38 printf("[✓] Received interrupt, terminating...\n");
39 switch (signal) {
40 case SIGHUP:
41 case SIGINT:
42 run = 0;
43 break;
44 }
45 }
46  
47 void *pumpsLOG(void *argsThread) {
48 // Function arguments.
49 Targs *args = (Targs *)argsThread;
50 // Will point to log message to send to server.
51 char *data;
52 // The queue containing messages to send to LOG-S.
53 Queue *sendQueue = args->LOG_dataQueue;
54 int serverSocket;
55 struct sockaddr_in serverSockAddr;
56 struct hostent *server;
57  
58 printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
59 args->logPort);
60  
61 // Spin around and post messages to LOG-S.
62 do {
63 // Nothing to post, so don't bother.
64 if (queueIsEmpty(sendQueue)) {
65 sleep(1);
66 continue;
67 }
68  
69 // Garbage was pushed onto the logging queue.
70 if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
71 continue;
72  
73 // Create socket.
74 if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
75 printf("[E] Error creating socket.\n");
76 sleep(1);
77 continue;
78 }
79  
80 bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
81 serverSockAddr.sin_family = AF_INET;
82 serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
83 serverSockAddr.sin_port = htons(args->logPort);
84  
85 if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
86 sizeof(serverSockAddr)) < 0) {
87 printf("[E] ( LOG-S™ ) Error connecting to LOG-S.\n");
88 sleep(1);
89 continue;
90 }
91  
92 if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
93 printf("[E] ( LOG-S™ ) Error sending log message to LOG-S.\n");
94 continue;
95 }
96  
97 #ifdef DEBUG
98 printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
99 #endif
100  
101 // Close the socket.
102 close(serverSocket);
103 } while (run);
104  
105 CLEANUP:
106  
107 // Close the socket.
108 close(serverSocket);
109  
110 // Exit context.
111 pthread_exit(NULL);
112 }
113  
114 // Callback when results have been published using MQTT.
115 void MMQ_message(struct mosquitto *mosq, void *userdata,
116 const struct mosquitto_message *message) {
117 // The logging queue to send messages to.
118 Targs *args = userdata;
119 // Formatted log data.
120 char *logData;
121 size_t logSize;
122 bool topicMatches;
123  
124 // Check if the topic matches what we have subscribed to.
125 mosquitto_topic_matches_sub(args->topic, message->topic, &topicMatches);
126 if (!topicMatches)
127 return;
128  
129 logSize = strlen("MMQ-C Received message for topic: ") + 1 +
130 strlen(message->payload) + 1 + strlen(message->topic) + 1 + 1;
131 logData = (char *)calloc(logSize, sizeof(char));
132 snprintf(logData, logSize, "MMQ-C Received message %s for topic: %s",
133 message->payload, message->topic);
134 printf("[✓] %s\n", logData);
135 queueEnqueue(args->LOG_dataQueue, logData);
136 }
137  
138 void MMQ_subscribe(struct mosquitto *mosq, void *userdata, int mid,
139 int qos_count, const int *granted_qos) {
140 // The logging queue to send messages to.
141 Targs *args = userdata;
142 // Formatted log data.
143 char *logData;
144 size_t logSize;
145  
146 logSize =
147 strlen("MMQ-C Subscribed to topic: ") + 1 + strlen(args->topic) + 1 + 1;
148 logData = (char *)calloc(logSize, sizeof(char));
149 snprintf(logData, logSize, "MMQ-C Subscribed to topic: %s", args->topic);
150 printf("[✓] %s\n", logData);
151 queueEnqueue(args->LOG_dataQueue, logData);
152 }
153  
154 // Subscribes to MQTT topic.
155 void *client(void *argsThread) {
156 // Function arguments.
157 Targs *args = (Targs *)argsThread;
158 struct mosquitto *mq = NULL;
159 // Formatted log data.
160 char *logData;
161 size_t logSize;
162  
163 mosquitto_lib_init();
164  
165 if ((mq = mosquitto_new(NULL, true, args)) == NULL) {
166 printf("[E] Could not create new mosquitto client.\n");
167 goto CLEANUP;
168 }
169  
170 if (mosquitto_connect(mq, args->address, args->port, 60) !=
171 MOSQ_ERR_SUCCESS) {
172 printf("[E] Could not connect to Mosquitto broker!\n");
173 goto CLEANUP;
174 }
175  
176 // Callback to run when message received.
177 mosquitto_message_callback_set(mq, MMQ_message);
178 mosquitto_subscribe_callback_set(mq, MMQ_subscribe);
179  
180 // Subscribe to topic.
181 mosquitto_subscribe(mq, NULL, args->topic, 0);
182  
183 do {
184 // Reconnect if the connection has been lost.
185 if (mosquitto_loop(mq, -1, 1) != MOSQ_ERR_SUCCESS)
186 mosquitto_reconnect(mq);
187  
188 sleep(1);
189 } while (run);
190  
191 CLEANUP:
192 printf("[✓] Shutting down...\n");
193  
194 // Disconnect from broker.
195 mosquitto_disconnect(mq);
196  
197 // Free Mosquitto client.
198 mosquitto_destroy(mq);
199  
200 // Cleanup library.
201 mosquitto_lib_cleanup();
202  
203 // Exit context.
204 pthread_exit(NULL);
205 }
206  
207 int main(int argc, char **argv) {
208 // For terminal input supression.
209 struct termios tattr;
210 struct termios tattr_store;
211 // Create the logging queue.
212 Queue *LOG_sendQueue = queueCreate(1);
213 // The listener thread.
214 pthread_t connectThread;
215 // The LOG-S thread.
216 pthread_t logThread;
217 // The thread arguments.
218 Targs *args = (Targs *)calloc(1, sizeof(Targs));
219  
220 if (argc < 5) {
221 printf("\n");
222 printf("Syntax: %s <address> <port> <topic> <LOG-S Address> <LOG-S Port>\n",
223 argv[0]);
224 printf("Example: %s 127.0.0.1 1883 temperature 127.0.0.1 2500\n", argv[0]);
225 printf("\n");
226 return -1;
227 }
228  
229 // Suppress console input for clarity if this is a terminal.
230 if (isatty(STDIN_FILENO)) {
231 // Save the attributes and restore them on program termination.
232 tcgetattr(STDIN_FILENO, &tattr_store);
233 tcgetattr(STDIN_FILENO, &tattr);
234 tattr.c_lflag &= ~(ICANON | ECHO);
235 tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
236 }
237  
238 // Let there be bling!
239 printf("%s\n", MOSQUITTO_BANNER);
240 printf("\tTip: Use Ctrl-c to end program.\n\n");
241  
242 // Bind to SIGHUP and SIGINT.
243 signal(SIGHUP, trap);
244 signal(SIGINT, trap);
245  
246 // Create thread arguments.
247 args->LOG_dataQueue = LOG_sendQueue;
248 args->address = (char *)calloc(strlen(argv[1]) + 1, sizeof(char));
249 strncat(args->address, argv[1], strlen(argv[1]));
250 args->port = atoi(argv[2]);
251 args->topic = (char *)calloc(strlen(argv[3]) + 1, sizeof(char));
252 strncat(args->topic, argv[3], strlen(argv[3]));
253 args->logAddress = (char *)calloc(strlen(argv[4]) + 1, sizeof(char));
254 strncat(args->logAddress, argv[4], strlen(argv[4]));
255 args->logPort = atoi(argv[5]);
256  
257 // Create ZeroMQ processing thread.
258 if (pthread_create(&connectThread, NULL, client, (void *)args))
259 printf("[E] Failed to create Mosquitto thread!\n");
260  
261 // Create the LOG-S thread.
262 if (pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
263 printf("[E] Failed to create LOG-S thread!\n");
264  
265 // Await thread termination.
266 pthread_join(connectThread, NULL);
267  
268 // If this is a terminal, restore the terminal attributes.
269 if (isatty(STDIN_FILENO))
270 tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);
271  
272 return 0;
273 }