BadVPN – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /**
2 * @file
3 * MQTT client
4 *
5 * @defgroup mqtt MQTT client
6 * @ingroup apps
7 * @verbinclude mqtt_client.txt
8 */
9  
10 /*
11 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com>
12 * All rights reserved.
13 *
14 * Redistribution and use in source and binary forms, with or without modification,
15 * are permitted provided that the following conditions are met:
16 *
17 * 1. Redistributions of source code must retain the above copyright notice,
18 * this list of conditions and the following disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright notice,
20 * this list of conditions and the following disclaimer in the documentation
21 * and/or other materials provided with the distribution.
22 * 3. The name of the author may not be used to endorse or promote products
23 * derived from this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
26 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
27 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
28 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
30 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
33 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
34 * OF SUCH DAMAGE.
35 *
36 * This file is part of the lwIP TCP/IP stack
37 *
38 * Author: Erik Andersson <erian747@gmail.com>
39 *
40 *
41 * @todo:
42 * - Handle large outgoing payloads for PUBLISH messages
43 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
44 * - Add support for legacy MQTT protocol version
45 *
46 * Please coordinate changes and requests with Erik Andersson
47 * Erik Andersson <erian747@gmail.com>
48 *
49 */
50 #include "lwip/apps/mqtt.h"
51 #include "lwip/apps/mqtt_priv.h"
52 #include "lwip/timeouts.h"
53 #include "lwip/ip_addr.h"
54 #include "lwip/mem.h"
55 #include "lwip/err.h"
56 #include "lwip/pbuf.h"
57 #include "lwip/altcp.h"
58 #include "lwip/altcp_tcp.h"
59 #include "lwip/altcp_tls.h"
60 #include <string.h>
61  
62 #if LWIP_TCP && LWIP_CALLBACK_API
63  
64 /**
65 * MQTT_DEBUG: Default is off.
66 */
67 #if !defined MQTT_DEBUG || defined __DOXYGEN__
68 #define MQTT_DEBUG LWIP_DBG_OFF
69 #endif
70  
71 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE)
72 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE)
73 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
74 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
75 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
76  
77  
78  
79 /**
80 * MQTT client connection states
81 */
82 enum {
83 TCP_DISCONNECTED,
84 TCP_CONNECTING,
85 MQTT_CONNECTING,
86 MQTT_CONNECTED
87 };
88  
89 /**
90 * MQTT control message types
91 */
92 enum mqtt_message_type {
93 MQTT_MSG_TYPE_CONNECT = 1,
94 MQTT_MSG_TYPE_CONNACK = 2,
95 MQTT_MSG_TYPE_PUBLISH = 3,
96 MQTT_MSG_TYPE_PUBACK = 4,
97 MQTT_MSG_TYPE_PUBREC = 5,
98 MQTT_MSG_TYPE_PUBREL = 6,
99 MQTT_MSG_TYPE_PUBCOMP = 7,
100 MQTT_MSG_TYPE_SUBSCRIBE = 8,
101 MQTT_MSG_TYPE_SUBACK = 9,
102 MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
103 MQTT_MSG_TYPE_UNSUBACK = 11,
104 MQTT_MSG_TYPE_PINGREQ = 12,
105 MQTT_MSG_TYPE_PINGRESP = 13,
106 MQTT_MSG_TYPE_DISCONNECT = 14
107 };
108  
109 /** Helpers to extract control packet type and qos from first byte in fixed header */
110 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
111 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
112  
113 /**
114 * MQTT connect flags, only used in CONNECT message
115 */
116 enum mqtt_connect_flag {
117 MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
118 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
119 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
120 MQTT_CONNECT_FLAG_WILL = 1 << 2,
121 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
122 };
123  
124  
125 static void mqtt_cyclic_timer(void *arg);
126  
127 #if defined(LWIP_DEBUG)
128 static const char *const mqtt_message_type_str[15] = {
129 "UNDEFINED",
130 "CONNECT",
131 "CONNACK",
132 "PUBLISH",
133 "PUBACK",
134 "PUBREC",
135 "PUBREL",
136 "PUBCOMP",
137 "SUBSCRIBE",
138 "SUBACK",
139 "UNSUBSCRIBE",
140 "UNSUBACK",
141 "PINGREQ",
142 "PINGRESP",
143 "DISCONNECT"
144 };
145  
146 /**
147 * Message type value to string
148 * @param msg_type see enum mqtt_message_type
149 *
150 * @return Control message type text string
151 */
152 static const char *
153 mqtt_msg_type_to_str(u8_t msg_type)
154 {
155 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
156 msg_type = 0;
157 }
158 return mqtt_message_type_str[msg_type];
159 }
160  
161 #endif
162  
163  
164 /**
165 * Generate MQTT packet identifier
166 * @param client MQTT client
167 * @return New packet identifier, range 1 to 65535
168 */
169 static u16_t
170 msg_generate_packet_id(mqtt_client_t *client)
171 {
172 client->pkt_id_seq++;
173 if (client->pkt_id_seq == 0) {
174 client->pkt_id_seq++;
175 }
176 return client->pkt_id_seq;
177 }
178  
179 /*--------------------------------------------------------------------------------------------------------------------- */
180 /* Output ring buffer */
181  
182 /** Add single item to ring buffer */
183 static void
184 mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item)
185 {
186 rb->buf[rb->put] = item;
187 rb->put++;
188 if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) {
189 rb->put = 0;
190 }
191 }
192  
193 /** Return pointer to ring buffer get position */
194 static u8_t *
195 mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb)
196 {
197 return &rb->buf[rb->get];
198 }
199  
200 static void
201 mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len)
202 {
203 LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE);
204  
205 rb->get += len;
206 if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) {
207 rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE;
208 }
209 }
210  
211 /** Return number of bytes in ring buffer */
212 static u16_t
213 mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb)
214 {
215 u32_t len = rb->put - rb->get;
216 if (len > 0xFFFF) {
217 len += MQTT_OUTPUT_RINGBUF_SIZE;
218 }
219 return (u16_t)len;
220 }
221  
222 /** Return number of bytes free in ring buffer */
223 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
224  
225 /** Return number of bytes possible to read without wrapping around */
226 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get))
227  
228 /**
229 * Try send as many bytes as possible from output ring buffer
230 * @param rb Output ring buffer
231 * @param tpcb TCP connection handle
232 */
233 static void
234 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb)
235 {
236 err_t err;
237 u8_t wrap = 0;
238 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
239 u16_t send_len = altcp_sndbuf(tpcb);
240 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
241  
242 if (send_len == 0 || ringbuf_lin_len == 0) {
243 return;
244 }
245  
246 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
247 send_len, ringbuf_lin_len, rb->get, rb->put));
248  
249 if (send_len > ringbuf_lin_len) {
250 /* Space in TCP output buffer is larger than available in ring buffer linear portion */
251 send_len = ringbuf_lin_len;
252 /* Wrap around if more data in ring buffer after linear portion */
253 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
254 }
255 err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
256 if ((err == ERR_OK) && wrap) {
257 mqtt_ringbuf_advance_get_idx(rb, send_len);
258 /* Use the lesser one of ring buffer linear length and TCP send buffer size */
259 send_len = LWIP_MIN(altcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
260 err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
261 }
262  
263 if (err == ERR_OK) {
264 mqtt_ringbuf_advance_get_idx(rb, send_len);
265 /* Flush */
266 altcp_output(tpcb);
267 } else {
268 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
269 }
270 }
271  
272  
273  
274 /*--------------------------------------------------------------------------------------------------------------------- */
275 /* Request queue */
276  
277 /**
278 * Create request item
279 * @param r_objs Pointer to request objects
280 * @param pkt_id Packet identifier of request
281 * @param cb Packet callback to call when requests lifetime ends
282 * @param arg Parameter following callback
283 * @return Request or NULL if failed to create
284 */
285 static struct mqtt_request_t *
286 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
287 {
288 struct mqtt_request_t *r = NULL;
289 u8_t n;
290 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
291 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
292 /* Item point to itself if not in use */
293 if (r_objs[n].next == &r_objs[n]) {
294 r = &r_objs[n];
295 r->next = NULL;
296 r->cb = cb;
297 r->arg = arg;
298 r->pkt_id = pkt_id;
299 break;
300 }
301 }
302 return r;
303 }
304  
305  
306 /**
307 * Append request to pending request queue
308 * @param tail Pointer to request queue tail pointer
309 * @param r Request to append
310 */
311 static void
312 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
313 {
314 struct mqtt_request_t *head = NULL;
315 s16_t time_before = 0;
316 struct mqtt_request_t *iter;
317  
318 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
319  
320 /* Iterate trough queue to find head, and count total timeout time */
321 for (iter = *tail; iter != NULL; iter = iter->next) {
322 time_before += iter->timeout_diff;
323 head = iter;
324 }
325  
326 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
327 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
328 if (head == NULL) {
329 *tail = r;
330 } else {
331 head->next = r;
332 }
333 }
334  
335  
336 /**
337 * Delete request item
338 * @param r Request item to delete
339 */
340 static void
341 mqtt_delete_request(struct mqtt_request_t *r)
342 {
343 if (r != NULL) {
344 r->next = r;
345 }
346 }
347  
348 /**
349 * Remove a request item with a specific packet identifier from request queue
350 * @param tail Pointer to request queue tail pointer
351 * @param pkt_id Packet identifier of request to take
352 * @return Request item if found, NULL if not
353 */
354 static struct mqtt_request_t *
355 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
356 {
357 struct mqtt_request_t *iter = NULL, *prev = NULL;
358 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
359 /* Search all request for pkt_id */
360 for (iter = *tail; iter != NULL; iter = iter->next) {
361 if (iter->pkt_id == pkt_id) {
362 break;
363 }
364 prev = iter;
365 }
366  
367 /* If request was found */
368 if (iter != NULL) {
369 /* unchain */
370 if (prev == NULL) {
371 *tail = iter->next;
372 } else {
373 prev->next = iter->next;
374 }
375 /* If exists, add remaining timeout time for the request to next */
376 if (iter->next != NULL) {
377 iter->next->timeout_diff += iter->timeout_diff;
378 }
379 iter->next = NULL;
380 }
381 return iter;
382 }
383  
384 /**
385 * Handle requests timeout
386 * @param tail Pointer to request queue tail pointer
387 * @param t Time since last call in seconds
388 */
389 static void
390 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
391 {
392 struct mqtt_request_t *r;
393 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
394 r = *tail;
395 while (t > 0 && r != NULL) {
396 if (t >= r->timeout_diff) {
397 t -= (u8_t)r->timeout_diff;
398 /* Unchain */
399 *tail = r->next;
400 /* Notify upper layer about timeout */
401 if (r->cb != NULL) {
402 r->cb(r->arg, ERR_TIMEOUT);
403 }
404 mqtt_delete_request(r);
405 /* Tail might be be modified in callback, so re-read it in every iteration */
406 r = *(struct mqtt_request_t *const volatile *)tail;
407 } else {
408 r->timeout_diff -= t;
409 t = 0;
410 }
411 }
412 }
413  
414 /**
415 * Free all request items
416 * @param tail Pointer to request queue tail pointer
417 */
418 static void
419 mqtt_clear_requests(struct mqtt_request_t **tail)
420 {
421 struct mqtt_request_t *iter, *next;
422 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
423 for (iter = *tail; iter != NULL; iter = next) {
424 next = iter->next;
425 mqtt_delete_request(iter);
426 }
427 *tail = NULL;
428 }
429 /**
430 * Initialize all request items
431 * @param r_objs Pointer to request objects
432 */
433 static void
434 mqtt_init_requests(struct mqtt_request_t *r_objs)
435 {
436 u8_t n;
437 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
438 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
439 /* Item pointing to itself indicates unused */
440 r_objs[n].next = &r_objs[n];
441 }
442 }
443  
444 /*--------------------------------------------------------------------------------------------------------------------- */
445 /* Output message build helpers */
446  
447  
448 static void
449 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
450 {
451 mqtt_ringbuf_put(rb, value);
452 }
453  
454 static
455 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
456 {
457 mqtt_ringbuf_put(rb, value >> 8);
458 mqtt_ringbuf_put(rb, value & 0xff);
459 }
460  
461 static void
462 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
463 {
464 u16_t n;
465 for (n = 0; n < length; n++) {
466 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
467 }
468 }
469  
470 static void
471 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
472 {
473 u16_t n;
474 mqtt_ringbuf_put(rb, length >> 8);
475 mqtt_ringbuf_put(rb, length & 0xff);
476 for (n = 0; n < length; n++) {
477 mqtt_ringbuf_put(rb, str[n]);
478 }
479 }
480  
481 /**
482 * Append fixed header
483 * @param rb Output ring buffer
484 * @param msg_type see enum mqtt_message_type
485 * @param fdup MQTT DUP flag
486 * @param fqos MQTT QoS field
487 * @param fretain MQTT retain flag
488 * @param r_length Remaining length after fixed header
489 */
490  
491 static void
492 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup,
493 u8_t fqos, u8_t fretain, u16_t r_length)
494 {
495 /* Start with control byte */
496 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1)));
497 /* Encode remaining length field */
498 do {
499 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
500 r_length >>= 7;
501 } while (r_length > 0);
502 }
503  
504  
505 /**
506 * Check output buffer space
507 * @param rb Output ring buffer
508 * @param r_length Remaining length after fixed header
509 * @return 1 if message will fit, 0 if not enough buffer space
510 */
511 static u8_t
512 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
513 {
514 /* Start with length of type byte + remaining length */
515 u16_t total_len = 1 + r_length;
516  
517 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
518  
519 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
520 do {
521 total_len++;
522 r_length >>= 7;
523 } while (r_length > 0);
524  
525 return (total_len <= mqtt_ringbuf_free(rb));
526 }
527  
528  
529 /**
530 * Close connection to server
531 * @param client MQTT client
532 * @param reason Reason for disconnection
533 */
534 static void
535 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
536 {
537 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
538  
539 /* Bring down TCP connection if not already done */
540 if (client->conn != NULL) {
541 err_t res;
542 altcp_recv(client->conn, NULL);
543 altcp_err(client->conn, NULL);
544 altcp_sent(client->conn, NULL);
545 res = altcp_close(client->conn);
546 if (res != ERR_OK) {
547 altcp_abort(client->conn);
548 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res)));
549 }
550 client->conn = NULL;
551 }
552  
553 /* Remove all pending requests */
554 mqtt_clear_requests(&client->pend_req_queue);
555 /* Stop cyclic timer */
556 sys_untimeout(mqtt_cyclic_timer, client);
557  
558 /* Notify upper layer of disconnection if changed state */
559 if (client->conn_state != TCP_DISCONNECTED) {
560  
561 client->conn_state = TCP_DISCONNECTED;
562 if (client->connect_cb != NULL) {
563 client->connect_cb(client, client->connect_arg, reason);
564 }
565 }
566 }
567  
568  
569 /**
570 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
571 * @param arg MQTT client
572 */
573 static void
574 mqtt_cyclic_timer(void *arg)
575 {
576 u8_t restart_timer = 1;
577 mqtt_client_t *client = (mqtt_client_t *)arg;
578 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
579  
580 if (client->conn_state == MQTT_CONNECTING) {
581 client->cyclic_tick++;
582 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
583 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
584 /* Disconnect TCP */
585 mqtt_close(client, MQTT_CONNECT_TIMEOUT);
586 restart_timer = 0;
587 }
588 } else if (client->conn_state == MQTT_CONNECTED) {
589 /* Handle timeout for pending requests */
590 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
591  
592 /* keep_alive > 0 means keep alive functionality shall be used */
593 if (client->keep_alive > 0) {
594  
595 client->server_watchdog++;
596 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
597 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) {
598 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
599 mqtt_close(client, MQTT_CONNECT_TIMEOUT);
600 restart_timer = 0;
601 }
602  
603 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
604 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
605 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
606 if (mqtt_output_check_space(&client->output, 0) != 0) {
607 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
608 client->cyclic_tick = 0;
609 }
610 } else {
611 client->cyclic_tick++;
612 }
613 }
614 } else {
615 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
616 restart_timer = 0;
617 }
618 if (restart_timer) {
619 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg);
620 }
621 }
622  
623  
624 /**
625 * Send PUBACK, PUBREC or PUBREL response message
626 * @param client MQTT client
627 * @param msg PUBACK, PUBREC or PUBREL
628 * @param pkt_id Packet identifier
629 * @param qos QoS value
630 * @return ERR_OK if successful, ERR_MEM if out of memory
631 */
632 static err_t
633 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
634 {
635 err_t err = ERR_OK;
636 if (mqtt_output_check_space(&client->output, 2)) {
637 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
638 mqtt_output_append_u16(&client->output, pkt_id);
639 mqtt_output_send(&client->output, client->conn);
640 } else {
641 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
642 mqtt_msg_type_to_str(msg), pkt_id));
643 err = ERR_MEM;
644 }
645 return err;
646 }
647  
648 /**
649 * Subscribe response from server
650 * @param r Matching request
651 * @param result Result code from server
652 */
653 static void
654 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
655 {
656 if (r->cb != NULL) {
657 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
658 }
659 }
660  
661  
662 /**
663 * Complete MQTT message received or buffer full
664 * @param client MQTT client
665 * @param fixed_hdr_idx header index
666 * @param length length received part
667 * @param remaining_length Remaining length of complete message
668 */
669 static mqtt_connection_status_t
670 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
671 {
672 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
673  
674 u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
675  
676 /* Control packet type */
677 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
678 u16_t pkt_id = 0;
679  
680 LWIP_ERROR("buffer length mismatch", fixed_hdr_idx + length <= MQTT_VAR_HEADER_BUFFER_LEN,
681 return MQTT_CONNECT_DISCONNECTED);
682  
683 if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
684 if (client->conn_state == MQTT_CONNECTING) {
685 /* Get result code from CONNACK */
686 res = (mqtt_connection_status_t)var_hdr_payload[1];
687 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res));
688 if (res == MQTT_CONNECT_ACCEPTED) {
689 /* Reset cyclic_tick when changing to connected state */
690 client->cyclic_tick = 0;
691 client->conn_state = MQTT_CONNECTED;
692 /* Notify upper layer */
693 if (client->connect_cb != 0) {
694 client->connect_cb(client, client->connect_arg, res);
695 }
696 }
697 } else {
698 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n"));
699 }
700 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
701 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n"));
702  
703 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
704 u16_t payload_offset = 0;
705 u16_t payload_length = length;
706 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
707  
708 if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
709 /* Should have topic and pkt id*/
710 u8_t *topic;
711 u16_t after_topic;
712 u8_t bkp;
713 u16_t topic_len = var_hdr_payload[0];
714 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
715  
716 topic = var_hdr_payload + 2;
717 after_topic = 2 + topic_len;
718 /* Check length, add one byte even for QoS 0 so that zero termination will fit */
719 if ((after_topic + (qos ? 2 : 1)) > length) {
720 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
721 goto out_disconnect;
722 }
723  
724 /* id for QoS 1 and 2 */
725 if (qos > 0) {
726 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
727 after_topic += 2;
728 } else {
729 client->inpub_pkt_id = 0;
730 }
731 /* Take backup of byte after topic */
732 bkp = topic[topic_len];
733 /* Zero terminate string */
734 topic[topic_len] = 0;
735 /* Payload data remaining in receive buffer */
736 payload_length = length - after_topic;
737 payload_offset = after_topic;
738  
739 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n",
740 qos, topic, remaining_length + payload_length));
741 if (client->pub_cb != NULL) {
742 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
743 }
744 /* Restore byte after topic */
745 topic[topic_len] = bkp;
746 }
747 if (payload_length > 0 || remaining_length == 0) {
748 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
749 /* Reply if QoS > 0 */
750 if (remaining_length == 0 && qos > 0) {
751 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
752 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
753 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
754 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
755 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
756 }
757 }
758 } else {
759 /* Get packet identifier */
760 pkt_id = (u16_t)var_hdr_payload[0] << 8;
761 pkt_id |= (u16_t)var_hdr_payload[1];
762 if (pkt_id == 0) {
763 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
764 goto out_disconnect;
765 }
766 if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
767 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id));
768 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
769  
770 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
771 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id));
772 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
773  
774 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
775 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
776 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
777 if (r != NULL) {
778 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
779 if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
780 if (length < 3) {
781 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n"));
782 goto out_disconnect;
783 } else {
784 mqtt_incomming_suback(r, var_hdr_payload[2]);
785 }
786 } else if (r->cb != NULL) {
787 r->cb(r->arg, ERR_OK);
788 }
789 mqtt_delete_request(r);
790 } else {
791 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
792 }
793 } else {
794 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
795 goto out_disconnect;
796 }
797 }
798 return res;
799 out_disconnect:
800 return MQTT_CONNECT_DISCONNECTED;
801 }
802  
803  
804 /**
805 * MQTT incoming message parser
806 * @param client MQTT client
807 * @param p PBUF chain of received data
808 * @return Connection status
809 */
810 static mqtt_connection_status_t
811 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
812 {
813 u16_t in_offset = 0;
814 u32_t msg_rem_len = 0;
815 u8_t fixed_hdr_idx = 0;
816 u8_t b = 0;
817  
818 while (p->tot_len > in_offset) {
819 if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
820  
821 if (fixed_hdr_idx < client->msg_idx) {
822 b = client->rx_buffer[fixed_hdr_idx];
823 } else {
824 b = pbuf_get_at(p, in_offset++);
825 client->rx_buffer[client->msg_idx++] = b;
826 }
827 fixed_hdr_idx++;
828  
829 if (fixed_hdr_idx >= 2) {
830 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
831 if ((b & 0x80) == 0) {
832 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len));
833 if (msg_rem_len == 0) {
834 /* Complete message with no extra headers of payload received */
835 mqtt_message_received(client, fixed_hdr_idx, 0, 0);
836 client->msg_idx = 0;
837 fixed_hdr_idx = 0;
838 } else {
839 /* Bytes remaining in message */
840 msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
841 }
842 }
843 }
844 } else {
845 u16_t cpy_len, cpy_start, buffer_space;
846  
847 cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
848  
849 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
850 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
851  
852 /* Limit to available space in buffer */
853 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
854 if (cpy_len > buffer_space) {
855 cpy_len = buffer_space;
856 }
857 pbuf_copy_partial(p, client->rx_buffer + cpy_start, cpy_len, in_offset);
858  
859 /* Advance get and put indexes */
860 client->msg_idx += cpy_len;
861 in_offset += cpy_len;
862 msg_rem_len -= cpy_len;
863  
864 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len));
865 if (msg_rem_len == 0 || cpy_len == buffer_space) {
866 /* Whole message received or buffer is full */
867 mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
868 if (res != MQTT_CONNECT_ACCEPTED) {
869 return res;
870 }
871 if (msg_rem_len == 0) {
872 /* Reset parser state */
873 client->msg_idx = 0;
874 /* msg_tot_len = 0; */
875 fixed_hdr_idx = 0;
876 }
877 }
878 }
879 }
880 return MQTT_CONNECT_ACCEPTED;
881 }
882  
883  
884 /**
885 * TCP received callback function. @see tcp_recv_fn
886 * @param arg MQTT client
887 * @param p PBUF chain of received data
888 * @param err Passed as return value if not ERR_OK
889 * @return ERR_OK or err passed into callback
890 */
891 static err_t
892 mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err)
893 {
894 mqtt_client_t *client = (mqtt_client_t *)arg;
895 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
896 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
897  
898 if (p == NULL) {
899 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
900 mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
901 } else {
902 mqtt_connection_status_t res;
903 if (err != ERR_OK) {
904 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err));
905 pbuf_free(p);
906 return err;
907 }
908  
909 /* Tell remote that data has been received */
910 altcp_recved(pcb, p->tot_len);
911 res = mqtt_parse_incoming(client, p);
912 pbuf_free(p);
913  
914 if (res != MQTT_CONNECT_ACCEPTED) {
915 mqtt_close(client, res);
916 }
917 /* If keep alive functionality is used */
918 if (client->keep_alive != 0) {
919 /* Reset server alive watchdog */
920 client->server_watchdog = 0;
921 }
922  
923 }
924 return ERR_OK;
925 }
926  
927  
928 /**
929 * TCP data sent callback function. @see tcp_sent_fn
930 * @param arg MQTT client
931 * @param tpcb TCP connection handle
932 * @param len Number of bytes sent
933 * @return ERR_OK
934 */
935 static err_t
936 mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len)
937 {
938 mqtt_client_t *client = (mqtt_client_t *)arg;
939  
940 LWIP_UNUSED_ARG(tpcb);
941 LWIP_UNUSED_ARG(len);
942  
943 if (client->conn_state == MQTT_CONNECTED) {
944 struct mqtt_request_t *r;
945  
946 /* Reset keep-alive send timer and server watchdog */
947 client->cyclic_tick = 0;
948 client->server_watchdog = 0;
949 /* QoS 0 publish has no response from server, so call its callbacks here */
950 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
951 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
952 if (r->cb != NULL) {
953 r->cb(r->arg, ERR_OK);
954 }
955 mqtt_delete_request(r);
956 }
957 /* Try send any remaining buffers from output queue */
958 mqtt_output_send(&client->output, client->conn);
959 }
960 return ERR_OK;
961 }
962  
963 /**
964 * TCP error callback function. @see tcp_err_fn
965 * @param arg MQTT client
966 * @param err Error encountered
967 */
968 static void
969 mqtt_tcp_err_cb(void *arg, err_t err)
970 {
971 mqtt_client_t *client = (mqtt_client_t *)arg;
972 LWIP_UNUSED_ARG(err); /* only used for debug output */
973 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
974 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
975 /* Set conn to null before calling close as pcb is already deallocated*/
976 client->conn = 0;
977 mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
978 }
979  
980 /**
981 * TCP poll callback function. @see tcp_poll_fn
982 * @param arg MQTT client
983 * @param tpcb TCP connection handle
984 * @return err ERR_OK
985 */
986 static err_t
987 mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb)
988 {
989 mqtt_client_t *client = (mqtt_client_t *)arg;
990 if (client->conn_state == MQTT_CONNECTED) {
991 /* Try send any remaining buffers from output queue */
992 mqtt_output_send(&client->output, tpcb);
993 }
994 return ERR_OK;
995 }
996  
997 /**
998 * TCP connect callback function. @see tcp_connected_fn
999 * @param arg MQTT client
1000 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
1001 * @return ERR_OK
1002 */
1003 static err_t
1004 mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err)
1005 {
1006 mqtt_client_t *client = (mqtt_client_t *)arg;
1007  
1008 if (err != ERR_OK) {
1009 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
1010 return err;
1011 }
1012  
1013 /* Initiate receiver state */
1014 client->msg_idx = 0;
1015  
1016 /* Setup TCP callbacks */
1017 altcp_recv(tpcb, mqtt_tcp_recv_cb);
1018 altcp_sent(tpcb, mqtt_tcp_sent_cb);
1019 altcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
1020  
1021 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n"));
1022 /* Enter MQTT connect state */
1023 client->conn_state = MQTT_CONNECTING;
1024  
1025 /* Start cyclic timer */
1026 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client);
1027 client->cyclic_tick = 0;
1028  
1029 /* Start transmission from output queue, connect message is the first one out*/
1030 mqtt_output_send(&client->output, client->conn);
1031  
1032 return ERR_OK;
1033 }
1034  
1035  
1036  
1037 /*---------------------------------------------------------------------------------------------------- */
1038 /* Public API */
1039  
1040  
1041 /**
1042 * @ingroup mqtt
1043 * MQTT publish function.
1044 * @param client MQTT client
1045 * @param topic Publish topic string
1046 * @param payload Data to publish (NULL is allowed)
1047 * @param payload_length: Length of payload (0 is allowed)
1048 * @param qos Quality of service, 0 1 or 2
1049 * @param retain MQTT retain flag
1050 * @param cb Callback to call when publish is complete or has timed out
1051 * @param arg User supplied argument to publish callback
1052 * @return ERR_OK if successful
1053 * ERR_CONN if client is disconnected
1054 * ERR_MEM if short on memory
1055 */
1056 err_t
1057 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
1058 mqtt_request_cb_t cb, void *arg)
1059 {
1060 struct mqtt_request_t *r;
1061 u16_t pkt_id;
1062 size_t topic_strlen;
1063 size_t total_len;
1064 u16_t topic_len;
1065 u16_t remaining_length;
1066  
1067 LWIP_ASSERT("mqtt_publish: client != NULL", client);
1068 LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
1069 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
1070  
1071 topic_strlen = strlen(topic);
1072 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1073 topic_len = (u16_t)topic_strlen;
1074 total_len = 2 + topic_len + payload_length;
1075 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1076 remaining_length = (u16_t)total_len;
1077  
1078 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
1079  
1080 if (qos > 0) {
1081 remaining_length += 2;
1082 /* Generate pkt_id id for QoS1 and 2 */
1083 pkt_id = msg_generate_packet_id(client);
1084 } else {
1085 /* Use reserved value pkt_id 0 for QoS 0 in request handle */
1086 pkt_id = 0;
1087 }
1088  
1089 r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1090 if (r == NULL) {
1091 return ERR_MEM;
1092 }
1093  
1094 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1095 mqtt_delete_request(r);
1096 return ERR_MEM;
1097 }
1098 /* Append fixed header */
1099 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1100  
1101 /* Append Topic */
1102 mqtt_output_append_string(&client->output, topic, topic_len);
1103  
1104 /* Append packet if for QoS 1 and 2*/
1105 if (qos > 0) {
1106 mqtt_output_append_u16(&client->output, pkt_id);
1107 }
1108  
1109 /* Append optional publish payload */
1110 if ((payload != NULL) && (payload_length > 0)) {
1111 mqtt_output_append_buf(&client->output, payload, payload_length);
1112 }
1113  
1114 mqtt_append_request(&client->pend_req_queue, r);
1115 mqtt_output_send(&client->output, client->conn);
1116 return ERR_OK;
1117 }
1118  
1119  
1120 /**
1121 * @ingroup mqtt
1122 * MQTT subscribe/unsubscribe function.
1123 * @param client MQTT client
1124 * @param topic topic to subscribe to
1125 * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
1126 * @param cb Callback to call when subscribe/unsubscribe reponse is received
1127 * @param arg User supplied argument to publish callback
1128 * @param sub 1 for subscribe, 0 for unsubscribe
1129 * @return ERR_OK if successful, @see err_t enum for other results
1130 */
1131 err_t
1132 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
1133 {
1134 size_t topic_strlen;
1135 size_t total_len;
1136 u16_t topic_len;
1137 u16_t remaining_length;
1138 u16_t pkt_id;
1139 struct mqtt_request_t *r;
1140  
1141 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
1142 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
1143  
1144 topic_strlen = strlen(topic);
1145 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1146 topic_len = (u16_t)topic_strlen;
1147 /* Topic string, pkt_id, qos for subscribe */
1148 total_len = topic_len + 2 + 2 + (sub != 0);
1149 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1150 remaining_length = (u16_t)total_len;
1151  
1152 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
1153 if (client->conn_state == TCP_DISCONNECTED) {
1154 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1155 return ERR_CONN;
1156 }
1157  
1158 pkt_id = msg_generate_packet_id(client);
1159 r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1160 if (r == NULL) {
1161 return ERR_MEM;
1162 }
1163  
1164 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1165 mqtt_delete_request(r);
1166 return ERR_MEM;
1167 }
1168  
1169 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
1170  
1171 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1172 /* Packet id */
1173 mqtt_output_append_u16(&client->output, pkt_id);
1174 /* Topic */
1175 mqtt_output_append_string(&client->output, topic, topic_len);
1176 /* QoS */
1177 if (sub != 0) {
1178 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
1179 }
1180  
1181 mqtt_append_request(&client->pend_req_queue, r);
1182 mqtt_output_send(&client->output, client->conn);
1183 return ERR_OK;
1184 }
1185  
1186  
1187 /**
1188 * @ingroup mqtt
1189 * Set callback to handle incoming publish requests from server
1190 * @param client MQTT client
1191 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
1192 * @param data_cb Callback for each fragment of payload that arrives
1193 * @param arg User supplied argument to both callbacks
1194 */
1195 void
1196 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
1197 mqtt_incoming_data_cb_t data_cb, void *arg)
1198 {
1199 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
1200 client->data_cb = data_cb;
1201 client->pub_cb = pub_cb;
1202 client->inpub_arg = arg;
1203 }
1204  
1205 /**
1206 * @ingroup mqtt
1207 * Create a new MQTT client instance
1208 * @return Pointer to instance on success, NULL otherwise
1209 */
1210 mqtt_client_t *
1211 mqtt_client_new(void)
1212 {
1213 return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t));
1214 }
1215  
1216 /**
1217 * @ingroup mqtt
1218 * Free MQTT client instance
1219 * @param client Pointer to instance to be freed
1220 */
1221 void
1222 mqtt_client_free(mqtt_client_t *client)
1223 {
1224 mem_free(client);
1225 }
1226  
1227 /**
1228 * @ingroup mqtt
1229 * Connect to MQTT server
1230 * @param client MQTT client
1231 * @param ip_addr Server IP
1232 * @param port Server port
1233 * @param cb Connection state change callback
1234 * @param arg User supplied argument to connection callback
1235 * @param client_info Client identification and connection options
1236 * @return ERR_OK if successful, @see err_t enum for other results
1237 */
1238 err_t
1239 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
1240 const struct mqtt_connect_client_info_t *client_info)
1241 {
1242 err_t err;
1243 size_t len;
1244 u16_t client_id_length;
1245 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
1246 u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1247 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1248 u8_t client_user_len = 0, client_pass_len = 0;
1249  
1250 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
1251 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
1252 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
1253 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
1254  
1255 if (client->conn_state != TCP_DISCONNECTED) {
1256 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n"));
1257 return ERR_ISCONN;
1258 }
1259  
1260 /* Wipe clean */
1261 memset(client, 0, sizeof(mqtt_client_t));
1262 client->connect_arg = arg;
1263 client->connect_cb = cb;
1264 client->keep_alive = client_info->keep_alive;
1265 mqtt_init_requests(client->req_list);
1266  
1267 /* Build connect message */
1268 if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
1269 flags |= MQTT_CONNECT_FLAG_WILL;
1270 flags |= (client_info->will_qos & 3) << 3;
1271 if (client_info->will_retain) {
1272 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1273 }
1274 len = strlen(client_info->will_topic);
1275 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
1276 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
1277 will_topic_len = (u8_t)len;
1278 len = strlen(client_info->will_msg);
1279 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
1280 will_msg_len = (u8_t)len;
1281 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1282 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1283 remaining_length = (u16_t)len;
1284 }
1285 if (client_info->client_user != NULL) {
1286 flags |= MQTT_CONNECT_FLAG_USERNAME;
1287 len = strlen(client_info->client_user);
1288 LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFF, return ERR_VAL);
1289 LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL);
1290 client_user_len = (u8_t)len;
1291 len = remaining_length + 2 + client_user_len;
1292 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1293 remaining_length = (u16_t)len;
1294 }
1295 if (client_info->client_pass != NULL) {
1296 flags |= MQTT_CONNECT_FLAG_PASSWORD;
1297 len = strlen(client_info->client_pass);
1298 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFF, return ERR_VAL);
1299 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL);
1300 client_pass_len = (u8_t)len;
1301 len = remaining_length + 2 + client_pass_len;
1302 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1303 remaining_length = (u16_t)len;
1304 }
1305  
1306 /* Don't complicate things, always connect using clean session */
1307 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1308  
1309 len = strlen(client_info->client_id);
1310 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
1311 client_id_length = (u16_t)len;
1312 len = remaining_length + 2 + client_id_length;
1313 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1314 remaining_length = (u16_t)len;
1315  
1316 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1317 return ERR_MEM;
1318 }
1319  
1320 client->conn = altcp_tcp_new();
1321 if (client->conn == NULL) {
1322 return ERR_MEM;
1323 }
1324 #if LWIP_ALTCP && LWIP_ALTCP_TLS
1325 if (client_info->tls_config) {
1326 struct altcp_pcb *pcb_tls = altcp_tls_new(client_info->tls_config, client->conn);
1327 if (pcb_tls == NULL) {
1328 altcp_close(client->conn);
1329 return ERR_MEM;
1330 }
1331 client->conn = pcb_tls;
1332 }
1333 #endif
1334  
1335 /* Set arg pointer for callbacks */
1336 altcp_arg(client->conn, client);
1337 /* Any local address, pick random local port number */
1338 err = altcp_bind(client->conn, IP_ADDR_ANY, 0);
1339 if (err != ERR_OK) {
1340 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
1341 goto tcp_fail;
1342 }
1343 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
1344  
1345 /* Connect to server */
1346 err = altcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
1347 if (err != ERR_OK) {
1348 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
1349 goto tcp_fail;
1350 }
1351 /* Set error callback */
1352 altcp_err(client->conn, mqtt_tcp_err_cb);
1353 client->conn_state = TCP_CONNECTING;
1354  
1355 /* Append fixed header */
1356 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1357 /* Append Protocol string */
1358 mqtt_output_append_string(&client->output, "MQTT", 4);
1359 /* Append Protocol level */
1360 mqtt_output_append_u8(&client->output, 4);
1361 /* Append connect flags */
1362 mqtt_output_append_u8(&client->output, flags);
1363 /* Append keep-alive */
1364 mqtt_output_append_u16(&client->output, client_info->keep_alive);
1365 /* Append client id */
1366 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
1367 /* Append will message if used */
1368 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1369 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
1370 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
1371 }
1372 /* Append user name if given */
1373 if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) {
1374 mqtt_output_append_string(&client->output, client_info->client_user, client_user_len);
1375 }
1376 /* Append password if given */
1377 if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) {
1378 mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len);
1379 }
1380 return ERR_OK;
1381  
1382 tcp_fail:
1383 altcp_abort(client->conn);
1384 client->conn = NULL;
1385 return err;
1386 }
1387  
1388  
1389 /**
1390 * @ingroup mqtt
1391 * Disconnect from MQTT server
1392 * @param client MQTT client
1393 */
1394 void
1395 mqtt_disconnect(mqtt_client_t *client)
1396 {
1397 LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
1398 /* If connection in not already closed */
1399 if (client->conn_state != TCP_DISCONNECTED) {
1400 /* Set conn_state before calling mqtt_close to prevent callback from being called */
1401 client->conn_state = TCP_DISCONNECTED;
1402 mqtt_close(client, (mqtt_connection_status_t)0);
1403 }
1404 }
1405  
1406 /**
1407 * @ingroup mqtt
1408 * Check connection with server
1409 * @param client MQTT client
1410 * @return 1 if connected to server, 0 otherwise
1411 */
1412 u8_t
1413 mqtt_client_is_connected(mqtt_client_t *client)
1414 {
1415 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
1416 return client->conn_state == MQTT_CONNECTED;
1417 }
1418  
1419 #endif /* LWIP_TCP && LWIP_CALLBACK_API */