zeroSquitto – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | /*************************************************************************/ |
2 | /* ZeroMQ server - ZeroMQ and Mosquitto Publisher-Subscriber Project */ |
||
3 | /*************************************************************************/ |
||
4 | |||
5 | #include "circularQueue.h" |
||
6 | #include "constants.h" |
||
7 | #include <pthread.h> |
||
8 | #include <signal.h> |
||
9 | #include <stdio.h> |
||
10 | #include <stdlib.h> |
||
11 | #include <string.h> |
||
12 | #include <termios.h> |
||
13 | #include <unistd.h> |
||
14 | #include <zmq.h> |
||
15 | |||
16 | // Used for program termination. |
||
17 | static volatile int run = 1; |
||
18 | |||
19 | // Stucture passed as argument to threads. |
||
20 | typedef struct { |
||
21 | CircularQueue *dataQueue; |
||
22 | char *endpoint; |
||
23 | } Targs; |
||
24 | |||
25 | // Handles SIGHUP and SIGINT (Ctrl + C) |
||
26 | void trap(int signal) { |
||
27 | printf("[✓] Received interrupt, terminating...\n"); |
||
28 | switch (signal) { |
||
29 | case SIGHUP: |
||
30 | case SIGINT: |
||
31 | run = 0; |
||
32 | break; |
||
33 | } |
||
34 | } |
||
35 | |||
36 | void *server(void *argsThread) { |
||
37 | // Function arguments. |
||
38 | Targs *args = (Targs *)argsThread; |
||
39 | // The thread identifier. |
||
40 | CircularQueue *sendQueue = args->dataQueue; |
||
41 | // Create a new context and a responder. |
||
42 | void *context = zmq_ctx_new(); |
||
43 | void *responder = zmq_socket(context, ZMQ_REP); |
||
44 | // Receive buffer for the response. |
||
45 | char *recvData = (char *)calloc(DATA_LENGTH_LIMIT, sizeof(char)); |
||
46 | // Holds a queue item to respond with. |
||
47 | char *sendData = NULL; |
||
48 | // Holds the request number. |
||
49 | int request = 0; |
||
50 | |||
51 | if (zmq_bind(responder, args->endpoint) != 0) { |
||
52 | printf("[E] Could not bind responder on selected endpoint.\n"); |
||
53 | pthread_exit(NULL); |
||
54 | } |
||
55 | |||
56 | printf("[✓] Bound to endpoing %s, listening...\n", args->endpoint); |
||
57 | |||
58 | /* |
||
59 | * Loop over circular queue and respond with queued items. |
||
60 | */ |
||
61 | do { |
||
62 | |||
63 | // Check for data in non-blocking mode. |
||
64 | if (zmq_recv(responder, recvData, DATA_LENGTH_LIMIT, ZMQ_DONTWAIT) == -1) { |
||
65 | sleep(1); |
||
66 | continue; |
||
67 | } |
||
68 | |||
69 | #ifdef DEBUG |
||
70 | printf("[D] Received: %s\n", recvData); |
||
71 | #endif |
||
72 | |||
73 | // Check that this is a PULL request - otherwise continue. |
||
74 | if (strncmp(recvData, PULL_METHOD, DATA_LENGTH_LIMIT) != 0) { |
||
75 | sleep(1); |
||
76 | continue; |
||
77 | } |
||
78 | |||
79 | // Dequeue an item off the circular queue to respond with. |
||
80 | sendData = queueDequeue(sendQueue); |
||
81 | printf("[✓] (%d) Pushing: %s\n", request, sendData); |
||
82 | zmq_send(responder, sendData, strlen(sendData) + 1, 0); |
||
83 | |||
84 | sleep(1); |
||
85 | ++request; |
||
86 | } while (run && !queueIsEmpty(sendQueue)); |
||
87 | |||
88 | // Perform cleanups. |
||
89 | zmq_close(responder); |
||
90 | zmq_ctx_destroy(context); |
||
91 | |||
92 | // Release buffers. |
||
93 | free(recvData); |
||
94 | free(sendData); |
||
95 | // Free the queue. |
||
96 | queueClear(sendQueue); |
||
97 | |||
98 | // Free arguments. |
||
99 | free(args); |
||
100 | // Exit context. |
||
101 | pthread_exit(NULL); |
||
102 | } |
||
103 | |||
104 | int main(int argc, char **argv) { |
||
105 | // For terminal input supression. |
||
106 | struct termios tattr; |
||
107 | struct termios tattr_store; |
||
108 | // Create a circular queue to use for pushing data. |
||
109 | CircularQueue *sendQueue = queueCreate(1); |
||
110 | // The listener thread. |
||
111 | pthread_t listenThread; |
||
112 | // The thread arguments. |
||
113 | Targs *args = (Targs *)calloc(1, sizeof(Targs)); |
||
114 | |||
115 | if (argc < 2) { |
||
116 | printf("[E] First argument must be a listener endpoint, ie: %s\n", |
||
117 | "tcp://*:1025/"); |
||
118 | return -1; |
||
119 | } |
||
120 | |||
121 | // Suppress console input for clarity if this is a terminal. |
||
122 | if (isatty(STDIN_FILENO)) { |
||
123 | // Save the attributes and restore them on program termination. |
||
124 | tcgetattr(STDIN_FILENO, &tattr_store); |
||
125 | tcgetattr(STDIN_FILENO, &tattr); |
||
126 | tattr.c_lflag &= ~(ICANON | ECHO); |
||
127 | tcsetattr(STDIN_FILENO, TCSAFLUSH, &tattr); |
||
128 | } |
||
129 | |||
130 | // Let there be bling! |
||
131 | printf("%s\n", SERVER_BANNER); |
||
132 | printf("\tTip: Use Ctrl-c to end program.\n\n"); |
||
133 | |||
134 | // Bind to SIGHUP and SIGINT. |
||
135 | signal(SIGHUP, trap); |
||
136 | signal(SIGINT, trap); |
||
137 | |||
138 | // Enqueue a few elements to the queue for the exercise. |
||
139 | queueEnqueue(sendQueue, "36C"); |
||
140 | queueEnqueue(sendQueue, "38C"); |
||
141 | queueEnqueue(sendQueue, "40C"); |
||
142 | queueEnqueue(sendQueue, "21C"); |
||
143 | |||
144 | args->dataQueue = sendQueue; |
||
145 | args->endpoint = argv[1]; |
||
146 | |||
147 | // Create ZeroMQ processing thread. |
||
148 | if (pthread_create(&listenThread, NULL, server, (void *)args)) |
||
149 | printf("[E] Failed to create ZeroMQ thread.\n"); |
||
150 | |||
151 | // Await thread termination. |
||
152 | pthread_join(listenThread, NULL); |
||
153 | |||
154 | // If this is a terminal, restore the terminal attributes. |
||
155 | if (isatty(STDIN_FILENO)) |
||
156 | tcsetattr(STDIN_FILENO, TCSANOW, &tattr_store); |
||
157 | |||
158 | return 0; |
||
159 | } |