1 /*******************************************************************************
2 * Copyright (c) 2014, 2017 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
15 * Ian Craggs - fix for #96 - check rem_len in readPacket
16 * Ian Craggs - add ability to set message handler separately #6
17 *******************************************************************************/
18 #include "MQTTClient.h"
19
20 #include <stdio.h>
21 #include <string.h>
22 #include <signal.h>
23
NewMessageData(MessageData * md,MQTTString * aTopicName,MQTTMessage * aMessage)24 static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
25 md->topicName = aTopicName;
26 md->message = aMessage;
27 }
28
29
getNextPacketId(MQTTClient * c)30 static int getNextPacketId(MQTTClient *c) {
31 return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
32 }
33
34
sendPacket(MQTTClient * c,int length,Timer * timer)35 static int sendPacket(MQTTClient* c, int length, Timer* timer)
36 {
37 int rc = FAILURE,
38 sent = 0;
39 bool isexpired = TimerIsExpired(timer);
40 while (sent < length && !isexpired) {
41 rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
42 if (rc < 0) // there was an error writing the data
43 break;
44 sent += rc;
45 isexpired = TimerIsExpired(timer);
46 }
47 if (sent == length) {
48 // LogDebug("before sendPacket TimerCountdown...lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld",
49 // c->last_sent.end_time.tv_sec,
50 // c->last_sent.end_time.tv_usec,
51 // c->last_received.end_time.tv_sec,
52 // c->last_received.end_time.tv_usec);
53 TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
54 // LogDebug("after sendPacket TimerCountdown...lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld",
55 // c->last_sent.end_time.tv_sec,
56 // c->last_sent.end_time.tv_usec,
57 // c->last_received.end_time.tv_sec,
58 // c->last_received.end_time.tv_usec);
59 rc = SUCCESS;
60 } else {
61 // LogDebug("sent=%{public}d,length=%{public}d,isexpired=%{public}d, rc = %{public}d, errno = %{public}d", sent, length, isexpired, rc, errno);
62 rc = FAILURE;
63 }
64 return rc;
65 }
66
handle_pipe(int sig)67 void handle_pipe(int sig) {}
MQTTClientInit(MQTTClient * c,Network * network,unsigned int command_timeout_ms,unsigned char * sendbuf,size_t sendbuf_size,unsigned char * readbuf,size_t readbuf_size)68 void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
69 unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
70 {
71 int i;
72 c->ipstack = network;
73 struct sigaction action;
74 action.sa_handler = handle_pipe;
75 sigemptyset(&action.sa_mask);
76 action.sa_flags = 0;
77 sigaction(SIGPIPE, &action, NULL);
78 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
79 c->messageHandlers[i].topicFilter = 0;
80 c->command_timeout_ms = command_timeout_ms;
81 c->buf = sendbuf;
82 c->buf_size = sendbuf_size;
83 c->readbuf = readbuf;
84 c->readbuf_size = readbuf_size;
85 c->isconnected = 0;
86 c->cleansession = 0;
87 c->ping_outstanding = 0;
88 c->defaultMessageHandler = NULL;
89 c->next_packetid = 1;
90 TimerInit(&c->last_sent);
91 TimerInit(&c->last_received);
92 #if defined(MQTT_TASK)
93 MutexInit(&c->mutex);
94 #endif
95 }
96
97
decodePacket(MQTTClient * c,int * value,int timeout)98 static int decodePacket(MQTTClient* c, int* value, int timeout)
99 {
100 unsigned char i;
101 int multiplier = 1;
102 int len = 0;
103 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
104
105 *value = 0;
106 do
107 {
108 int rc = MQTTPACKET_READ_ERROR;
109
110 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
111 {
112 rc = MQTTPACKET_READ_ERROR; /* bad data */
113 goto exit;
114 }
115 rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
116 if (rc != 1) {
117 LogError("%{public}s: decodePacket error.ErrorCode=[%{public}d]", __PRETTY_FUNCTION__, rc);
118 goto exit;
119 }
120 *value += (i & 127) * multiplier;
121 multiplier *= 128;
122 } while ((i & 128) != 0);
123 exit:
124 return len;
125 }
126
127
readPacket(MQTTClient * c,Timer * timer)128 static int readPacket(MQTTClient* c, Timer* timer)
129 {
130 MQTTHeader header = {0};
131 int len = 0;
132 int rem_len = 0;
133 /* 1. read the header byte. This has the packet type in it */
134 int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
135 if (rc != 1) {
136 if (errno != EAGAIN) {
137 // LogError("%{public}s:rc=[%{public}d],:errno=[%{public}d]", __PRETTY_FUNCTION__, rc, errno);
138 }
139 goto exit;
140 }
141 len = 1;
142 /* 2. read the remaining length. This is variable in itself */
143 decodePacket(c, &rem_len, TimerLeftMS(timer));
144 len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
145
146 if (rem_len > (c->readbuf_size - len))
147 {
148 rc = BUFFER_OVERFLOW;
149 LogError("%{public}s: BUFFER_OVERFLOW - len.ErrorCode=[%{public}d],", __PRETTY_FUNCTION__, rc);
150 goto exit;
151 }
152
153 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
154 if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
155 LogError("rem_len = %{public}d, rc = %{public}d", rem_len, rc);
156 rc = 0;
157 goto exit;
158 }
159
160 header.byte = c->readbuf[0];
161 rc = header.bits.type;
162
163 if (c->keepAliveInterval > 0) {
164 bool lastsent = TimerIsExpired(&c->last_sent);
165 bool lastreceived = TimerIsExpired(&c->last_received);
166 // LogDebug("before readPacket TimerCountdown...lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld",
167 // c->last_sent.end_time.tv_sec,
168 // c->last_sent.end_time.tv_usec,
169 // c->last_received.end_time.tv_sec,
170 // c->last_received.end_time.tv_usec);
171 TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
172 // LogDebug("after readPacket TimerCountdown...lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld",
173 // c->last_sent.end_time.tv_sec,
174 // c->last_sent.end_time.tv_usec,
175 // c->last_received.end_time.tv_sec,
176 // c->last_received.end_time.tv_usec);
177 }
178 exit:
179 return rc;
180 }
181
182
183 // assume topic filter and name is in correct format
184 // # can only be at end
185 // + and # can only be next to separator
isTopicMatched(char * topicFilter,MQTTString * topicName)186 static char isTopicMatched(char* topicFilter, MQTTString* topicName)
187 {
188 char* curf = topicFilter;
189 char* curn = topicName->lenstring.data;
190 char* curn_end = curn + topicName->lenstring.len;
191
192 while (*curf && curn < curn_end)
193 {
194 if (*curn == '/' && *curf != '/')
195 break;
196 if (*curf != '+' && *curf != '#' && *curf != *curn)
197 break;
198 if (*curf == '+')
199 { // skip until we meet the next separator, or end of string
200 char* nextpos = curn + 1;
201 while (nextpos < curn_end && *nextpos != '/')
202 nextpos = ++curn + 1;
203 }
204 else if (*curf == '#')
205 curn = curn_end - 1; // skip until end of string
206 curf++;
207 curn++;
208 };
209
210 return (curn == curn_end) && (*curf == '\0');
211 }
212
213
deliverMessage(MQTTClient * c,MQTTString * topicName,MQTTMessage * message)214 int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
215 {
216 int i;
217 int rc = FAILURE;
218 // we have to find the right message handler - indexed by topic
219 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
220 {
221 if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
222 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
223 {
224 if (c->messageHandlers[i].fp != NULL)
225 {
226 MessageData md;
227 NewMessageData(&md, topicName, message);
228 c->messageHandlers[i].fp(&md);
229 rc = SUCCESS;
230 }
231 }
232 }
233
234 if (rc == FAILURE && c->defaultMessageHandler != NULL)
235 {
236 MessageData md;
237 NewMessageData(&md, topicName, message);
238 c->defaultMessageHandler(&md);
239 rc = SUCCESS;
240 }
241
242 return rc;
243 }
244
245
keepalive(MQTTClient * c)246 int keepalive(MQTTClient* c)
247 {
248 int rc = SUCCESS;
249
250 if (c->keepAliveInterval == 0)
251 goto exit;
252 struct Timer res = c->last_received;
253 TimerAddSecond(&res, 2);
254 // struct timeval now;
255 // gettimeofday(&now, NULL);
256 // LogDebug("[keepalive] lastsent.tv_sec=%{public}lld, lastsent.tv_usec=%{public}lld, last_received.tv_sec=%{public}lld, last_received.tv_usec=%{public}lld,now.tv_sec=%{public}lld, now.tv_usec=%{public}lld",
257 // c->last_sent.end_time.tv_sec,
258 // c->last_sent.end_time.tv_usec,
259 // c->last_received.end_time.tv_sec,
260 // c->last_received.end_time.tv_usec,
261 // now.tv_sec,
262 // now.tv_usec
263 // );
264 if ( TimerIsExpired(&c->last_sent) || TimerIsExpired(&res))
265 {
266 if (c->ping_outstanding) {
267 rc = FAILURE; /* PINGRESP not received in keepalive interval */
268 // struct timeval now, res;
269 // gettimeofday(&now, NULL);
270 // timersub(&c->last_sent.end_time, &now, &res);
271 }
272 else
273 {
274 Timer timer;
275 TimerInit(&timer);
276 TimerCountdownMS(&timer, 1000);
277 int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
278 if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
279 c->ping_outstanding = 1;
280 LogDebug("ping request...");
281 }
282 }
283
284 exit:
285 return rc;
286 }
287
288
MQTTCleanSession(MQTTClient * c)289 void MQTTCleanSession(MQTTClient* c)
290 {
291 int i = 0;
292
293 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
294 c->messageHandlers[i].topicFilter = NULL;
295 }
296
297
MQTTCloseSession(MQTTClient * c)298 void MQTTCloseSession(MQTTClient* c)
299 {
300 LogDebug("close session");
301 c->ping_outstanding = 0;
302 c->isconnected = 0;
303 if (c->cleansession)
304 MQTTCleanSession(c);
305 }
306
307
cycle(MQTTClient * c,Timer * timer)308 int cycle(MQTTClient* c, Timer* timer)
309 {
310 int len = 0,
311 rc = SUCCESS;
312
313 int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
314 if (packet_type != 0)
315 LogDebug("byte0=0x%{public}x,byte1=0x%{public}x, rc=0x%{public}x, packet_type=0x%{public}x,errno = [%{public}d]", c->readbuf[0], c->readbuf[1], rc, packet_type, errno);
316
317 switch (packet_type)
318 {
319 default:
320 /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
321 rc = packet_type;
322 goto exit;
323 case 0: /* timed out reading packet */
324 break;
325 case CONNACK:
326 case PUBACK:
327 case SUBACK:
328 case UNSUBACK:
329 break;
330 case PUBLISH:
331 {
332 MQTTString topicName;
333 MQTTMessage msg;
334 int intQoS;
335 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
336 if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
337 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
338 goto exit;
339 msg.qos = (enum QoS)intQoS;
340 LogDebug("9527>> msg.qos = %{public}d, intQoS = %{public}d, topicName = %{public}s, msg.payloadlen = %{public}d" ,msg.qos, intQoS, topicName.cstring, msg.payloadlen);
341 deliverMessage(c, &topicName, &msg);
342 if (msg.qos != QOS0)
343 {
344 if (msg.qos == QOS1)
345 len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
346 else if (msg.qos == QOS2)
347 len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
348 if (len <= 0)
349 rc = FAILURE;
350 else
351 rc = sendPacket(c, len, timer);
352 if (rc == FAILURE)
353 goto exit; // there was a problem
354 }
355 break;
356 }
357 case PUBREC:
358 case PUBREL:
359 {
360 unsigned short mypacketid;
361 unsigned char dup, type;
362 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
363 rc = FAILURE;
364 else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
365 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
366 rc = FAILURE;
367 else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
368 rc = FAILURE; // there was a problem
369 if (rc == FAILURE)
370 goto exit; // there was a problem
371 break;
372 }
373
374 case PUBCOMP:
375 break;
376 case PINGRESP:
377 c->ping_outstanding = 0;
378 LogDebug("PINGRESP.................");
379 break;
380 }
381
382 if (keepalive(c) != SUCCESS) {
383 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
384 rc = FAILURE;
385 }
386
387 exit:
388 if (rc == SUCCESS)
389 rc = packet_type;
390 else if (c->isconnected)
391 MQTTCloseSession(c);
392 return rc;
393 }
394
395
MQTTYield(MQTTClient * c,int timeout_ms)396 int MQTTYield(MQTTClient* c, int timeout_ms)
397 {
398 int rc = SUCCESS;
399 Timer timer;
400
401 TimerInit(&timer);
402 TimerCountdownMS(&timer, timeout_ms);
403
404 do
405 {
406 if (cycle(c, &timer) < 0)
407 {
408 rc = FAILURE;
409 break;
410 }
411 } while (!TimerIsExpired(&timer));
412
413 return rc;
414 }
415
MQTTIsConnected(MQTTClient * client)416 int MQTTIsConnected(MQTTClient* client)
417 {
418 return client->isconnected;
419 }
420
MQTTRun(void * parm)421 void MQTTRun(void* parm)
422 {
423 Timer timer;
424 MQTTClient* c = (MQTTClient*)parm;
425
426 TimerInit(&timer);
427
428 while (1)
429 {
430 #if defined(MQTT_TASK)
431 MutexLock(&c->mutex);
432 #endif
433 TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
434 cycle(c, &timer);
435 #if defined(MQTT_TASK)
436 MutexUnlock(&c->mutex);
437 #endif
438 }
439 }
440
441
442 #if defined(MQTT_TASK)
MQTTStartTask(MQTTClient * client)443 int MQTTStartTask(MQTTClient* client)
444 {
445 return ThreadStart(&client->thread, &MQTTRun, client);
446 }
447 #endif
448
449
waitfor(MQTTClient * c,int packet_type,Timer * timer)450 int waitfor(MQTTClient* c, int packet_type, Timer* timer)
451 {
452 int rc = FAILURE;
453
454 do
455 {
456 if (TimerIsExpired(timer))
457 break; // we timed out
458 rc = cycle(c, timer);
459 }
460 while (rc != packet_type && rc >= 0);
461
462 return rc;
463 }
464
465
466
467
MQTTConnectWithResults(MQTTClient * c,MQTTPacket_connectData * options,MQTTConnackData * data)468 int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
469 {
470 Timer connect_timer;
471 int rc = FAILURE;
472 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
473 int len = 0;
474
475 #if defined(MQTT_TASK)
476 MutexLock(&c->mutex);
477 #endif
478 if (c->isconnected) /* don't send connect packet again if we are already connected */
479 goto exit;
480
481 TimerInit(&connect_timer);
482 TimerCountdownMS(&connect_timer, c->command_timeout_ms);
483
484 if (options == 0)
485 options = &default_options; /* set default options if none were supplied */
486
487 c->keepAliveInterval = options->keepAliveInterval;
488 c->cleansession = options->cleansession;
489 TimerCountdown(&c->last_received, c->keepAliveInterval);
490 if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
491 goto exit;
492 if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet
493 goto exit; // there was a problem
494
495 // this will be a blocking call, wait for the connack
496 if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
497 {
498 data->rc = 0;
499 data->sessionPresent = 0;
500 if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
501 rc = data->rc;
502 else
503 rc = FAILURE;
504 }
505 else
506 rc = FAILURE;
507
508 exit:
509 if (rc == SUCCESS)
510 {
511 c->isconnected = 1;
512 c->ping_outstanding = 0;
513 }
514
515 #if defined(MQTT_TASK)
516 MutexUnlock(&c->mutex);
517 #endif
518
519 return rc;
520 }
521
522
MQTTConnect(MQTTClient * c,MQTTPacket_connectData * options)523 int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
524 {
525 MQTTConnackData data;
526 return MQTTConnectWithResults(c, options, &data);
527 }
528
529
MQTTSetMessageHandler(MQTTClient * c,const char * topicFilter,messageHandler messageHandler)530 int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
531 {
532 int rc = FAILURE;
533 int i = -1;
534
535 /* first check for an existing matching slot */
536 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
537 {
538 if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
539 {
540 if (messageHandler == NULL) /* remove existing */
541 {
542 c->messageHandlers[i].topicFilter = NULL;
543 c->messageHandlers[i].fp = NULL;
544 }
545 rc = SUCCESS; /* return i when adding new subscription */
546 break;
547 }
548 }
549 /* if no existing, look for empty slot (unless we are removing) */
550 if (messageHandler != NULL) {
551 if (rc == FAILURE)
552 {
553 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
554 {
555 if (c->messageHandlers[i].topicFilter == NULL)
556 {
557 rc = SUCCESS;
558 break;
559 }
560 }
561 }
562 if (i < MAX_MESSAGE_HANDLERS)
563 {
564 c->messageHandlers[i].topicFilter = topicFilter;
565 c->messageHandlers[i].fp = messageHandler;
566 }
567 }
568 return rc;
569 }
570
MQTTAsyncSubscribe(MQTTClient * c,const char * topicFilter,enum QoS qos,messageHandler messageHandler,MQTTSubackData * data)571 int MQTTAsyncSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
572 messageHandler messageHandler, MQTTSubackData* data)
573 {
574 int rc = FAILURE;
575 Timer timer;
576 int len = 0;
577 MQTTString topic = MQTTString_initializer;
578 topic.cstring = (char *)topicFilter;
579
580 #if defined(MQTT_TASK)
581 MutexLock(&c->mutex);
582 #endif
583 if (!c->isconnected)
584 goto exit;
585
586 TimerInit(&timer);
587 TimerCountdownMS(&timer, c->command_timeout_ms);
588
589 len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
590 if (len <= 0)
591 goto exit;
592 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
593 goto exit; // there was a problem
594 rc =MQTTSetMessageHandler(c, topicFilter, messageHandler);
595 exit:
596 if (rc == FAILURE)
597 MQTTCloseSession(c);
598 #if defined(MQTT_TASK)
599 MutexUnlock(&c->mutex);
600 #endif
601 return rc;
602 }
603
604
MQTTSubscribeWithResults(MQTTClient * c,const char * topicFilter,enum QoS qos,messageHandler messageHandler,MQTTSubackData * data)605 int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
606 messageHandler messageHandler, MQTTSubackData* data)
607 {
608 int rc = FAILURE;
609 Timer timer;
610 int len = 0;
611 MQTTString topic = MQTTString_initializer;
612 topic.cstring = (char *)topicFilter;
613
614 #if defined(MQTT_TASK)
615 MutexLock(&c->mutex);
616 #endif
617 if (!c->isconnected)
618 goto exit;
619
620 TimerInit(&timer);
621 TimerCountdownMS(&timer, c->command_timeout_ms);
622
623 len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
624 if (len <= 0)
625 goto exit;
626 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
627 goto exit; // there was a problem
628
629 if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
630 {
631 int count = 0;
632 unsigned short mypacketid;
633 data->grantedQoS = QOS0;
634 if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1)
635 {
636 if (data->grantedQoS != 0x80)
637 rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
638 }
639 }
640 else
641 rc = FAILURE;
642
643 exit:
644 if (rc == FAILURE)
645 MQTTCloseSession(c);
646 #if defined(MQTT_TASK)
647 MutexUnlock(&c->mutex);
648 #endif
649 return rc;
650 }
651
652
MQTTSubscribe(MQTTClient * c,const char * topicFilter,enum QoS qos,messageHandler messageHandler)653 int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
654 messageHandler messageHandler)
655 {
656 MQTTSubackData data;
657 return MQTTAsyncSubscribe(c, topicFilter, qos, messageHandler, &data);
658 }
659
660
MQTTAsyncUnsubscribe(MQTTClient * c,const char * topicFilter)661 int MQTTAsyncUnsubscribe(MQTTClient* c, const char* topicFilter)
662 {
663 int rc = FAILURE;
664 Timer timer;
665 MQTTString topic = MQTTString_initializer;
666 topic.cstring = (char *)topicFilter;
667 int len = 0;
668
669 #if defined(MQTT_TASK)
670 MutexLock(&c->mutex);
671 #endif
672 if (!c->isconnected)
673 goto exit;
674
675 TimerInit(&timer);
676 TimerCountdownMS(&timer, c->command_timeout_ms);
677
678 if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
679 goto exit;
680 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
681 goto exit; // there was a problem
682 MQTTSetMessageHandler(c, topicFilter, NULL);
683 exit:
684 if (rc == FAILURE)
685 MQTTCloseSession(c);
686 #if defined(MQTT_TASK)
687 MutexUnlock(&c->mutex);
688 #endif
689 return rc;
690 }
691
692
MQTTUnsubscribe(MQTTClient * c,const char * topicFilter)693 int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
694 {
695 int rc = FAILURE;
696 Timer timer;
697 MQTTString topic = MQTTString_initializer;
698 topic.cstring = (char *)topicFilter;
699 int len = 0;
700
701 #if defined(MQTT_TASK)
702 MutexLock(&c->mutex);
703 #endif
704 if (!c->isconnected)
705 goto exit;
706
707 TimerInit(&timer);
708 TimerCountdownMS(&timer, c->command_timeout_ms);
709
710 if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
711 goto exit;
712 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
713 goto exit; // there was a problem
714
715 if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
716 {
717 unsigned short mypacketid; // should be the same as the packetid above
718 if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
719 {
720 /* remove the subscription message handler associated with this topic, if there is one */
721 MQTTSetMessageHandler(c, topicFilter, NULL);
722 }
723 }
724 else
725 rc = FAILURE;
726
727 exit:
728 if (rc == FAILURE)
729 MQTTCloseSession(c);
730 #if defined(MQTT_TASK)
731 MutexUnlock(&c->mutex);
732 #endif
733 return rc;
734 }
MQTTAsyncPublish(MQTTClient * c,const char * topicName,MQTTMessage * message)735 int MQTTAsyncPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
736 {
737 int rc = FAILURE;
738 Timer timer;
739 MQTTString topic = MQTTString_initializer;
740 topic.cstring = (char *)topicName;
741 int len = 0;
742
743 #if defined(MQTT_TASK)
744 MutexLock(&c->mutex);
745 #endif
746 if (!c->isconnected)
747 goto exit;
748
749 TimerInit(&timer);
750 TimerCountdownMS(&timer, c->command_timeout_ms);
751
752 if (message->qos == QOS1 || message->qos == QOS2)
753 message->id = getNextPacketId(c);
754
755 len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
756 topic, (unsigned char*)message->payload, message->payloadlen);
757 if (len <= 0) {
758 LogDebug("MQTTSerialize_publish error.");
759 goto exit;
760 }
761 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
762 {
763 LogDebug("sendPacket error.");
764 goto exit; // there was a problem
765 }
766 exit:
767 if (rc == FAILURE)
768 MQTTCloseSession(c);
769 #if defined(MQTT_TASK)
770 MutexUnlock(&c->mutex);
771 #endif
772 return rc;
773 }
774
MQTTPublish(MQTTClient * c,const char * topicName,MQTTMessage * message)775 int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
776 {
777 int rc = FAILURE;
778 Timer timer;
779 MQTTString topic = MQTTString_initializer;
780 topic.cstring = (char *)topicName;
781 int len = 0;
782
783 #if defined(MQTT_TASK)
784 MutexLock(&c->mutex);
785 #endif
786 if (!c->isconnected)
787 goto exit;
788
789 TimerInit(&timer);
790 TimerCountdownMS(&timer, c->command_timeout_ms);
791
792 if (message->qos == QOS1 || message->qos == QOS2)
793 message->id = getNextPacketId(c);
794
795 len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
796 topic, (unsigned char*)message->payload, message->payloadlen);
797 if (len <= 0)
798 goto exit;
799 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
800 goto exit; // there was a problem
801
802 if (message->qos == QOS1)
803 {
804 if (waitfor(c, PUBACK, &timer) == PUBACK)
805 {
806 unsigned short mypacketid;
807 unsigned char dup, type;
808 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
809 rc = FAILURE;
810 }
811 else
812 rc = FAILURE;
813 }
814 else if (message->qos == QOS2)
815 {
816 if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
817 {
818 unsigned short mypacketid;
819 unsigned char dup, type;
820 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
821 rc = FAILURE;
822 }
823 else
824 rc = FAILURE;
825 }
826
827 exit:
828 if (rc == FAILURE)
829 MQTTCloseSession(c);
830 #if defined(MQTT_TASK)
831 MutexUnlock(&c->mutex);
832 #endif
833 return rc;
834 }
835
836
MQTTDisconnect(MQTTClient * c)837 int MQTTDisconnect(MQTTClient* c)
838 {
839 int rc = FAILURE;
840 Timer timer; // we might wait for incomplete incoming publishes to complete
841 int len = 0;
842
843 #if defined(MQTT_TASK)
844 MutexLock(&c->mutex);
845 #endif
846 TimerInit(&timer);
847 TimerCountdownMS(&timer, c->command_timeout_ms);
848
849 len = MQTTSerialize_disconnect(c->buf, c->buf_size);
850 if (len > 0)
851 rc = sendPacket(c, len, &timer); // send the disconnect packet
852 MQTTCloseSession(c);
853
854 #if defined(MQTT_TASK)
855 MutexUnlock(&c->mutex);
856 #endif
857 return rc;
858 }
859