1 /*******************************************************************************
2 * Copyright (c) 2009, 2017 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial implementation for embedded C client
15 *******************************************************************************/
16
17
18 /**
19 * @file
20 * Tests for the Paho embedded C "high" level client
21 */
22
23
24 #include "MQTTClient.h"
25 #include <string.h>
26 #include <stdlib.h>
27
28 #if !defined(_WINDOWS)
29 #include <sys/time.h>
30 #include <sys/socket.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #else
34 #include <windows.h>
35 #define setenv(a, b, c) _putenv_s(a, b)
36 #endif
37
38 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
39
usage(void)40 void usage(void)
41 {
42 printf("help!!\n");
43 exit(EXIT_FAILURE);
44 }
45
46 struct Options
47 {
48 char* host; /**< connection to system under test. */
49 int port;
50 char* proxy_host;
51 int proxy_port;
52 int verbose;
53 int test_no;
54 int MQTTVersion;
55 int iterations;
56 } options =
57 {
58 "localhost",
59 1883,
60 "localhost",
61 1885,
62 0, //verbose
63 0, //test_no
64 4,
65 1,
66 };
67
getopts(int argc,char ** argv)68 void getopts(int argc, char** argv)
69 {
70 int count = 1;
71
72 while (count < argc)
73 {
74 if (strcmp(argv[count], "--test_no") == 0)
75 {
76 if (++count < argc)
77 options.test_no = atoi(argv[count]);
78 else
79 usage();
80 }
81 else if (strcmp(argv[count], "--host") == 0)
82 {
83 if (++count < argc)
84 {
85 options.host = argv[count];
86 printf("\nSetting host to %s\n", options.host);
87 }
88 else
89 usage();
90 }
91 else if (strcmp(argv[count], "--port") == 0)
92 {
93 if (++count < argc)
94 {
95 options.port = atoi(argv[count]);
96 printf("\nSetting port to %d\n", options.port);
97 }
98 else
99 usage();
100 }
101 else if (strcmp(argv[count], "--proxy_host") == 0)
102 {
103 if (++count < argc)
104 {
105 options.proxy_host = argv[count];
106 printf("\nSetting proxy_host to %s\n", options.proxy_host);
107 }
108 else
109 usage();
110 }
111 else if (strcmp(argv[count], "--proxy_port") == 0)
112 {
113 if (++count < argc)
114 {
115 options.proxy_port = atoi(argv[count]);
116 printf("\nSetting proxy_port to %d\n", options.proxy_port);
117 }
118 else
119 usage();
120 }
121 else if (strcmp(argv[count], "--MQTTversion") == 0)
122 {
123 if (++count < argc)
124 {
125 options.MQTTVersion = atoi(argv[count]);
126 printf("setting MQTT version to %d\n", options.MQTTVersion);
127 }
128 else
129 usage();
130 }
131 else if (strcmp(argv[count], "--iterations") == 0)
132 {
133 if (++count < argc)
134 options.iterations = atoi(argv[count]);
135 else
136 usage();
137 }
138 else if (strcmp(argv[count], "--verbose") == 0)
139 {
140 options.verbose = 1;
141 printf("\nSetting verbose on\n");
142 }
143 count++;
144 }
145 }
146
147
148 #define LOGA_DEBUG 0
149 #define LOGA_INFO 1
150 #include <stdarg.h>
151 #include <time.h>
152 #include <sys/timeb.h>
MyLog(int LOGA_level,char * format,...)153 void MyLog(int LOGA_level, char* format, ...)
154 {
155 static char msg_buf[256];
156 va_list args;
157 struct timeb ts;
158
159 struct tm *timeinfo;
160
161 if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
162 return;
163
164 ftime(&ts);
165 timeinfo = localtime(&ts.time);
166 strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
167
168 sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
169
170 va_start(args, format);
171 vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
172 va_end(args);
173
174 printf("%s\n", msg_buf);
175 fflush(stdout);
176 }
177
178
179 #if defined(WIN32) || defined(_WINDOWS)
180 #define mqsleep(A) Sleep(1000*A)
181 #define START_TIME_TYPE DWORD
182 static DWORD start_time = 0;
start_clock(void)183 START_TIME_TYPE start_clock(void)
184 {
185 return GetTickCount();
186 }
187 #elif defined(AIX)
188 #define mqsleep sleep
189 #define START_TIME_TYPE struct timespec
start_clock(void)190 START_TIME_TYPE start_clock(void)
191 {
192 static struct timespec start;
193 clock_gettime(CLOCK_REALTIME, &start);
194 return start;
195 }
196 #else
197 #define mqsleep sleep
198 #define START_TIME_TYPE struct timeval
199 /* TODO - unused - remove? static struct timeval start_time; */
start_clock(void)200 START_TIME_TYPE start_clock(void)
201 {
202 struct timeval start_time;
203 gettimeofday(&start_time, NULL);
204 return start_time;
205 }
206 #endif
207
208
209 #if defined(WIN32)
elapsed(START_TIME_TYPE start_time)210 long elapsed(START_TIME_TYPE start_time)
211 {
212 return GetTickCount() - start_time;
213 }
214 #elif defined(AIX)
215 #define assert(a)
elapsed(struct timespec start)216 long elapsed(struct timespec start)
217 {
218 struct timespec now, res;
219
220 clock_gettime(CLOCK_REALTIME, &now);
221 ntimersub(now, start, res);
222 return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
223 }
224 #else
elapsed(START_TIME_TYPE start_time)225 long elapsed(START_TIME_TYPE start_time)
226 {
227 struct timeval now, res;
228
229 gettimeofday(&now, NULL);
230 timersub(&now, &start_time, &res);
231 return (res.tv_sec)*1000 + (res.tv_usec)/1000;
232 }
233 #endif
234
235
236 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
237 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
238
239 int tests = 0;
240 int failures = 0;
241 FILE* xml;
242 START_TIME_TYPE global_start_time;
243 char output[3000];
244 char* cur_output = output;
245
246
write_test_result(void)247 void write_test_result(void)
248 {
249 long duration = elapsed(global_start_time);
250
251 fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
252 if (cur_output != output)
253 {
254 fprintf(xml, "%s", output);
255 cur_output = output;
256 }
257 fprintf(xml, "</testcase>\n");
258 }
259
260
myassert(char * filename,int lineno,char * description,int value,char * format,...)261 void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
262 {
263 ++tests;
264 if (!value)
265 {
266 va_list args;
267
268 ++failures;
269 MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
270
271 va_start(args, format);
272 vprintf(format, args);
273 va_end(args);
274
275 cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
276 description, filename, lineno);
277 }
278 else
279 MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
280 }
281
282
283 static volatile MessageData* test1_message_data = NULL;
284 static MQTTMessage pubmsg;
285
messageArrived(MessageData * md)286 void messageArrived(MessageData* md)
287 {
288 test1_message_data = md;
289 MQTTMessage* m = md->message;
290
291 assert("Good message lengths", pubmsg.payloadlen == m->payloadlen,
292 "payloadlen was %d", m->payloadlen);
293
294 if (pubmsg.payloadlen == m->payloadlen)
295 assert("Good message contents", memcmp(m->payload, pubmsg.payload, m->payloadlen) == 0,
296 "payload was %s", m->payload);
297 }
298
299
300 /*********************************************************************
301
302 Test1: single-threaded client
303
304 *********************************************************************/
test1_sendAndReceive(MQTTClient * c,int qos,char * test_topic)305 void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
306 {
307 char* topicName = NULL;
308 int topicLen;
309 int i = 0;
310 int iterations = 50;
311 int rc;
312 int wait_seconds;
313
314 MyLog(LOGA_DEBUG, "%d messages at QoS %d", iterations, qos);
315 memset(&pubmsg, '\0', sizeof(pubmsg));
316 pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
317 pubmsg.payloadlen = 11;
318 pubmsg.qos = qos;
319 pubmsg.retained = 0;
320 pubmsg.dup = 0;
321
322 for (i = 0; i < iterations; ++i)
323 {
324 test1_message_data = NULL;
325 rc = MQTTPublish(c, test_topic, &pubmsg);
326 assert("Good rc from publish", rc == SUCCESS, "rc was %d", rc);
327
328 /* wait for the message to be received */
329 wait_seconds = 10;
330 while ((test1_message_data == NULL) && (wait_seconds-- > 0))
331 {
332 MQTTYield(c, 100);
333 }
334 assert("Message Arrived", wait_seconds > 0, "Time out waiting for message %d\n", i);
335
336 if (!test1_message_data)
337 printf("No message received within timeout period\n");
338 }
339
340 /* wait to receive any outstanding messages */
341 wait_seconds = 2;
342 while (wait_seconds-- > 0)
343 {
344 MQTTYield(c, 1000);
345 }
346 }
347
348
test1(struct Options options)349 int test1(struct Options options)
350 {
351 int subsqos = 2;
352 Network n;
353 MQTTClient c;
354 int rc = 0;
355 char* test_topic = "C client test1";
356 MQTTPacket_willOptions wopts;
357 unsigned char buf[100];
358 unsigned char readbuf[100];
359
360 printf("test1\n");
361 fprintf(xml, "<testcase classname=\"test1\" name=\"single threaded client using receive\"");
362 global_start_time = start_clock();
363 failures = 0;
364 MyLog(LOGA_INFO, "Starting test 1 - single threaded client using receive");
365
366 NetworkInit(&n);
367 NetworkConnect(&n, options.host, options.port);
368 MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
369
370 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
371 data.willFlag = 1;
372 data.MQTTVersion = options.MQTTVersion;
373 data.clientID.cstring = "single-threaded-test";
374 data.username.cstring = "testuser";
375 data.password.cstring = "testpassword";
376
377 data.keepAliveInterval = 20;
378 data.cleansession = 1;
379
380 data.will.message.cstring = "will message";
381 data.will.qos = 1;
382 data.will.retained = 0;
383 data.will.topicName.cstring = "will topic";
384
385 MyLog(LOGA_DEBUG, "Connecting");
386 rc = MQTTConnect(&c, &data);
387 assert("Good rc from connect", rc == SUCCESS, "rc was %d", rc);
388 if (rc != SUCCESS)
389 goto exit;
390
391 rc = MQTTSubscribe(&c, test_topic, subsqos, messageArrived);
392 assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
393
394 test1_sendAndReceive(&c, 0, test_topic);
395 test1_sendAndReceive(&c, 1, test_topic);
396 test1_sendAndReceive(&c, 2, test_topic);
397
398 MyLog(LOGA_DEBUG, "Stopping\n");
399
400 rc = MQTTUnsubscribe(&c, test_topic);
401 assert("Unsubscribe successful", rc == SUCCESS, "rc was %d", rc);
402 rc = MQTTDisconnect(&c);
403 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
404
405 /* Just to make sure we can connect again */
406 NetworkConnect(&n, options.host, options.port);
407 rc = MQTTConnect(&c, &data);
408 assert("Connect successful", rc == SUCCESS, "rc was %d", rc);
409 rc = MQTTDisconnect(&c);
410 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
411
412 exit:
413 MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
414 (failures == 0) ? "passed" : "failed", tests, failures);
415 write_test_result();
416 return failures;
417 }
418
419
420 /*********************************************************************
421
422 Test 2: connack return data
423
424 sessionPresent
425
426 *********************************************************************/
test2(struct Options options)427 int test2(struct Options options)
428 {
429 int subsqos = 2;
430 Network n;
431 MQTTClient c;
432 int rc = 0;
433 char* test_topic = "C client test2";
434 MQTTPacket_willOptions wopts;
435 unsigned char buf[100];
436 unsigned char readbuf[100];
437 MQTTConnackData connack;
438 MQTTSubackData suback;
439
440 fprintf(xml, "<testcase classname=\"test2\" name=\"connack return data\"");
441 global_start_time = start_clock();
442 failures = 0;
443 MyLog(LOGA_INFO, "Starting test 2 - connack return data");
444
445 NetworkInit(&n);
446 NetworkConnect(&n, options.host, options.port);
447 MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
448
449 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
450 data.willFlag = 1;
451 data.MQTTVersion = options.MQTTVersion;
452 data.clientID.cstring = "connack-return-data";
453 data.username.cstring = "testuser";
454 data.password.cstring = "testpassword";
455
456 data.keepAliveInterval = 20;
457 data.cleansession = 1;
458
459 data.will.message.cstring = "will message";
460 data.will.qos = 1;
461 data.will.retained = 0;
462 data.will.topicName.cstring = "will topic";
463
464 MyLog(LOGA_DEBUG, "Connecting");
465 rc = MQTTConnect(&c, &data);
466 assert("Good rc from connect", rc == SUCCESS, "rc was %d", rc);
467 if (rc != SUCCESS)
468 goto exit;
469
470 rc = MQTTDisconnect(&c);
471 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
472 NetworkDisconnect(&n);
473
474 /* now connect cleansession false */
475 NetworkConnect(&n, options.host, options.port);
476 data.cleansession = 0;
477 rc = MQTTConnectWithResults(&c, &data, &connack);
478 assert("Good rc from connect", rc == SUCCESS, "rc was %d", rc);
479
480 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
481 assert("Session present is 0", connack.sessionPresent == 0,
482 "sessionPresent was %d", connack.sessionPresent);
483
484 /* set up some state */
485 rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived, &suback);
486 assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
487 assert("Good granted QoS", suback.grantedQoS == subsqos,
488 "granted QoS was %d", suback.grantedQoS);
489
490 rc = MQTTDisconnect(&c);
491 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
492 NetworkDisconnect(&n);
493
494 /* Connect and check sessionPresent */
495 NetworkConnect(&n, options.host, options.port);
496 rc = MQTTConnectWithResults(&c, &data, &connack);
497 assert("Connect successful", rc == SUCCESS, "rc was %d", rc);
498
499 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
500 assert("Session present is 1", connack.sessionPresent == 1,
501 "sessionPresent was %d", connack.sessionPresent);
502
503 rc = MQTTDisconnect(&c);
504 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
505 NetworkDisconnect(&n);
506
507 /* Connect and check sessionPresent is cleared */
508 data.cleansession = 1;
509 NetworkConnect(&n, options.host, options.port);
510 rc = MQTTConnectWithResults(&c, &data, &connack);
511 assert("Connect successful", rc == SUCCESS, "rc was %d", rc);
512
513 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
514 assert("Session present is 0", connack.sessionPresent == 0,
515 "sessionPresent was %d", connack.sessionPresent);
516
517 rc = MQTTDisconnect(&c);
518 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
519 NetworkDisconnect(&n);
520
521 exit:
522 MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
523 (failures == 0) ? "passed" : "failed", tests, failures);
524 write_test_result();
525 return failures;
526 }
527
528 /*********************************************************************
529
530 Test 3: client session state
531
532 *********************************************************************/
533 static volatile MessageData* test2_message_data = NULL;
534
messageArrived2(MessageData * md)535 void messageArrived2(MessageData* md)
536 {
537 test2_message_data = md;
538 MQTTMessage *m = md->message;
539
540 assert("Good message lengths", pubmsg.payloadlen == m->payloadlen,
541 "payloadlen was %d", m->payloadlen);
542
543 if (pubmsg.payloadlen == m->payloadlen)
544 assert("Good message contents", memcmp(m->payload, pubmsg.payload, m->payloadlen) == 0,
545 "payload was %s", m->payload);
546 }
547
548
check_subs_exist(MQTTClient * c,const char * test_topic,int which)549 int check_subs_exist(MQTTClient* c, const char* test_topic, int which)
550 {
551 int rc = FAILURE;
552 int wait_seconds = 0;
553
554 memset(&pubmsg, '\0', sizeof(pubmsg));
555 pubmsg.payload = (void*)"a much longer message that we can shorten to the extent that we need to payload up to 11";
556 pubmsg.payloadlen = 11;
557 pubmsg.qos = QOS2;
558 pubmsg.retained = 0;
559 pubmsg.dup = 0;
560
561 test1_message_data = test2_message_data = NULL;
562 rc = MQTTPublish(c, test_topic, &pubmsg);
563 assert("Good rc from publish", rc == SUCCESS, "rc was %d", rc);
564
565 /* wait for the message to be received */
566 wait_seconds = 10;
567 while (wait_seconds-- > 0)
568 {
569 MQTTYield(c, 100);
570 }
571
572 rc = (((which == 1 || which == 3) && test1_message_data) ||
573 (which == 2 && test1_message_data == NULL)) ? SUCCESS : FAILURE;
574 assert("test1 subscription", rc == SUCCESS, "test1_message_data %p\n",
575 test1_message_data);
576 rc = (((which == 2 || which == 3) && test2_message_data) ||
577 (which == 1 && test2_message_data == NULL)) ? SUCCESS : FAILURE;
578 assert("test2 subscription", rc == SUCCESS, "test2_message_data %p\n",
579 test2_message_data);
580 return rc;
581 }
582
583
test3(struct Options options)584 int test3(struct Options options)
585 {
586 enum QoS subsqos = QOS2;
587 Network n;
588 MQTTClient c;
589 int rc;
590 const char* test_topic = "C client test3";
591 int wait_seconds = 0;
592 unsigned char buf[100];
593 unsigned char readbuf[100];
594 MQTTConnackData connack;
595 MQTTSubackData suback;
596
597 fprintf(xml, "<testcase classname=\"test3\" name=\"session state\"");
598 global_start_time = start_clock();
599 failures = 0;
600 MyLog(LOGA_INFO, "Starting test 3 - session state");
601
602 NetworkInit(&n);
603 MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
604
605 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
606 data.willFlag = 1;
607 data.MQTTVersion = options.MQTTVersion;
608 data.clientID.cstring = (char*)"connack-session-state";
609 data.username.cstring = (char*)"testuser";
610 data.password.cstring = (char*)"testpassword";
611
612 data.keepAliveInterval = 10;
613 data.cleansession = 1;
614
615 data.will.message.cstring = (char*)"will message";
616 data.will.qos = 1;
617 data.will.retained = 0;
618 data.will.topicName.cstring = (char*)"will topic";
619
620 assert("Not connected", MQTTIsConnected(&c) == 0,
621 "isconnected was %d", MQTTIsConnected(&c));
622
623 MyLog(LOGA_DEBUG, "Connecting");
624 rc = NetworkConnect(&n, options.host, options.port);
625 assert("Good rc from TCP connect", rc == SUCCESS, "rc was %d", rc);
626 if (rc != SUCCESS)
627 goto exit;
628
629 rc = MQTTConnectWithResults(&c, &data, &connack);
630 assert("Good rc from connect", rc == SUCCESS, "rc was %d", rc);
631 if (rc != SUCCESS)
632 goto exit;
633
634 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
635 assert("Session present is 0", connack.sessionPresent == 0,
636 "sessionPresent was %d", connack.sessionPresent);
637
638 assert("Good rc in connack", MQTTIsConnected(&c) == 1,
639 "isconnected was %d", MQTTIsConnected(&c));
640
641 rc = MQTTDisconnect(&c);
642 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
643 NetworkDisconnect(&n);
644
645 /* reconnect with cleansession false */
646 data.cleansession = 0;
647 rc = NetworkConnect(&n, options.proxy_host, options.proxy_port);
648 assert("TCP connect successful", rc == SUCCESS, "rc was %d", rc);
649 rc = MQTTConnectWithResults(&c, &data, &connack);
650 assert("Connect successful", rc == SUCCESS, "rc was %d", rc);
651
652 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
653 assert("Session present is 0", connack.sessionPresent == 0,
654 "sessionPresent was %d", connack.sessionPresent);
655
656 rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived, &suback);
657 assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
658 assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
659 "rc was %d", suback.grantedQoS);
660
661 check_subs_exist(&c, test_topic, 1);
662
663 rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived2, &suback);
664 assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
665 assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
666 "rc was %d", suback.grantedQoS);
667
668 check_subs_exist(&c, test_topic, 2);
669
670 rc = MQTTDisconnect(&c);
671 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
672 NetworkDisconnect(&n);
673
674 /* reconnect with cleansession false */
675 data.cleansession = 0;
676 rc = NetworkConnect(&n, options.proxy_host, options.proxy_port);
677 assert("TCP connect successful", rc == SUCCESS, "rc was %d", rc);
678 rc = MQTTConnectWithResults(&c, &data, &connack);
679 assert("Connect successful", rc == SUCCESS, "rc was %d", rc);
680
681 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
682 assert("Session present is 1", connack.sessionPresent == 1,
683 "sessionPresent was %d", connack.sessionPresent);
684
685 check_subs_exist(&c, test_topic, 2);
686
687 rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived, &suback);
688 assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
689 assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
690 "rc was %d", suback.grantedQoS);
691
692 check_subs_exist(&c, test_topic, 1);
693
694 // cause a connection FAILURE
695 memset(&pubmsg, '\0', sizeof(pubmsg));
696 pubmsg.payload = (void*)"TERMINATE";
697 pubmsg.payloadlen = strlen((char*)pubmsg.payload);
698 pubmsg.qos = QOS0;
699 pubmsg.retained = 0;
700 pubmsg.dup = 0;
701 rc = MQTTPublish(&c, "MQTTSAS topic", &pubmsg);
702 assert("Good rc from publish", rc == SUCCESS, "rc was %d", rc);
703
704 // wait for failure to be noticed by keepalive
705 wait_seconds = 20;
706 while (MQTTIsConnected(&c) && (wait_seconds-- > 0))
707 {
708 MQTTYield(&c, 1000);
709 }
710 assert("Disconnected", !MQTTIsConnected(&c), "isConnected was %d",
711 MQTTIsConnected(&c));
712 NetworkDisconnect(&n);
713
714 /* reconnect with cleansession false */
715 data.cleansession = 0;
716 rc = NetworkConnect(&n, options.host, options.port);
717 assert("TCP connect successful", rc == SUCCESS, "rc was %d", rc);
718 rc = MQTTConnectWithResults(&c, &data, &connack);
719 assert("Connect successful", rc == SUCCESS, "rc was %d", rc);
720
721 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
722 assert("Session present is 1", connack.sessionPresent == 1,
723 "sessionPresent was %d", connack.sessionPresent);
724
725 check_subs_exist(&c, test_topic, 1);
726
727 rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived2, &suback);
728 assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
729 assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
730 "rc was %d", suback.grantedQoS);
731
732 check_subs_exist(&c, test_topic, 2);
733
734 rc = MQTTDisconnect(&c);
735 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
736 NetworkDisconnect(&n);
737
738 /* reconnect with cleansession true to clean up both server and client state */
739 data.cleansession = 1;
740 rc = NetworkConnect(&n, options.host, options.port);
741 assert("TCP connect successful", rc == SUCCESS, "rc was %d", rc);
742 rc = MQTTConnectWithResults(&c, &data, &connack);
743 assert("Connect successful", rc == SUCCESS, "rc was %d", rc);
744
745 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
746 assert("Session present is 0", connack.sessionPresent == 0,
747 "sessionPresent was %d", connack.sessionPresent);
748
749 rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived2, &suback);
750 assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
751 assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
752 "rc was %d", suback.grantedQoS);
753
754 check_subs_exist(&c, test_topic, 2);
755
756 rc = MQTTDisconnect(&c);
757 assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
758 NetworkDisconnect(&n);
759
760 exit:
761 MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
762 (failures == 0) ? "passed" : "failed", tests, failures);
763 write_test_result();
764 return failures;
765 }
766
767 #if 0
768 /*********************************************************************
769
770 Test 4: connectionLost and will message
771
772 *********************************************************************/
773 MQTTClient test6_c1, test6_c2;
774 volatile int test6_will_message_arrived = 0;
775 volatile int test6_connection_lost_called = 0;
776
777 void test6_connectionLost(void* context, char* cause)
778 {
779 MQTTClient c = (MQTTClient)context;
780 printf("%s -> Callback: connection lost\n", (c == test6_c1) ? "Client-1" : "Client-2");
781 test6_connection_lost_called = 1;
782 }
783
784 void test6_deliveryComplete(void* context, MQTTClient_deliveryToken token)
785 {
786 printf("Client-2 -> Callback: publish complete for token %d\n", token);
787 }
788
789 char* test6_will_topic = "C Test 2: will topic";
790 char* test6_will_message = "will message from Client-1";
791
792 int test6_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
793 {
794 MQTTClient c = (MQTTClient)context;
795 printf("%s -> Callback: message received on topic '%s' is '%.*s'.\n",
796 (c == test6_c1) ? "Client-1" : "Client-2", topicName, m->payloadlen, (char*)(m->payload));
797 if (c == test6_c2 && strcmp(topicName, test6_will_topic) == 0 && memcmp(m->payload, test6_will_message, m->payloadlen) == 0)
798 test6_will_message_arrived = 1;
799 MQTTClient_free(topicName);
800 MQTTClient_freeMessage(&m);
801 return 1;
802 }
803
804
805 int test6(struct Options options)
806 {
807 char* testname = "test6";
808 MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
809 MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
810 MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer;
811 int rc, count;
812 char* mqttsas_topic = "MQTTSAS topic";
813
814 failures = 0;
815 MyLog(LOGA_INFO, "Starting test 6 - connectionLost and will messages");
816 fprintf(xml, "<testcase classname=\"test1\" name=\"connectionLost and will messages\"");
817 global_start_time = start_clock();
818
819 opts.keepAliveInterval = 2;
820 opts.cleansession = 1;
821 opts.MQTTVersion = MQTTVERSION_3_1_1;
822 opts.will = &wopts;
823 opts.will->message = test6_will_message;
824 opts.will->qos = 1;
825 opts.will->retained = 0;
826 opts.will->topicName = test6_will_topic;
827 if (options.haconnections != NULL)
828 {
829 opts.serverURIs = options.haconnections;
830 opts.serverURIcount = options.hacount;
831 }
832
833 /* Client-1 with Will options */
834 rc = MQTTClient_create(&test6_c1, options.proxy_connection, "Client_1", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
835 assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
836 if (rc != MQTTCLIENT_SUCCESS)
837 goto exit;
838
839 rc = MQTTClient_setCallbacks(test6_c1, (void*)test6_c1, test6_connectionLost, test6_messageArrived, test6_deliveryComplete);
840 assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
841 if (rc != MQTTCLIENT_SUCCESS)
842 goto exit;
843
844 /* Connect to the broker */
845 rc = MQTTClient_connect(test6_c1, &opts);
846 assert("good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
847 if (rc != MQTTCLIENT_SUCCESS)
848 goto exit;
849
850 /* Client - 2 (multi-threaded) */
851 rc = MQTTClient_create(&test6_c2, options.connection, "Client_2", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
852 assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
853
854 /* Set the callback functions for the client */
855 rc = MQTTClient_setCallbacks(test6_c2, (void*)test6_c2, test6_connectionLost, test6_messageArrived, test6_deliveryComplete);
856 assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
857
858 /* Connect to the broker */
859 opts2.keepAliveInterval = 20;
860 opts2.cleansession = 1;
861 MyLog(LOGA_INFO, "Connecting Client_2 ...");
862 rc = MQTTClient_connect(test6_c2, &opts2);
863 assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
864
865 rc = MQTTClient_subscribe(test6_c2, test6_will_topic, 2);
866 assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
867
868 /* now send the command which will break the connection and cause the will message to be sent */
869 rc = MQTTClient_publish(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL);
870 assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
871
872 MyLog(LOGA_INFO, "Waiting to receive the will message");
873 count = 0;
874 while (++count < 40)
875 {
876 #if defined(WIN32)
877 Sleep(1000L);
878 #else
879 sleep(1);
880 #endif
881 if (test6_will_message_arrived == 1 && test6_connection_lost_called == 1)
882 break;
883 }
884 assert("will message arrived", test6_will_message_arrived == 1,
885 "will_message_arrived was %d\n", test6_will_message_arrived);
886 assert("connection lost called", test6_connection_lost_called == 1,
887 "connection_lost_called %d\n", test6_connection_lost_called);
888
889 rc = MQTTClient_unsubscribe(test6_c2, test6_will_topic);
890 assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
891
892 rc = MQTTClient_isConnected(test6_c2);
893 assert("Client-2 still connected", rc == 1, "isconnected is %d", rc);
894
895 rc = MQTTClient_isConnected(test6_c1);
896 assert("Client-1 not connected", rc == 0, "isconnected is %d", rc);
897
898 rc = MQTTClient_disconnect(test6_c2, 100L);
899 assert("Good rc from disconnect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
900
901 MQTTClient_destroy(&test6_c1);
902 MQTTClient_destroy(&test6_c2);
903
904 exit:
905 MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.\n",
906 (failures == 0) ? "passed" : "failed", testname, tests, failures);
907 write_test_result();
908 return failures;
909 }
910
911
912 int test6a(struct Options options)
913 {
914 char* testname = "test6a";
915 MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
916 MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
917 MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer;
918 int rc, count;
919 char* mqttsas_topic = "MQTTSAS topic";
920
921 failures = 0;
922 MyLog(LOGA_INFO, "Starting test 6 - connectionLost and binary will messages");
923 fprintf(xml, "<testcase classname=\"test1\" name=\"connectionLost and binary will messages\"");
924 global_start_time = start_clock();
925
926 opts.keepAliveInterval = 2;
927 opts.cleansession = 1;
928 opts.MQTTVersion = MQTTVERSION_3_1_1;
929 opts.will = &wopts;
930 opts.will->payload.data = test6_will_message;
931 opts.will->payload.len = strlen(test6_will_message) + 1;
932 opts.will->qos = 1;
933 opts.will->retained = 0;
934 opts.will->topicName = test6_will_topic;
935 if (options.haconnections != NULL)
936 {
937 opts.serverURIs = options.haconnections;
938 opts.serverURIcount = options.hacount;
939 }
940
941 /* Client-1 with Will options */
942 rc = MQTTClient_create(&test6_c1, options.proxy_connection, "Client_1", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
943 assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
944 if (rc != MQTTCLIENT_SUCCESS)
945 goto exit;
946
947 rc = MQTTClient_setCallbacks(test6_c1, (void*)test6_c1, test6_connectionLost, test6_messageArrived, test6_deliveryComplete);
948 assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
949 if (rc != MQTTCLIENT_SUCCESS)
950 goto exit;
951
952 /* Connect to the broker */
953 rc = MQTTClient_connect(test6_c1, &opts);
954 assert("good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
955 if (rc != MQTTCLIENT_SUCCESS)
956 goto exit;
957
958 /* Client - 2 (multi-threaded) */
959 rc = MQTTClient_create(&test6_c2, options.connection, "Client_2", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
960 assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
961
962 /* Set the callback functions for the client */
963 rc = MQTTClient_setCallbacks(test6_c2, (void*)test6_c2, test6_connectionLost, test6_messageArrived, test6_deliveryComplete);
964 assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
965
966 /* Connect to the broker */
967 opts2.keepAliveInterval = 20;
968 opts2.cleansession = 1;
969 MyLog(LOGA_INFO, "Connecting Client_2 ...");
970 rc = MQTTClient_connect(test6_c2, &opts2);
971 assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
972
973 rc = MQTTClient_subscribe(test6_c2, test6_will_topic, 2);
974 assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
975
976 /* now send the command which will break the connection and cause the will message to be sent */
977 rc = MQTTClient_publish(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL);
978 assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
979
980 MyLog(LOGA_INFO, "Waiting to receive the will message");
981 count = 0;
982 while (++count < 40)
983 {
984 #if defined(WIN32)
985 Sleep(1000L);
986 #else
987 sleep(1);
988 #endif
989 if (test6_will_message_arrived == 1 && test6_connection_lost_called == 1)
990 break;
991 }
992 assert("will message arrived", test6_will_message_arrived == 1,
993 "will_message_arrived was %d\n", test6_will_message_arrived);
994 assert("connection lost called", test6_connection_lost_called == 1,
995 "connection_lost_called %d\n", test6_connection_lost_called);
996
997 rc = MQTTClient_unsubscribe(test6_c2, test6_will_topic);
998 assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
999
1000 rc = MQTTClient_isConnected(test6_c2);
1001 assert("Client-2 still connected", rc == 1, "isconnected is %d", rc);
1002
1003 rc = MQTTClient_isConnected(test6_c1);
1004 assert("Client-1 not connected", rc == 0, "isconnected is %d", rc);
1005
1006 rc = MQTTClient_disconnect(test6_c2, 100L);
1007 assert("Good rc from disconnect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1008
1009 MQTTClient_destroy(&test6_c1);
1010 MQTTClient_destroy(&test6_c2);
1011
1012 exit:
1013 MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.\n",
1014 (failures == 0) ? "passed" : "failed", testname, tests, failures);
1015 write_test_result();
1016 return failures;
1017 }
1018 #endif
1019
main(int argc,char ** argv)1020 int main(int argc, char** argv)
1021 {
1022 int rc = 0;
1023 int (*tests[])() = {NULL, test1, test2, test3};
1024 int i;
1025
1026 xml = fopen("TEST-test1.xml", "w");
1027 fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
1028
1029 getopts(argc, argv);
1030
1031 for (i = 0; i < options.iterations; ++i)
1032 {
1033 if (options.test_no == 0)
1034 { /* run all the tests */
1035 for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
1036 rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
1037 }
1038 else
1039 rc = tests[options.test_no](options); /* run just the selected test */
1040 }
1041
1042 if (rc == 0)
1043 MyLog(LOGA_INFO, "verdict pass");
1044 else
1045 MyLog(LOGA_INFO, "verdict fail");
1046
1047 fprintf(xml, "</testsuite>\n");
1048 fclose(xml);
1049 return rc;
1050 }
1051