1 /*
2 * lws-minimal-mqtt-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 * Sakthi Kannan <saktr@amazon.com>
6 *
7 * This file is made available under the Creative Commons CC0 1.0
8 * Universal Public Domain Dedication.
9 */
10
11 #include <libwebsockets.h>
12 #include <string.h>
13 #include <signal.h>
14 #if defined(WIN32)
15 #define HAVE_STRUCT_TIMESPEC
16 #if defined(pid_t)
17 #undef pid_t
18 #endif
19 #endif
20 #include <pthread.h>
21 #include <assert.h>
22
23 #define COUNT 8
24
25 struct test_item {
26 struct lws_context *context;
27 struct lws *wsi;
28 lws_sorted_usec_list_t sul;
29 } items[COUNT];
30
31 enum {
32 STATE_SUBSCRIBE, /* subscribe to the topic */
33 STATE_WAIT_SUBACK,
34 STATE_PUBLISH_QOS0, /* Send the message in QoS0 */
35 STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */
36 STATE_PUBLISH_QOS1, /* Send the message in QoS1 */
37 STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */
38 STATE_UNSUBSCRIBE,
39 STATE_WAIT_UNSUBACK,
40
41 STATE_TEST_FINISH
42 };
43
44 static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
45 done, count = COUNT;
46
47 static const lws_retry_bo_t retry = {
48 .secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */
49 .secs_since_valid_hangup = 25, /* hangup if still idle secs */
50 };
51
52 static const lws_mqtt_client_connect_param_t client_connect_param = {
53 .client_id = NULL,
54 .keep_alive = 60,
55 .clean_start = 1,
56 .client_id_nofree = 1,
57 .username_nofree = 1,
58 .password_nofree = 1,
59 .will_param = {
60 .topic = "good/bye",
61 .message = "sign-off",
62 .qos = 0,
63 .retain = 0,
64 },
65 .username = "lwsUser",
66 .password = "mySecretPassword",
67 };
68
69 static lws_mqtt_topic_elem_t topics[] = {
70 [0] = { .name = "test/topic0", .qos = QOS0 },
71 [1] = { .name = "test/topic1", .qos = QOS1 },
72 };
73
74 static lws_mqtt_subscribe_param_t sub_param = {
75 .topic = &topics[0],
76 .num_topics = LWS_ARRAY_SIZE(topics),
77 };
78
79 static const char * const test_string =
80 "No one would have believed in the last years of the nineteenth "
81 "century that this world was being watched keenly and closely by "
82 "intelligences greater than man's and yet as mortal as his own; that as "
83 "men busied themselves about their various concerns they were "
84 "scrutinised and studied, perhaps almost as narrowly as a man with a "
85 "microscope might scrutinise the transient creatures that swarm and "
86 "multiply in a drop of water. With infinite complacency men went to "
87 "and fro over this globe about their little affairs, serene in their "
88 "assurance of their empire over matter. It is possible that the "
89 "infusoria under the microscope do the same. No one gave a thought to "
90 "the older worlds of space as sources of human danger, or thought of "
91 "them only to dismiss the idea of life upon them as impossible or "
92 "improbable. It is curious to recall some of the mental habits of "
93 "those departed days. At most terrestrial men fancied there might be "
94 "other men upon Mars, perhaps inferior to themselves and ready to "
95 "welcome a missionary enterprise. Yet across the gulf of space, minds "
96 "that are to our minds as ours are to those of the beasts that perish, "
97 "intellects vast and cool and unsympathetic, regarded this earth with "
98 "envious eyes, and slowly and surely drew their plans against us. And "
99 "early in the twentieth century came the great disillusionment. ";
100
101 /* this reflects the length of the string above */
102 #define TEST_STRING_LEN 1337
103
104 struct pss {
105 lws_mqtt_publish_param_t pub_param;
106 int state;
107 size_t pos;
108 int retries;
109 };
110
111 static void
sigint_handler(int sig)112 sigint_handler(int sig)
113 {
114 interrupted = 1;
115 }
116
117 static int
connect_client(struct lws_context * context,struct test_item * item)118 connect_client(struct lws_context *context, struct test_item *item)
119 {
120 struct lws_client_connect_info i;
121
122 memset(&i, 0, sizeof i);
123
124 i.mqtt_cp = &client_connect_param;
125 i.opaque_user_data = item;
126 i.protocol = "test-mqtt";
127 i.address = "localhost";
128 i.host = "localhost";
129 i.pwsi = &item->wsi;
130 i.context = context;
131 i.method = "MQTT";
132 i.alpn = "mqtt";
133 i.port = 1883;
134
135 if (do_ssl) {
136 i.ssl_connection = LCCSCF_USE_SSL;
137 i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
138 i.port = 8883;
139 }
140
141 if (pipeline)
142 i.ssl_connection |= LCCSCF_PIPELINE;
143
144 if (!lws_client_connect_via_info(&i)) {
145 lwsl_err("%s: Client Connect Failed\n", __func__);
146
147 return 1;
148 }
149
150 return 0;
151 }
152
153 static void
start_conn(struct lws_sorted_usec_list * sul)154 start_conn(struct lws_sorted_usec_list *sul)
155 {
156 struct test_item *item = lws_container_of(sul, struct test_item, sul);
157
158 lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
159
160 if (connect_client(item->context, item))
161 interrupted = 1;
162 }
163
164
165 static int
system_notify_cb(lws_state_manager_t * mgr,lws_state_notify_link_t * link,int current,int target)166 system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
167 int current, int target)
168 {
169 struct lws_context *context = mgr->parent;
170 int n;
171
172 if (current != LWS_SYSTATE_OPERATIONAL ||
173 target != LWS_SYSTATE_OPERATIONAL)
174 return 0;
175
176 /*
177 * We delay trying to do the client connection until the protocols have
178 * been initialized for each vhost... this happens after we have network
179 * and time so we can judge tls cert validity.
180 *
181 * Stagger the connection attempts so we get some joining before the
182 * first has connected and some afterwards
183 */
184
185 for (n = 0; n < count; n++) {
186 items[n].context = context;
187 lws_sul_schedule(context, 0, &items[n].sul, start_conn,
188 n * stagger_us);
189 }
190
191 return 0;
192 }
193
194
195 static int
callback_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)196 callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
197 void *user, void *in, size_t len)
198 {
199 struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
200 struct pss *pss = (struct pss *)user;
201 lws_mqtt_publish_param_t *pub;
202 size_t chunk;
203
204 switch (reason) {
205 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
206 lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
207 in ? (char *)in : "(null)");
208
209 if (++done == count)
210 goto finish_test;
211 break;
212
213 case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
214 lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
215
216 if (++done == count)
217 goto finish_test;
218 break;
219
220 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
221 lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
222 lws_callback_on_writable(wsi);
223
224 return 0;
225
226 case LWS_CALLBACK_MQTT_SUBSCRIBED:
227 lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
228
229 /* then we can get on with the actual test part */
230
231 pss->state++;
232 lws_callback_on_writable(wsi);
233 break;
234
235 case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
236 lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
237 __func__, (int)(item - &item[0]), wsi);
238 okay++;
239
240 if (++pss->state == STATE_TEST_FINISH) {
241 lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
242 __func__, (int)(item - &items[0]), okay, count);
243 /* We are done, request to close */
244 return -1;
245 }
246 break;
247
248 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
249
250 /*
251 * Extra WRITEABLE may appear here other than ones we asked
252 * for, so we must consult our own state to decide if we want
253 * to make use of the opportunity
254 */
255
256 switch (pss->state) {
257 case STATE_SUBSCRIBE:
258 lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
259
260 if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
261 lwsl_notice("%s: subscribe failed\n", __func__);
262
263 return -1;
264 }
265 pss->state++;
266 break;
267
268 case STATE_PUBLISH_QOS0:
269 case STATE_PUBLISH_QOS1:
270
271 lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
272
273 pss->pub_param.topic = pss->state == STATE_PUBLISH_QOS0 ?
274 "test/topic0" : "test/topic1";
275 pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
276 pss->pub_param.qos =
277 pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
278 pss->pub_param.payload_len = TEST_STRING_LEN;
279
280 /* We send the message out 300 bytes or less at at time */
281
282 chunk = 300;
283
284 if (chunk > TEST_STRING_LEN - pss->pos)
285 chunk = TEST_STRING_LEN - pss->pos;
286
287 lwsl_notice("%s: sending %d at +%d\n", __func__,
288 (int)chunk, (int)pss->pos);
289
290 if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
291 test_string + pss->pos, (uint32_t)chunk,
292 (pss->pos + chunk == TEST_STRING_LEN))) {
293 lwsl_notice("%s: publish failed\n", __func__);
294 return -1;
295 }
296
297 pss->pos += chunk;
298
299 if (pss->pos == TEST_STRING_LEN) {
300 lwsl_debug("%s: sent message\n", __func__);
301 pss->pos = 0;
302 pss->state++;
303 }
304 break;
305
306 case STATE_UNSUBSCRIBE:
307 lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
308 __func__, (int)(item - &item[0]), wsi);
309 pss->state++;
310 if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
311 lwsl_notice("%s: subscribe failed\n", __func__);
312 return -1;
313 }
314 break;
315 default:
316 break;
317 }
318
319 return 0;
320
321 case LWS_CALLBACK_MQTT_ACK:
322 lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
323 /*
324 * We can forget about the message we just sent, it's done.
325 *
326 * For our test, that's the indication we can close the wsi.
327 */
328
329 pss->state++;
330 if (pss->state != STATE_TEST_FINISH) {
331 lws_callback_on_writable(wsi);
332 break;
333 }
334
335 break;
336
337 case LWS_CALLBACK_MQTT_RESEND:
338 lwsl_user("%s: MQTT_RESEND\n", __func__);
339 /*
340 * We must resend the packet ID mentioned in len
341 */
342 if (++pss->retries == 3) {
343 lwsl_notice("%s: too many retries\n", __func__);
344 return 1; /* kill the connection */
345 }
346 pss->state--;
347 pss->pos = 0;
348 break;
349
350 case LWS_CALLBACK_MQTT_CLIENT_RX:
351 pub = (lws_mqtt_publish_param_t *)in;
352 assert(pub);
353 lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
354 (int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
355 (int)pub->payload_len, (int)len);
356
357 //lwsl_hexdump_info(pub->payload, len);
358
359 return 0;
360
361 default:
362 break;
363 }
364
365 return 0;
366
367 finish_test:
368 interrupted = 1;
369 lws_cancel_service(lws_get_context(wsi));
370
371 return 0;
372 }
373
374 static const struct lws_protocols protocols[] = {
375 {
376 .name = "test-mqtt",
377 .callback = callback_mqtt,
378 .per_session_data_size = sizeof(struct pss)
379 },
380 LWS_PROTOCOL_LIST_TERM
381 };
382
main(int argc,const char ** argv)383 int main(int argc, const char **argv)
384 {
385 lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
386 system_notify_cb, "app" };
387 lws_state_notify_link_t *na[] = { ¬ifier, NULL };
388 struct lws_context_creation_info info;
389 struct lws_context *context;
390 const char *p;
391 int n = 0;
392
393 signal(SIGINT, sigint_handler);
394 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
395 lws_cmdline_option_handle_builtin(argc, argv, &info);
396
397 do_ssl = !!lws_cmdline_option(argc, argv, "-s");
398 if (do_ssl)
399 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
400
401 if (lws_cmdline_option(argc, argv, "-p"))
402 pipeline = 1;
403
404 if ((p = lws_cmdline_option(argc, argv, "-i")))
405 stagger_us = atoi(p);
406
407 if ((p = lws_cmdline_option(argc, argv, "-c")))
408 count = atoi(p);
409
410 if (count > COUNT) {
411 count = COUNT;
412 lwsl_err("%s: clipped count at max %d\n", __func__, count);
413 }
414
415 lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
416 do_ssl ? "tls enabled": "unencrypted");
417
418 info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
419 info.protocols = protocols;
420 info.register_notifier_list = na;
421 info.fd_limit_per_thread = 1 + COUNT + 1;
422 info.retry_and_idle_policy = &retry;
423
424 #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
425 /*
426 * OpenSSL uses the system trust store. mbedTLS has to be told which
427 * CA to trust explicitly.
428 */
429 info.client_ssl_ca_filepath = "./mosq-ca.crt";
430 #endif
431
432 context = lws_create_context(&info);
433 if (!context) {
434 lwsl_err("lws init failed\n");
435 return 1;
436 }
437
438 /* Event loop */
439 while (n >= 0 && !interrupted)
440 n = lws_service(context, 0);
441
442 lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
443 okay != count ? "failed" : "OK");
444 lws_context_destroy(context);
445
446 return okay != count;
447 }
448