zeroSquitto – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /*************************************************************************/
2 /* ZeroMQ client - ZeroMQ and Mosquitto Publisher-Subscriber Project */
3 /*************************************************************************/
4  
5 #include "constants.h"
6 #include "queue.h"
7 #include <arpa/inet.h>
8 #include <limits.h>
9 #include <netinet/in.h>
10 #include <pthread.h>
11 #include <signal.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <sys/socket.h>
16 #include <sys/types.h>
17 #include <termios.h>
18 #include <unistd.h>
19 #include <zmq.h>
20  
21 // Used for program termination.
22 static volatile int run = 1;
23  
24 // Stucture passed as argument to threads.
25 typedef struct {
26 char *endpoint;
27  
28 // For sending messages to LOG-S.
29 Queue *LOG_dataQueue;
30 char *logAddress;
31 int logPort;
32 } Targs;
33  
34 // Handles SIGHUP and SIGINT (Ctrl + C)
35 void trap(int signal) {
36 printf("[✓] Received interrupt, terminating...\n");
37 switch (signal) {
38 case SIGHUP:
39 case SIGINT:
40 run = 0;
41 break;
42 }
43 }
44  
45 void *pumpsLOG(void *argsThread) {
46 // Function arguments.
47 Targs *args = (Targs *)argsThread;
48 // Will point to log message to send to server.
49 char *data;
50 // The queue containing messages to send to LOG-S.
51 Queue *sendQueue = args->LOG_dataQueue;
52 int serverSocket;
53 struct sockaddr_in serverSockAddr;
54 struct hostent *server;
55  
56 printf("[✓] ( LOG-S™ ) Pumping logs to %s:%d.\n", args->logAddress,
57 args->logPort);
58  
59 // Spin around and post messages to LOG-S.
60 do {
61 // Nothing to post, so don't bother.
62 if (queueIsEmpty(sendQueue)) {
63 sleep(1);
64 continue;
65 }
66  
67 // Garbage was pushed onto the logging queue.
68 if ((data = queueDequeue(sendQueue)) == NULL || strlen(data) == 0)
69 continue;
70  
71 // Create socket.
72 if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
73 printf("[E] Error creating socket.\n");
74 goto CLEANUP;
75 }
76  
77 bzero((char *)&serverSockAddr, sizeof(serverSockAddr));
78 serverSockAddr.sin_family = AF_INET;
79 serverSockAddr.sin_addr.s_addr = inet_addr(args->logAddress);
80 serverSockAddr.sin_port = htons(args->logPort);
81  
82 if (connect(serverSocket, (struct sockaddr *)&serverSockAddr,
83 sizeof(serverSockAddr)) < 0) {
84 printf("[E] ( LOG-S™ ) Error connecting to LOG-S.\n");
85 sleep(1);
86 continue;
87 }
88  
89 if (write(serverSocket, data, LOG_MAX_SEND_BYTES) < 0) {
90 printf("[E] ( LOG-S™ ) Error sending log message to LOG-S.\n");
91 continue;
92 }
93  
94 #ifdef DEBUG
95 printf("[✓] ( LOG-S™ ) Log message \"%s\" sent to LOG-S.\n", data);
96 #endif
97  
98 // Close the socket.
99 close(serverSocket);
100 } while (run);
101  
102 CLEANUP:
103  
104 // Close the socket.
105 close(serverSocket);
106  
107 // Exit context.
108 pthread_exit(NULL);
109 }
110  
111 void *client(void *argsThread) {
112 // Function arguments.
113 Targs *args = (Targs *)argsThread;
114 // Create a ZMQ context and a requester.
115 void *context = zmq_ctx_new();
116 void *requester = zmq_socket(context, ZMQ_REQ);
117 // Receive buffer for the request.
118 char *recvData = (char *)calloc(DATA_LENGTH_LIMIT, sizeof(char));
119 // Holds the request number.
120 int request = 0;
121 int zmqTimeout = ZMQ_LINGER_TIME;
122 // The logging queue to send messages to.
123 Queue *logsQueue = args->LOG_dataQueue;
124 // Formatted log data.
125 char *logData;
126 size_t logSize;
127  
128 // Set the linger time to one second for clean shutdown.
129 zmq_setsockopt(requester, ZMQ_LINGER, &zmqTimeout, sizeof(zmqTimeout));
130  
131 printf("[✓] Connecting to endpoint %s...\n", args->endpoint);
132 if (zmq_connect(requester, args->endpoint) != 0) {
133 printf("[E] Error connecting: %s\n", strerror(zmq_errno()));
134 goto CLEANUP;
135 }
136  
137 do {
138 #ifdef DEBUG
139 printf("[✓] (%d) Pulling...\n", request);
140 #endif
141  
142 logSize =
143 strlen("ZMQ-C Sending pull request ") + 1 + MAX_INTEGER_LENGTH + 1;
144 logData = (char *)calloc(logSize, sizeof(char));
145 snprintf(logData, logSize, "ZMQ-C Sending pull request %d", request);
146 queueEnqueue(logsQueue, logData);
147  
148 zmq_send(requester, PULL_METHOD, strlen(PULL_METHOD), ZMQ_DONTWAIT);
149  
150 // Check for data in non-blocking mode.
151 if (zmq_recv(requester, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) {
152 sleep(1);
153 continue;
154 }
155  
156 logSize = strlen("ZMQ-C Request received: ") + 1 + MAX_INTEGER_LENGTH +
157 strlen(recvData) + 1 + 1;
158 logData = (char *)calloc(logSize, sizeof(char));
159 snprintf(logData, logSize, "ZMQ-C Request %d received: %s", request,
160 recvData);
161 printf("[✓] %s\n", logData);
162 queueEnqueue(logsQueue, logData);
163  
164 sleep(1);
165 ++request;
166 } while (run);
167  
168 CLEANUP:
169 printf("[✓] Terminating...\n");
170  
171 if (zmq_disconnect(requester, args->endpoint) != 0)
172 printf("[E] Error disconnecting: %s\n", strerror(zmq_errno()));
173  
174 // Perform cleanups.
175 zmq_close(requester);
176 zmq_ctx_destroy(context);
177  
178 // Release buffers.
179 free(recvData);
180  
181 // Free arguments.
182 free(args);
183  
184 // Exit context.
185 pthread_exit(NULL);
186 }
187  
188 int main(int argc, char **argv) {
189 // For terminal input supression.
190 struct termios tattr;
191 struct termios tattr_store;
192 // The listener thread.
193 pthread_t connectThread;
194 // Create the logging queue.
195 Queue *LOG_sendQueue = queueCreate(1);
196 // The LOG-S thread.
197 pthread_t logThread;
198 // The thread arguments.
199 Targs *args = (Targs *)calloc(1, sizeof(Targs));
200  
201 if (argc < 4) {
202 printf("\n");
203 printf("Syntax: %s <endpoint> <LOG-S Address> <LOG-S Port>\n", argv[0]);
204 printf("Example: %s tcp://127.0.0.1:1025 127.0.0.1 2500\n", argv[0]);
205 printf("\n");
206 return -1;
207 }
208  
209 // Suppress console input for clarity if this is a terminal.
210 if (isatty(STDIN_FILENO)) {
211 // Save the attributes and restore them on program termination.
212 tcgetattr(STDIN_FILENO, &tattr_store);
213 tcgetattr(STDIN_FILENO, &tattr);
214 tattr.c_lflag &= ~(ICANON | ECHO);
215 tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr);
216 }
217  
218 // Let there be bling!
219 printf("%s\n", CLIENT_BANNER);
220 printf("\tTip: Use Ctrl-c to end program.\n\n");
221  
222 // Bind to SIGHUP and SIGINT.
223 signal(SIGHUP, trap);
224 signal(SIGINT, trap);
225  
226 // Create thread arguments.
227 args->LOG_dataQueue = LOG_sendQueue;
228 args->endpoint = (char *)calloc(strlen(argv[1]) + 1, sizeof(char));
229 strncat(args->endpoint, argv[1], strlen(argv[1]));
230 args->logAddress = (char *)calloc(strlen(argv[2]) + 1, sizeof(char));
231 strncat(args->logAddress, argv[2], strlen(argv[2]));
232 args->logPort = atoi(argv[3]);
233  
234 // Create ZeroMQ processing thread.
235 if (pthread_create(&connectThread, NULL, client, (void *)args))
236 printf("[E] Failed to create ZeroMQ thread!\n");
237  
238 // Create the LOG-S thread.
239 if (pthread_create(&logThread, NULL, pumpsLOG, (void *)args))
240 printf("[E] Failed to create LOG-S thread!\n");
241  
242 // Await thread termination.
243 pthread_join(connectThread, NULL);
244  
245 // If this is a terminal, restore the terminal attributes.
246 if (isatty(STDIN_FILENO))
247 tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store);
248  
249 return 0;
250 }