• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2014, 2017 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    http://www.eclipse.org/legal/epl-v10.html
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *   Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
15  *   Ian Craggs - fix for #96 - check rem_len in readPacket
16  *   Ian Craggs - add ability to set message handler separately #6
17  *******************************************************************************/
18 #include "MQTTClient.h"
19 
20 #include <stdio.h>
21 #include <string.h>
22 #include <signal.h>
23 
NewMessageData(MessageData * md,MQTTString * aTopicName,MQTTMessage * aMessage)24 static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
25     md->topicName = aTopicName;
26     md->message = aMessage;
27 }
28 
29 
getNextPacketId(MQTTClient * c)30 static int getNextPacketId(MQTTClient *c) {
31     return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
32 }
33 
34 
sendPacket(MQTTClient * c,int length,Timer * timer)35 static int sendPacket(MQTTClient* c, int length, Timer* timer)
36 {
37     int rc = FAILURE,
38         sent = 0;
39     bool isexpired = TimerIsExpired(timer);
40     while (sent < length && !isexpired) {
41         rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
42         if (rc < 0)  // there was an error writing the data
43             break;
44         sent += rc;
45         isexpired = TimerIsExpired(timer);
46     }
47     if (sent == length) {
48         // LogDebug("before sendPacket TimerCountdown...lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld",
49         // c->last_sent.end_time.tv_sec,
50         // c->last_sent.end_time.tv_usec,
51         // c->last_received.end_time.tv_sec,
52         // c->last_received.end_time.tv_usec);
53         TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
54         // LogDebug("after sendPacket TimerCountdown...lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld",
55         // c->last_sent.end_time.tv_sec,
56         // c->last_sent.end_time.tv_usec,
57         // c->last_received.end_time.tv_sec,
58         // c->last_received.end_time.tv_usec);
59         rc = SUCCESS;
60     } else {
61         // LogDebug("sent=%{public}d,length=%{public}d,isexpired=%{public}d, rc = %{public}d, errno = %{public}d", sent, length, isexpired, rc, errno);
62         rc = FAILURE;
63     }
64     return rc;
65 }
66 
handle_pipe(int sig)67 void handle_pipe(int sig) {}
MQTTClientInit(MQTTClient * c,Network * network,unsigned int command_timeout_ms,unsigned char * sendbuf,size_t sendbuf_size,unsigned char * readbuf,size_t readbuf_size)68 void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
69         unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
70 {
71     int i;
72     c->ipstack = network;
73     struct sigaction action;
74     action.sa_handler = handle_pipe;
75     sigemptyset(&action.sa_mask);
76     action.sa_flags = 0;
77     sigaction(SIGPIPE, &action, NULL);
78     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
79         c->messageHandlers[i].topicFilter = 0;
80     c->command_timeout_ms = command_timeout_ms;
81     c->buf = sendbuf;
82     c->buf_size = sendbuf_size;
83     c->readbuf = readbuf;
84     c->readbuf_size = readbuf_size;
85     c->isconnected = 0;
86     c->cleansession = 0;
87     c->ping_outstanding = 0;
88     c->defaultMessageHandler = NULL;
89       c->next_packetid = 1;
90     TimerInit(&c->last_sent);
91     TimerInit(&c->last_received);
92 #if defined(MQTT_TASK)
93       MutexInit(&c->mutex);
94 #endif
95 }
96 
97 
decodePacket(MQTTClient * c,int * value,int timeout)98 static int decodePacket(MQTTClient* c, int* value, int timeout)
99 {
100     unsigned char i;
101     int multiplier = 1;
102     int len = 0;
103     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
104 
105     *value = 0;
106     do
107     {
108         int rc = MQTTPACKET_READ_ERROR;
109 
110         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
111         {
112             rc = MQTTPACKET_READ_ERROR; /* bad data */
113             goto exit;
114         }
115         rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
116         if (rc != 1) {
117             LogError("%{public}s: decodePacket  error.ErrorCode=[%{public}d]", __PRETTY_FUNCTION__, rc);
118             goto exit;
119         }
120         *value += (i & 127) * multiplier;
121         multiplier *= 128;
122     } while ((i & 128) != 0);
123 exit:
124     return len;
125 }
126 
127 
readPacket(MQTTClient * c,Timer * timer)128 static int readPacket(MQTTClient* c, Timer* timer)
129 {
130     MQTTHeader header = {0};
131     int len = 0;
132     int rem_len = 0;
133     /* 1. read the header byte.  This has the packet type in it */
134     int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
135     if (rc != 1) {
136         if (errno != EAGAIN) {
137             // LogError("%{public}s:rc=[%{public}d],:errno=[%{public}d]", __PRETTY_FUNCTION__, rc, errno);
138         }
139         goto exit;
140     }
141     len = 1;
142     /* 2. read the remaining length.  This is variable in itself */
143     decodePacket(c, &rem_len, TimerLeftMS(timer));
144     len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
145 
146     if (rem_len > (c->readbuf_size - len))
147     {
148         rc = BUFFER_OVERFLOW;
149         LogError("%{public}s: BUFFER_OVERFLOW - len.ErrorCode=[%{public}d],", __PRETTY_FUNCTION__, rc);
150         goto exit;
151     }
152 
153     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
154     if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
155         LogError("rem_len = %{public}d,  rc = %{public}d", rem_len,  rc);
156         rc = 0;
157         goto exit;
158     }
159 
160     header.byte = c->readbuf[0];
161     rc = header.bits.type;
162 
163     if (c->keepAliveInterval > 0) {
164         bool lastsent = TimerIsExpired(&c->last_sent);
165         bool lastreceived = TimerIsExpired(&c->last_received);
166         // LogDebug("before readPacket TimerCountdown...lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld",
167         // c->last_sent.end_time.tv_sec,
168         // c->last_sent.end_time.tv_usec,
169         // c->last_received.end_time.tv_sec,
170         // c->last_received.end_time.tv_usec);
171         TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
172         // LogDebug("after readPacket TimerCountdown...lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld",
173         // c->last_sent.end_time.tv_sec,
174         // c->last_sent.end_time.tv_usec,
175         // c->last_received.end_time.tv_sec,
176         // c->last_received.end_time.tv_usec);
177         }
178 exit:
179     return rc;
180 }
181 
182 
183 // assume topic filter and name is in correct format
184 // # can only be at end
185 // + and # can only be next to separator
isTopicMatched(char * topicFilter,MQTTString * topicName)186 static char isTopicMatched(char* topicFilter, MQTTString* topicName)
187 {
188     char* curf = topicFilter;
189     char* curn = topicName->lenstring.data;
190     char* curn_end = curn + topicName->lenstring.len;
191 
192     while (*curf && curn < curn_end)
193     {
194         if (*curn == '/' && *curf != '/')
195             break;
196         if (*curf != '+' && *curf != '#' && *curf != *curn)
197             break;
198         if (*curf == '+')
199         {   // skip until we meet the next separator, or end of string
200             char* nextpos = curn + 1;
201             while (nextpos < curn_end && *nextpos != '/')
202                 nextpos = ++curn + 1;
203         }
204         else if (*curf == '#')
205             curn = curn_end - 1;    // skip until end of string
206         curf++;
207         curn++;
208     };
209 
210     return (curn == curn_end) && (*curf == '\0');
211 }
212 
213 
deliverMessage(MQTTClient * c,MQTTString * topicName,MQTTMessage * message)214 int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
215 {
216     int i;
217     int rc = FAILURE;
218     // we have to find the right message handler - indexed by topic
219     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
220     {
221         if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
222                 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
223         {
224             if (c->messageHandlers[i].fp != NULL)
225             {
226                 MessageData md;
227                 NewMessageData(&md, topicName, message);
228                 c->messageHandlers[i].fp(&md);
229                 rc = SUCCESS;
230             }
231         }
232     }
233 
234     if (rc == FAILURE && c->defaultMessageHandler != NULL)
235     {
236         MessageData md;
237         NewMessageData(&md, topicName, message);
238         c->defaultMessageHandler(&md);
239         rc = SUCCESS;
240     }
241 
242     return rc;
243 }
244 
245 
keepalive(MQTTClient * c)246 int keepalive(MQTTClient* c)
247 {
248     int rc = SUCCESS;
249 
250     if (c->keepAliveInterval == 0)
251         goto exit;
252     struct Timer res = c->last_received;
253     TimerAddSecond(&res, 2);
254     // struct timeval now;
255     // gettimeofday(&now, NULL);
256     // LogDebug("[keepalive] lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld,now.tv_sec=%{public}lld, now.tv_usec=%{public}lld",
257     //     c->last_sent.end_time.tv_sec,
258     //     c->last_sent.end_time.tv_usec,
259     //     c->last_received.end_time.tv_sec,
260     //     c->last_received.end_time.tv_usec,
261     //     now.tv_sec,
262     //     now.tv_usec
263     //     );
264     if ( TimerIsExpired(&c->last_sent) || TimerIsExpired(&res))
265     {
266         if (c->ping_outstanding) {
267             rc = FAILURE; /* PINGRESP not received in keepalive interval */
268             // struct timeval now, res;
269             // gettimeofday(&now, NULL);
270             // timersub(&c->last_sent.end_time, &now, &res);
271         }
272         else
273         {
274             Timer timer;
275             TimerInit(&timer);
276             TimerCountdownMS(&timer, 1000);
277             int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
278             if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
279                 c->ping_outstanding = 1;
280                 LogDebug("ping request...");
281         }
282     }
283 
284 exit:
285     return rc;
286 }
287 
288 
MQTTCleanSession(MQTTClient * c)289 void MQTTCleanSession(MQTTClient* c)
290 {
291     int i = 0;
292 
293     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
294         c->messageHandlers[i].topicFilter = NULL;
295 }
296 
297 
MQTTCloseSession(MQTTClient * c)298 void MQTTCloseSession(MQTTClient* c)
299 {
300     LogDebug("close session");
301     c->ping_outstanding = 0;
302     c->isconnected = 0;
303     if (c->cleansession)
304         MQTTCleanSession(c);
305 }
306 
307 
cycle(MQTTClient * c,Timer * timer)308 int cycle(MQTTClient* c, Timer* timer)
309 {
310     int len = 0,
311         rc = SUCCESS;
312 
313     int packet_type = readPacket(c, timer);     /* read the socket, see what work is due */
314 	if (packet_type != 0)
315         LogDebug("byte0=0x%{public}x,byte1=0x%{public}x, rc=0x%{public}x, packet_type=0x%{public}x,errno = [%{public}d]", c->readbuf[0], c->readbuf[1], rc, packet_type, errno);
316 
317     switch (packet_type)
318     {
319         default:
320             /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
321             rc = packet_type;
322             goto exit;
323         case 0: /* timed out reading packet */
324             break;
325         case CONNACK:
326         case PUBACK:
327         case SUBACK:
328         case UNSUBACK:
329             break;
330         case PUBLISH:
331         {
332             MQTTString topicName;
333             MQTTMessage msg;
334             int intQoS;
335             msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
336             if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
337                (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
338                 goto exit;
339             msg.qos = (enum QoS)intQoS;
340             LogDebug("9527>>   msg.qos = %{public}d,  intQoS = %{public}d, topicName = %{public}s,  msg.payloadlen = %{public}d" ,msg.qos, intQoS, topicName.cstring, msg.payloadlen);
341             deliverMessage(c, &topicName, &msg);
342             if (msg.qos != QOS0)
343             {
344                 if (msg.qos == QOS1)
345                     len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
346                 else if (msg.qos == QOS2)
347                     len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
348                 if (len <= 0)
349                     rc = FAILURE;
350                 else
351                     rc = sendPacket(c, len, timer);
352                 if (rc == FAILURE)
353                     goto exit; // there was a problem
354             }
355             break;
356         }
357         case PUBREC:
358         case PUBREL:
359         {
360             unsigned short mypacketid;
361             unsigned char dup, type;
362             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
363                 rc = FAILURE;
364             else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
365                 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
366                 rc = FAILURE;
367             else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
368                 rc = FAILURE; // there was a problem
369             if (rc == FAILURE)
370                 goto exit; // there was a problem
371             break;
372         }
373 
374         case PUBCOMP:
375             break;
376         case PINGRESP:
377             c->ping_outstanding = 0;
378             LogDebug("PINGRESP.................");
379             break;
380     }
381 
382     if (keepalive(c) != SUCCESS) {
383         //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
384         rc = FAILURE;
385     }
386 
387 exit:
388     if (rc == SUCCESS)
389         rc = packet_type;
390     else if (c->isconnected)
391         MQTTCloseSession(c);
392     return rc;
393 }
394 
395 
MQTTYield(MQTTClient * c,int timeout_ms)396 int MQTTYield(MQTTClient* c, int timeout_ms)
397 {
398     int rc = SUCCESS;
399     Timer timer;
400 
401     TimerInit(&timer);
402     TimerCountdownMS(&timer, timeout_ms);
403 
404     do
405     {
406         if (cycle(c, &timer) < 0)
407         {
408             rc = FAILURE;
409             break;
410         }
411     } while (!TimerIsExpired(&timer));
412 
413     return rc;
414 }
415 
MQTTIsConnected(MQTTClient * client)416 int MQTTIsConnected(MQTTClient* client)
417 {
418     return client->isconnected;
419 }
420 
MQTTRun(void * parm)421 void MQTTRun(void* parm)
422 {
423     Timer timer;
424     MQTTClient* c = (MQTTClient*)parm;
425 
426     TimerInit(&timer);
427 
428     while (1)
429     {
430 #if defined(MQTT_TASK)
431         MutexLock(&c->mutex);
432 #endif
433         TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
434         cycle(c, &timer);
435 #if defined(MQTT_TASK)
436         MutexUnlock(&c->mutex);
437 #endif
438     }
439 }
440 
441 
442 #if defined(MQTT_TASK)
MQTTStartTask(MQTTClient * client)443 int MQTTStartTask(MQTTClient* client)
444 {
445     return ThreadStart(&client->thread, &MQTTRun, client);
446 }
447 #endif
448 
449 
waitfor(MQTTClient * c,int packet_type,Timer * timer)450 int waitfor(MQTTClient* c, int packet_type, Timer* timer)
451 {
452     int rc = FAILURE;
453 
454     do
455     {
456         if (TimerIsExpired(timer))
457             break; // we timed out
458         rc = cycle(c, timer);
459     }
460     while (rc != packet_type && rc >= 0);
461 
462     return rc;
463 }
464 
465 
466 
467 
MQTTConnectWithResults(MQTTClient * c,MQTTPacket_connectData * options,MQTTConnackData * data)468 int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
469 {
470     Timer connect_timer;
471     int rc = FAILURE;
472     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
473     int len = 0;
474 
475 #if defined(MQTT_TASK)
476       MutexLock(&c->mutex);
477 #endif
478       if (c->isconnected) /* don't send connect packet again if we are already connected */
479           goto exit;
480 
481     TimerInit(&connect_timer);
482     TimerCountdownMS(&connect_timer, c->command_timeout_ms);
483 
484     if (options == 0)
485         options = &default_options; /* set default options if none were supplied */
486 
487     c->keepAliveInterval = options->keepAliveInterval;
488     c->cleansession = options->cleansession;
489     TimerCountdown(&c->last_received, c->keepAliveInterval);
490     if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
491         goto exit;
492     if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS)  // send the connect packet
493         goto exit; // there was a problem
494 
495     // this will be a blocking call, wait for the connack
496     if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
497     {
498         data->rc = 0;
499         data->sessionPresent = 0;
500         if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
501             rc = data->rc;
502         else
503             rc = FAILURE;
504     }
505     else
506         rc = FAILURE;
507 
508 exit:
509     if (rc == SUCCESS)
510     {
511         c->isconnected = 1;
512         c->ping_outstanding = 0;
513     }
514 
515 #if defined(MQTT_TASK)
516       MutexUnlock(&c->mutex);
517 #endif
518 
519     return rc;
520 }
521 
522 
MQTTConnect(MQTTClient * c,MQTTPacket_connectData * options)523 int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
524 {
525     MQTTConnackData data;
526     return MQTTConnectWithResults(c, options, &data);
527 }
528 
529 
MQTTSetMessageHandler(MQTTClient * c,const char * topicFilter,messageHandler messageHandler)530 int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
531 {
532     int rc = FAILURE;
533     int i = -1;
534 
535     /* first check for an existing matching slot */
536     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
537     {
538         if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
539         {
540             if (messageHandler == NULL) /* remove existing */
541             {
542                 c->messageHandlers[i].topicFilter = NULL;
543                 c->messageHandlers[i].fp = NULL;
544             }
545             rc = SUCCESS; /* return i when adding new subscription */
546             break;
547         }
548     }
549     /* if no existing, look for empty slot (unless we are removing) */
550     if (messageHandler != NULL) {
551         if (rc == FAILURE)
552         {
553             for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
554             {
555                 if (c->messageHandlers[i].topicFilter == NULL)
556                 {
557                     rc = SUCCESS;
558                     break;
559                 }
560             }
561         }
562         if (i < MAX_MESSAGE_HANDLERS)
563         {
564             c->messageHandlers[i].topicFilter = topicFilter;
565             c->messageHandlers[i].fp = messageHandler;
566         }
567     }
568     return rc;
569 }
570 
MQTTAsyncSubscribe(MQTTClient * c,const char * topicFilter,enum QoS qos,messageHandler messageHandler,MQTTSubackData * data)571 int MQTTAsyncSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
572        messageHandler messageHandler, MQTTSubackData* data)
573 {
574     int rc = FAILURE;
575     Timer timer;
576     int len = 0;
577     MQTTString topic = MQTTString_initializer;
578     topic.cstring = (char *)topicFilter;
579 
580 #if defined(MQTT_TASK)
581       MutexLock(&c->mutex);
582 #endif
583       if (!c->isconnected)
584             goto exit;
585 
586     TimerInit(&timer);
587     TimerCountdownMS(&timer, c->command_timeout_ms);
588 
589     len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
590     if (len <= 0)
591         goto exit;
592     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
593         goto exit;             // there was a problem
594     rc =MQTTSetMessageHandler(c, topicFilter, messageHandler);
595 exit:
596     if (rc == FAILURE)
597         MQTTCloseSession(c);
598 #if defined(MQTT_TASK)
599       MutexUnlock(&c->mutex);
600 #endif
601     return rc;
602 }
603 
604 
MQTTSubscribeWithResults(MQTTClient * c,const char * topicFilter,enum QoS qos,messageHandler messageHandler,MQTTSubackData * data)605 int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
606        messageHandler messageHandler, MQTTSubackData* data)
607 {
608     int rc = FAILURE;
609     Timer timer;
610     int len = 0;
611     MQTTString topic = MQTTString_initializer;
612     topic.cstring = (char *)topicFilter;
613 
614 #if defined(MQTT_TASK)
615       MutexLock(&c->mutex);
616 #endif
617       if (!c->isconnected)
618             goto exit;
619 
620     TimerInit(&timer);
621     TimerCountdownMS(&timer, c->command_timeout_ms);
622 
623     len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
624     if (len <= 0)
625         goto exit;
626     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
627         goto exit;             // there was a problem
628 
629     if (waitfor(c, SUBACK, &timer) == SUBACK)      // wait for suback
630     {
631         int count = 0;
632         unsigned short mypacketid;
633         data->grantedQoS = QOS0;
634         if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1)
635         {
636             if (data->grantedQoS != 0x80)
637                 rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
638         }
639     }
640     else
641         rc = FAILURE;
642 
643 exit:
644     if (rc == FAILURE)
645         MQTTCloseSession(c);
646 #if defined(MQTT_TASK)
647       MutexUnlock(&c->mutex);
648 #endif
649     return rc;
650 }
651 
652 
MQTTSubscribe(MQTTClient * c,const char * topicFilter,enum QoS qos,messageHandler messageHandler)653 int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
654        messageHandler messageHandler)
655 {
656     MQTTSubackData data;
657     return MQTTAsyncSubscribe(c, topicFilter, qos, messageHandler, &data);
658 }
659 
660 
MQTTAsyncUnsubscribe(MQTTClient * c,const char * topicFilter)661 int MQTTAsyncUnsubscribe(MQTTClient* c, const char* topicFilter)
662 {
663     int rc = FAILURE;
664     Timer timer;
665     MQTTString topic = MQTTString_initializer;
666     topic.cstring = (char *)topicFilter;
667     int len = 0;
668 
669 #if defined(MQTT_TASK)
670       MutexLock(&c->mutex);
671 #endif
672       if (!c->isconnected)
673           goto exit;
674 
675     TimerInit(&timer);
676     TimerCountdownMS(&timer, c->command_timeout_ms);
677 
678     if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
679         goto exit;
680     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
681         goto exit; // there was a problem
682     MQTTSetMessageHandler(c, topicFilter, NULL);
683 exit:
684     if (rc == FAILURE)
685         MQTTCloseSession(c);
686 #if defined(MQTT_TASK)
687       MutexUnlock(&c->mutex);
688 #endif
689     return rc;
690 }
691 
692 
MQTTUnsubscribe(MQTTClient * c,const char * topicFilter)693 int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
694 {
695     int rc = FAILURE;
696     Timer timer;
697     MQTTString topic = MQTTString_initializer;
698     topic.cstring = (char *)topicFilter;
699     int len = 0;
700 
701 #if defined(MQTT_TASK)
702       MutexLock(&c->mutex);
703 #endif
704       if (!c->isconnected)
705           goto exit;
706 
707     TimerInit(&timer);
708     TimerCountdownMS(&timer, c->command_timeout_ms);
709 
710     if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
711         goto exit;
712     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
713         goto exit; // there was a problem
714 
715     if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
716     {
717         unsigned short mypacketid;  // should be the same as the packetid above
718         if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
719         {
720             /* remove the subscription message handler associated with this topic, if there is one */
721             MQTTSetMessageHandler(c, topicFilter, NULL);
722         }
723     }
724     else
725         rc = FAILURE;
726 
727 exit:
728     if (rc == FAILURE)
729         MQTTCloseSession(c);
730 #if defined(MQTT_TASK)
731       MutexUnlock(&c->mutex);
732 #endif
733     return rc;
734 }
MQTTAsyncPublish(MQTTClient * c,const char * topicName,MQTTMessage * message)735 int MQTTAsyncPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
736 {
737     int rc = FAILURE;
738     Timer timer;
739     MQTTString topic = MQTTString_initializer;
740     topic.cstring = (char *)topicName;
741     int len = 0;
742 
743 #if defined(MQTT_TASK)
744       MutexLock(&c->mutex);
745 #endif
746       if (!c->isconnected)
747             goto exit;
748 
749     TimerInit(&timer);
750     TimerCountdownMS(&timer, c->command_timeout_ms);
751 
752     if (message->qos == QOS1 || message->qos == QOS2)
753         message->id = getNextPacketId(c);
754 
755     len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
756               topic, (unsigned char*)message->payload, message->payloadlen);
757     if (len <= 0) {
758         LogDebug("MQTTSerialize_publish error.");
759         goto exit;
760     }
761     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
762     {
763         LogDebug("sendPacket error.");
764         goto exit; // there was a problem
765     }
766 exit:
767     if (rc == FAILURE)
768         MQTTCloseSession(c);
769 #if defined(MQTT_TASK)
770       MutexUnlock(&c->mutex);
771 #endif
772     return rc;
773 }
774 
MQTTPublish(MQTTClient * c,const char * topicName,MQTTMessage * message)775 int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
776 {
777     int rc = FAILURE;
778     Timer timer;
779     MQTTString topic = MQTTString_initializer;
780     topic.cstring = (char *)topicName;
781     int len = 0;
782 
783 #if defined(MQTT_TASK)
784       MutexLock(&c->mutex);
785 #endif
786       if (!c->isconnected)
787             goto exit;
788 
789     TimerInit(&timer);
790     TimerCountdownMS(&timer, c->command_timeout_ms);
791 
792     if (message->qos == QOS1 || message->qos == QOS2)
793         message->id = getNextPacketId(c);
794 
795     len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
796               topic, (unsigned char*)message->payload, message->payloadlen);
797     if (len <= 0)
798         goto exit;
799     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
800         goto exit; // there was a problem
801 
802     if (message->qos == QOS1)
803     {
804         if (waitfor(c, PUBACK, &timer) == PUBACK)
805         {
806             unsigned short mypacketid;
807             unsigned char dup, type;
808             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
809                 rc = FAILURE;
810         }
811         else
812             rc = FAILURE;
813     }
814     else if (message->qos == QOS2)
815     {
816         if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
817         {
818             unsigned short mypacketid;
819             unsigned char dup, type;
820             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
821                 rc = FAILURE;
822         }
823         else
824             rc = FAILURE;
825     }
826 
827 exit:
828     if (rc == FAILURE)
829         MQTTCloseSession(c);
830 #if defined(MQTT_TASK)
831       MutexUnlock(&c->mutex);
832 #endif
833     return rc;
834 }
835 
836 
MQTTDisconnect(MQTTClient * c)837 int MQTTDisconnect(MQTTClient* c)
838 {
839     int rc = FAILURE;
840     Timer timer;     // we might wait for incomplete incoming publishes to complete
841     int len = 0;
842 
843 #if defined(MQTT_TASK)
844     MutexLock(&c->mutex);
845 #endif
846     TimerInit(&timer);
847     TimerCountdownMS(&timer, c->command_timeout_ms);
848 
849       len = MQTTSerialize_disconnect(c->buf, c->buf_size);
850     if (len > 0)
851         rc = sendPacket(c, len, &timer);            // send the disconnect packet
852     MQTTCloseSession(c);
853 
854 #if defined(MQTT_TASK)
855       MutexUnlock(&c->mutex);
856 #endif
857     return rc;
858 }
859