1 /*
2 * lws-minimal-mqtt-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 * Sakthi Kannan <saktr@amazon.com>
6 *
7 * This file is made available under the Creative Commons CC0 1.0
8 * Universal Public Domain Dedication.
9 */
10
11 #include <libwebsockets.h>
12 #include <string.h>
13 #include <signal.h>
14 #include <pthread.h>
15 #include <assert.h>
16
17 #define COUNT 8
18
19 struct test_item {
20 struct lws_context *context;
21 struct lws *wsi;
22 lws_sorted_usec_list_t sul;
23 } items[COUNT];
24
25 enum {
26 STATE_SUBSCRIBE, /* subscribe to the topic */
27 STATE_WAIT_SUBACK,
28 STATE_PUBLISH_QOS0, /* Send the message in QoS0 */
29 STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */
30 STATE_PUBLISH_QOS1, /* Send the message in QoS1 */
31 STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */
32 STATE_UNSUBSCRIBE,
33 STATE_WAIT_UNSUBACK,
34
35 STATE_TEST_FINISH
36 };
37
38 static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
39 done, count = COUNT;
40
41 static const lws_retry_bo_t retry = {
42 .secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */
43 .secs_since_valid_hangup = 25, /* hangup if still idle secs */
44 };
45
46 static const lws_mqtt_client_connect_param_t client_connect_param = {
47 .client_id = NULL,
48 .keep_alive = 60,
49 .clean_start = 1,
50 .will_param = {
51 .topic = "good/bye",
52 .message = "sign-off",
53 .qos = 0,
54 .retain = 0,
55 },
56 .username = "lwsUser",
57 .password = "mySecretPassword",
58 };
59
60 static lws_mqtt_topic_elem_t topics[] = {
61 [0] = { .name = "test/topic0", .qos = QOS0 },
62 [1] = { .name = "test/topic1", .qos = QOS1 },
63 };
64
65 static lws_mqtt_subscribe_param_t sub_param = {
66 .topic = &topics[0],
67 .num_topics = LWS_ARRAY_SIZE(topics),
68 };
69
70 static const char * const test_string =
71 "No one would have believed in the last years of the nineteenth "
72 "century that this world was being watched keenly and closely by "
73 "intelligences greater than man's and yet as mortal as his own; that as "
74 "men busied themselves about their various concerns they were "
75 "scrutinised and studied, perhaps almost as narrowly as a man with a "
76 "microscope might scrutinise the transient creatures that swarm and "
77 "multiply in a drop of water. With infinite complacency men went to "
78 "and fro over this globe about their little affairs, serene in their "
79 "assurance of their empire over matter. It is possible that the "
80 "infusoria under the microscope do the same. No one gave a thought to "
81 "the older worlds of space as sources of human danger, or thought of "
82 "them only to dismiss the idea of life upon them as impossible or "
83 "improbable. It is curious to recall some of the mental habits of "
84 "those departed days. At most terrestrial men fancied there might be "
85 "other men upon Mars, perhaps inferior to themselves and ready to "
86 "welcome a missionary enterprise. Yet across the gulf of space, minds "
87 "that are to our minds as ours are to those of the beasts that perish, "
88 "intellects vast and cool and unsympathetic, regarded this earth with "
89 "envious eyes, and slowly and surely drew their plans against us. And "
90 "early in the twentieth century came the great disillusionment. ";
91
92 /* this reflects the length of the string above */
93 #define TEST_STRING_LEN 1337
94
95 struct pss {
96 lws_mqtt_publish_param_t pub_param;
97 int state;
98 size_t pos;
99 int retries;
100 };
101
102 static void
sigint_handler(int sig)103 sigint_handler(int sig)
104 {
105 interrupted = 1;
106 }
107
108 static int
connect_client(struct lws_context * context,struct test_item * item)109 connect_client(struct lws_context *context, struct test_item *item)
110 {
111 struct lws_client_connect_info i;
112
113 memset(&i, 0, sizeof i);
114
115 i.mqtt_cp = &client_connect_param;
116 i.opaque_user_data = item;
117 i.protocol = "test-mqtt";
118 i.address = "localhost";
119 i.host = "localhost";
120 i.pwsi = &item->wsi;
121 i.context = context;
122 i.method = "MQTT";
123 i.alpn = "mqtt";
124 i.port = 1883;
125
126 if (do_ssl) {
127 i.ssl_connection = LCCSCF_USE_SSL;
128 i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
129 i.port = 8883;
130 }
131
132 if (pipeline)
133 i.ssl_connection |= LCCSCF_PIPELINE;
134
135 if (!lws_client_connect_via_info(&i)) {
136 lwsl_err("%s: Client Connect Failed\n", __func__);
137
138 return 1;
139 }
140
141 return 0;
142 }
143
144 static void
start_conn(struct lws_sorted_usec_list * sul)145 start_conn(struct lws_sorted_usec_list *sul)
146 {
147 struct test_item *item = lws_container_of(sul, struct test_item, sul);
148
149 lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
150
151 if (connect_client(item->context, item))
152 interrupted = 1;
153 }
154
155
156 static int
system_notify_cb(lws_state_manager_t * mgr,lws_state_notify_link_t * link,int current,int target)157 system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
158 int current, int target)
159 {
160 struct lws_context *context = mgr->parent;
161 int n;
162
163 if (current != LWS_SYSTATE_OPERATIONAL ||
164 target != LWS_SYSTATE_OPERATIONAL)
165 return 0;
166
167 /*
168 * We delay trying to do the client connection until the protocols have
169 * been initialized for each vhost... this happens after we have network
170 * and time so we can judge tls cert validity.
171 *
172 * Stagger the connection attempts so we get some joining before the
173 * first has connected and some afterwards
174 */
175
176 for (n = 0; n < count; n++) {
177 items[n].context = context;
178 lws_sul_schedule(context, 0, &items[n].sul, start_conn,
179 n * stagger_us);
180 }
181
182 return 0;
183 }
184
185
186 static int
callback_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)187 callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
188 void *user, void *in, size_t len)
189 {
190 struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
191 struct pss *pss = (struct pss *)user;
192 lws_mqtt_publish_param_t *pub;
193 size_t chunk;
194
195 switch (reason) {
196 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
197 lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
198 in ? (char *)in : "(null)");
199
200 if (++done == count)
201 goto finish_test;
202 break;
203
204 case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
205 lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
206
207 if (++done == count)
208 goto finish_test;
209 break;
210
211 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
212 lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
213 lws_callback_on_writable(wsi);
214
215 return 0;
216
217 case LWS_CALLBACK_MQTT_SUBSCRIBED:
218 lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
219
220 /* then we can get on with the actual test part */
221
222 pss->state++;
223 lws_callback_on_writable(wsi);
224 break;
225
226 case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
227 lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
228 __func__, (int)(item - &item[0]), wsi);
229 okay++;
230
231 if (++pss->state == STATE_TEST_FINISH) {
232 lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
233 __func__, (int)(item - &items[0]), okay, count);
234 /* We are done, request to close */
235 return -1;
236 }
237 break;
238
239 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
240
241 /*
242 * Extra WRITEABLE may appear here other than ones we asked
243 * for, so we must consult our own state to decide if we want
244 * to make use of the opportunity
245 */
246
247 switch (pss->state) {
248 case STATE_SUBSCRIBE:
249 lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
250
251 if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
252 lwsl_notice("%s: subscribe failed\n", __func__);
253
254 return -1;
255 }
256 pss->state++;
257 break;
258
259 case STATE_PUBLISH_QOS0:
260 case STATE_PUBLISH_QOS1:
261
262 lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
263
264 pss->pub_param.topic = pss->state == STATE_PUBLISH_QOS0 ?
265 "test/topic0" : "test/topic1";
266 pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
267 pss->pub_param.qos =
268 pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
269 pss->pub_param.payload_len = TEST_STRING_LEN;
270
271 /* We send the message out 300 bytes or less at at time */
272
273 chunk = 300;
274
275 if (chunk > TEST_STRING_LEN - pss->pos)
276 chunk = TEST_STRING_LEN - pss->pos;
277
278 lwsl_notice("%s: sending %d at +%d\n", __func__,
279 (int)chunk, (int)pss->pos);
280
281 if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
282 test_string + pss->pos, chunk,
283 (pss->pos + chunk == TEST_STRING_LEN))) {
284 lwsl_notice("%s: publish failed\n", __func__);
285 return -1;
286 }
287
288 pss->pos += chunk;
289
290 if (pss->pos == TEST_STRING_LEN) {
291 lwsl_debug("%s: sent message\n", __func__);
292 pss->pos = 0;
293 pss->state++;
294 }
295 break;
296
297 case STATE_UNSUBSCRIBE:
298 lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
299 __func__, (int)(item - &item[0]), wsi);
300 pss->state++;
301 if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
302 lwsl_notice("%s: subscribe failed\n", __func__);
303 return -1;
304 }
305 break;
306 default:
307 break;
308 }
309
310 return 0;
311
312 case LWS_CALLBACK_MQTT_ACK:
313 lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
314 /*
315 * We can forget about the message we just sent, it's done.
316 *
317 * For our test, that's the indication we can close the wsi.
318 */
319
320 pss->state++;
321 if (pss->state != STATE_TEST_FINISH) {
322 lws_callback_on_writable(wsi);
323 break;
324 }
325
326 break;
327
328 case LWS_CALLBACK_MQTT_RESEND:
329 lwsl_user("%s: MQTT_RESEND\n", __func__);
330 /*
331 * We must resend the packet ID mentioned in len
332 */
333 if (++pss->retries == 3) {
334 lwsl_notice("%s: too many retries\n", __func__);
335 return 1; /* kill the connection */
336 }
337 pss->state--;
338 pss->pos = 0;
339 break;
340
341 case LWS_CALLBACK_MQTT_CLIENT_RX:
342 pub = (lws_mqtt_publish_param_t *)in;
343 assert(pub);
344 lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
345 (int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
346 (int)pub->payload_len, (int)len);
347
348 //lwsl_hexdump_info(pub->payload, len);
349
350 return 0;
351
352 default:
353 break;
354 }
355
356 return 0;
357
358 finish_test:
359 interrupted = 1;
360 lws_cancel_service(lws_get_context(wsi));
361
362 return 0;
363 }
364
365 static const struct lws_protocols protocols[] = {
366 {
367 .name = "test-mqtt",
368 .callback = callback_mqtt,
369 .per_session_data_size = sizeof(struct pss)
370 },
371 { NULL, NULL, 0, 0 }
372 };
373
main(int argc,const char ** argv)374 int main(int argc, const char **argv)
375 {
376 lws_state_notify_link_t notifier = { {}, system_notify_cb, "app" };
377 lws_state_notify_link_t *na[] = { ¬ifier, NULL };
378 struct lws_context_creation_info info;
379 struct lws_context *context;
380 const char *p;
381 int n = 0;
382
383 signal(SIGINT, sigint_handler);
384 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
385 lws_cmdline_option_handle_builtin(argc, argv, &info);
386
387 do_ssl = !!lws_cmdline_option(argc, argv, "-s");
388 if (do_ssl)
389 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
390
391 if (lws_cmdline_option(argc, argv, "-p"))
392 pipeline = 1;
393
394 if ((p = lws_cmdline_option(argc, argv, "-i")))
395 stagger_us = atoi(p);
396
397 if ((p = lws_cmdline_option(argc, argv, "-c")))
398 count = atoi(p);
399
400 if (count > COUNT) {
401 count = COUNT;
402 lwsl_err("%s: clipped count at max %d\n", __func__, count);
403 }
404
405 lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
406 do_ssl ? "tls enabled": "unencrypted");
407
408 info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
409 info.protocols = protocols;
410 info.register_notifier_list = na;
411 info.fd_limit_per_thread = 1 + COUNT + 1;
412 info.retry_and_idle_policy = &retry;
413
414 #if defined(LWS_WITH_MBEDTLS)
415 /*
416 * OpenSSL uses the system trust store. mbedTLS has to be told which
417 * CA to trust explicitly.
418 */
419 info.client_ssl_ca_filepath = "./mosq-ca.crt";
420 #endif
421
422 context = lws_create_context(&info);
423 if (!context) {
424 lwsl_err("lws init failed\n");
425 return 1;
426 }
427
428 /* Event loop */
429 while (n >= 0 && !interrupted)
430 n = lws_service(context, 0);
431
432 lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
433 okay != count ? "failed" : "OK");
434 lws_context_destroy(context);
435
436 return okay != count;
437 }
438