• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2014, 2017 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    http://www.eclipse.org/legal/epl-v10.html
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
16  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
17  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
18  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
19  *    Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
20  *    Ian Craggs - fix for bug 475749 - packetid modified twice
21  *    Ian Craggs - add ability to set message handler separately #6
22  *******************************************************************************/
23 
24 #if !defined(MQTTCLIENT_H)
25 #define MQTTCLIENT_H
26 
27 #include "FP.h"
28 #include "MQTTPacket.h"
29 #include <stdio.h>
30 #include "MQTTLogging.h"
31 
32 #if !defined(MQTTCLIENT_QOS1)
33     #define MQTTCLIENT_QOS1 1
34 #endif
35 #if !defined(MQTTCLIENT_QOS2)
36     #define MQTTCLIENT_QOS2 0
37 #endif
38 
39 namespace MQTT
40 {
41 
42 
43 enum QoS { QOS0, QOS1, QOS2 };
44 
45 // all failure return codes must be negative
46 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
47 
48 
49 struct Message
50 {
51     enum QoS qos;
52     bool retained;
53     bool dup;
54     unsigned short id;
55     void *payload;
56     size_t payloadlen;
57 };
58 
59 
60 struct MessageData
61 {
MessageDataMessageData62     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
63     { }
64 
65     struct Message &message;
66     MQTTString &topicName;
67 };
68 
69 
70 struct connackData
71 {
72     int rc;
73     bool sessionPresent;
74 };
75 
76 
77 struct subackData
78 {
79     int grantedQoS;
80 };
81 
82 
83 class PacketId
84 {
85 public:
PacketId()86     PacketId()
87     {
88         next = 0;
89     }
90 
getNext()91     int getNext()
92     {
93         return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
94     }
95 
96 private:
97     static const int MAX_PACKET_ID = 65535;
98     int next;
99 };
100 
101 
102 /**
103  * @class Client
104  * @brief blocking, non-threaded MQTT client API
105  *
106  * This version of the API blocks on all method calls, until they are complete.  This means that only one
107  * MQTT request can be in process at any one time.
108  * @param Network a network class which supports send, receive
109  * @param Timer a timer class with the methods:
110  */
111 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
112 class Client
113 {
114 
115 public:
116 
117     typedef void (*messageHandler)(MessageData&);
118 
119     /** Construct the client
120      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
121      *      before calling MQTT connect
122      *  @param limits an instance of the Limit class - to alter limits as required
123      */
124     Client(Network& network, unsigned int command_timeout_ms = 30000);
125 
126     /** Set the default message handling callback - used for any message which does not match a subscription message handler
127      *  @param mh - pointer to the callback function.  Set to 0 to remove.
128      */
setDefaultMessageHandler(messageHandler mh)129     void setDefaultMessageHandler(messageHandler mh)
130     {
131         if (mh != 0)
132             defaultMessageHandler.attach(mh);
133         else
134             defaultMessageHandler.detach();
135     }
136 
137     /** Set a message handling callback.  This can be used outside of the the subscribe method.
138      *  @param topicFilter - a topic pattern which can include wildcards
139      *  @param mh - pointer to the callback function. If 0, removes the callback if any
140      */
141     int setMessageHandler(const char* topicFilter, messageHandler mh);
142 
143     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
144      *  The nework object must be connected to the network endpoint before calling this
145      *  Default connect options are used
146      *  @return success code -
147      */
148     int connect();
149 
150     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
151      *  The nework object must be connected to the network endpoint before calling this
152      *  @param options - connect options
153      *  @return success code -
154      */
155     int connect(MQTTPacket_connectData& options);
156 
157     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
158      *  The nework object must be connected to the network endpoint before calling this
159      *  @param options - connect options
160      *  @param connackData - connack data to be returned
161      *  @return success code -
162      */
163     int connect(MQTTPacket_connectData& options, connackData& data);
164 
165     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
166      *  @param topic - the topic to publish to
167      *  @param message - the message to send
168      *  @return success code -
169      */
170     int publish(const char* topicName, Message& message);
171 
172     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
173      *  @param topic - the topic to publish to
174      *  @param payload - the data to send
175      *  @param payloadlen - the length of the data
176      *  @param qos - the QoS to send the publish at
177      *  @param retained - whether the message should be retained
178      *  @return success code -
179      */
180     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
181 
182     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
183      *  @param topic - the topic to publish to
184      *  @param payload - the data to send
185      *  @param payloadlen - the length of the data
186      *  @param id - the packet id used - returned
187      *  @param qos - the QoS to send the publish at
188      *  @param retained - whether the message should be retained
189      *  @return success code -
190      */
191     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
192 
193     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
194      *  @param topicFilter - a topic pattern which can include wildcards
195      *  @param qos - the MQTT QoS to subscribe at
196      *  @param mh - the callback function to be invoked when a message is received for this subscription
197      *  @return success code -
198      */
199     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
200 
201     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
202      *  @param topicFilter - a topic pattern which can include wildcards
203      *  @param qos - the MQTT QoS to subscribe at©
204      *  @param mh - the callback function to be invoked when a message is received for this subscription
205      *  @param
206      *  @return success code -
207      */
208     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
209 
210     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
211      *  @param topicFilter - a topic pattern which can include wildcards
212      *  @return success code -
213      */
214     int unsubscribe(const char* topicFilter);
215 
216     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
217      *  @return success code -
218      */
219     int disconnect();
220 
221     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
222      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
223      *  received.
224      *  @param timeout_ms the time to wait, in milliseconds
225      *  @return success code - on failure, this means the client has disconnected
226      */
227     int yield(unsigned long timeout_ms = 1000L);
228 
229     /** Is the client connected?
230      *  @return flag - is the client connected or not?
231      */
isConnected()232     bool isConnected()
233     {
234         return isconnected;
235     }
236 
237 private:
238 
239     void closeSession();
240     void cleanSession();
241     int cycle(Timer& timer);
242     int waitfor(int packet_type, Timer& timer);
243     int keepalive();
244     int publish(int len, Timer& timer, enum QoS qos);
245 
246     int decodePacket(int* value, int timeout);
247     int readPacket(Timer& timer);
248     int sendPacket(int length, Timer& timer);
249     int deliverMessage(MQTTString& topicName, Message& message);
250     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
251 
252     Network& ipstack;
253     unsigned long command_timeout_ms;
254 
255     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
256     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
257 
258     Timer last_sent, last_received;
259     unsigned int keepAliveInterval;
260     bool ping_outstanding;
261     bool cleansession;
262 
263     PacketId packetid;
264 
265     struct MessageHandlers
266     {
267         const char* topicFilter;
268         FP<void, MessageData&> fp;
269     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
270 
271     FP<void, MessageData&> defaultMessageHandler;
272 
273     bool isconnected;
274 
275 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
276     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
277     int inflightLen;
278     unsigned short inflightMsgid;
279     enum QoS inflightQoS;
280 #endif
281 
282 #if MQTTCLIENT_QOS2
283     bool pubrel;
284     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
285         #define MAX_INCOMING_QOS2_MESSAGES 10
286     #endif
287     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
288     bool isQoS2msgidFree(unsigned short id);
289     bool useQoS2msgid(unsigned short id);
290 	void freeQoS2msgid(unsigned short id);
291 #endif
292 
293 };
294 
295 }
296 
297 
298 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
cleanSession()299 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
300 {
301     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
302         messageHandlers[i].topicFilter = 0;
303 
304 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
305     inflightMsgid = 0;
306     inflightQoS = QOS0;
307 #endif
308 
309 #if MQTTCLIENT_QOS2
310     pubrel = false;
311     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
312         incomingQoS2messages[i] = 0;
313 #endif
314 }
315 
316 
317 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
closeSession()318 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
319 {
320     ping_outstanding = false;
321     isconnected = false;
322     if (cleansession)
323         cleanSession();
324 }
325 
326 
327 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
Client(Network & network,unsigned int command_timeout_ms)328 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
329 {
330     this->command_timeout_ms = command_timeout_ms;
331     cleansession = true;
332 	  closeSession();
333 }
334 
335 
336 #if MQTTCLIENT_QOS2
337 template<class Network, class Timer, int a, int b>
isQoS2msgidFree(unsigned short id)338 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
339 {
340     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
341     {
342         if (incomingQoS2messages[i] == id)
343             return false;
344     }
345     return true;
346 }
347 
348 
349 template<class Network, class Timer, int a, int b>
useQoS2msgid(unsigned short id)350 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
351 {
352     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
353     {
354         if (incomingQoS2messages[i] == 0)
355         {
356             incomingQoS2messages[i] = id;
357             return true;
358         }
359     }
360     return false;
361 }
362 
363 
364 template<class Network, class Timer, int a, int b>
freeQoS2msgid(unsigned short id)365 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
366 {
367     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
368     {
369         if (incomingQoS2messages[i] == id)
370         {
371             incomingQoS2messages[i] = 0;
372             return;
373         }
374     }
375 }
376 #endif
377 
378 
379 template<class Network, class Timer, int a, int b>
sendPacket(int length,Timer & timer)380 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
381 {
382     int rc = FAILURE,
383         sent = 0;
384 
385     while (sent < length)
386     {
387         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
388         if (rc < 0)  // there was an error writing the data
389             break;
390         sent += rc;
391         if (timer.expired()) // only check expiry after at least one attempt to write
392             break;
393     }
394     if (sent == length)
395     {
396         if (this->keepAliveInterval > 0)
397             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
398         rc = SUCCESS;
399     }
400     else
401         rc = FAILURE;
402 
403 #if defined(MQTT_DEBUG)
404     char printbuf[150];
405     DEBUG("Rc %d from sending packet %s\r\n", rc,
406         MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
407 #endif
408     return rc;
409 }
410 
411 
412 template<class Network, class Timer, int a, int b>
decodePacket(int * value,int timeout)413 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
414 {
415     unsigned char c;
416     int multiplier = 1;
417     int len = 0;
418     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
419 
420     *value = 0;
421     do
422     {
423         int rc = MQTTPACKET_READ_ERROR;
424 
425         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
426         {
427             rc = MQTTPACKET_READ_ERROR; /* bad data */
428             goto exit;
429         }
430         rc = ipstack.read(&c, 1, timeout);
431         if (rc != 1)
432             goto exit;
433         *value += (c & 127) * multiplier;
434         multiplier *= 128;
435     } while ((c & 128) != 0);
436 exit:
437     return len;
438 }
439 
440 
441 /**
442  * If any read fails in this method, then we should disconnect from the network, as on reconnect
443  * the packets can be retried.
444  * @param timeout the max time to wait for the packet read to complete, in milliseconds
445  * @return the MQTT packet type, 0 if none, -1 if error
446  */
447 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
readPacket(Timer & timer)448 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
449 {
450     int rc = FAILURE;
451     MQTTHeader header = {0};
452     int len = 0;
453     int rem_len = 0;
454 
455     /* 1. read the header byte.  This has the packet type in it */
456     rc = ipstack.read(readbuf, 1, timer.left_ms());
457     if (rc != 1)
458         goto exit;
459 
460     len = 1;
461     /* 2. read the remaining length.  This is variable in itself */
462     decodePacket(&rem_len, timer.left_ms());
463     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
464 
465     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
466     {
467         rc = BUFFER_OVERFLOW;
468         goto exit;
469     }
470 
471     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
472     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
473         goto exit;
474 
475     header.byte = readbuf[0];
476     rc = header.bits.type;
477     if (this->keepAliveInterval > 0)
478         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
479 exit:
480 
481 #if defined(MQTT_DEBUG)
482     if (rc >= 0)
483     {
484         char printbuf[50];
485         DEBUG("Rc %d receiving packet %s\r\n", rc,
486             MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
487     }
488 #endif
489     return rc;
490 }
491 
492 
493 // assume topic filter and name is in correct format
494 // # can only be at end
495 // + and # can only be next to separator
496 template<class Network, class Timer, int a, int b>
isTopicMatched(char * topicFilter,MQTTString & topicName)497 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
498 {
499     char* curf = topicFilter;
500     char* curn = topicName.lenstring.data;
501     char* curn_end = curn + topicName.lenstring.len;
502 
503     while (*curf && curn < curn_end)
504     {
505         if (*curn == '/' && *curf != '/')
506             break;
507         if (*curf != '+' && *curf != '#' && *curf != *curn)
508             break;
509         if (*curf == '+')
510         {   // skip until we meet the next separator, or end of string
511             char* nextpos = curn + 1;
512             while (nextpos < curn_end && *nextpos != '/')
513                 nextpos = ++curn + 1;
514         }
515         else if (*curf == '#')
516             curn = curn_end - 1;    // skip until end of string
517         curf++;
518         curn++;
519     };
520 
521     return (curn == curn_end) && (*curf == '\0');
522 }
523 
524 
525 
526 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
deliverMessage(MQTTString & topicName,Message & message)527 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
528 {
529     int rc = FAILURE;
530 
531     // we have to find the right message handler - indexed by topic
532     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
533     {
534         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
535                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
536         {
537             if (messageHandlers[i].fp.attached())
538             {
539                 MessageData md(topicName, message);
540                 messageHandlers[i].fp(md);
541                 rc = SUCCESS;
542             }
543         }
544     }
545 
546     if (rc == FAILURE && defaultMessageHandler.attached())
547     {
548         MessageData md(topicName, message);
549         defaultMessageHandler(md);
550         rc = SUCCESS;
551     }
552 
553     return rc;
554 }
555 
556 
557 
558 template<class Network, class Timer, int a, int b>
yield(unsigned long timeout_ms)559 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
560 {
561     int rc = SUCCESS;
562     Timer timer;
563 
564     timer.countdown_ms(timeout_ms);
565     while (!timer.expired())
566     {
567         if (cycle(timer) < 0)
568         {
569             rc = FAILURE;
570             break;
571         }
572     }
573 
574     return rc;
575 }
576 
577 
578 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
cycle(Timer & timer)579 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
580 {
581     // get one piece of work off the wire and one pass through
582     int len = 0,
583         rc = SUCCESS;
584 
585     int packet_type = readPacket(timer);    // read the socket, see what work is due
586 
587     switch (packet_type)
588     {
589         default:
590             // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
591             rc = packet_type;
592             goto exit;
593         case 0: // timed out reading packet
594             break;
595         case CONNACK:
596         case PUBACK:
597         case SUBACK:
598         case UNSUBACK:
599             break;
600         case PUBLISH:
601         {
602             MQTTString topicName = MQTTString_initializer;
603             Message msg;
604             int intQoS;
605             msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
606             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
607                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
608                 goto exit;
609             msg.qos = (enum QoS)intQoS;
610 #if MQTTCLIENT_QOS2
611             if (msg.qos != QOS2)
612 #endif
613                 deliverMessage(topicName, msg);
614 #if MQTTCLIENT_QOS2
615             else if (isQoS2msgidFree(msg.id))
616             {
617                 if (useQoS2msgid(msg.id))
618                     deliverMessage(topicName, msg);
619                 else
620                     WARN("Maximum number of incoming QoS2 messages exceeded");
621             }
622 #endif
623 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
624             if (msg.qos != QOS0)
625             {
626                 if (msg.qos == QOS1)
627                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
628                 else if (msg.qos == QOS2)
629                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
630                 if (len <= 0)
631                     rc = FAILURE;
632                 else
633                     rc = sendPacket(len, timer);
634                 if (rc == FAILURE)
635                     goto exit; // there was a problem
636             }
637             break;
638 #endif
639         }
640 #if MQTTCLIENT_QOS2
641         case PUBREC:
642         case PUBREL:
643             unsigned short mypacketid;
644             unsigned char dup, type;
645             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
646                 rc = FAILURE;
647             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
648 						         (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
649                 rc = FAILURE;
650             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
651                 rc = FAILURE; // there was a problem
652             if (rc == FAILURE)
653                 goto exit; // there was a problem
654             if (packet_type == PUBREL)
655                 freeQoS2msgid(mypacketid);
656             break;
657 
658         case PUBCOMP:
659             break;
660 #endif
661         case PINGRESP:
662             ping_outstanding = false;
663             break;
664     }
665 
666     if (keepalive() != SUCCESS)
667         //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
668         rc = FAILURE;
669 
670 exit:
671     if (rc == SUCCESS)
672         rc = packet_type;
673     else if (isconnected)
674         closeSession();
675     return rc;
676 }
677 
678 
679 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
keepalive()680 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
681 {
682     int rc = SUCCESS;
683     static Timer ping_sent;
684 
685     if (keepAliveInterval == 0)
686         goto exit;
687 
688     if (ping_outstanding)
689     {
690         if (ping_sent.expired())
691         {
692             rc = FAILURE; // session failure
693             #if defined(MQTT_DEBUG)
694                 DEBUG("PINGRESP not received in keepalive interval\r\n");
695             #endif
696         }
697     }
698     else if (last_sent.expired() || last_received.expired())
699     {
700         Timer timer(1000);
701         int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
702         if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
703         {
704             ping_outstanding = true;
705             ping_sent.countdown(this->keepAliveInterval);
706         }
707     }
708 exit:
709     return rc;
710 }
711 
712 
713 // only used in single-threaded mode where one command at a time is in process
714 template<class Network, class Timer, int a, int b>
waitfor(int packet_type,Timer & timer)715 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
716 {
717     int rc = FAILURE;
718 
719     do
720     {
721         if (timer.expired())
722             break; // we timed out
723         rc = cycle(timer);
724     }
725     while (rc != packet_type && rc >= 0);
726 
727     return rc;
728 }
729 
730 
731 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
connect(MQTTPacket_connectData & options,connackData & data)732 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
733 {
734     Timer connect_timer(command_timeout_ms);
735     int rc = FAILURE;
736     int len = 0;
737 
738     if (isconnected) // don't send connect packet again if we are already connected
739         goto exit;
740 
741     this->keepAliveInterval = options.keepAliveInterval;
742     this->cleansession = options.cleansession;
743     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
744         goto exit;
745     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
746         goto exit; // there was a problem
747 
748     if (this->keepAliveInterval > 0)
749         last_received.countdown(this->keepAliveInterval);
750     // this will be a blocking call, wait for the connack
751     if (waitfor(CONNACK, connect_timer) == CONNACK)
752     {
753         data.rc = 0;
754         data.sessionPresent = false;
755         if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
756                             (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
757             rc = data.rc;
758         else
759             rc = FAILURE;
760     }
761     else
762         rc = FAILURE;
763 
764 #if MQTTCLIENT_QOS2
765     // resend any inflight publish
766     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
767     {
768         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
769             rc = FAILURE;
770         else
771             rc = publish(len, connect_timer, inflightQoS);
772     }
773     else
774 #endif
775 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
776     if (inflightMsgid > 0)
777     {
778         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
779         rc = publish(inflightLen, connect_timer, inflightQoS);
780     }
781 #endif
782 
783 exit:
784     if (rc == SUCCESS)
785     {
786         isconnected = true;
787         ping_outstanding = false;
788     }
789     return rc;
790 }
791 
792 
793 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
connect(MQTTPacket_connectData & options)794 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
795 {
796     connackData data;
797     return connect(options, data);
798 }
799 
800 
801 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
connect()802 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
803 {
804     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
805     return connect(default_options);
806 }
807 
808 
809 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
setMessageHandler(const char * topicFilter,messageHandler messageHandler)810 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
811 {
812     int rc = FAILURE;
813     int i = -1;
814 
815     // first check for an existing matching slot
816     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
817     {
818         if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
819         {
820             if (messageHandler == 0) // remove existing
821             {
822                 messageHandlers[i].topicFilter = 0;
823                 messageHandlers[i].fp.detach();
824             }
825             rc = SUCCESS; // return i when adding new subscription
826             break;
827         }
828     }
829     // if no existing, look for empty slot (unless we are removing)
830     if (messageHandler != 0) {
831         if (rc == FAILURE)
832         {
833             for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
834             {
835                 if (messageHandlers[i].topicFilter == 0)
836                 {
837                     rc = SUCCESS;
838                     break;
839                 }
840             }
841         }
842         if (i < MAX_MESSAGE_HANDLERS)
843         {
844             messageHandlers[i].topicFilter = topicFilter;
845             messageHandlers[i].fp.attach(messageHandler);
846         }
847     }
848     return rc;
849 }
850 
851 
852 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
subscribe(const char * topicFilter,enum QoS qos,messageHandler messageHandler,subackData & data)853 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
854      enum QoS qos, messageHandler messageHandler, subackData& data)
855 {
856     int rc = FAILURE;
857     Timer timer(command_timeout_ms);
858     int len = 0;
859     MQTTString topic = {(char*)topicFilter, {0, 0}};
860 
861     if (!isconnected)
862         goto exit;
863 
864     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
865     if (len <= 0)
866         goto exit;
867     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
868         goto exit;             // there was a problem
869 
870     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
871     {
872         int count = 0;
873         unsigned short mypacketid;
874         data.grantedQoS = 0;
875         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
876         {
877             if (data.grantedQoS != 0x80)
878                 rc = setMessageHandler(topicFilter, messageHandler);
879         }
880     }
881     else
882         rc = FAILURE;
883 
884 exit:
885     if (rc == FAILURE)
886         closeSession();
887     return rc;
888 }
889 
890 
891 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
subscribe(const char * topicFilter,enum QoS qos,messageHandler messageHandler)892 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
893 {
894     subackData data;
895     return subscribe(topicFilter, qos, messageHandler, data);
896 }
897 
898 
899 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
unsubscribe(const char * topicFilter)900 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
901 {
902     int rc = FAILURE;
903     Timer timer(command_timeout_ms);
904     MQTTString topic = {(char*)topicFilter, {0, 0}};
905     int len = 0;
906 
907     if (!isconnected)
908         goto exit;
909 
910     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
911         goto exit;
912     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
913         goto exit; // there was a problem
914 
915     if (waitfor(UNSUBACK, timer) == UNSUBACK)
916     {
917         unsigned short mypacketid;  // should be the same as the packetid above
918         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
919         {
920             // remove the subscription message handler associated with this topic, if there is one
921             setMessageHandler(topicFilter, 0);
922         }
923     }
924     else
925         rc = FAILURE;
926 
927 exit:
928     if (rc != SUCCESS)
929         closeSession();
930     return rc;
931 }
932 
933 
934 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
publish(int len,Timer & timer,enum QoS qos)935 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
936 {
937     int rc;
938 
939     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
940         goto exit; // there was a problem
941 
942 #if MQTTCLIENT_QOS1
943     if (qos == QOS1)
944     {
945         if (waitfor(PUBACK, timer) == PUBACK)
946         {
947             unsigned short mypacketid;
948             unsigned char dup, type;
949             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
950                 rc = FAILURE;
951             else if (inflightMsgid == mypacketid)
952                 inflightMsgid = 0;
953         }
954         else
955             rc = FAILURE;
956     }
957 #endif
958 #if MQTTCLIENT_QOS2
959     else if (qos == QOS2)
960     {
961         if (waitfor(PUBCOMP, timer) == PUBCOMP)
962         {
963             unsigned short mypacketid;
964             unsigned char dup, type;
965             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
966                 rc = FAILURE;
967             else if (inflightMsgid == mypacketid)
968                 inflightMsgid = 0;
969         }
970         else
971             rc = FAILURE;
972     }
973 #endif
974 
975 exit:
976     if (rc != SUCCESS)
977         closeSession();
978     return rc;
979 }
980 
981 
982 
983 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
publish(const char * topicName,void * payload,size_t payloadlen,unsigned short & id,enum QoS qos,bool retained)984 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
985 {
986     int rc = FAILURE;
987     Timer timer(command_timeout_ms);
988     MQTTString topicString = MQTTString_initializer;
989     int len = 0;
990 
991     if (!isconnected)
992         goto exit;
993 
994     topicString.cstring = (char*)topicName;
995 
996 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
997     if (qos == QOS1 || qos == QOS2)
998         id = packetid.getNext();
999 #endif
1000 
1001     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
1002               topicString, (unsigned char*)payload, payloadlen);
1003     if (len <= 0)
1004         goto exit;
1005 
1006 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
1007     if (!cleansession)
1008     {
1009         memcpy(pubbuf, sendbuf, len);
1010         inflightMsgid = id;
1011         inflightLen = len;
1012         inflightQoS = qos;
1013 #if MQTTCLIENT_QOS2
1014         pubrel = false;
1015 #endif
1016     }
1017 #endif
1018 
1019     rc = publish(len, timer, qos);
1020 exit:
1021     return rc;
1022 }
1023 
1024 
1025 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
publish(const char * topicName,void * payload,size_t payloadlen,enum QoS qos,bool retained)1026 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
1027 {
1028     unsigned short id = 0;  // dummy - not used for anything
1029     return publish(topicName, payload, payloadlen, id, qos, retained);
1030 }
1031 
1032 
1033 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
publish(const char * topicName,Message & message)1034 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
1035 {
1036     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
1037 }
1038 
1039 
1040 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
disconnect()1041 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
1042 {
1043     int rc = FAILURE;
1044     Timer timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
1045     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
1046     if (len > 0)
1047         rc = sendPacket(len, timer);            // send the disconnect packet
1048     closeSession();
1049     return rc;
1050 }
1051 
1052 #endif
1053