1 /*
2 * lws-minimal-mqtt-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 * Sakthi Kannan <saktr@amazon.com>
6 *
7 * This file is made available under the Creative Commons CC0 1.0
8 * Universal Public Domain Dedication.
9 */
10
11 #include <libwebsockets.h>
12 #include <string.h>
13 #include <signal.h>
14 #if defined(WIN32)
15 #define HAVE_STRUCT_TIMESPEC
16 #if defined(pid_t)
17 #undef pid_t
18 #endif
19 #endif
20 #include <pthread.h>
21 #include <assert.h>
22
23 enum {
24 STATE_SUBSCRIBE, /* subscribe to the topic */
25 STATE_PUBLISH_QOS0, /* Send the message in QoS0 */
26 STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */
27 STATE_PUBLISH_QOS1, /* Send the message in QoS1 */
28 STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */
29
30 STATE_TEST_FINISH
31 };
32
33 static int interrupted, bad = 1, do_ssl;
34
35 static const lws_retry_bo_t retry = {
36 .secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */
37 .secs_since_valid_hangup = 25, /* hangup if still idle secs */
38 };
39
40 static const lws_mqtt_client_connect_param_t client_connect_param = {
41 .client_id = "lwsMqttClient",
42 .keep_alive = 60,
43 .clean_start = 1,
44 .client_id_nofree = 1,
45 .username_nofree = 1,
46 .password_nofree = 1,
47 .will_param = {
48 .topic = "good/bye",
49 .message = "sign-off",
50 .qos = 0,
51 .retain = 0,
52 },
53 .username = "lwsUser",
54 .password = "mySecretPassword",
55 };
56
57 static lws_mqtt_publish_param_t pub_param;
58
59 static lws_mqtt_topic_elem_t topics[] = {
60 [0] = { .name = "test/topic0", .qos = QOS0 },
61 [1] = { .name = "test/topic1", .qos = QOS1 },
62 };
63
64 static lws_mqtt_subscribe_param_t sub_param = {
65 .topic = &topics[0],
66 .num_topics = LWS_ARRAY_SIZE(topics),
67 };
68
69 static const char * const test_string =
70 "No one would have believed in the last years of the nineteenth "
71 "century that this world was being watched keenly and closely by "
72 "intelligences greater than man's and yet as mortal as his own; that as "
73 "men busied themselves about their various concerns they were "
74 "scrutinised and studied, perhaps almost as narrowly as a man with a "
75 "microscope might scrutinise the transient creatures that swarm and "
76 "multiply in a drop of water. With infinite complacency men went to "
77 "and fro over this globe about their little affairs, serene in their "
78 "assurance of their empire over matter. It is possible that the "
79 "infusoria under the microscope do the same. No one gave a thought to "
80 "the older worlds of space as sources of human danger, or thought of "
81 "them only to dismiss the idea of life upon them as impossible or "
82 "improbable. It is curious to recall some of the mental habits of "
83 "those departed days. At most terrestrial men fancied there might be "
84 "other men upon Mars, perhaps inferior to themselves and ready to "
85 "welcome a missionary enterprise. Yet across the gulf of space, minds "
86 "that are to our minds as ours are to those of the beasts that perish, "
87 "intellects vast and cool and unsympathetic, regarded this earth with "
88 "envious eyes, and slowly and surely drew their plans against us. And "
89 "early in the twentieth century came the great disillusionment. ";
90
91 /* this reflects the length of the string above */
92 #define TEST_STRING_LEN 1337
93
94 struct pss {
95 int state;
96 size_t pos;
97 int retries;
98 };
99
100 static void
sigint_handler(int sig)101 sigint_handler(int sig)
102 {
103 interrupted = 1;
104 }
105
106 static int
connect_client(struct lws_context * context)107 connect_client(struct lws_context *context)
108 {
109 struct lws_client_connect_info i;
110
111 memset(&i, 0, sizeof i);
112
113 i.mqtt_cp = &client_connect_param;
114 i.address = "localhost";
115 i.host = "localhost";
116 i.protocol = "mqtt";
117 i.context = context;
118 i.method = "MQTT";
119 i.alpn = "mqtt";
120 i.port = 1883;
121
122 if (do_ssl) {
123 i.ssl_connection = LCCSCF_USE_SSL;
124 i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
125 i.port = 8883;
126 }
127
128 if (!lws_client_connect_via_info(&i)) {
129 lwsl_err("%s: Client Connect Failed\n", __func__);
130
131 return 1;
132 }
133
134 return 0;
135 }
136
137 static int
system_notify_cb(lws_state_manager_t * mgr,lws_state_notify_link_t * link,int current,int target)138 system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
139 int current, int target)
140 {
141 struct lws_context *context = mgr->parent;
142
143 if (current != LWS_SYSTATE_OPERATIONAL ||
144 target != LWS_SYSTATE_OPERATIONAL)
145 return 0;
146
147 /*
148 * We delay trying to do the client connection until
149 * the protocols have been initialized for each
150 * vhost... this happens after we have network and
151 * time so we can judge tls cert validity.
152 */
153
154 if (connect_client(context))
155 interrupted = 1;
156
157 return 0;
158 }
159
160
161 static int
callback_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)162 callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
163 void *user, void *in, size_t len)
164 {
165 struct pss *pss = (struct pss *)user;
166 lws_mqtt_publish_param_t *pub;
167 size_t chunk;
168
169 switch (reason) {
170 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
171 lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
172 in ? (char *)in : "(null)");
173 interrupted = 1;
174 break;
175
176 case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
177 lwsl_user("%s: CLIENT_CLOSED\n", __func__);
178 interrupted = 1;
179 break;
180
181 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
182 lwsl_user("%s: MQTT_CLIENT_ESTABLISHED\n", __func__);
183 lws_callback_on_writable(wsi);
184
185 return 0;
186
187 case LWS_CALLBACK_MQTT_SUBSCRIBED:
188 lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
189 lws_callback_on_writable(wsi);
190 break;
191
192 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
193 /*
194 * Extra WRITEABLE may appear here other than ones we asked
195 * for, so we must consult our own state to decide if we want
196 * to make use of the opportunity
197 */
198
199 switch (pss->state) {
200 case STATE_SUBSCRIBE:
201 lwsl_user("%s: WRITEABLE: Subscribing\n", __func__);
202
203 if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
204 lwsl_notice("%s: subscribe failed\n", __func__);
205
206 return -1;
207 }
208 pss->state++;
209 break;
210
211 case STATE_PUBLISH_QOS0:
212 case STATE_PUBLISH_QOS1:
213
214 lwsl_user("%s: WRITEABLE: Publish\n", __func__);
215
216 pub_param.topic = "test/topic";
217 pub_param.topic_len = (uint16_t)strlen(pub_param.topic);
218 pub_param.qos = pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
219 pub_param.payload_len = TEST_STRING_LEN;
220
221 /* We send the message out 300 bytes or less at at time */
222
223 chunk = 300;
224
225 if (chunk > TEST_STRING_LEN - pss->pos)
226 chunk = TEST_STRING_LEN - pss->pos;
227
228 if (lws_mqtt_client_send_publish(wsi, &pub_param,
229 test_string + pss->pos, (uint32_t)chunk,
230 (pss->pos + chunk == TEST_STRING_LEN)))
231 return -1;
232
233 pss->pos += chunk;
234
235 if (pss->pos == TEST_STRING_LEN) {
236 pss->pos = 0;
237 pss->state++;
238 }
239 break;
240
241 default:
242 break;
243 }
244
245 return 0;
246
247 case LWS_CALLBACK_MQTT_ACK:
248 lwsl_user("%s: MQTT_ACK\n", __func__);
249 /*
250 * We can forget about the message we just sent, it's done.
251 *
252 * For our test, that's the indication we can close the wsi.
253 */
254
255 pss->state++;
256 if (pss->state != STATE_TEST_FINISH) {
257 lws_callback_on_writable(wsi);
258 break;
259 }
260
261 /* Oh we are done then */
262
263 bad = 0;
264 interrupted = 1;
265 lws_cancel_service(lws_get_context(wsi));
266 break;
267
268 case LWS_CALLBACK_MQTT_RESEND:
269 lwsl_user("%s: MQTT_RESEND\n", __func__);
270 /*
271 * We must resend the packet ID mentioned in len
272 */
273 if (++pss->retries == 3) {
274 interrupted = 1;
275 break;
276 }
277 pss->state--;
278 pss->pos = 0;
279 break;
280
281 case LWS_CALLBACK_MQTT_CLIENT_RX:
282 lwsl_user("%s: MQTT_CLIENT_RX\n", __func__);
283
284 pub = (lws_mqtt_publish_param_t *)in;
285 assert(pub);
286
287 lwsl_hexdump_notice(pub->topic, pub->topic_len);
288 lwsl_hexdump_notice(pub->payload, pub->payload_len);
289
290 return 0;
291
292 default:
293 break;
294 }
295
296 return 0;
297 }
298
299 static const struct lws_protocols protocols[] = {
300 {
301 .name = "mqtt",
302 .callback = callback_mqtt,
303 .per_session_data_size = sizeof(struct pss)
304 },
305 LWS_PROTOCOL_LIST_TERM
306 };
307
main(int argc,const char ** argv)308 int main(int argc, const char **argv)
309 {
310 lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
311 system_notify_cb, "app" };
312 lws_state_notify_link_t *na[] = { ¬ifier, NULL };
313 struct lws_context_creation_info info;
314 struct lws_context *context;
315 int n = 0;
316
317 signal(SIGINT, sigint_handler);
318 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
319 lws_cmdline_option_handle_builtin(argc, argv, &info);
320
321 do_ssl = !!lws_cmdline_option(argc, argv, "-s");
322 if (do_ssl)
323 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
324
325 lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
326 do_ssl ? "tls enabled": "unencrypted");
327
328 info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
329 info.protocols = protocols;
330 info.register_notifier_list = na;
331 info.fd_limit_per_thread = 1 + 1 + 1;
332 info.retry_and_idle_policy = &retry;
333
334 #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
335 /*
336 * OpenSSL uses the system trust store. mbedTLS has to be told which
337 * CA to trust explicitly.
338 */
339 info.client_ssl_ca_filepath = "./mosq-ca.crt";
340 #endif
341
342 context = lws_create_context(&info);
343 if (!context) {
344 lwsl_err("lws init failed\n");
345 return 1;
346 }
347
348 /* Event loop */
349 while (n >= 0 && !interrupted)
350 n = lws_service(context, 0);
351
352 lwsl_user("Completed: %s\n", bad ? "failed" : "OK");
353 lws_context_destroy(context);
354
355 return bad;
356 }
357