BadVPN – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | /** |
2 | * @file |
||
3 | * MQTT client |
||
4 | * |
||
5 | * @defgroup mqtt MQTT client |
||
6 | * @ingroup apps |
||
7 | * @verbinclude mqtt_client.txt |
||
8 | */ |
||
9 | |||
10 | /* |
||
11 | * Copyright (c) 2016 Erik Andersson <erian747@gmail.com> |
||
12 | * All rights reserved. |
||
13 | * |
||
14 | * Redistribution and use in source and binary forms, with or without modification, |
||
15 | * are permitted provided that the following conditions are met: |
||
16 | * |
||
17 | * 1. Redistributions of source code must retain the above copyright notice, |
||
18 | * this list of conditions and the following disclaimer. |
||
19 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
||
20 | * this list of conditions and the following disclaimer in the documentation |
||
21 | * and/or other materials provided with the distribution. |
||
22 | * 3. The name of the author may not be used to endorse or promote products |
||
23 | * derived from this software without specific prior written permission. |
||
24 | * |
||
25 | * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED |
||
26 | * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF |
||
27 | * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT |
||
28 | * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
||
29 | * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT |
||
30 | * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
||
31 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
||
32 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING |
||
33 | * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY |
||
34 | * OF SUCH DAMAGE. |
||
35 | * |
||
36 | * This file is part of the lwIP TCP/IP stack |
||
37 | * |
||
38 | * Author: Erik Andersson <erian747@gmail.com> |
||
39 | * |
||
40 | * |
||
41 | * @todo: |
||
42 | * - Handle large outgoing payloads for PUBLISH messages |
||
43 | * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics) |
||
44 | * - Add support for legacy MQTT protocol version |
||
45 | * |
||
46 | * Please coordinate changes and requests with Erik Andersson |
||
47 | * Erik Andersson <erian747@gmail.com> |
||
48 | * |
||
49 | */ |
||
50 | #include "lwip/apps/mqtt.h" |
||
51 | #include "lwip/apps/mqtt_priv.h" |
||
52 | #include "lwip/timeouts.h" |
||
53 | #include "lwip/ip_addr.h" |
||
54 | #include "lwip/mem.h" |
||
55 | #include "lwip/err.h" |
||
56 | #include "lwip/pbuf.h" |
||
57 | #include "lwip/altcp.h" |
||
58 | #include "lwip/altcp_tcp.h" |
||
59 | #include "lwip/altcp_tls.h" |
||
60 | #include <string.h> |
||
61 | |||
62 | #if LWIP_TCP && LWIP_CALLBACK_API |
||
63 | |||
64 | /** |
||
65 | * MQTT_DEBUG: Default is off. |
||
66 | */ |
||
67 | #if !defined MQTT_DEBUG || defined __DOXYGEN__ |
||
68 | #define MQTT_DEBUG LWIP_DBG_OFF |
||
69 | #endif |
||
70 | |||
71 | #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE) |
||
72 | #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE) |
||
73 | #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING) |
||
74 | #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE) |
||
75 | #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS) |
||
76 | |||
77 | |||
78 | |||
79 | /** |
||
80 | * MQTT client connection states |
||
81 | */ |
||
82 | enum { |
||
83 | TCP_DISCONNECTED, |
||
84 | TCP_CONNECTING, |
||
85 | MQTT_CONNECTING, |
||
86 | MQTT_CONNECTED |
||
87 | }; |
||
88 | |||
89 | /** |
||
90 | * MQTT control message types |
||
91 | */ |
||
92 | enum mqtt_message_type { |
||
93 | MQTT_MSG_TYPE_CONNECT = 1, |
||
94 | MQTT_MSG_TYPE_CONNACK = 2, |
||
95 | MQTT_MSG_TYPE_PUBLISH = 3, |
||
96 | MQTT_MSG_TYPE_PUBACK = 4, |
||
97 | MQTT_MSG_TYPE_PUBREC = 5, |
||
98 | MQTT_MSG_TYPE_PUBREL = 6, |
||
99 | MQTT_MSG_TYPE_PUBCOMP = 7, |
||
100 | MQTT_MSG_TYPE_SUBSCRIBE = 8, |
||
101 | MQTT_MSG_TYPE_SUBACK = 9, |
||
102 | MQTT_MSG_TYPE_UNSUBSCRIBE = 10, |
||
103 | MQTT_MSG_TYPE_UNSUBACK = 11, |
||
104 | MQTT_MSG_TYPE_PINGREQ = 12, |
||
105 | MQTT_MSG_TYPE_PINGRESP = 13, |
||
106 | MQTT_MSG_TYPE_DISCONNECT = 14 |
||
107 | }; |
||
108 | |||
109 | /** Helpers to extract control packet type and qos from first byte in fixed header */ |
||
110 | #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4) |
||
111 | #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1) |
||
112 | |||
113 | /** |
||
114 | * MQTT connect flags, only used in CONNECT message |
||
115 | */ |
||
116 | enum mqtt_connect_flag { |
||
117 | MQTT_CONNECT_FLAG_USERNAME = 1 << 7, |
||
118 | MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, |
||
119 | MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, |
||
120 | MQTT_CONNECT_FLAG_WILL = 1 << 2, |
||
121 | MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 |
||
122 | }; |
||
123 | |||
124 | |||
125 | static void mqtt_cyclic_timer(void *arg); |
||
126 | |||
127 | #if defined(LWIP_DEBUG) |
||
128 | static const char *const mqtt_message_type_str[15] = { |
||
129 | "UNDEFINED", |
||
130 | "CONNECT", |
||
131 | "CONNACK", |
||
132 | "PUBLISH", |
||
133 | "PUBACK", |
||
134 | "PUBREC", |
||
135 | "PUBREL", |
||
136 | "PUBCOMP", |
||
137 | "SUBSCRIBE", |
||
138 | "SUBACK", |
||
139 | "UNSUBSCRIBE", |
||
140 | "UNSUBACK", |
||
141 | "PINGREQ", |
||
142 | "PINGRESP", |
||
143 | "DISCONNECT" |
||
144 | }; |
||
145 | |||
146 | /** |
||
147 | * Message type value to string |
||
148 | * @param msg_type see enum mqtt_message_type |
||
149 | * |
||
150 | * @return Control message type text string |
||
151 | */ |
||
152 | static const char * |
||
153 | mqtt_msg_type_to_str(u8_t msg_type) |
||
154 | { |
||
155 | if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) { |
||
156 | msg_type = 0; |
||
157 | } |
||
158 | return mqtt_message_type_str[msg_type]; |
||
159 | } |
||
160 | |||
161 | #endif |
||
162 | |||
163 | |||
164 | /** |
||
165 | * Generate MQTT packet identifier |
||
166 | * @param client MQTT client |
||
167 | * @return New packet identifier, range 1 to 65535 |
||
168 | */ |
||
169 | static u16_t |
||
170 | msg_generate_packet_id(mqtt_client_t *client) |
||
171 | { |
||
172 | client->pkt_id_seq++; |
||
173 | if (client->pkt_id_seq == 0) { |
||
174 | client->pkt_id_seq++; |
||
175 | } |
||
176 | return client->pkt_id_seq; |
||
177 | } |
||
178 | |||
179 | /*--------------------------------------------------------------------------------------------------------------------- */ |
||
180 | /* Output ring buffer */ |
||
181 | |||
182 | /** Add single item to ring buffer */ |
||
183 | static void |
||
184 | mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item) |
||
185 | { |
||
186 | rb->buf[rb->put] = item; |
||
187 | rb->put++; |
||
188 | if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) { |
||
189 | rb->put = 0; |
||
190 | } |
||
191 | } |
||
192 | |||
193 | /** Return pointer to ring buffer get position */ |
||
194 | static u8_t * |
||
195 | mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb) |
||
196 | { |
||
197 | return &rb->buf[rb->get]; |
||
198 | } |
||
199 | |||
200 | static void |
||
201 | mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len) |
||
202 | { |
||
203 | LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE); |
||
204 | |||
205 | rb->get += len; |
||
206 | if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) { |
||
207 | rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE; |
||
208 | } |
||
209 | } |
||
210 | |||
211 | /** Return number of bytes in ring buffer */ |
||
212 | static u16_t |
||
213 | mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb) |
||
214 | { |
||
215 | u32_t len = rb->put - rb->get; |
||
216 | if (len > 0xFFFF) { |
||
217 | len += MQTT_OUTPUT_RINGBUF_SIZE; |
||
218 | } |
||
219 | return (u16_t)len; |
||
220 | } |
||
221 | |||
222 | /** Return number of bytes free in ring buffer */ |
||
223 | #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb)) |
||
224 | |||
225 | /** Return number of bytes possible to read without wrapping around */ |
||
226 | #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get)) |
||
227 | |||
228 | /** |
||
229 | * Try send as many bytes as possible from output ring buffer |
||
230 | * @param rb Output ring buffer |
||
231 | * @param tpcb TCP connection handle |
||
232 | */ |
||
233 | static void |
||
234 | mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb) |
||
235 | { |
||
236 | err_t err; |
||
237 | u8_t wrap = 0; |
||
238 | u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb); |
||
239 | u16_t send_len = altcp_sndbuf(tpcb); |
||
240 | LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); |
||
241 | |||
242 | if (send_len == 0 || ringbuf_lin_len == 0) { |
||
243 | return; |
||
244 | } |
||
245 | |||
246 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n", |
||
247 | send_len, ringbuf_lin_len, rb->get, rb->put)); |
||
248 | |||
249 | if (send_len > ringbuf_lin_len) { |
||
250 | /* Space in TCP output buffer is larger than available in ring buffer linear portion */ |
||
251 | send_len = ringbuf_lin_len; |
||
252 | /* Wrap around if more data in ring buffer after linear portion */ |
||
253 | wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len); |
||
254 | } |
||
255 | err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0)); |
||
256 | if ((err == ERR_OK) && wrap) { |
||
257 | mqtt_ringbuf_advance_get_idx(rb, send_len); |
||
258 | /* Use the lesser one of ring buffer linear length and TCP send buffer size */ |
||
259 | send_len = LWIP_MIN(altcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb)); |
||
260 | err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY); |
||
261 | } |
||
262 | |||
263 | if (err == ERR_OK) { |
||
264 | mqtt_ringbuf_advance_get_idx(rb, send_len); |
||
265 | /* Flush */ |
||
266 | altcp_output(tpcb); |
||
267 | } else { |
||
268 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err))); |
||
269 | } |
||
270 | } |
||
271 | |||
272 | |||
273 | |||
274 | /*--------------------------------------------------------------------------------------------------------------------- */ |
||
275 | /* Request queue */ |
||
276 | |||
277 | /** |
||
278 | * Create request item |
||
279 | * @param r_objs Pointer to request objects |
||
280 | * @param pkt_id Packet identifier of request |
||
281 | * @param cb Packet callback to call when requests lifetime ends |
||
282 | * @param arg Parameter following callback |
||
283 | * @return Request or NULL if failed to create |
||
284 | */ |
||
285 | static struct mqtt_request_t * |
||
286 | mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg) |
||
287 | { |
||
288 | struct mqtt_request_t *r = NULL; |
||
289 | u8_t n; |
||
290 | LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); |
||
291 | for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) { |
||
292 | /* Item point to itself if not in use */ |
||
293 | if (r_objs[n].next == &r_objs[n]) { |
||
294 | r = &r_objs[n]; |
||
295 | r->next = NULL; |
||
296 | r->cb = cb; |
||
297 | r->arg = arg; |
||
298 | r->pkt_id = pkt_id; |
||
299 | break; |
||
300 | } |
||
301 | } |
||
302 | return r; |
||
303 | } |
||
304 | |||
305 | |||
306 | /** |
||
307 | * Append request to pending request queue |
||
308 | * @param tail Pointer to request queue tail pointer |
||
309 | * @param r Request to append |
||
310 | */ |
||
311 | static void |
||
312 | mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r) |
||
313 | { |
||
314 | struct mqtt_request_t *head = NULL; |
||
315 | s16_t time_before = 0; |
||
316 | struct mqtt_request_t *iter; |
||
317 | |||
318 | LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); |
||
319 | |||
320 | /* Iterate trough queue to find head, and count total timeout time */ |
||
321 | for (iter = *tail; iter != NULL; iter = iter->next) { |
||
322 | time_before += iter->timeout_diff; |
||
323 | head = iter; |
||
324 | } |
||
325 | |||
326 | LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT); |
||
327 | r->timeout_diff = MQTT_REQ_TIMEOUT - time_before; |
||
328 | if (head == NULL) { |
||
329 | *tail = r; |
||
330 | } else { |
||
331 | head->next = r; |
||
332 | } |
||
333 | } |
||
334 | |||
335 | |||
336 | /** |
||
337 | * Delete request item |
||
338 | * @param r Request item to delete |
||
339 | */ |
||
340 | static void |
||
341 | mqtt_delete_request(struct mqtt_request_t *r) |
||
342 | { |
||
343 | if (r != NULL) { |
||
344 | r->next = r; |
||
345 | } |
||
346 | } |
||
347 | |||
348 | /** |
||
349 | * Remove a request item with a specific packet identifier from request queue |
||
350 | * @param tail Pointer to request queue tail pointer |
||
351 | * @param pkt_id Packet identifier of request to take |
||
352 | * @return Request item if found, NULL if not |
||
353 | */ |
||
354 | static struct mqtt_request_t * |
||
355 | mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) |
||
356 | { |
||
357 | struct mqtt_request_t *iter = NULL, *prev = NULL; |
||
358 | LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); |
||
359 | /* Search all request for pkt_id */ |
||
360 | for (iter = *tail; iter != NULL; iter = iter->next) { |
||
361 | if (iter->pkt_id == pkt_id) { |
||
362 | break; |
||
363 | } |
||
364 | prev = iter; |
||
365 | } |
||
366 | |||
367 | /* If request was found */ |
||
368 | if (iter != NULL) { |
||
369 | /* unchain */ |
||
370 | if (prev == NULL) { |
||
371 | *tail = iter->next; |
||
372 | } else { |
||
373 | prev->next = iter->next; |
||
374 | } |
||
375 | /* If exists, add remaining timeout time for the request to next */ |
||
376 | if (iter->next != NULL) { |
||
377 | iter->next->timeout_diff += iter->timeout_diff; |
||
378 | } |
||
379 | iter->next = NULL; |
||
380 | } |
||
381 | return iter; |
||
382 | } |
||
383 | |||
384 | /** |
||
385 | * Handle requests timeout |
||
386 | * @param tail Pointer to request queue tail pointer |
||
387 | * @param t Time since last call in seconds |
||
388 | */ |
||
389 | static void |
||
390 | mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) |
||
391 | { |
||
392 | struct mqtt_request_t *r; |
||
393 | LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); |
||
394 | r = *tail; |
||
395 | while (t > 0 && r != NULL) { |
||
396 | if (t >= r->timeout_diff) { |
||
397 | t -= (u8_t)r->timeout_diff; |
||
398 | /* Unchain */ |
||
399 | *tail = r->next; |
||
400 | /* Notify upper layer about timeout */ |
||
401 | if (r->cb != NULL) { |
||
402 | r->cb(r->arg, ERR_TIMEOUT); |
||
403 | } |
||
404 | mqtt_delete_request(r); |
||
405 | /* Tail might be be modified in callback, so re-read it in every iteration */ |
||
406 | r = *(struct mqtt_request_t *const volatile *)tail; |
||
407 | } else { |
||
408 | r->timeout_diff -= t; |
||
409 | t = 0; |
||
410 | } |
||
411 | } |
||
412 | } |
||
413 | |||
414 | /** |
||
415 | * Free all request items |
||
416 | * @param tail Pointer to request queue tail pointer |
||
417 | */ |
||
418 | static void |
||
419 | mqtt_clear_requests(struct mqtt_request_t **tail) |
||
420 | { |
||
421 | struct mqtt_request_t *iter, *next; |
||
422 | LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); |
||
423 | for (iter = *tail; iter != NULL; iter = next) { |
||
424 | next = iter->next; |
||
425 | mqtt_delete_request(iter); |
||
426 | } |
||
427 | *tail = NULL; |
||
428 | } |
||
429 | /** |
||
430 | * Initialize all request items |
||
431 | * @param r_objs Pointer to request objects |
||
432 | */ |
||
433 | static void |
||
434 | mqtt_init_requests(struct mqtt_request_t *r_objs) |
||
435 | { |
||
436 | u8_t n; |
||
437 | LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); |
||
438 | for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) { |
||
439 | /* Item pointing to itself indicates unused */ |
||
440 | r_objs[n].next = &r_objs[n]; |
||
441 | } |
||
442 | } |
||
443 | |||
444 | /*--------------------------------------------------------------------------------------------------------------------- */ |
||
445 | /* Output message build helpers */ |
||
446 | |||
447 | |||
448 | static void |
||
449 | mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value) |
||
450 | { |
||
451 | mqtt_ringbuf_put(rb, value); |
||
452 | } |
||
453 | |||
454 | static |
||
455 | void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value) |
||
456 | { |
||
457 | mqtt_ringbuf_put(rb, value >> 8); |
||
458 | mqtt_ringbuf_put(rb, value & 0xff); |
||
459 | } |
||
460 | |||
461 | static void |
||
462 | mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length) |
||
463 | { |
||
464 | u16_t n; |
||
465 | for (n = 0; n < length; n++) { |
||
466 | mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]); |
||
467 | } |
||
468 | } |
||
469 | |||
470 | static void |
||
471 | mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length) |
||
472 | { |
||
473 | u16_t n; |
||
474 | mqtt_ringbuf_put(rb, length >> 8); |
||
475 | mqtt_ringbuf_put(rb, length & 0xff); |
||
476 | for (n = 0; n < length; n++) { |
||
477 | mqtt_ringbuf_put(rb, str[n]); |
||
478 | } |
||
479 | } |
||
480 | |||
481 | /** |
||
482 | * Append fixed header |
||
483 | * @param rb Output ring buffer |
||
484 | * @param msg_type see enum mqtt_message_type |
||
485 | * @param fdup MQTT DUP flag |
||
486 | * @param fqos MQTT QoS field |
||
487 | * @param fretain MQTT retain flag |
||
488 | * @param r_length Remaining length after fixed header |
||
489 | */ |
||
490 | |||
491 | static void |
||
492 | mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup, |
||
493 | u8_t fqos, u8_t fretain, u16_t r_length) |
||
494 | { |
||
495 | /* Start with control byte */ |
||
496 | mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1))); |
||
497 | /* Encode remaining length field */ |
||
498 | do { |
||
499 | mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0)); |
||
500 | r_length >>= 7; |
||
501 | } while (r_length > 0); |
||
502 | } |
||
503 | |||
504 | |||
505 | /** |
||
506 | * Check output buffer space |
||
507 | * @param rb Output ring buffer |
||
508 | * @param r_length Remaining length after fixed header |
||
509 | * @return 1 if message will fit, 0 if not enough buffer space |
||
510 | */ |
||
511 | static u8_t |
||
512 | mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length) |
||
513 | { |
||
514 | /* Start with length of type byte + remaining length */ |
||
515 | u16_t total_len = 1 + r_length; |
||
516 | |||
517 | LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); |
||
518 | |||
519 | /* Calculate number of required bytes to contain the remaining bytes field and add to total*/ |
||
520 | do { |
||
521 | total_len++; |
||
522 | r_length >>= 7; |
||
523 | } while (r_length > 0); |
||
524 | |||
525 | return (total_len <= mqtt_ringbuf_free(rb)); |
||
526 | } |
||
527 | |||
528 | |||
529 | /** |
||
530 | * Close connection to server |
||
531 | * @param client MQTT client |
||
532 | * @param reason Reason for disconnection |
||
533 | */ |
||
534 | static void |
||
535 | mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason) |
||
536 | { |
||
537 | LWIP_ASSERT("mqtt_close: client != NULL", client != NULL); |
||
538 | |||
539 | /* Bring down TCP connection if not already done */ |
||
540 | if (client->conn != NULL) { |
||
541 | err_t res; |
||
542 | altcp_recv(client->conn, NULL); |
||
543 | altcp_err(client->conn, NULL); |
||
544 | altcp_sent(client->conn, NULL); |
||
545 | res = altcp_close(client->conn); |
||
546 | if (res != ERR_OK) { |
||
547 | altcp_abort(client->conn); |
||
548 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res))); |
||
549 | } |
||
550 | client->conn = NULL; |
||
551 | } |
||
552 | |||
553 | /* Remove all pending requests */ |
||
554 | mqtt_clear_requests(&client->pend_req_queue); |
||
555 | /* Stop cyclic timer */ |
||
556 | sys_untimeout(mqtt_cyclic_timer, client); |
||
557 | |||
558 | /* Notify upper layer of disconnection if changed state */ |
||
559 | if (client->conn_state != TCP_DISCONNECTED) { |
||
560 | |||
561 | client->conn_state = TCP_DISCONNECTED; |
||
562 | if (client->connect_cb != NULL) { |
||
563 | client->connect_cb(client, client->connect_arg, reason); |
||
564 | } |
||
565 | } |
||
566 | } |
||
567 | |||
568 | |||
569 | /** |
||
570 | * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states |
||
571 | * @param arg MQTT client |
||
572 | */ |
||
573 | static void |
||
574 | mqtt_cyclic_timer(void *arg) |
||
575 | { |
||
576 | u8_t restart_timer = 1; |
||
577 | mqtt_client_t *client = (mqtt_client_t *)arg; |
||
578 | LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL); |
||
579 | |||
580 | if (client->conn_state == MQTT_CONNECTING) { |
||
581 | client->cyclic_tick++; |
||
582 | if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) { |
||
583 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n")); |
||
584 | /* Disconnect TCP */ |
||
585 | mqtt_close(client, MQTT_CONNECT_TIMEOUT); |
||
586 | restart_timer = 0; |
||
587 | } |
||
588 | } else if (client->conn_state == MQTT_CONNECTED) { |
||
589 | /* Handle timeout for pending requests */ |
||
590 | mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL); |
||
591 | |||
592 | /* keep_alive > 0 means keep alive functionality shall be used */ |
||
593 | if (client->keep_alive > 0) { |
||
594 | |||
595 | client->server_watchdog++; |
||
596 | /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */ |
||
597 | if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) { |
||
598 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n")); |
||
599 | mqtt_close(client, MQTT_CONNECT_TIMEOUT); |
||
600 | restart_timer = 0; |
||
601 | } |
||
602 | |||
603 | /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */ |
||
604 | if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) { |
||
605 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n")); |
||
606 | if (mqtt_output_check_space(&client->output, 0) != 0) { |
||
607 | mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0); |
||
608 | client->cyclic_tick = 0; |
||
609 | } |
||
610 | } else { |
||
611 | client->cyclic_tick++; |
||
612 | } |
||
613 | } |
||
614 | } else { |
||
615 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state)); |
||
616 | restart_timer = 0; |
||
617 | } |
||
618 | if (restart_timer) { |
||
619 | sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg); |
||
620 | } |
||
621 | } |
||
622 | |||
623 | |||
624 | /** |
||
625 | * Send PUBACK, PUBREC or PUBREL response message |
||
626 | * @param client MQTT client |
||
627 | * @param msg PUBACK, PUBREC or PUBREL |
||
628 | * @param pkt_id Packet identifier |
||
629 | * @param qos QoS value |
||
630 | * @return ERR_OK if successful, ERR_MEM if out of memory |
||
631 | */ |
||
632 | static err_t |
||
633 | pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos) |
||
634 | { |
||
635 | err_t err = ERR_OK; |
||
636 | if (mqtt_output_check_space(&client->output, 2)) { |
||
637 | mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2); |
||
638 | mqtt_output_append_u16(&client->output, pkt_id); |
||
639 | mqtt_output_send(&client->output, client->conn); |
||
640 | } else { |
||
641 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n", |
||
642 | mqtt_msg_type_to_str(msg), pkt_id)); |
||
643 | err = ERR_MEM; |
||
644 | } |
||
645 | return err; |
||
646 | } |
||
647 | |||
648 | /** |
||
649 | * Subscribe response from server |
||
650 | * @param r Matching request |
||
651 | * @param result Result code from server |
||
652 | */ |
||
653 | static void |
||
654 | mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result) |
||
655 | { |
||
656 | if (r->cb != NULL) { |
||
657 | r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT); |
||
658 | } |
||
659 | } |
||
660 | |||
661 | |||
662 | /** |
||
663 | * Complete MQTT message received or buffer full |
||
664 | * @param client MQTT client |
||
665 | * @param fixed_hdr_idx header index |
||
666 | * @param length length received part |
||
667 | * @param remaining_length Remaining length of complete message |
||
668 | */ |
||
669 | static mqtt_connection_status_t |
||
670 | mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length) |
||
671 | { |
||
672 | mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; |
||
673 | |||
674 | u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx; |
||
675 | |||
676 | /* Control packet type */ |
||
677 | u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); |
||
678 | u16_t pkt_id = 0; |
||
679 | |||
680 | LWIP_ERROR("buffer length mismatch", fixed_hdr_idx + length <= MQTT_VAR_HEADER_BUFFER_LEN, |
||
681 | return MQTT_CONNECT_DISCONNECTED); |
||
682 | |||
683 | if (pkt_type == MQTT_MSG_TYPE_CONNACK) { |
||
684 | if (client->conn_state == MQTT_CONNECTING) { |
||
685 | /* Get result code from CONNACK */ |
||
686 | res = (mqtt_connection_status_t)var_hdr_payload[1]; |
||
687 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res)); |
||
688 | if (res == MQTT_CONNECT_ACCEPTED) { |
||
689 | /* Reset cyclic_tick when changing to connected state */ |
||
690 | client->cyclic_tick = 0; |
||
691 | client->conn_state = MQTT_CONNECTED; |
||
692 | /* Notify upper layer */ |
||
693 | if (client->connect_cb != 0) { |
||
694 | client->connect_cb(client, client->connect_arg, res); |
||
695 | } |
||
696 | } |
||
697 | } else { |
||
698 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n")); |
||
699 | } |
||
700 | } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) { |
||
701 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n")); |
||
702 | |||
703 | } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) { |
||
704 | u16_t payload_offset = 0; |
||
705 | u16_t payload_length = length; |
||
706 | u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]); |
||
707 | |||
708 | if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) { |
||
709 | /* Should have topic and pkt id*/ |
||
710 | u8_t *topic; |
||
711 | u16_t after_topic; |
||
712 | u8_t bkp; |
||
713 | u16_t topic_len = var_hdr_payload[0]; |
||
714 | topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]); |
||
715 | |||
716 | topic = var_hdr_payload + 2; |
||
717 | after_topic = 2 + topic_len; |
||
718 | /* Check length, add one byte even for QoS 0 so that zero termination will fit */ |
||
719 | if ((after_topic + (qos ? 2 : 1)) > length) { |
||
720 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n")); |
||
721 | goto out_disconnect; |
||
722 | } |
||
723 | |||
724 | /* id for QoS 1 and 2 */ |
||
725 | if (qos > 0) { |
||
726 | client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1]; |
||
727 | after_topic += 2; |
||
728 | } else { |
||
729 | client->inpub_pkt_id = 0; |
||
730 | } |
||
731 | /* Take backup of byte after topic */ |
||
732 | bkp = topic[topic_len]; |
||
733 | /* Zero terminate string */ |
||
734 | topic[topic_len] = 0; |
||
735 | /* Payload data remaining in receive buffer */ |
||
736 | payload_length = length - after_topic; |
||
737 | payload_offset = after_topic; |
||
738 | |||
739 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n", |
||
740 | qos, topic, remaining_length + payload_length)); |
||
741 | if (client->pub_cb != NULL) { |
||
742 | client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length); |
||
743 | } |
||
744 | /* Restore byte after topic */ |
||
745 | topic[topic_len] = bkp; |
||
746 | } |
||
747 | if (payload_length > 0 || remaining_length == 0) { |
||
748 | client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0); |
||
749 | /* Reply if QoS > 0 */ |
||
750 | if (remaining_length == 0 && qos > 0) { |
||
751 | /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */ |
||
752 | u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC; |
||
753 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n", |
||
754 | mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id)); |
||
755 | pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0); |
||
756 | } |
||
757 | } |
||
758 | } else { |
||
759 | /* Get packet identifier */ |
||
760 | pkt_id = (u16_t)var_hdr_payload[0] << 8; |
||
761 | pkt_id |= (u16_t)var_hdr_payload[1]; |
||
762 | if (pkt_id == 0) { |
||
763 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n")); |
||
764 | goto out_disconnect; |
||
765 | } |
||
766 | if (pkt_type == MQTT_MSG_TYPE_PUBREC) { |
||
767 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id)); |
||
768 | pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1); |
||
769 | |||
770 | } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) { |
||
771 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id)); |
||
772 | pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0); |
||
773 | |||
774 | } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK || |
||
775 | pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) { |
||
776 | struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id); |
||
777 | if (r != NULL) { |
||
778 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); |
||
779 | if (pkt_type == MQTT_MSG_TYPE_SUBACK) { |
||
780 | if (length < 3) { |
||
781 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n")); |
||
782 | goto out_disconnect; |
||
783 | } else { |
||
784 | mqtt_incomming_suback(r, var_hdr_payload[2]); |
||
785 | } |
||
786 | } else if (r->cb != NULL) { |
||
787 | r->cb(r->arg, ERR_OK); |
||
788 | } |
||
789 | mqtt_delete_request(r); |
||
790 | } else { |
||
791 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); |
||
792 | } |
||
793 | } else { |
||
794 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type)); |
||
795 | goto out_disconnect; |
||
796 | } |
||
797 | } |
||
798 | return res; |
||
799 | out_disconnect: |
||
800 | return MQTT_CONNECT_DISCONNECTED; |
||
801 | } |
||
802 | |||
803 | |||
804 | /** |
||
805 | * MQTT incoming message parser |
||
806 | * @param client MQTT client |
||
807 | * @param p PBUF chain of received data |
||
808 | * @return Connection status |
||
809 | */ |
||
810 | static mqtt_connection_status_t |
||
811 | mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) |
||
812 | { |
||
813 | u16_t in_offset = 0; |
||
814 | u32_t msg_rem_len = 0; |
||
815 | u8_t fixed_hdr_idx = 0; |
||
816 | u8_t b = 0; |
||
817 | |||
818 | while (p->tot_len > in_offset) { |
||
819 | if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) { |
||
820 | |||
821 | if (fixed_hdr_idx < client->msg_idx) { |
||
822 | b = client->rx_buffer[fixed_hdr_idx]; |
||
823 | } else { |
||
824 | b = pbuf_get_at(p, in_offset++); |
||
825 | client->rx_buffer[client->msg_idx++] = b; |
||
826 | } |
||
827 | fixed_hdr_idx++; |
||
828 | |||
829 | if (fixed_hdr_idx >= 2) { |
||
830 | msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7); |
||
831 | if ((b & 0x80) == 0) { |
||
832 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len)); |
||
833 | if (msg_rem_len == 0) { |
||
834 | /* Complete message with no extra headers of payload received */ |
||
835 | mqtt_message_received(client, fixed_hdr_idx, 0, 0); |
||
836 | client->msg_idx = 0; |
||
837 | fixed_hdr_idx = 0; |
||
838 | } else { |
||
839 | /* Bytes remaining in message */ |
||
840 | msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx; |
||
841 | } |
||
842 | } |
||
843 | } |
||
844 | } else { |
||
845 | u16_t cpy_len, cpy_start, buffer_space; |
||
846 | |||
847 | cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx; |
||
848 | |||
849 | /* Allow to copy the lesser one of available length in input data or bytes remaining in message */ |
||
850 | cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); |
||
851 | |||
852 | /* Limit to available space in buffer */ |
||
853 | buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start; |
||
854 | if (cpy_len > buffer_space) { |
||
855 | cpy_len = buffer_space; |
||
856 | } |
||
857 | pbuf_copy_partial(p, client->rx_buffer + cpy_start, cpy_len, in_offset); |
||
858 | |||
859 | /* Advance get and put indexes */ |
||
860 | client->msg_idx += cpy_len; |
||
861 | in_offset += cpy_len; |
||
862 | msg_rem_len -= cpy_len; |
||
863 | |||
864 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len)); |
||
865 | if (msg_rem_len == 0 || cpy_len == buffer_space) { |
||
866 | /* Whole message received or buffer is full */ |
||
867 | mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len); |
||
868 | if (res != MQTT_CONNECT_ACCEPTED) { |
||
869 | return res; |
||
870 | } |
||
871 | if (msg_rem_len == 0) { |
||
872 | /* Reset parser state */ |
||
873 | client->msg_idx = 0; |
||
874 | /* msg_tot_len = 0; */ |
||
875 | fixed_hdr_idx = 0; |
||
876 | } |
||
877 | } |
||
878 | } |
||
879 | } |
||
880 | return MQTT_CONNECT_ACCEPTED; |
||
881 | } |
||
882 | |||
883 | |||
884 | /** |
||
885 | * TCP received callback function. @see tcp_recv_fn |
||
886 | * @param arg MQTT client |
||
887 | * @param p PBUF chain of received data |
||
888 | * @param err Passed as return value if not ERR_OK |
||
889 | * @return ERR_OK or err passed into callback |
||
890 | */ |
||
891 | static err_t |
||
892 | mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err) |
||
893 | { |
||
894 | mqtt_client_t *client = (mqtt_client_t *)arg; |
||
895 | LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL); |
||
896 | LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb); |
||
897 | |||
898 | if (p == NULL) { |
||
899 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n")); |
||
900 | mqtt_close(client, MQTT_CONNECT_DISCONNECTED); |
||
901 | } else { |
||
902 | mqtt_connection_status_t res; |
||
903 | if (err != ERR_OK) { |
||
904 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err)); |
||
905 | pbuf_free(p); |
||
906 | return err; |
||
907 | } |
||
908 | |||
909 | /* Tell remote that data has been received */ |
||
910 | altcp_recved(pcb, p->tot_len); |
||
911 | res = mqtt_parse_incoming(client, p); |
||
912 | pbuf_free(p); |
||
913 | |||
914 | if (res != MQTT_CONNECT_ACCEPTED) { |
||
915 | mqtt_close(client, res); |
||
916 | } |
||
917 | /* If keep alive functionality is used */ |
||
918 | if (client->keep_alive != 0) { |
||
919 | /* Reset server alive watchdog */ |
||
920 | client->server_watchdog = 0; |
||
921 | } |
||
922 | |||
923 | } |
||
924 | return ERR_OK; |
||
925 | } |
||
926 | |||
927 | |||
928 | /** |
||
929 | * TCP data sent callback function. @see tcp_sent_fn |
||
930 | * @param arg MQTT client |
||
931 | * @param tpcb TCP connection handle |
||
932 | * @param len Number of bytes sent |
||
933 | * @return ERR_OK |
||
934 | */ |
||
935 | static err_t |
||
936 | mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len) |
||
937 | { |
||
938 | mqtt_client_t *client = (mqtt_client_t *)arg; |
||
939 | |||
940 | LWIP_UNUSED_ARG(tpcb); |
||
941 | LWIP_UNUSED_ARG(len); |
||
942 | |||
943 | if (client->conn_state == MQTT_CONNECTED) { |
||
944 | struct mqtt_request_t *r; |
||
945 | |||
946 | /* Reset keep-alive send timer and server watchdog */ |
||
947 | client->cyclic_tick = 0; |
||
948 | client->server_watchdog = 0; |
||
949 | /* QoS 0 publish has no response from server, so call its callbacks here */ |
||
950 | while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) { |
||
951 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n")); |
||
952 | if (r->cb != NULL) { |
||
953 | r->cb(r->arg, ERR_OK); |
||
954 | } |
||
955 | mqtt_delete_request(r); |
||
956 | } |
||
957 | /* Try send any remaining buffers from output queue */ |
||
958 | mqtt_output_send(&client->output, client->conn); |
||
959 | } |
||
960 | return ERR_OK; |
||
961 | } |
||
962 | |||
963 | /** |
||
964 | * TCP error callback function. @see tcp_err_fn |
||
965 | * @param arg MQTT client |
||
966 | * @param err Error encountered |
||
967 | */ |
||
968 | static void |
||
969 | mqtt_tcp_err_cb(void *arg, err_t err) |
||
970 | { |
||
971 | mqtt_client_t *client = (mqtt_client_t *)arg; |
||
972 | LWIP_UNUSED_ARG(err); /* only used for debug output */ |
||
973 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg)); |
||
974 | LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL); |
||
975 | /* Set conn to null before calling close as pcb is already deallocated*/ |
||
976 | client->conn = 0; |
||
977 | mqtt_close(client, MQTT_CONNECT_DISCONNECTED); |
||
978 | } |
||
979 | |||
980 | /** |
||
981 | * TCP poll callback function. @see tcp_poll_fn |
||
982 | * @param arg MQTT client |
||
983 | * @param tpcb TCP connection handle |
||
984 | * @return err ERR_OK |
||
985 | */ |
||
986 | static err_t |
||
987 | mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb) |
||
988 | { |
||
989 | mqtt_client_t *client = (mqtt_client_t *)arg; |
||
990 | if (client->conn_state == MQTT_CONNECTED) { |
||
991 | /* Try send any remaining buffers from output queue */ |
||
992 | mqtt_output_send(&client->output, tpcb); |
||
993 | } |
||
994 | return ERR_OK; |
||
995 | } |
||
996 | |||
997 | /** |
||
998 | * TCP connect callback function. @see tcp_connected_fn |
||
999 | * @param arg MQTT client |
||
1000 | * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error |
||
1001 | * @return ERR_OK |
||
1002 | */ |
||
1003 | static err_t |
||
1004 | mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err) |
||
1005 | { |
||
1006 | mqtt_client_t *client = (mqtt_client_t *)arg; |
||
1007 | |||
1008 | if (err != ERR_OK) { |
||
1009 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err)); |
||
1010 | return err; |
||
1011 | } |
||
1012 | |||
1013 | /* Initiate receiver state */ |
||
1014 | client->msg_idx = 0; |
||
1015 | |||
1016 | /* Setup TCP callbacks */ |
||
1017 | altcp_recv(tpcb, mqtt_tcp_recv_cb); |
||
1018 | altcp_sent(tpcb, mqtt_tcp_sent_cb); |
||
1019 | altcp_poll(tpcb, mqtt_tcp_poll_cb, 2); |
||
1020 | |||
1021 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n")); |
||
1022 | /* Enter MQTT connect state */ |
||
1023 | client->conn_state = MQTT_CONNECTING; |
||
1024 | |||
1025 | /* Start cyclic timer */ |
||
1026 | sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client); |
||
1027 | client->cyclic_tick = 0; |
||
1028 | |||
1029 | /* Start transmission from output queue, connect message is the first one out*/ |
||
1030 | mqtt_output_send(&client->output, client->conn); |
||
1031 | |||
1032 | return ERR_OK; |
||
1033 | } |
||
1034 | |||
1035 | |||
1036 | |||
1037 | /*---------------------------------------------------------------------------------------------------- */ |
||
1038 | /* Public API */ |
||
1039 | |||
1040 | |||
1041 | /** |
||
1042 | * @ingroup mqtt |
||
1043 | * MQTT publish function. |
||
1044 | * @param client MQTT client |
||
1045 | * @param topic Publish topic string |
||
1046 | * @param payload Data to publish (NULL is allowed) |
||
1047 | * @param payload_length: Length of payload (0 is allowed) |
||
1048 | * @param qos Quality of service, 0 1 or 2 |
||
1049 | * @param retain MQTT retain flag |
||
1050 | * @param cb Callback to call when publish is complete or has timed out |
||
1051 | * @param arg User supplied argument to publish callback |
||
1052 | * @return ERR_OK if successful |
||
1053 | * ERR_CONN if client is disconnected |
||
1054 | * ERR_MEM if short on memory |
||
1055 | */ |
||
1056 | err_t |
||
1057 | mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, |
||
1058 | mqtt_request_cb_t cb, void *arg) |
||
1059 | { |
||
1060 | struct mqtt_request_t *r; |
||
1061 | u16_t pkt_id; |
||
1062 | size_t topic_strlen; |
||
1063 | size_t total_len; |
||
1064 | u16_t topic_len; |
||
1065 | u16_t remaining_length; |
||
1066 | |||
1067 | LWIP_ASSERT("mqtt_publish: client != NULL", client); |
||
1068 | LWIP_ASSERT("mqtt_publish: topic != NULL", topic); |
||
1069 | LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN); |
||
1070 | |||
1071 | topic_strlen = strlen(topic); |
||
1072 | LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); |
||
1073 | topic_len = (u16_t)topic_strlen; |
||
1074 | total_len = 2 + topic_len + payload_length; |
||
1075 | LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); |
||
1076 | remaining_length = (u16_t)total_len; |
||
1077 | |||
1078 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic)); |
||
1079 | |||
1080 | if (qos > 0) { |
||
1081 | remaining_length += 2; |
||
1082 | /* Generate pkt_id id for QoS1 and 2 */ |
||
1083 | pkt_id = msg_generate_packet_id(client); |
||
1084 | } else { |
||
1085 | /* Use reserved value pkt_id 0 for QoS 0 in request handle */ |
||
1086 | pkt_id = 0; |
||
1087 | } |
||
1088 | |||
1089 | r = mqtt_create_request(client->req_list, pkt_id, cb, arg); |
||
1090 | if (r == NULL) { |
||
1091 | return ERR_MEM; |
||
1092 | } |
||
1093 | |||
1094 | if (mqtt_output_check_space(&client->output, remaining_length) == 0) { |
||
1095 | mqtt_delete_request(r); |
||
1096 | return ERR_MEM; |
||
1097 | } |
||
1098 | /* Append fixed header */ |
||
1099 | mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length); |
||
1100 | |||
1101 | /* Append Topic */ |
||
1102 | mqtt_output_append_string(&client->output, topic, topic_len); |
||
1103 | |||
1104 | /* Append packet if for QoS 1 and 2*/ |
||
1105 | if (qos > 0) { |
||
1106 | mqtt_output_append_u16(&client->output, pkt_id); |
||
1107 | } |
||
1108 | |||
1109 | /* Append optional publish payload */ |
||
1110 | if ((payload != NULL) && (payload_length > 0)) { |
||
1111 | mqtt_output_append_buf(&client->output, payload, payload_length); |
||
1112 | } |
||
1113 | |||
1114 | mqtt_append_request(&client->pend_req_queue, r); |
||
1115 | mqtt_output_send(&client->output, client->conn); |
||
1116 | return ERR_OK; |
||
1117 | } |
||
1118 | |||
1119 | |||
1120 | /** |
||
1121 | * @ingroup mqtt |
||
1122 | * MQTT subscribe/unsubscribe function. |
||
1123 | * @param client MQTT client |
||
1124 | * @param topic topic to subscribe to |
||
1125 | * @param qos Quality of service, 0 1 or 2 (only used for subscribe) |
||
1126 | * @param cb Callback to call when subscribe/unsubscribe reponse is received |
||
1127 | * @param arg User supplied argument to publish callback |
||
1128 | * @param sub 1 for subscribe, 0 for unsubscribe |
||
1129 | * @return ERR_OK if successful, @see err_t enum for other results |
||
1130 | */ |
||
1131 | err_t |
||
1132 | mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub) |
||
1133 | { |
||
1134 | size_t topic_strlen; |
||
1135 | size_t total_len; |
||
1136 | u16_t topic_len; |
||
1137 | u16_t remaining_length; |
||
1138 | u16_t pkt_id; |
||
1139 | struct mqtt_request_t *r; |
||
1140 | |||
1141 | LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client); |
||
1142 | LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic); |
||
1143 | |||
1144 | topic_strlen = strlen(topic); |
||
1145 | LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); |
||
1146 | topic_len = (u16_t)topic_strlen; |
||
1147 | /* Topic string, pkt_id, qos for subscribe */ |
||
1148 | total_len = topic_len + 2 + 2 + (sub != 0); |
||
1149 | LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); |
||
1150 | remaining_length = (u16_t)total_len; |
||
1151 | |||
1152 | LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3); |
||
1153 | if (client->conn_state == TCP_DISCONNECTED) { |
||
1154 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n")); |
||
1155 | return ERR_CONN; |
||
1156 | } |
||
1157 | |||
1158 | pkt_id = msg_generate_packet_id(client); |
||
1159 | r = mqtt_create_request(client->req_list, pkt_id, cb, arg); |
||
1160 | if (r == NULL) { |
||
1161 | return ERR_MEM; |
||
1162 | } |
||
1163 | |||
1164 | if (mqtt_output_check_space(&client->output, remaining_length) == 0) { |
||
1165 | mqtt_delete_request(r); |
||
1166 | return ERR_MEM; |
||
1167 | } |
||
1168 | |||
1169 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id)); |
||
1170 | |||
1171 | mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length); |
||
1172 | /* Packet id */ |
||
1173 | mqtt_output_append_u16(&client->output, pkt_id); |
||
1174 | /* Topic */ |
||
1175 | mqtt_output_append_string(&client->output, topic, topic_len); |
||
1176 | /* QoS */ |
||
1177 | if (sub != 0) { |
||
1178 | mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2)); |
||
1179 | } |
||
1180 | |||
1181 | mqtt_append_request(&client->pend_req_queue, r); |
||
1182 | mqtt_output_send(&client->output, client->conn); |
||
1183 | return ERR_OK; |
||
1184 | } |
||
1185 | |||
1186 | |||
1187 | /** |
||
1188 | * @ingroup mqtt |
||
1189 | * Set callback to handle incoming publish requests from server |
||
1190 | * @param client MQTT client |
||
1191 | * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload |
||
1192 | * @param data_cb Callback for each fragment of payload that arrives |
||
1193 | * @param arg User supplied argument to both callbacks |
||
1194 | */ |
||
1195 | void |
||
1196 | mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb, |
||
1197 | mqtt_incoming_data_cb_t data_cb, void *arg) |
||
1198 | { |
||
1199 | LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL); |
||
1200 | client->data_cb = data_cb; |
||
1201 | client->pub_cb = pub_cb; |
||
1202 | client->inpub_arg = arg; |
||
1203 | } |
||
1204 | |||
1205 | /** |
||
1206 | * @ingroup mqtt |
||
1207 | * Create a new MQTT client instance |
||
1208 | * @return Pointer to instance on success, NULL otherwise |
||
1209 | */ |
||
1210 | mqtt_client_t * |
||
1211 | mqtt_client_new(void) |
||
1212 | { |
||
1213 | return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t)); |
||
1214 | } |
||
1215 | |||
1216 | /** |
||
1217 | * @ingroup mqtt |
||
1218 | * Free MQTT client instance |
||
1219 | * @param client Pointer to instance to be freed |
||
1220 | */ |
||
1221 | void |
||
1222 | mqtt_client_free(mqtt_client_t *client) |
||
1223 | { |
||
1224 | mem_free(client); |
||
1225 | } |
||
1226 | |||
1227 | /** |
||
1228 | * @ingroup mqtt |
||
1229 | * Connect to MQTT server |
||
1230 | * @param client MQTT client |
||
1231 | * @param ip_addr Server IP |
||
1232 | * @param port Server port |
||
1233 | * @param cb Connection state change callback |
||
1234 | * @param arg User supplied argument to connection callback |
||
1235 | * @param client_info Client identification and connection options |
||
1236 | * @return ERR_OK if successful, @see err_t enum for other results |
||
1237 | */ |
||
1238 | err_t |
||
1239 | mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg, |
||
1240 | const struct mqtt_connect_client_info_t *client_info) |
||
1241 | { |
||
1242 | err_t err; |
||
1243 | size_t len; |
||
1244 | u16_t client_id_length; |
||
1245 | /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ |
||
1246 | u16_t remaining_length = 2 + 4 + 1 + 1 + 2; |
||
1247 | u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; |
||
1248 | u8_t client_user_len = 0, client_pass_len = 0; |
||
1249 | |||
1250 | LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL); |
||
1251 | LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL); |
||
1252 | LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL); |
||
1253 | LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL); |
||
1254 | |||
1255 | if (client->conn_state != TCP_DISCONNECTED) { |
||
1256 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n")); |
||
1257 | return ERR_ISCONN; |
||
1258 | } |
||
1259 | |||
1260 | /* Wipe clean */ |
||
1261 | memset(client, 0, sizeof(mqtt_client_t)); |
||
1262 | client->connect_arg = arg; |
||
1263 | client->connect_cb = cb; |
||
1264 | client->keep_alive = client_info->keep_alive; |
||
1265 | mqtt_init_requests(client->req_list); |
||
1266 | |||
1267 | /* Build connect message */ |
||
1268 | if (client_info->will_topic != NULL && client_info->will_msg != NULL) { |
||
1269 | flags |= MQTT_CONNECT_FLAG_WILL; |
||
1270 | flags |= (client_info->will_qos & 3) << 3; |
||
1271 | if (client_info->will_retain) { |
||
1272 | flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; |
||
1273 | } |
||
1274 | len = strlen(client_info->will_topic); |
||
1275 | LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL); |
||
1276 | LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL); |
||
1277 | will_topic_len = (u8_t)len; |
||
1278 | len = strlen(client_info->will_msg); |
||
1279 | LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL); |
||
1280 | will_msg_len = (u8_t)len; |
||
1281 | len = remaining_length + 2 + will_topic_len + 2 + will_msg_len; |
||
1282 | LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); |
||
1283 | remaining_length = (u16_t)len; |
||
1284 | } |
||
1285 | if (client_info->client_user != NULL) { |
||
1286 | flags |= MQTT_CONNECT_FLAG_USERNAME; |
||
1287 | len = strlen(client_info->client_user); |
||
1288 | LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFF, return ERR_VAL); |
||
1289 | LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL); |
||
1290 | client_user_len = (u8_t)len; |
||
1291 | len = remaining_length + 2 + client_user_len; |
||
1292 | LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); |
||
1293 | remaining_length = (u16_t)len; |
||
1294 | } |
||
1295 | if (client_info->client_pass != NULL) { |
||
1296 | flags |= MQTT_CONNECT_FLAG_PASSWORD; |
||
1297 | len = strlen(client_info->client_pass); |
||
1298 | LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFF, return ERR_VAL); |
||
1299 | LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL); |
||
1300 | client_pass_len = (u8_t)len; |
||
1301 | len = remaining_length + 2 + client_pass_len; |
||
1302 | LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); |
||
1303 | remaining_length = (u16_t)len; |
||
1304 | } |
||
1305 | |||
1306 | /* Don't complicate things, always connect using clean session */ |
||
1307 | flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; |
||
1308 | |||
1309 | len = strlen(client_info->client_id); |
||
1310 | LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL); |
||
1311 | client_id_length = (u16_t)len; |
||
1312 | len = remaining_length + 2 + client_id_length; |
||
1313 | LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); |
||
1314 | remaining_length = (u16_t)len; |
||
1315 | |||
1316 | if (mqtt_output_check_space(&client->output, remaining_length) == 0) { |
||
1317 | return ERR_MEM; |
||
1318 | } |
||
1319 | |||
1320 | client->conn = altcp_tcp_new(); |
||
1321 | if (client->conn == NULL) { |
||
1322 | return ERR_MEM; |
||
1323 | } |
||
1324 | #if LWIP_ALTCP && LWIP_ALTCP_TLS |
||
1325 | if (client_info->tls_config) { |
||
1326 | struct altcp_pcb *pcb_tls = altcp_tls_new(client_info->tls_config, client->conn); |
||
1327 | if (pcb_tls == NULL) { |
||
1328 | altcp_close(client->conn); |
||
1329 | return ERR_MEM; |
||
1330 | } |
||
1331 | client->conn = pcb_tls; |
||
1332 | } |
||
1333 | #endif |
||
1334 | |||
1335 | /* Set arg pointer for callbacks */ |
||
1336 | altcp_arg(client->conn, client); |
||
1337 | /* Any local address, pick random local port number */ |
||
1338 | err = altcp_bind(client->conn, IP_ADDR_ANY, 0); |
||
1339 | if (err != ERR_OK) { |
||
1340 | LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err)); |
||
1341 | goto tcp_fail; |
||
1342 | } |
||
1343 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port)); |
||
1344 | |||
1345 | /* Connect to server */ |
||
1346 | err = altcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb); |
||
1347 | if (err != ERR_OK) { |
||
1348 | LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err)); |
||
1349 | goto tcp_fail; |
||
1350 | } |
||
1351 | /* Set error callback */ |
||
1352 | altcp_err(client->conn, mqtt_tcp_err_cb); |
||
1353 | client->conn_state = TCP_CONNECTING; |
||
1354 | |||
1355 | /* Append fixed header */ |
||
1356 | mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length); |
||
1357 | /* Append Protocol string */ |
||
1358 | mqtt_output_append_string(&client->output, "MQTT", 4); |
||
1359 | /* Append Protocol level */ |
||
1360 | mqtt_output_append_u8(&client->output, 4); |
||
1361 | /* Append connect flags */ |
||
1362 | mqtt_output_append_u8(&client->output, flags); |
||
1363 | /* Append keep-alive */ |
||
1364 | mqtt_output_append_u16(&client->output, client_info->keep_alive); |
||
1365 | /* Append client id */ |
||
1366 | mqtt_output_append_string(&client->output, client_info->client_id, client_id_length); |
||
1367 | /* Append will message if used */ |
||
1368 | if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) { |
||
1369 | mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len); |
||
1370 | mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len); |
||
1371 | } |
||
1372 | /* Append user name if given */ |
||
1373 | if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) { |
||
1374 | mqtt_output_append_string(&client->output, client_info->client_user, client_user_len); |
||
1375 | } |
||
1376 | /* Append password if given */ |
||
1377 | if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) { |
||
1378 | mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len); |
||
1379 | } |
||
1380 | return ERR_OK; |
||
1381 | |||
1382 | tcp_fail: |
||
1383 | altcp_abort(client->conn); |
||
1384 | client->conn = NULL; |
||
1385 | return err; |
||
1386 | } |
||
1387 | |||
1388 | |||
1389 | /** |
||
1390 | * @ingroup mqtt |
||
1391 | * Disconnect from MQTT server |
||
1392 | * @param client MQTT client |
||
1393 | */ |
||
1394 | void |
||
1395 | mqtt_disconnect(mqtt_client_t *client) |
||
1396 | { |
||
1397 | LWIP_ASSERT("mqtt_disconnect: client != NULL", client); |
||
1398 | /* If connection in not already closed */ |
||
1399 | if (client->conn_state != TCP_DISCONNECTED) { |
||
1400 | /* Set conn_state before calling mqtt_close to prevent callback from being called */ |
||
1401 | client->conn_state = TCP_DISCONNECTED; |
||
1402 | mqtt_close(client, (mqtt_connection_status_t)0); |
||
1403 | } |
||
1404 | } |
||
1405 | |||
1406 | /** |
||
1407 | * @ingroup mqtt |
||
1408 | * Check connection with server |
||
1409 | * @param client MQTT client |
||
1410 | * @return 1 if connected to server, 0 otherwise |
||
1411 | */ |
||
1412 | u8_t |
||
1413 | mqtt_client_is_connected(mqtt_client_t *client) |
||
1414 | { |
||
1415 | LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client); |
||
1416 | return client->conn_state == MQTT_CONNECTED; |
||
1417 | } |
||
1418 | |||
1419 | #endif /* LWIP_TCP && LWIP_CALLBACK_API */ |