• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2017 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    http://www.eclipse.org/legal/epl-v10.html
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial implementation for embedded C client
15  *******************************************************************************/
16 
17 
18 /**
19  * @file
20  * Tests for the Paho embedded C "high" level client
21  */
22 
23 
24 #include "MQTTClient.h"
25 #include <string.h>
26 #include <stdlib.h>
27 
28 #if !defined(_WINDOWS)
29   #include <sys/time.h>
30   #include <sys/socket.h>
31   #include <unistd.h>
32   #include <errno.h>
33 #else
34   #include <windows.h>
35   #define setenv(a, b, c) _putenv_s(a, b)
36 #endif
37 
38 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
39 
usage(void)40 void usage(void)
41 {
42 	printf("help!!\n");
43 	exit(EXIT_FAILURE);
44 }
45 
46 struct Options
47 {
48 	char* host;         /**< connection to system under test. */
49   int port;
50 	char* proxy_host;
51   int proxy_port;
52 	int verbose;
53 	int test_no;
54 	int MQTTVersion;
55 	int iterations;
56 } options =
57 {
58 	"localhost",
59   1883,
60 	"localhost",
61 	1885,
62 	0, //verbose
63 	0, //test_no
64 	4,
65 	1,
66 };
67 
getopts(int argc,char ** argv)68 void getopts(int argc, char** argv)
69 {
70 	int count = 1;
71 
72 	while (count < argc)
73 	{
74 		if (strcmp(argv[count], "--test_no") == 0)
75 		{
76 			if (++count < argc)
77 				options.test_no = atoi(argv[count]);
78 			else
79 				usage();
80 		}
81 		else if (strcmp(argv[count], "--host") == 0)
82 		{
83 			if (++count < argc)
84 			{
85 				options.host = argv[count];
86 				printf("\nSetting host to %s\n", options.host);
87 			}
88 			else
89 				usage();
90 		}
91     else if (strcmp(argv[count], "--port") == 0)
92     {
93       if (++count < argc)
94       {
95         options.port = atoi(argv[count]);
96         printf("\nSetting port to %d\n", options.port);
97       }
98       else
99         usage();
100     }
101 		else if (strcmp(argv[count], "--proxy_host") == 0)
102 		{
103 			if (++count < argc)
104       {
105 				options.proxy_host = argv[count];
106         printf("\nSetting proxy_host to %s\n", options.proxy_host);
107       }
108 			else
109 				usage();
110 		}
111     else if (strcmp(argv[count], "--proxy_port") == 0)
112     {
113       if (++count < argc)
114       {
115         options.proxy_port = atoi(argv[count]);
116         printf("\nSetting proxy_port to %d\n", options.proxy_port);
117       }
118       else
119         usage();
120     }
121 		else if (strcmp(argv[count], "--MQTTversion") == 0)
122 		{
123 			if (++count < argc)
124 			{
125 				options.MQTTVersion = atoi(argv[count]);
126 				printf("setting MQTT version to %d\n", options.MQTTVersion);
127 			}
128 			else
129 				usage();
130 		}
131 		else if (strcmp(argv[count], "--iterations") == 0)
132 		{
133 			if (++count < argc)
134 				options.iterations = atoi(argv[count]);
135 			else
136 				usage();
137 		}
138 		else if (strcmp(argv[count], "--verbose") == 0)
139 		{
140 			options.verbose = 1;
141 			printf("\nSetting verbose on\n");
142 		}
143 		count++;
144 	}
145 }
146 
147 
148 #define LOGA_DEBUG 0
149 #define LOGA_INFO 1
150 #include <stdarg.h>
151 #include <time.h>
152 #include <sys/timeb.h>
MyLog(int LOGA_level,char * format,...)153 void MyLog(int LOGA_level, char* format, ...)
154 {
155 	static char msg_buf[256];
156 	va_list args;
157 	struct timeb ts;
158 
159 	struct tm *timeinfo;
160 
161 	if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
162 	  return;
163 
164 	ftime(&ts);
165 	timeinfo = localtime(&ts.time);
166 	strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
167 
168 	sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
169 
170 	va_start(args, format);
171 	vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
172 	va_end(args);
173 
174 	printf("%s\n", msg_buf);
175 	fflush(stdout);
176 }
177 
178 
179 #if defined(WIN32) || defined(_WINDOWS)
180 #define mqsleep(A) Sleep(1000*A)
181 #define START_TIME_TYPE DWORD
182 static DWORD start_time = 0;
start_clock(void)183 START_TIME_TYPE start_clock(void)
184 {
185 	return GetTickCount();
186 }
187 #elif defined(AIX)
188 #define mqsleep sleep
189 #define START_TIME_TYPE struct timespec
start_clock(void)190 START_TIME_TYPE start_clock(void)
191 {
192 	static struct timespec start;
193 	clock_gettime(CLOCK_REALTIME, &start);
194 	return start;
195 }
196 #else
197 #define mqsleep sleep
198 #define START_TIME_TYPE struct timeval
199 /* TODO - unused - remove? static struct timeval start_time; */
start_clock(void)200 START_TIME_TYPE start_clock(void)
201 {
202 	struct timeval start_time;
203 	gettimeofday(&start_time, NULL);
204 	return start_time;
205 }
206 #endif
207 
208 
209 #if defined(WIN32)
elapsed(START_TIME_TYPE start_time)210 long elapsed(START_TIME_TYPE start_time)
211 {
212 	return GetTickCount() - start_time;
213 }
214 #elif defined(AIX)
215 #define assert(a)
elapsed(struct timespec start)216 long elapsed(struct timespec start)
217 {
218 	struct timespec now, res;
219 
220 	clock_gettime(CLOCK_REALTIME, &now);
221 	ntimersub(now, start, res);
222 	return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
223 }
224 #else
elapsed(START_TIME_TYPE start_time)225 long elapsed(START_TIME_TYPE start_time)
226 {
227 	struct timeval now, res;
228 
229 	gettimeofday(&now, NULL);
230 	timersub(&now, &start_time, &res);
231 	return (res.tv_sec)*1000 + (res.tv_usec)/1000;
232 }
233 #endif
234 
235 
236 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
237 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
238 
239 int tests = 0;
240 int failures = 0;
241 FILE* xml;
242 START_TIME_TYPE global_start_time;
243 char output[3000];
244 char* cur_output = output;
245 
246 
write_test_result(void)247 void write_test_result(void)
248 {
249 	long duration = elapsed(global_start_time);
250 
251 	fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
252 	if (cur_output != output)
253 	{
254 		fprintf(xml, "%s", output);
255 		cur_output = output;
256 	}
257 	fprintf(xml, "</testcase>\n");
258 }
259 
260 
myassert(char * filename,int lineno,char * description,int value,char * format,...)261 void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
262 {
263 	++tests;
264 	if (!value)
265 	{
266 		va_list args;
267 
268 		++failures;
269 		MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
270 
271 		va_start(args, format);
272 		vprintf(format, args);
273 		va_end(args);
274 
275 		cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
276                         description, filename, lineno);
277 	}
278 	else
279 		MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
280 }
281 
282 
283 static volatile MessageData* test1_message_data = NULL;
284 static MQTTMessage pubmsg;
285 
messageArrived(MessageData * md)286 void messageArrived(MessageData* md)
287 {
288   test1_message_data = md;
289   MQTTMessage* m = md->message;
290 
291 	assert("Good message lengths", pubmsg.payloadlen == m->payloadlen,
292          "payloadlen was %d", m->payloadlen);
293 
294   if (pubmsg.payloadlen == m->payloadlen)
295       assert("Good message contents", memcmp(m->payload, pubmsg.payload, m->payloadlen) == 0,
296           "payload was %s", m->payload);
297 }
298 
299 
300 /*********************************************************************
301 
302 Test1: single-threaded client
303 
304 *********************************************************************/
test1_sendAndReceive(MQTTClient * c,int qos,char * test_topic)305 void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
306 {
307 	char* topicName = NULL;
308 	int topicLen;
309 	int i = 0;
310 	int iterations = 50;
311 	int rc;
312   int wait_seconds;
313 
314 	MyLog(LOGA_DEBUG, "%d messages at QoS %d", iterations, qos);
315   memset(&pubmsg, '\0', sizeof(pubmsg));
316 	pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
317 	pubmsg.payloadlen = 11;
318 	pubmsg.qos = qos;
319 	pubmsg.retained = 0;
320   pubmsg.dup = 0;
321 
322 	for (i = 0; i < iterations; ++i)
323 	{
324     test1_message_data = NULL;
325 		rc = MQTTPublish(c, test_topic, &pubmsg);
326 		assert("Good rc from publish", rc == SUCCESS, "rc was %d", rc);
327 
328     /* wait for the message to be received */
329     wait_seconds = 10;
330 		while ((test1_message_data == NULL) && (wait_seconds-- > 0))
331 		{
332       MQTTYield(c, 100);
333 		}
334 		assert("Message Arrived", wait_seconds > 0, "Time out waiting for message %d\n", i);
335 
336 		if (!test1_message_data)
337 			printf("No message received within timeout period\n");
338 	}
339 
340 	/* wait to receive any outstanding messages */
341   wait_seconds = 2;
342   while (wait_seconds-- > 0)
343   {
344       MQTTYield(c, 1000);
345   }
346 }
347 
348 
test1(struct Options options)349 int test1(struct Options options)
350 {
351 	int subsqos = 2;
352   Network n;
353 	MQTTClient c;
354 	int rc = 0;
355 	char* test_topic = "C client test1";
356   MQTTPacket_willOptions wopts;
357   unsigned char buf[100];
358   unsigned char readbuf[100];
359 
360   printf("test1\n");
361 	fprintf(xml, "<testcase classname=\"test1\" name=\"single threaded client using receive\"");
362 	global_start_time = start_clock();
363 	failures = 0;
364 	MyLog(LOGA_INFO, "Starting test 1 - single threaded client using receive");
365 
366   NetworkInit(&n);
367   NetworkConnect(&n, options.host, options.port);
368   MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
369 
370   MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
371   data.willFlag = 1;
372   data.MQTTVersion = options.MQTTVersion;
373   data.clientID.cstring = "single-threaded-test";
374   data.username.cstring = "testuser";
375   data.password.cstring = "testpassword";
376 
377   data.keepAliveInterval = 20;
378   data.cleansession = 1;
379 
380 	data.will.message.cstring = "will message";
381 	data.will.qos = 1;
382 	data.will.retained = 0;
383 	data.will.topicName.cstring = "will topic";
384 
385 	MyLog(LOGA_DEBUG, "Connecting");
386   rc = MQTTConnect(&c, &data);
387 	assert("Good rc from connect", rc == SUCCESS, "rc was %d", rc);
388 	if (rc != SUCCESS)
389 		goto exit;
390 
391 	rc = MQTTSubscribe(&c, test_topic, subsqos, messageArrived);
392 	assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
393 
394 	test1_sendAndReceive(&c, 0, test_topic);
395 	test1_sendAndReceive(&c, 1, test_topic);
396 	test1_sendAndReceive(&c, 2, test_topic);
397 
398 	MyLog(LOGA_DEBUG, "Stopping\n");
399 
400 	rc = MQTTUnsubscribe(&c, test_topic);
401 	assert("Unsubscribe successful", rc == SUCCESS, "rc was %d", rc);
402 	rc = MQTTDisconnect(&c);
403 	assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
404 
405 	/* Just to make sure we can connect again */
406   NetworkConnect(&n, options.host, options.port);
407   rc = MQTTConnect(&c, &data);
408 	assert("Connect successful",  rc == SUCCESS, "rc was %d", rc);
409 	rc = MQTTDisconnect(&c);
410 	assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
411 
412 exit:
413 	MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
414 			(failures == 0) ? "passed" : "failed", tests, failures);
415 	write_test_result();
416 	return failures;
417 }
418 
419 
420 /*********************************************************************
421 
422 Test 2: connack return data
423 
424 sessionPresent
425 
426 *********************************************************************/
test2(struct Options options)427 int test2(struct Options options)
428 {
429 	int subsqos = 2;
430   Network n;
431 	MQTTClient c;
432 	int rc = 0;
433 	char* test_topic = "C client test2";
434   MQTTPacket_willOptions wopts;
435   unsigned char buf[100];
436   unsigned char readbuf[100];
437   MQTTConnackData connack;
438   MQTTSubackData suback;
439 
440 	fprintf(xml, "<testcase classname=\"test2\" name=\"connack return data\"");
441 	global_start_time = start_clock();
442 	failures = 0;
443 	MyLog(LOGA_INFO, "Starting test 2 - connack return data");
444 
445   NetworkInit(&n);
446   NetworkConnect(&n, options.host, options.port);
447   MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
448 
449   MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
450   data.willFlag = 1;
451   data.MQTTVersion = options.MQTTVersion;
452   data.clientID.cstring = "connack-return-data";
453   data.username.cstring = "testuser";
454   data.password.cstring = "testpassword";
455 
456   data.keepAliveInterval = 20;
457   data.cleansession = 1;
458 
459 	data.will.message.cstring = "will message";
460 	data.will.qos = 1;
461 	data.will.retained = 0;
462 	data.will.topicName.cstring = "will topic";
463 
464 	MyLog(LOGA_DEBUG, "Connecting");
465   rc = MQTTConnect(&c, &data);
466 	assert("Good rc from connect", rc == SUCCESS, "rc was %d", rc);
467 	if (rc != SUCCESS)
468 		goto exit;
469 
470 	rc = MQTTDisconnect(&c);
471 	assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
472   NetworkDisconnect(&n);
473 
474   /* now connect cleansession false */
475   NetworkConnect(&n, options.host, options.port);
476   data.cleansession = 0;
477   rc = MQTTConnectWithResults(&c, &data, &connack);
478   assert("Good rc from connect", rc == SUCCESS, "rc was %d", rc);
479 
480   assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
481   assert("Session present is 0", connack.sessionPresent == 0,
482          "sessionPresent was %d", connack.sessionPresent);
483 
484   /* set up some state */
485   rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived, &suback);
486   assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
487   assert("Good granted QoS", suback.grantedQoS == subsqos,
488          "granted QoS was %d", suback.grantedQoS);
489 
490   rc = MQTTDisconnect(&c);
491   assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
492   NetworkDisconnect(&n);
493 
494 	/* Connect and check sessionPresent */
495   NetworkConnect(&n, options.host, options.port);
496   rc = MQTTConnectWithResults(&c, &data, &connack);
497 	assert("Connect successful",  rc == SUCCESS, "rc was %d", rc);
498 
499   assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
500   assert("Session present is 1", connack.sessionPresent == 1,
501            "sessionPresent was %d", connack.sessionPresent);
502 
503 	rc = MQTTDisconnect(&c);
504 	assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
505   NetworkDisconnect(&n);
506 
507   /* Connect and check sessionPresent is cleared */
508   data.cleansession = 1;
509   NetworkConnect(&n, options.host, options.port);
510   rc = MQTTConnectWithResults(&c, &data, &connack);
511   assert("Connect successful",  rc == SUCCESS, "rc was %d", rc);
512 
513   assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
514   assert("Session present is 0", connack.sessionPresent == 0,
515            "sessionPresent was %d", connack.sessionPresent);
516 
517   rc = MQTTDisconnect(&c);
518   assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
519   NetworkDisconnect(&n);
520 
521 exit:
522 	MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
523 			(failures == 0) ? "passed" : "failed", tests, failures);
524 	write_test_result();
525 	return failures;
526 }
527 
528 /*********************************************************************
529 
530 Test 3: client session state
531 
532 *********************************************************************/
533 static volatile MessageData* test2_message_data = NULL;
534 
messageArrived2(MessageData * md)535 void messageArrived2(MessageData* md)
536 {
537     test2_message_data = md;
538 	  MQTTMessage *m = md->message;
539 
540     assert("Good message lengths", pubmsg.payloadlen == m->payloadlen,
541          "payloadlen was %d", m->payloadlen);
542 
543     if (pubmsg.payloadlen == m->payloadlen)
544         assert("Good message contents", memcmp(m->payload, pubmsg.payload, m->payloadlen) == 0,
545           "payload was %s", m->payload);
546 }
547 
548 
check_subs_exist(MQTTClient * c,const char * test_topic,int which)549 int check_subs_exist(MQTTClient* c, const char* test_topic, int which)
550 {
551     int rc = FAILURE;
552     int wait_seconds = 0;
553 
554     memset(&pubmsg, '\0', sizeof(pubmsg));
555     pubmsg.payload = (void*)"a much longer message that we can shorten to the extent that we need to payload up to 11";
556     pubmsg.payloadlen = 11;
557     pubmsg.qos = QOS2;
558     pubmsg.retained = 0;
559     pubmsg.dup = 0;
560 
561     test1_message_data = test2_message_data = NULL;
562     rc = MQTTPublish(c, test_topic, &pubmsg);
563     assert("Good rc from publish", rc == SUCCESS, "rc was %d", rc);
564 
565     /* wait for the message to be received */
566     wait_seconds = 10;
567     while (wait_seconds-- > 0)
568     {
569         MQTTYield(c, 100);
570     }
571 
572     rc = (((which == 1 || which == 3) && test1_message_data) ||
573          (which == 2 && test1_message_data == NULL)) ? SUCCESS : FAILURE;
574     assert("test1 subscription", rc == SUCCESS, "test1_message_data %p\n",
575             test1_message_data);
576     rc = (((which == 2 || which == 3) && test2_message_data) ||
577          (which == 1 && test2_message_data == NULL)) ? SUCCESS : FAILURE;
578     assert("test2 subscription", rc == SUCCESS, "test2_message_data %p\n",
579              test2_message_data);
580     return rc;
581 }
582 
583 
test3(struct Options options)584 int test3(struct Options options)
585 {
586   enum QoS subsqos = QOS2;
587   Network n;
588   MQTTClient c;
589 	int rc;
590   const char* test_topic = "C client test3";
591   int wait_seconds = 0;
592   unsigned char buf[100];
593   unsigned char readbuf[100];
594   MQTTConnackData connack;
595   MQTTSubackData suback;
596 
597   fprintf(xml, "<testcase classname=\"test3\" name=\"session state\"");
598   global_start_time = start_clock();
599   failures = 0;
600   MyLog(LOGA_INFO, "Starting test 3 - session state");
601 
602   NetworkInit(&n);
603   MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
604 
605   MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
606   data.willFlag = 1;
607   data.MQTTVersion = options.MQTTVersion;
608   data.clientID.cstring = (char*)"connack-session-state";
609   data.username.cstring = (char*)"testuser";
610   data.password.cstring = (char*)"testpassword";
611 
612   data.keepAliveInterval = 10;
613   data.cleansession = 1;
614 
615   data.will.message.cstring = (char*)"will message";
616   data.will.qos = 1;
617   data.will.retained = 0;
618   data.will.topicName.cstring = (char*)"will topic";
619 
620   assert("Not connected", MQTTIsConnected(&c) == 0,
621          "isconnected was %d", MQTTIsConnected(&c));
622 
623   MyLog(LOGA_DEBUG, "Connecting");
624   rc = NetworkConnect(&n, options.host, options.port);
625   assert("Good rc from TCP connect", rc == SUCCESS, "rc was %d", rc);
626   if (rc != SUCCESS)
627     goto exit;
628 
629   rc = MQTTConnectWithResults(&c, &data, &connack);
630   assert("Good rc from connect", rc == SUCCESS, "rc was %d", rc);
631   if (rc != SUCCESS)
632     goto exit;
633 
634   assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
635   assert("Session present is 0", connack.sessionPresent == 0,
636          "sessionPresent was %d", connack.sessionPresent);
637 
638   assert("Good rc in connack", MQTTIsConnected(&c) == 1,
639                 "isconnected was %d", MQTTIsConnected(&c));
640 
641   rc = MQTTDisconnect(&c);
642   assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
643   NetworkDisconnect(&n);
644 
645   /* reconnect with cleansession false */
646   data.cleansession = 0;
647   rc = NetworkConnect(&n, options.proxy_host, options.proxy_port);
648   assert("TCP connect successful",  rc == SUCCESS, "rc was %d", rc);
649   rc = MQTTConnectWithResults(&c, &data, &connack);
650   assert("Connect successful",  rc == SUCCESS, "rc was %d", rc);
651 
652   assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
653   assert("Session present is 0", connack.sessionPresent == 0,
654            "sessionPresent was %d", connack.sessionPresent);
655 
656   rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived, &suback);
657   assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
658   assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
659          "rc was %d", suback.grantedQoS);
660 
661   check_subs_exist(&c, test_topic, 1);
662 
663   rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived2, &suback);
664   assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
665   assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
666                   "rc was %d", suback.grantedQoS);
667 
668   check_subs_exist(&c, test_topic, 2);
669 
670   rc = MQTTDisconnect(&c);
671   assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
672   NetworkDisconnect(&n);
673 
674   /* reconnect with cleansession false */
675   data.cleansession = 0;
676   rc = NetworkConnect(&n, options.proxy_host, options.proxy_port);
677   assert("TCP connect successful",  rc == SUCCESS, "rc was %d", rc);
678   rc = MQTTConnectWithResults(&c, &data, &connack);
679   assert("Connect successful",  rc == SUCCESS, "rc was %d", rc);
680 
681   assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
682   assert("Session present is 1", connack.sessionPresent == 1,
683            "sessionPresent was %d", connack.sessionPresent);
684 
685   check_subs_exist(&c, test_topic, 2);
686 
687   rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived, &suback);
688   assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
689   assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
690             "rc was %d", suback.grantedQoS);
691 
692   check_subs_exist(&c, test_topic, 1);
693 
694   // cause a connection FAILURE
695   memset(&pubmsg, '\0', sizeof(pubmsg));
696   pubmsg.payload = (void*)"TERMINATE";
697   pubmsg.payloadlen = strlen((char*)pubmsg.payload);
698   pubmsg.qos = QOS0;
699   pubmsg.retained = 0;
700   pubmsg.dup = 0;
701   rc = MQTTPublish(&c, "MQTTSAS topic", &pubmsg);
702   assert("Good rc from publish", rc == SUCCESS, "rc was %d", rc);
703 
704   // wait for failure to be noticed by keepalive
705   wait_seconds = 20;
706   while (MQTTIsConnected(&c) && (wait_seconds-- > 0))
707   {
708       MQTTYield(&c, 1000);
709   }
710   assert("Disconnected", !MQTTIsConnected(&c), "isConnected was %d",
711          MQTTIsConnected(&c));
712   NetworkDisconnect(&n);
713 
714   /* reconnect with cleansession false */
715   data.cleansession = 0;
716   rc = NetworkConnect(&n, options.host, options.port);
717   assert("TCP connect successful",  rc == SUCCESS, "rc was %d", rc);
718   rc = MQTTConnectWithResults(&c, &data, &connack);
719   assert("Connect successful",  rc == SUCCESS, "rc was %d", rc);
720 
721   assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
722   assert("Session present is 1", connack.sessionPresent == 1,
723            "sessionPresent was %d", connack.sessionPresent);
724 
725   check_subs_exist(&c, test_topic, 1);
726 
727   rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived2, &suback);
728   assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
729   assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
730                   "rc was %d", suback.grantedQoS);
731 
732   check_subs_exist(&c, test_topic, 2);
733 
734   rc = MQTTDisconnect(&c);
735   assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
736   NetworkDisconnect(&n);
737 
738   /* reconnect with cleansession true to clean up both server and client state */
739   data.cleansession = 1;
740   rc = NetworkConnect(&n, options.host, options.port);
741   assert("TCP connect successful",  rc == SUCCESS, "rc was %d", rc);
742   rc = MQTTConnectWithResults(&c, &data, &connack);
743   assert("Connect successful",  rc == SUCCESS, "rc was %d", rc);
744 
745   assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc);
746   assert("Session present is 0", connack.sessionPresent == 0,
747            "sessionPresent was %d", connack.sessionPresent);
748 
749   rc = MQTTSubscribeWithResults(&c, test_topic, subsqos, messageArrived2, &suback);
750   assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
751   assert("Granted QoS rc from subscribe", suback.grantedQoS == QOS2,
752                   "rc was %d", suback.grantedQoS);
753 
754   check_subs_exist(&c, test_topic, 2);
755 
756   rc = MQTTDisconnect(&c);
757   assert("Disconnect successful", rc == SUCCESS, "rc was %d", rc);
758   NetworkDisconnect(&n);
759 
760 exit:
761   MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
762       (failures == 0) ? "passed" : "failed", tests, failures);
763   write_test_result();
764   return failures;
765 }
766 
767 #if 0
768 /*********************************************************************
769 
770 Test 4: connectionLost and will message
771 
772 *********************************************************************/
773 MQTTClient test6_c1, test6_c2;
774 volatile int test6_will_message_arrived = 0;
775 volatile int test6_connection_lost_called = 0;
776 
777 void test6_connectionLost(void* context, char* cause)
778 {
779 	MQTTClient c = (MQTTClient)context;
780 	printf("%s -> Callback: connection lost\n", (c == test6_c1) ? "Client-1" : "Client-2");
781 	test6_connection_lost_called = 1;
782 }
783 
784 void test6_deliveryComplete(void* context, MQTTClient_deliveryToken token)
785 {
786 	printf("Client-2 -> Callback: publish complete for token %d\n", token);
787 }
788 
789 char* test6_will_topic = "C Test 2: will topic";
790 char* test6_will_message = "will message from Client-1";
791 
792 int test6_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
793 {
794 	MQTTClient c = (MQTTClient)context;
795 	printf("%s -> Callback: message received on topic '%s' is '%.*s'.\n",
796 			 (c == test6_c1) ? "Client-1" : "Client-2", topicName, m->payloadlen, (char*)(m->payload));
797 	if (c == test6_c2 && strcmp(topicName, test6_will_topic) == 0 && memcmp(m->payload, test6_will_message, m->payloadlen) == 0)
798 		test6_will_message_arrived = 1;
799 	MQTTClient_free(topicName);
800 	MQTTClient_freeMessage(&m);
801 	return 1;
802 }
803 
804 
805 int test6(struct Options options)
806 {
807 	char* testname = "test6";
808 	MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
809 	MQTTClient_willOptions wopts =  MQTTClient_willOptions_initializer;
810 	MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer;
811 	int rc, count;
812 	char* mqttsas_topic = "MQTTSAS topic";
813 
814 	failures = 0;
815 	MyLog(LOGA_INFO, "Starting test 6 - connectionLost and will messages");
816 	fprintf(xml, "<testcase classname=\"test1\" name=\"connectionLost and will messages\"");
817 	global_start_time = start_clock();
818 
819 	opts.keepAliveInterval = 2;
820 	opts.cleansession = 1;
821 	opts.MQTTVersion = MQTTVERSION_3_1_1;
822 	opts.will = &wopts;
823 	opts.will->message = test6_will_message;
824 	opts.will->qos = 1;
825 	opts.will->retained = 0;
826 	opts.will->topicName = test6_will_topic;
827 	if (options.haconnections != NULL)
828 	{
829 		opts.serverURIs = options.haconnections;
830 		opts.serverURIcount = options.hacount;
831 	}
832 
833 	/* Client-1 with Will options */
834 	rc = MQTTClient_create(&test6_c1, options.proxy_connection, "Client_1", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
835 	assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
836 	if (rc != MQTTCLIENT_SUCCESS)
837 		goto exit;
838 
839 	rc = MQTTClient_setCallbacks(test6_c1, (void*)test6_c1, test6_connectionLost, test6_messageArrived, test6_deliveryComplete);
840 	assert("good rc from setCallbacks",  rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
841 	if (rc != MQTTCLIENT_SUCCESS)
842 		goto exit;
843 
844 	/* Connect to the broker */
845 	rc = MQTTClient_connect(test6_c1, &opts);
846 	assert("good rc from connect",  rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
847 	if (rc != MQTTCLIENT_SUCCESS)
848 		goto exit;
849 
850 	/* Client - 2 (multi-threaded) */
851 	rc = MQTTClient_create(&test6_c2, options.connection, "Client_2", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
852 	assert("good rc from create",  rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
853 
854 	/* Set the callback functions for the client */
855 	rc = MQTTClient_setCallbacks(test6_c2, (void*)test6_c2, test6_connectionLost, test6_messageArrived, test6_deliveryComplete);
856 	assert("good rc from setCallbacks",  rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
857 
858 	/* Connect to the broker */
859 	opts2.keepAliveInterval = 20;
860 	opts2.cleansession = 1;
861 	MyLog(LOGA_INFO, "Connecting Client_2 ...");
862 	rc = MQTTClient_connect(test6_c2, &opts2);
863 	assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
864 
865 	rc = MQTTClient_subscribe(test6_c2, test6_will_topic, 2);
866 	assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
867 
868 	/* now send the command which will break the connection and cause the will message to be sent */
869 	rc = MQTTClient_publish(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL);
870 	assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
871 
872 	MyLog(LOGA_INFO, "Waiting to receive the will message");
873 	count = 0;
874 	while (++count < 40)
875 	{
876 		#if defined(WIN32)
877 			Sleep(1000L);
878 		#else
879 			sleep(1);
880 		#endif
881 		if (test6_will_message_arrived == 1 && test6_connection_lost_called == 1)
882 			break;
883 	}
884 	assert("will message arrived", test6_will_message_arrived == 1,
885 							"will_message_arrived was %d\n", test6_will_message_arrived);
886 	assert("connection lost called", test6_connection_lost_called == 1,
887 			         "connection_lost_called %d\n", test6_connection_lost_called);
888 
889 	rc = MQTTClient_unsubscribe(test6_c2, test6_will_topic);
890 	assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
891 
892 	rc = MQTTClient_isConnected(test6_c2);
893 	assert("Client-2 still connected", rc == 1, "isconnected is %d", rc);
894 
895 	rc = MQTTClient_isConnected(test6_c1);
896 	assert("Client-1 not connected", rc == 0, "isconnected is %d", rc);
897 
898 	rc = MQTTClient_disconnect(test6_c2, 100L);
899 	assert("Good rc from disconnect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
900 
901 	MQTTClient_destroy(&test6_c1);
902 	MQTTClient_destroy(&test6_c2);
903 
904 exit:
905 	MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.\n",
906 			(failures == 0) ? "passed" : "failed", testname, tests, failures);
907 	write_test_result();
908 	return failures;
909 }
910 
911 
912 int test6a(struct Options options)
913 {
914 	char* testname = "test6a";
915 	MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
916 	MQTTClient_willOptions wopts =  MQTTClient_willOptions_initializer;
917 	MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer;
918 	int rc, count;
919 	char* mqttsas_topic = "MQTTSAS topic";
920 
921 	failures = 0;
922 	MyLog(LOGA_INFO, "Starting test 6 - connectionLost and binary will messages");
923 	fprintf(xml, "<testcase classname=\"test1\" name=\"connectionLost and binary will messages\"");
924 	global_start_time = start_clock();
925 
926 	opts.keepAliveInterval = 2;
927 	opts.cleansession = 1;
928 	opts.MQTTVersion = MQTTVERSION_3_1_1;
929 	opts.will = &wopts;
930 	opts.will->payload.data = test6_will_message;
931 	opts.will->payload.len = strlen(test6_will_message) + 1;
932 	opts.will->qos = 1;
933 	opts.will->retained = 0;
934 	opts.will->topicName = test6_will_topic;
935 	if (options.haconnections != NULL)
936 	{
937 		opts.serverURIs = options.haconnections;
938 		opts.serverURIcount = options.hacount;
939 	}
940 
941 	/* Client-1 with Will options */
942 	rc = MQTTClient_create(&test6_c1, options.proxy_connection, "Client_1", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
943 	assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
944 	if (rc != MQTTCLIENT_SUCCESS)
945 		goto exit;
946 
947 	rc = MQTTClient_setCallbacks(test6_c1, (void*)test6_c1, test6_connectionLost, test6_messageArrived, test6_deliveryComplete);
948 	assert("good rc from setCallbacks",  rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
949 	if (rc != MQTTCLIENT_SUCCESS)
950 		goto exit;
951 
952 	/* Connect to the broker */
953 	rc = MQTTClient_connect(test6_c1, &opts);
954 	assert("good rc from connect",  rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
955 	if (rc != MQTTCLIENT_SUCCESS)
956 		goto exit;
957 
958 	/* Client - 2 (multi-threaded) */
959 	rc = MQTTClient_create(&test6_c2, options.connection, "Client_2", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
960 	assert("good rc from create",  rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
961 
962 	/* Set the callback functions for the client */
963 	rc = MQTTClient_setCallbacks(test6_c2, (void*)test6_c2, test6_connectionLost, test6_messageArrived, test6_deliveryComplete);
964 	assert("good rc from setCallbacks",  rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
965 
966 	/* Connect to the broker */
967 	opts2.keepAliveInterval = 20;
968 	opts2.cleansession = 1;
969 	MyLog(LOGA_INFO, "Connecting Client_2 ...");
970 	rc = MQTTClient_connect(test6_c2, &opts2);
971 	assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
972 
973 	rc = MQTTClient_subscribe(test6_c2, test6_will_topic, 2);
974 	assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
975 
976 	/* now send the command which will break the connection and cause the will message to be sent */
977 	rc = MQTTClient_publish(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL);
978 	assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
979 
980 	MyLog(LOGA_INFO, "Waiting to receive the will message");
981 	count = 0;
982 	while (++count < 40)
983 	{
984 		#if defined(WIN32)
985 			Sleep(1000L);
986 		#else
987 			sleep(1);
988 		#endif
989 		if (test6_will_message_arrived == 1 && test6_connection_lost_called == 1)
990 			break;
991 	}
992 	assert("will message arrived", test6_will_message_arrived == 1,
993 							"will_message_arrived was %d\n", test6_will_message_arrived);
994 	assert("connection lost called", test6_connection_lost_called == 1,
995 			         "connection_lost_called %d\n", test6_connection_lost_called);
996 
997 	rc = MQTTClient_unsubscribe(test6_c2, test6_will_topic);
998 	assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
999 
1000 	rc = MQTTClient_isConnected(test6_c2);
1001 	assert("Client-2 still connected", rc == 1, "isconnected is %d", rc);
1002 
1003 	rc = MQTTClient_isConnected(test6_c1);
1004 	assert("Client-1 not connected", rc == 0, "isconnected is %d", rc);
1005 
1006 	rc = MQTTClient_disconnect(test6_c2, 100L);
1007 	assert("Good rc from disconnect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1008 
1009 	MQTTClient_destroy(&test6_c1);
1010 	MQTTClient_destroy(&test6_c2);
1011 
1012 exit:
1013 	MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.\n",
1014 			(failures == 0) ? "passed" : "failed", testname, tests, failures);
1015 	write_test_result();
1016 	return failures;
1017 }
1018 #endif
1019 
main(int argc,char ** argv)1020 int main(int argc, char** argv)
1021 {
1022 	int rc = 0;
1023  	int (*tests[])() = {NULL, test1, test2, test3};
1024 	int i;
1025 
1026 	xml = fopen("TEST-test1.xml", "w");
1027 	fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
1028 
1029 	getopts(argc, argv);
1030 
1031 	for (i = 0; i < options.iterations; ++i)
1032 	{
1033 	 	if (options.test_no == 0)
1034 		{ /* run all the tests */
1035  		   	for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
1036 				rc += tests[options.test_no](options); /* return number of failures.  0 = test succeeded */
1037 		}
1038 		else
1039  		   	rc = tests[options.test_no](options); /* run just the selected test */
1040 	}
1041 
1042  	if (rc == 0)
1043 		MyLog(LOGA_INFO, "verdict pass");
1044 	else
1045 		MyLog(LOGA_INFO, "verdict fail");
1046 
1047 	fprintf(xml, "</testsuite>\n");
1048 	fclose(xml);
1049 	return rc;
1050 }
1051