• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws-minimal-mqtt-client
3  *
4  * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5  *                         Sakthi Kannan <saktr@amazon.com>
6  *
7  * This file is made available under the Creative Commons CC0 1.0
8  * Universal Public Domain Dedication.
9  */
10 
11 #include <libwebsockets.h>
12 #include <string.h>
13 #include <signal.h>
14 #if defined(WIN32)
15 #define HAVE_STRUCT_TIMESPEC
16 #if defined(pid_t)
17 #undef pid_t
18 #endif
19 #endif
20 #include <pthread.h>
21 #include <assert.h>
22 
23 #define COUNT 8
24 
25 struct test_item {
26 	struct lws_context		*context;
27 	struct lws			*wsi;
28 	lws_sorted_usec_list_t		sul;
29 } items[COUNT];
30 
31 enum {
32 	STATE_SUBSCRIBE,	/* subscribe to the topic */
33 	STATE_WAIT_SUBACK,
34 	STATE_PUBLISH_QOS0,	/* Send the message in QoS0 */
35 	STATE_WAIT_ACK0,	/* Wait for the synthetic "ack" */
36 	STATE_PUBLISH_QOS1,	/* Send the message in QoS1 */
37 	STATE_WAIT_ACK1,	/* Wait for the real ack (or timeout + retry) */
38 	STATE_UNSUBSCRIBE,
39 	STATE_WAIT_UNSUBACK,
40 
41 	STATE_TEST_FINISH
42 };
43 
44 static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
45 	   done, count = COUNT;
46 
47 static const lws_retry_bo_t retry = {
48 	.secs_since_valid_ping		= 20, /* if idle, PINGREQ after secs */
49 	.secs_since_valid_hangup	= 25, /* hangup if still idle secs */
50 };
51 
52 static const lws_mqtt_client_connect_param_t client_connect_param = {
53 	.client_id			= NULL,
54 	.keep_alive			= 60,
55 	.clean_start			= 1,
56 	.client_id_nofree		= 1,
57 	.username_nofree		= 1,
58 	.password_nofree		= 1,
59 	.will_param = {
60 		.topic			= "good/bye",
61 		.message		= "sign-off",
62 		.qos			= 0,
63 		.retain			= 0,
64 	},
65 	.username			= "lwsUser",
66 	.password			= "mySecretPassword",
67 };
68 
69 static lws_mqtt_topic_elem_t topics[] = {
70 	[0] = { .name = "test/topic0", .qos = QOS0 },
71 	[1] = { .name = "test/topic1", .qos = QOS1 },
72 };
73 
74 static lws_mqtt_subscribe_param_t sub_param = {
75 	.topic				= &topics[0],
76 	.num_topics			= LWS_ARRAY_SIZE(topics),
77 };
78 
79 static const char * const test_string =
80 	"No one would have believed in the last years of the nineteenth "
81 	"century that this world was being watched keenly and closely by "
82 	"intelligences greater than man's and yet as mortal as his own; that as "
83 	"men busied themselves about their various concerns they were "
84 	"scrutinised and studied, perhaps almost as narrowly as a man with a "
85 	"microscope might scrutinise the transient creatures that swarm and "
86 	"multiply in a drop of water.  With infinite complacency men went to "
87 	"and fro over this globe about their little affairs, serene in their "
88 	"assurance of their empire over matter. It is possible that the "
89 	"infusoria under the microscope do the same.  No one gave a thought to "
90 	"the older worlds of space as sources of human danger, or thought of "
91 	"them only to dismiss the idea of life upon them as impossible or "
92 	"improbable.  It is curious to recall some of the mental habits of "
93 	"those departed days.  At most terrestrial men fancied there might be "
94 	"other men upon Mars, perhaps inferior to themselves and ready to "
95 	"welcome a missionary enterprise. Yet across the gulf of space, minds "
96 	"that are to our minds as ours are to those of the beasts that perish, "
97 	"intellects vast and cool and unsympathetic, regarded this earth with "
98 	"envious eyes, and slowly and surely drew their plans against us.  And "
99 	"early in the twentieth century came the great disillusionment. ";
100 
101 /* this reflects the length of the string above */
102 #define TEST_STRING_LEN 1337
103 
104 struct pss {
105 	lws_mqtt_publish_param_t	pub_param;
106 	int				state;
107 	size_t				pos;
108 	int				retries;
109 };
110 
111 static void
sigint_handler(int sig)112 sigint_handler(int sig)
113 {
114 	interrupted = 1;
115 }
116 
117 static int
connect_client(struct lws_context * context,struct test_item * item)118 connect_client(struct lws_context *context, struct test_item *item)
119 {
120 	struct lws_client_connect_info i;
121 
122 	memset(&i, 0, sizeof i);
123 
124 	i.mqtt_cp = &client_connect_param;
125 	i.opaque_user_data = item;
126 	i.protocol = "test-mqtt";
127 	i.address = "localhost";
128 	i.host = "localhost";
129 	i.pwsi = &item->wsi;
130 	i.context = context;
131 	i.method = "MQTT";
132 	i.alpn = "mqtt";
133 	i.port = 1883;
134 
135 	if (do_ssl) {
136 		i.ssl_connection = LCCSCF_USE_SSL;
137 		i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
138 		i.port = 8883;
139 	}
140 
141 	if (pipeline)
142 		i.ssl_connection |= LCCSCF_PIPELINE;
143 
144 	if (!lws_client_connect_via_info(&i)) {
145 		lwsl_err("%s: Client Connect Failed\n", __func__);
146 
147 		return 1;
148 	}
149 
150 	return 0;
151 }
152 
153 static void
start_conn(struct lws_sorted_usec_list * sul)154 start_conn(struct lws_sorted_usec_list *sul)
155 {
156 	struct test_item *item = lws_container_of(sul, struct test_item, sul);
157 
158 	lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
159 
160 	if (connect_client(item->context, item))
161 		interrupted = 1;
162 }
163 
164 
165 static int
system_notify_cb(lws_state_manager_t * mgr,lws_state_notify_link_t * link,int current,int target)166 system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
167 		 int current, int target)
168 {
169 	struct lws_context *context = mgr->parent;
170 	int n;
171 
172 	if (current != LWS_SYSTATE_OPERATIONAL ||
173 	    target != LWS_SYSTATE_OPERATIONAL)
174 		return 0;
175 
176 	/*
177 	* We delay trying to do the client connection until the protocols have
178 	* been initialized for each vhost... this happens after we have network
179 	* and time so we can judge tls cert validity.
180 	*
181 	* Stagger the connection attempts so we get some joining before the
182 	* first has connected and some afterwards
183 	*/
184 
185 	for (n = 0; n < count; n++) {
186 		items[n].context = context;
187 		lws_sul_schedule(context, 0, &items[n].sul, start_conn,
188 				 n * stagger_us);
189 	}
190 
191 	return 0;
192 }
193 
194 
195 static int
callback_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)196 callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
197 	      void *user, void *in, size_t len)
198 {
199 	struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
200 	struct pss *pss = (struct pss *)user;
201 	lws_mqtt_publish_param_t *pub;
202 	size_t chunk;
203 
204 	switch (reason) {
205 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
206 		lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
207 			 in ? (char *)in : "(null)");
208 
209 		if (++done == count)
210 			goto finish_test;
211 		break;
212 
213 	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
214 		lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
215 
216 		if (++done == count)
217 			goto finish_test;
218 		break;
219 
220 	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
221 		lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
222 		lws_callback_on_writable(wsi);
223 
224 		return 0;
225 
226 	case LWS_CALLBACK_MQTT_SUBSCRIBED:
227 		lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
228 
229 		/* then we can get on with the actual test part */
230 
231 		pss->state++;
232 		lws_callback_on_writable(wsi);
233 		break;
234 
235 	case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
236 		lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
237 			  __func__, (int)(item - &item[0]), wsi);
238 		okay++;
239 
240 		if (++pss->state == STATE_TEST_FINISH) {
241 			lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
242 				    __func__, (int)(item - &items[0]), okay, count);
243 			/* We are done, request to close */
244 			return -1;
245 		}
246 		break;
247 
248 	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
249 
250 		/*
251 		 * Extra WRITEABLE may appear here other than ones we asked
252 		 * for, so we must consult our own state to decide if we want
253 		 * to make use of the opportunity
254 		 */
255 
256 		switch (pss->state) {
257 		case STATE_SUBSCRIBE:
258 			lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
259 
260 			if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
261 				lwsl_notice("%s: subscribe failed\n", __func__);
262 
263 				return -1;
264 			}
265 			pss->state++;
266 			break;
267 
268 		case STATE_PUBLISH_QOS0:
269 		case STATE_PUBLISH_QOS1:
270 
271 			lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
272 
273 			pss->pub_param.topic	= pss->state == STATE_PUBLISH_QOS0 ?
274 						"test/topic0" : "test/topic1";
275 			pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
276 			pss->pub_param.qos =
277 				pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
278 			pss->pub_param.payload_len = TEST_STRING_LEN;
279 
280 			/* We send the message out 300 bytes or less at at time */
281 
282 			chunk = 300;
283 
284 			if (chunk > TEST_STRING_LEN - pss->pos)
285 				chunk = TEST_STRING_LEN - pss->pos;
286 
287 			lwsl_notice("%s: sending %d at +%d\n", __func__,
288 					(int)chunk, (int)pss->pos);
289 
290 			if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
291 					test_string + pss->pos, (uint32_t)chunk,
292 					(pss->pos + chunk == TEST_STRING_LEN))) {
293 				lwsl_notice("%s: publish failed\n", __func__);
294 				return -1;
295 			}
296 
297 			pss->pos += chunk;
298 
299 			if (pss->pos == TEST_STRING_LEN) {
300 				lwsl_debug("%s: sent message\n", __func__);
301 				pss->pos = 0;
302 				pss->state++;
303 			}
304 			break;
305 
306 		case STATE_UNSUBSCRIBE:
307 			lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
308 				  __func__, (int)(item - &item[0]), wsi);
309 			pss->state++;
310 			if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
311 				lwsl_notice("%s: subscribe failed\n", __func__);
312 				return -1;
313 			}
314 			break;
315 		default:
316 			break;
317 		}
318 
319 		return 0;
320 
321 	case LWS_CALLBACK_MQTT_ACK:
322 		lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
323 		/*
324 		 * We can forget about the message we just sent, it's done.
325 		 *
326 		 * For our test, that's the indication we can close the wsi.
327 		 */
328 
329 		pss->state++;
330 		if (pss->state != STATE_TEST_FINISH) {
331 			lws_callback_on_writable(wsi);
332 			break;
333 		}
334 
335 		break;
336 
337 	case LWS_CALLBACK_MQTT_RESEND:
338 		lwsl_user("%s: MQTT_RESEND\n", __func__);
339 		/*
340 		 * We must resend the packet ID mentioned in len
341 		 */
342 		if (++pss->retries == 3) {
343 			lwsl_notice("%s: too many retries\n", __func__);
344 			return 1; /* kill the connection */
345 		}
346 		pss->state--;
347 		pss->pos = 0;
348 		break;
349 
350 	case LWS_CALLBACK_MQTT_CLIENT_RX:
351 		pub = (lws_mqtt_publish_param_t *)in;
352 		assert(pub);
353 		lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
354 			  (int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
355 			  (int)pub->payload_len, (int)len);
356 
357 		//lwsl_hexdump_info(pub->payload, len);
358 
359 		return 0;
360 
361 	default:
362 		break;
363 	}
364 
365 	return 0;
366 
367 finish_test:
368 	interrupted = 1;
369 	lws_cancel_service(lws_get_context(wsi));
370 
371 	return 0;
372 }
373 
374 static const struct lws_protocols protocols[] = {
375 	{
376 		.name			= "test-mqtt",
377 		.callback		= callback_mqtt,
378 		.per_session_data_size	= sizeof(struct pss)
379 	},
380 	LWS_PROTOCOL_LIST_TERM
381 };
382 
main(int argc,const char ** argv)383 int main(int argc, const char **argv)
384 {
385 	lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
386 					     system_notify_cb, "app" };
387 	lws_state_notify_link_t *na[] = { &notifier, NULL };
388 	struct lws_context_creation_info info;
389 	struct lws_context *context;
390 	const char *p;
391 	int n = 0;
392 
393 	signal(SIGINT, sigint_handler);
394 	memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
395 	lws_cmdline_option_handle_builtin(argc, argv, &info);
396 
397 	do_ssl = !!lws_cmdline_option(argc, argv, "-s");
398 	if (do_ssl)
399 		info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
400 
401 	if (lws_cmdline_option(argc, argv, "-p"))
402 		pipeline = 1;
403 
404 	if ((p = lws_cmdline_option(argc, argv, "-i")))
405 		stagger_us = atoi(p);
406 
407 	if ((p = lws_cmdline_option(argc, argv, "-c")))
408 		count = atoi(p);
409 
410 	if (count > COUNT) {
411 		count = COUNT;
412 		lwsl_err("%s: clipped count at max %d\n", __func__, count);
413 	}
414 
415 	lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
416 			do_ssl ? "tls enabled": "unencrypted");
417 
418 	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
419 	info.protocols = protocols;
420 	info.register_notifier_list = na;
421 	info.fd_limit_per_thread = 1 + COUNT + 1;
422 	info.retry_and_idle_policy = &retry;
423 
424 #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
425 	/*
426 	 * OpenSSL uses the system trust store.  mbedTLS has to be told which
427 	 * CA to trust explicitly.
428 	 */
429 	info.client_ssl_ca_filepath = "./mosq-ca.crt";
430 #endif
431 
432 	context = lws_create_context(&info);
433 	if (!context) {
434 		lwsl_err("lws init failed\n");
435 		return 1;
436 	}
437 
438 	/* Event loop */
439 	while (n >= 0 && !interrupted)
440 		n = lws_service(context, 0);
441 
442 	lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
443 			okay != count ? "failed" : "OK");
444 	lws_context_destroy(context);
445 
446 	return okay != count;
447 }
448