• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws-minimal-mqtt-client
3  *
4  * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5  *                         Sakthi Kannan <saktr@amazon.com>
6  *
7  * This file is made available under the Creative Commons CC0 1.0
8  * Universal Public Domain Dedication.
9  */
10 
11 #include <libwebsockets.h>
12 #include <string.h>
13 #include <signal.h>
14 #if defined(WIN32)
15 #define HAVE_STRUCT_TIMESPEC
16 #if defined(pid_t)
17 #undef pid_t
18 #endif
19 #endif
20 #include <pthread.h>
21 #include <assert.h>
22 
23 enum {
24 	STATE_SUBSCRIBE,	/* subscribe to the topic */
25 	STATE_PUBLISH_QOS0,	/* Send the message in QoS0 */
26 	STATE_WAIT_ACK0,	/* Wait for the synthetic "ack" */
27 	STATE_PUBLISH_QOS1,	/* Send the message in QoS1 */
28 	STATE_WAIT_ACK1,	/* Wait for the real ack (or timeout + retry) */
29 
30 	STATE_TEST_FINISH
31 };
32 
33 static int interrupted, bad = 1, do_ssl;
34 
35 static const lws_retry_bo_t retry = {
36 	.secs_since_valid_ping		= 20, /* if idle, PINGREQ after secs */
37 	.secs_since_valid_hangup	= 25, /* hangup if still idle secs */
38 };
39 
40 static const lws_mqtt_client_connect_param_t client_connect_param = {
41 	.client_id			= "lwsMqttClient",
42 	.keep_alive			= 60,
43 	.clean_start			= 1,
44 	.client_id_nofree		= 1,
45 	.username_nofree		= 1,
46 	.password_nofree		= 1,
47 	.will_param = {
48 		.topic			= "good/bye",
49 		.message		= "sign-off",
50 		.qos			= 0,
51 		.retain			= 0,
52 	},
53 	.username			= "lwsUser",
54 	.password			= "mySecretPassword",
55 };
56 
57 static lws_mqtt_publish_param_t pub_param;
58 
59 static lws_mqtt_topic_elem_t topics[] = {
60 	[0] = { .name = "test/topic0", .qos = QOS0 },
61 	[1] = { .name = "test/topic1", .qos = QOS1 },
62 };
63 
64 static lws_mqtt_subscribe_param_t sub_param = {
65 	.topic				= &topics[0],
66 	.num_topics			= LWS_ARRAY_SIZE(topics),
67 };
68 
69 static const char * const test_string =
70 	"No one would have believed in the last years of the nineteenth "
71 	"century that this world was being watched keenly and closely by "
72 	"intelligences greater than man's and yet as mortal as his own; that as "
73 	"men busied themselves about their various concerns they were "
74 	"scrutinised and studied, perhaps almost as narrowly as a man with a "
75 	"microscope might scrutinise the transient creatures that swarm and "
76 	"multiply in a drop of water.  With infinite complacency men went to "
77 	"and fro over this globe about their little affairs, serene in their "
78 	"assurance of their empire over matter. It is possible that the "
79 	"infusoria under the microscope do the same.  No one gave a thought to "
80 	"the older worlds of space as sources of human danger, or thought of "
81 	"them only to dismiss the idea of life upon them as impossible or "
82 	"improbable.  It is curious to recall some of the mental habits of "
83 	"those departed days.  At most terrestrial men fancied there might be "
84 	"other men upon Mars, perhaps inferior to themselves and ready to "
85 	"welcome a missionary enterprise. Yet across the gulf of space, minds "
86 	"that are to our minds as ours are to those of the beasts that perish, "
87 	"intellects vast and cool and unsympathetic, regarded this earth with "
88 	"envious eyes, and slowly and surely drew their plans against us.  And "
89 	"early in the twentieth century came the great disillusionment. ";
90 
91 /* this reflects the length of the string above */
92 #define TEST_STRING_LEN 1337
93 
94 struct pss {
95 	int		state;
96 	size_t		pos;
97 	int		retries;
98 };
99 
100 static void
sigint_handler(int sig)101 sigint_handler(int sig)
102 {
103 	interrupted = 1;
104 }
105 
106 static int
connect_client(struct lws_context * context)107 connect_client(struct lws_context *context)
108 {
109 	struct lws_client_connect_info i;
110 
111 	memset(&i, 0, sizeof i);
112 
113 	i.mqtt_cp = &client_connect_param;
114 	i.address = "localhost";
115 	i.host = "localhost";
116 	i.protocol = "mqtt";
117 	i.context = context;
118 	i.method = "MQTT";
119 	i.alpn = "mqtt";
120 	i.port = 1883;
121 
122 	if (do_ssl) {
123 		i.ssl_connection = LCCSCF_USE_SSL;
124 		i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
125 		i.port = 8883;
126 	}
127 
128 	if (!lws_client_connect_via_info(&i)) {
129 		lwsl_err("%s: Client Connect Failed\n", __func__);
130 
131 		return 1;
132 	}
133 
134 	return 0;
135 }
136 
137 static int
system_notify_cb(lws_state_manager_t * mgr,lws_state_notify_link_t * link,int current,int target)138 system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
139 		 int current, int target)
140 {
141 	struct lws_context *context = mgr->parent;
142 
143 	if (current != LWS_SYSTATE_OPERATIONAL ||
144 	    target != LWS_SYSTATE_OPERATIONAL)
145 		return 0;
146 
147 	/*
148 	* We delay trying to do the client connection until
149 	* the protocols have been initialized for each
150 	* vhost... this happens after we have network and
151 	* time so we can judge tls cert validity.
152 	*/
153 
154 	if (connect_client(context))
155 		interrupted = 1;
156 
157 	return 0;
158  }
159 
160 
161 static int
callback_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)162 callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
163 	      void *user, void *in, size_t len)
164 {
165 	struct pss *pss = (struct pss *)user;
166 	lws_mqtt_publish_param_t *pub;
167 	size_t chunk;
168 
169 	switch (reason) {
170 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
171 		lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
172 			 in ? (char *)in : "(null)");
173 		interrupted = 1;
174 		break;
175 
176 	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
177 		lwsl_user("%s: CLIENT_CLOSED\n", __func__);
178 		interrupted = 1;
179 		break;
180 
181 	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
182 		lwsl_user("%s: MQTT_CLIENT_ESTABLISHED\n", __func__);
183 		lws_callback_on_writable(wsi);
184 
185 		return 0;
186 
187 	case LWS_CALLBACK_MQTT_SUBSCRIBED:
188 		lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
189 		lws_callback_on_writable(wsi);
190 		break;
191 
192 	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
193 		/*
194 		 * Extra WRITEABLE may appear here other than ones we asked
195 		 * for, so we must consult our own state to decide if we want
196 		 * to make use of the opportunity
197 		 */
198 
199 		switch (pss->state) {
200 		case STATE_SUBSCRIBE:
201 			lwsl_user("%s: WRITEABLE: Subscribing\n", __func__);
202 
203 			if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
204 				lwsl_notice("%s: subscribe failed\n", __func__);
205 
206 				return -1;
207 			}
208 			pss->state++;
209 			break;
210 
211 		case STATE_PUBLISH_QOS0:
212 		case STATE_PUBLISH_QOS1:
213 
214 			lwsl_user("%s: WRITEABLE: Publish\n", __func__);
215 
216 			pub_param.topic	= "test/topic";
217 			pub_param.topic_len = (uint16_t)strlen(pub_param.topic);
218 			pub_param.qos = pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
219 			pub_param.payload_len = TEST_STRING_LEN;
220 
221 			/* We send the message out 300 bytes or less at at time */
222 
223 			chunk = 300;
224 
225 			if (chunk > TEST_STRING_LEN - pss->pos)
226 				chunk = TEST_STRING_LEN - pss->pos;
227 
228 			if (lws_mqtt_client_send_publish(wsi, &pub_param,
229 					test_string + pss->pos, (uint32_t)chunk,
230 					(pss->pos + chunk == TEST_STRING_LEN)))
231 				return -1;
232 
233 			pss->pos += chunk;
234 
235 			if (pss->pos == TEST_STRING_LEN) {
236 				pss->pos = 0;
237 				pss->state++;
238 			}
239 			break;
240 
241 		default:
242 			break;
243 		}
244 
245 		return 0;
246 
247 	case LWS_CALLBACK_MQTT_ACK:
248 		lwsl_user("%s: MQTT_ACK\n", __func__);
249 		/*
250 		 * We can forget about the message we just sent, it's done.
251 		 *
252 		 * For our test, that's the indication we can close the wsi.
253 		 */
254 
255 		pss->state++;
256 		if (pss->state != STATE_TEST_FINISH) {
257 			lws_callback_on_writable(wsi);
258 			break;
259 		}
260 
261 		/* Oh we are done then */
262 
263 		bad = 0;
264 		interrupted = 1;
265 		lws_cancel_service(lws_get_context(wsi));
266 		break;
267 
268 	case LWS_CALLBACK_MQTT_RESEND:
269 		lwsl_user("%s: MQTT_RESEND\n", __func__);
270 		/*
271 		 * We must resend the packet ID mentioned in len
272 		 */
273 		if (++pss->retries == 3) {
274 			interrupted = 1;
275 			break;
276 		}
277 		pss->state--;
278 		pss->pos = 0;
279 		break;
280 
281 	case LWS_CALLBACK_MQTT_CLIENT_RX:
282 		lwsl_user("%s: MQTT_CLIENT_RX\n", __func__);
283 
284 		pub = (lws_mqtt_publish_param_t *)in;
285 		assert(pub);
286 
287 		lwsl_hexdump_notice(pub->topic, pub->topic_len);
288 		lwsl_hexdump_notice(pub->payload, pub->payload_len);
289 
290 		return 0;
291 
292 	default:
293 		break;
294 	}
295 
296 	return 0;
297 }
298 
299 static const struct lws_protocols protocols[] = {
300 	{
301 		.name			= "mqtt",
302 		.callback		= callback_mqtt,
303 		.per_session_data_size	= sizeof(struct pss)
304 	},
305 	LWS_PROTOCOL_LIST_TERM
306 };
307 
main(int argc,const char ** argv)308 int main(int argc, const char **argv)
309 {
310 	lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
311 					     system_notify_cb, "app" };
312 	lws_state_notify_link_t *na[] = { &notifier, NULL };
313 	struct lws_context_creation_info info;
314 	struct lws_context *context;
315 	int n = 0;
316 
317 	signal(SIGINT, sigint_handler);
318 	memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
319 	lws_cmdline_option_handle_builtin(argc, argv, &info);
320 
321 	do_ssl = !!lws_cmdline_option(argc, argv, "-s");
322 	if (do_ssl)
323 		info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
324 
325 	lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
326 			do_ssl ? "tls enabled": "unencrypted");
327 
328 	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
329 	info.protocols = protocols;
330 	info.register_notifier_list = na;
331 	info.fd_limit_per_thread = 1 + 1 + 1;
332 	info.retry_and_idle_policy = &retry;
333 
334 #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
335 	/*
336 	 * OpenSSL uses the system trust store.  mbedTLS has to be told which
337 	 * CA to trust explicitly.
338 	 */
339 	info.client_ssl_ca_filepath = "./mosq-ca.crt";
340 #endif
341 
342 	context = lws_create_context(&info);
343 	if (!context) {
344 		lwsl_err("lws init failed\n");
345 		return 1;
346 	}
347 
348 	/* Event loop */
349 	while (n >= 0 && !interrupted)
350 		n = lws_service(context, 0);
351 
352 	lwsl_user("Completed: %s\n", bad ? "failed" : "OK");
353 	lws_context_destroy(context);
354 
355 	return bad;
356 }
357