• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #include "private-lib-core.h"
26 
27 static int
rops_handle_POLLIN_mqtt(struct lws_context_per_thread * pt,struct lws * wsi,struct lws_pollfd * pollfd)28 rops_handle_POLLIN_mqtt(struct lws_context_per_thread *pt, struct lws *wsi,
29 			   struct lws_pollfd *pollfd)
30 {
31 	unsigned int pending = 0;
32 	struct lws_tokens ebuf;
33 	int n = 0;
34 	char buffered = 0;
35 
36 	lwsl_debug("%s: wsistate 0x%x, %s pollout %d\n", __func__,
37 		   (unsigned int)wsi->wsistate,  wsi->a.protocol->name,
38 		   pollfd->revents);
39 
40 	/*
41 	 * After the CONNACK and nwsi establishment, the first logical
42 	 * stream is migrated out of the nwsi to be child sid 1, and the
43 	 * nwsi no longer has a wsi->mqtt of its own.
44 	 *
45 	 * RX events on the nwsi must be converted to events seen or not
46 	 * seen by one or more child streams.
47 	 *
48 	 * SUBACK - reflected to child stream that asked for it
49 	 * PUBACK - routed to child that did the related publish
50 	 */
51 
52 	ebuf.token = NULL;
53 	ebuf.len = 0;
54 
55 	if (lwsi_state(wsi) != LRS_ESTABLISHED) {
56 #if defined(LWS_WITH_CLIENT)
57 
58 		if (lwsi_state(wsi) == LRS_WAITING_SSL &&
59 		    ((pollfd->revents & LWS_POLLOUT)) &&
60 		    lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
61 			lwsl_info("failed at set pollfd\n");
62 			return LWS_HPI_RET_PLEASE_CLOSE_ME;
63 		}
64 
65 		if ((pollfd->revents & LWS_POLLOUT) &&
66 		    lws_handle_POLLOUT_event(wsi, pollfd)) {
67 			lwsl_debug("POLLOUT event closed it\n");
68 			return LWS_HPI_RET_PLEASE_CLOSE_ME;
69 		}
70 
71 		n = lws_mqtt_client_socket_service(wsi, pollfd, NULL);
72 		if (n)
73 			return LWS_HPI_RET_WSI_ALREADY_DIED;
74 #endif
75 		return LWS_HPI_RET_HANDLED;
76 	}
77 
78 	/* 1: something requested a callback when it was OK to write */
79 
80 	if ((pollfd->revents & LWS_POLLOUT) &&
81 	    lwsi_state_can_handle_POLLOUT(wsi) &&
82 	    lws_handle_POLLOUT_event(wsi, pollfd)) {
83 		if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
84 			lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE);
85 
86 		return LWS_HPI_RET_PLEASE_CLOSE_ME;
87 	}
88 
89 	/* 3: buflist needs to be drained
90 	 */
91 read:
92 	// lws_buflist_describe(&wsi->buflist, wsi, __func__);
93 	ebuf.len = (int)lws_buflist_next_segment_len(&wsi->buflist, &ebuf.token);
94 	if (ebuf.len) {
95 		lwsl_info("draining buflist (len %d)\n", ebuf.len);
96 		buffered = 1;
97 		goto drain;
98 	}
99 
100 	if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
101 		return LWS_HPI_RET_HANDLED;
102 
103 	/* if (lws_is_flowcontrolled(wsi)) { */
104 	/*	lwsl_info("%s: %p should be rxflow (bm 0x%x)..\n", */
105 	/*		    __func__, wsi, wsi->rxflow_bitmap); */
106 	/*	return LWS_HPI_RET_HANDLED; */
107 	/* } */
108 
109 	if (!(lwsi_role_client(wsi) && lwsi_state(wsi) != LRS_ESTABLISHED)) {
110 		/*
111 		 * In case we are going to react to this rx by scheduling
112 		 * writes, we need to restrict the amount of rx to the size
113 		 * the protocol reported for rx buffer.
114 		 *
115 		 * Otherwise we get a situation we have to absorb possibly a
116 		 * lot of reads before we get a chance to drain them by writing
117 		 * them, eg, with echo type tests in autobahn.
118 		 */
119 
120 		buffered = 0;
121 		ebuf.token = pt->serv_buf;
122 		ebuf.len = (int)wsi->a.context->pt_serv_buf_size;
123 
124 		if ((unsigned int)ebuf.len > wsi->a.context->pt_serv_buf_size)
125 			ebuf.len = (int)wsi->a.context->pt_serv_buf_size;
126 
127 		if ((int)pending > ebuf.len)
128 			pending = (unsigned int)ebuf.len;
129 
130 		ebuf.len = lws_ssl_capable_read(wsi, ebuf.token,
131 						pending ? pending :
132 						(unsigned int)ebuf.len);
133 		switch (ebuf.len) {
134 		case 0:
135 			lwsl_info("%s: zero length read\n",
136 				  __func__);
137 			return LWS_HPI_RET_PLEASE_CLOSE_ME;
138 		case LWS_SSL_CAPABLE_MORE_SERVICE:
139 			lwsl_info("SSL Capable more service\n");
140 			return LWS_HPI_RET_HANDLED;
141 		case LWS_SSL_CAPABLE_ERROR:
142 			lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n",
143 					__func__);
144 			return LWS_HPI_RET_PLEASE_CLOSE_ME;
145 		}
146 
147 		/*
148 		 * coverity thinks ssl_capable_read() may read over
149 		 * 2GB.  Dissuade it...
150 		 */
151 		ebuf.len &= 0x7fffffff;
152 	}
153 
154 drain:
155 	/* service incoming data */
156 	//lws_buflist_describe(&wsi->buflist, wsi, __func__);
157 	if (ebuf.len) {
158 		n = lws_read_mqtt(wsi, ebuf.token, (unsigned int)ebuf.len);
159 		if (n < 0) {
160 			lwsl_notice("%s: lws_read_mqtt returned %d\n",
161 					__func__, n);
162 			/* we closed wsi */
163 			goto fail;
164                 }
165 		// lws_buflist_describe(&wsi->buflist, wsi, __func__);
166 		lwsl_debug("%s: consuming %d / %d\n", __func__, n, ebuf.len);
167 		if (lws_buflist_aware_finished_consuming(wsi, &ebuf, ebuf.len,
168 							 buffered, __func__))
169 			return LWS_HPI_RET_PLEASE_CLOSE_ME;
170 	}
171 
172 	ebuf.token = NULL;
173 	ebuf.len = 0;
174 
175 	pending = (unsigned int)lws_ssl_pending(wsi);
176 	if (pending) {
177 		pending = pending > wsi->a.context->pt_serv_buf_size ?
178 			wsi->a.context->pt_serv_buf_size : pending;
179 		goto read;
180 	}
181 
182 	if (buffered && /* were draining, now nothing left */
183 	    !lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
184 		lwsl_info("%s: %s flow buf: drained\n", __func__, lws_wsi_tag(wsi));
185 		/* having drained the rxflow buffer, can rearm POLLIN */
186 #if !defined(LWS_WITH_SERVER)
187 		n =
188 #endif
189 		__lws_rx_flow_control(wsi);
190 		/* n ignored, needed for NO_SERVER case */
191 	}
192 
193 	/* n = 0 */
194 	return LWS_HPI_RET_HANDLED;
195 
196 fail:
197 	lwsl_err("%s: Failed, bailing\n", __func__);
198 	lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "mqtt svc fail");
199 
200 	return LWS_HPI_RET_WSI_ALREADY_DIED;
201 }
202 
203 #if 0 /* defined(LWS_WITH_SERVER) */
204 
205 static int
206 rops_adoption_bind_mqtt(struct lws *wsi, int type, const char *vh_prot_name)
207 {
208 	/* no http but socket... must be mqtt */
209 	if ((type & LWS_ADOPT_HTTP) || !(type & LWS_ADOPT_SOCKET) ||
210 	    (type & _LWS_ADOPT_FINISH))
211 		return 0; /* no match */
212 
213 	lws_role_transition(wsi, 0, (type & LWS_ADOPT_ALLOW_SSL) ? LRS_SSL_INIT :
214 				LRS_ESTABLISHED, &role_ops_mqtt);
215 
216 	if (vh_prot_name)
217 		lws_bind_protocol(wsi, wsi->a.protocol, __func__);
218 	else
219 		/* this is the only time he will transition */
220 		lws_bind_protocol(wsi,
221 			&wsi->a.vhost->protocols[wsi->a.vhost->mqtt_protocol_index],
222 			__func__);
223 
224 	return 1; /* bound */
225 }
226 #endif
227 
228 static int
rops_client_bind_mqtt(struct lws * wsi,const struct lws_client_connect_info * i)229 rops_client_bind_mqtt(struct lws *wsi, const struct lws_client_connect_info *i)
230 {
231 	lwsl_debug("%s: i = %p\n", __func__, i);
232 	if (!i) {
233 
234 		/* finalize */
235 
236 		if (!wsi->user_space && wsi->stash->cis[CIS_METHOD])
237 			if (lws_ensure_user_space(wsi))
238 				return 1;
239 
240 		if (!wsi->stash->cis[CIS_METHOD] && !wsi->stash->cis[CIS_ALPN])
241 			wsi->stash->cis[CIS_ALPN] = "x-amzn-mqtt-ca";
242 
243 		/* if we went on the ah waiting list, it's ok, we can
244 		 * wait.
245 		 *
246 		 * When we do get the ah, now or later, he will end up
247 		 * at lws_http_client_connect_via_info2().
248 		 */
249 #if defined(LWS_WITH_CLIENT)
250 		if (lws_header_table_attach(wsi, 0) < 0)
251 			/*
252 			 * if we failed here, the connection is already closed
253 			 * and freed.
254 			 */
255 			return -1;
256 #else
257 		if (lws_header_table_attach(wsi, 0))
258 			return 0;
259 #endif
260 		return 0;
261 	}
262 
263 	/* if a recognized mqtt method, bind to it */
264 	if (strcmp(i->method, "MQTT"))
265 		return 0; /* no match */
266 
267 	if (lws_create_client_mqtt_object(i, wsi))
268 		return 1;
269 
270 	lws_role_transition(wsi, LWSIFR_CLIENT, LRS_UNCONNECTED,
271 				&role_ops_mqtt);
272 	return 1; /* matched */
273 }
274 
275 static int
rops_handle_POLLOUT_mqtt(struct lws * wsi)276 rops_handle_POLLOUT_mqtt(struct lws *wsi)
277 {
278 	struct lws **wsi2;
279 
280 	lwsl_debug("%s\n", __func__);
281 
282 #if defined(LWS_WITH_CLIENT)
283 	if (wsi->mqtt && wsi->mqtt->send_pingreq && !wsi->mqtt->inside_payload) {
284 		uint8_t buf[LWS_PRE + 2];
285 
286 		/*
287 		 * We are swallowing this POLLOUT in order to send a PINGREQ
288 		 * autonomously
289 		 */
290 
291 		wsi->mqtt->send_pingreq = 0;
292 
293 		lwsl_notice("%s: issuing PINGREQ\n", __func__);
294 
295 		buf[LWS_PRE] = LMQCP_CTOS_PINGREQ << 4;
296 		buf[LWS_PRE + 1] = 0;
297 
298 		if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 2,
299 			      LWS_WRITE_BINARY) != 2)
300 			return LWS_HP_RET_BAIL_DIE;
301 
302 		return LWS_HP_RET_BAIL_OK;
303 	}
304 #endif
305 	if (wsi->mqtt && !wsi->mqtt->inside_payload &&
306 	    (wsi->mqtt->send_pubrec || wsi->mqtt->send_pubrel ||
307 	     wsi->mqtt->send_pubcomp)) {
308 		uint8_t buf[LWS_PRE + 4];
309 		/* Remaining len = 2 */
310 		buf[LWS_PRE + 1] = 2;
311 		if (wsi->mqtt->send_pubrec) {
312 			lwsl_notice("%s: issuing PUBREC for pkt id: %d\n",
313 				    __func__, wsi->mqtt->peer_ack_pkt_id);
314 			buf[LWS_PRE] = LMQCP_PUBREC << 4 | 0x2;
315 			/* Packet ID */
316 			lws_ser_wu16be(&buf[LWS_PRE + 2],
317 				       wsi->mqtt->peer_ack_pkt_id);
318 			wsi->mqtt->send_pubrec = 0;
319 		} else if (wsi->mqtt->send_pubrel) {
320 			lwsl_notice("%s: issuing PUBREL for pkt id: %d\n",
321 				    __func__, wsi->mqtt->ack_pkt_id);
322 			buf[LWS_PRE] = LMQCP_PUBREL << 4 | 0x2;
323 			lws_ser_wu16be(&buf[LWS_PRE + 2],
324 				       wsi->mqtt->ack_pkt_id);
325 			wsi->mqtt->send_pubrel = 0;
326 		} else {
327 			lwsl_notice("%s: issuing PUBCOMP for pkt id: %d\n",
328 				    __func__, wsi->mqtt->peer_ack_pkt_id);
329 			buf[LWS_PRE] = LMQCP_PUBCOMP << 4 | 0x2;
330 			lws_ser_wu16be(&buf[LWS_PRE + 2],
331 				       wsi->mqtt->peer_ack_pkt_id);
332 			wsi->mqtt->send_pubcomp = 0;
333 		}
334 		if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
335 			      LWS_WRITE_BINARY) != 4)
336 			return LWS_HP_RET_BAIL_DIE;
337 		return LWS_HP_RET_BAIL_OK;
338 	}
339 
340 	wsi = lws_get_network_wsi(wsi);
341 
342 	wsi->mux.requested_POLLOUT = 0;
343 
344 	wsi2 = &wsi->mux.child_list;
345 	if (!*wsi2) {
346 		lwsl_debug("%s: no children\n", __func__);
347 		return LWS_HP_RET_DROP_POLLOUT;
348 	}
349 
350 	if (!wsi->mqtt)
351 		return LWS_HP_RET_BAIL_DIE;
352 
353 	lws_wsi_mux_dump_waiting_children(wsi);
354 
355 	do {
356 		struct lws *w, **wa;
357 
358 		wa = &(*wsi2)->mux.sibling_list;
359 		if (!(*wsi2)->mux.requested_POLLOUT)
360 			goto next_child;
361 
362 		if (!lwsi_state_can_handle_POLLOUT(wsi))
363 			goto next_child;
364 
365 		/*
366 		 * If the nwsi is in the middle of a frame, we can only
367 		 * continue to send that
368 		 */
369 
370 		if (wsi->mqtt->inside_payload && !(*wsi2)->mqtt->inside_payload)
371 			goto next_child;
372 
373 		/*
374 		 * we're going to do writable callback for this child.
375 		 * move him to be the last child
376 		 */
377 		w = lws_wsi_mux_move_child_to_tail(wsi2);
378 		if (!w) {
379 			wa = &wsi->mux.child_list;
380 			goto next_child;
381 		}
382 
383 		lwsl_debug("%s: child %s (wsistate 0x%x)\n", __func__,
384 			   lws_wsi_tag(w), (unsigned int)w->wsistate);
385 
386 		if (lwsi_state(wsi) == LRS_ESTABLISHED &&
387 		    !wsi->mqtt->inside_payload &&
388 		    wsi->mqtt->send_puback) {
389 			uint8_t buf[LWS_PRE + 4];
390 			lwsl_notice("%s: issuing PUBACK for pkt id: %d\n",
391 				    __func__, wsi->mqtt->ack_pkt_id);
392 
393 			/* Fixed header */
394 			buf[LWS_PRE] = LMQCP_PUBACK << 4;
395 			/* Remaining len = 2 */
396 			buf[LWS_PRE + 1] = 2;
397 			/* Packet ID */
398 			lws_ser_wu16be(&buf[LWS_PRE + 2], wsi->mqtt->peer_ack_pkt_id);
399 
400 			if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
401 				      LWS_WRITE_BINARY) != 4)
402 				return LWS_HP_RET_BAIL_DIE;
403 
404 			wsi->mqtt->send_puback = 0;
405 			w->mux.requested_POLLOUT = 1;
406 
407 			wa = &wsi->mux.child_list;
408 			goto next_child;
409 		}
410 
411 		if (lws_callback_as_writeable(w)) {
412 			lwsl_notice("%s: Closing child %s\n", __func__, lws_wsi_tag(w));
413 			lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
414 					   "mqtt pollout handle");
415 			wa = &wsi->mux.child_list;
416 		}
417 
418 next_child:
419 		wsi2 = wa;
420 	} while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi));
421 
422 	// lws_wsi_mux_dump_waiting_children(wsi);
423 
424 	if (lws_wsi_mux_action_pending_writeable_reqs(wsi))
425 		return LWS_HP_RET_BAIL_DIE;
426 
427 	return LWS_HP_RET_BAIL_OK;
428 }
429 
430 #if defined(LWS_WITH_CLIENT)
431 static int
rops_issue_keepalive_mqtt(struct lws * wsi,int isvalid)432 rops_issue_keepalive_mqtt(struct lws *wsi, int isvalid)
433 {
434 	struct lws *nwsi = lws_get_network_wsi(wsi);
435 
436 	if (isvalid) {
437 		_lws_validity_confirmed_role(nwsi);
438 
439 		return 0;
440 	}
441 
442 	nwsi->mqtt->send_pingreq = 1;
443 	lws_callback_on_writable(nwsi);
444 
445 	return 0;
446 }
447 #endif
448 
449 static int
rops_close_role_mqtt(struct lws_context_per_thread * pt,struct lws * wsi)450 rops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi)
451 {
452 	struct lws *nwsi = lws_get_network_wsi(wsi);
453 	lws_mqtt_subs_t	*s, *s1, *mysub;
454 	lws_mqttc_t *c;
455 
456 	if (!wsi->mqtt)
457 		return 0;
458 
459 	c = &wsi->mqtt->client;
460 
461 	lws_sul_cancel(&wsi->mqtt->sul_qos_puback_pubrec_wait);
462 
463 	lws_mqtt_str_free(&c->username);
464 	lws_mqtt_str_free(&c->password);
465 	lws_mqtt_str_free(&c->will.message);
466 	lws_mqtt_str_free(&c->will.topic);
467 	lws_mqtt_str_free(&c->id);
468 
469 	/* clean up any subscription allocations */
470 
471 	s = wsi->mqtt->subs_head;
472 	wsi->mqtt->subs_head = NULL;
473 	while (s) {
474 		s1 = s->next;
475 		/*
476 		 * Account for children no longer using nwsi subscription
477 		 */
478 		mysub = lws_mqtt_find_sub(nwsi->mqtt, (const char *)&s[1]);
479 //		assert(mysub); /* if child subscribed, nwsi must feel the same */
480 		if (mysub) {
481 			assert(mysub->ref_count);
482 			mysub->ref_count--;
483 		}
484 		lws_free(s);
485 		s = s1;
486 	}
487 
488 	lws_mqtt_publish_param_t *pub =
489 			(lws_mqtt_publish_param_t *)
490 				wsi->mqtt->rx_cpkt_param;
491 
492 	if (pub)
493 		lws_free_set_NULL(pub->topic);
494 
495 	lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
496 
497 	lws_free_set_NULL(wsi->mqtt);
498 
499 	return 0;
500 }
501 
502 static int
rops_callback_on_writable_mqtt(struct lws * wsi)503 rops_callback_on_writable_mqtt(struct lws *wsi)
504 {
505 #if defined(LWS_WITH_CLIENT)
506 	struct lws *network_wsi;
507 #endif
508 	int already;
509 
510 	lwsl_debug("%s: %s (wsistate 0x%x)\n", __func__, lws_wsi_tag(wsi),
511 			(unsigned int)wsi->wsistate);
512 
513 	if (wsi->mux.requested_POLLOUT
514 #if defined(LWS_WITH_CLIENT)
515 			&& !wsi->client_h2_alpn
516 #endif
517 	) {
518 		lwsl_debug("already pending writable\n");
519 		return 1;
520 	}
521 #if 0
522 	/* is this for DATA or for control messages? */
523 	if (wsi->upgraded_to_http2 && !wsi->h2.h2n->pps &&
524 	    !lws_h2_tx_cr_get(wsi)) {
525 		/*
526 		 * other side is not able to cope with us sending DATA
527 		 * anything so no matter if we have POLLOUT on our side if it's
528 		 * DATA we want to send.
529 		 *
530 		 * Delay waiting for our POLLOUT until peer indicates he has
531 		 * space for more using tx window command in http2 layer
532 		 */
533 		lwsl_notice("%s: %p: skint (%d)\n", __func__, wsi,
534 			    wsi->h2.tx_cr);
535 		wsi->h2.skint = 1;
536 		return 0;
537 	}
538 
539 	wsi->h2.skint = 0;
540 #endif
541 #if defined(LWS_WITH_CLIENT)
542 	network_wsi = lws_get_network_wsi(wsi);
543 #endif
544 	already = lws_wsi_mux_mark_parents_needing_writeable(wsi);
545 
546 	/* for network action, act only on the network wsi */
547 
548 	if (already
549 #if defined(LWS_WITH_CLIENT)
550 			&& !network_wsi->client_mux_substream
551 #endif
552 			)
553 		return 1;
554 
555 	return 0;
556 }
557 
558 static int
rops_close_kill_connection_mqtt(struct lws * wsi,enum lws_close_status reason)559 rops_close_kill_connection_mqtt(struct lws *wsi, enum lws_close_status reason)
560 {
561 	lwsl_info(" %s, his parent %s: child list %p, siblings:\n",
562 			lws_wsi_tag(wsi),
563 			lws_wsi_tag(wsi->mux.parent_wsi), wsi->mux.child_list);
564 	//lws_wsi_mux_dump_children(wsi);
565 
566 	if (wsi->mux_substream
567 #if defined(LWS_WITH_CLIENT)
568 			|| wsi->client_mux_substream
569 #endif
570 	) {
571 		lwsl_info("closing %s: parent %s: first child %p\n",
572 				lws_wsi_tag(wsi),
573 				lws_wsi_tag(wsi->mux.parent_wsi),
574 				wsi->mux.child_list);
575 
576 		if (wsi->mux.child_list && lwsl_visible(LLL_INFO)) {
577 			lwsl_info(" parent %s: closing children: list:\n", lws_wsi_tag(wsi));
578 			lws_wsi_mux_dump_children(wsi);
579 		}
580 
581 		lws_wsi_mux_close_children(wsi, (int)reason);
582 	}
583 
584 	if ((
585 #if defined(LWS_WITH_CLIENT)
586 			wsi->client_mux_substream ||
587 #endif
588 			wsi->mux_substream) &&
589 	     wsi->mux.parent_wsi) {
590 		lws_wsi_mux_sibling_disconnect(wsi);
591 	}
592 
593 	return 0;
594 }
595 
596 static const lws_rops_t rops_table_mqtt[] = {
597 	/*  1 */ { .handle_POLLIN	  = rops_handle_POLLIN_mqtt },
598 	/*  2 */ { .handle_POLLOUT	  = rops_handle_POLLOUT_mqtt },
599 	/*  3 */ { .callback_on_writable  = rops_callback_on_writable_mqtt },
600 	/*  4 */ { .close_role		  = rops_close_role_mqtt },
601 	/*  5 */ { .close_kill_connection = rops_close_kill_connection_mqtt },
602 #if defined(LWS_WITH_CLIENT)
603 	/*  6 */ { .client_bind		  = rops_client_bind_mqtt },
604 	/*  7 */ { .issue_keepalive	  = rops_issue_keepalive_mqtt },
605 #endif
606 };
607 
608 struct lws_role_ops role_ops_mqtt = {
609 	/* role name */			"mqtt",
610 	/* alpn id */			"x-amzn-mqtt-ca", /* "mqtt/3.1.1" */
611 
612 	/* rops_table */		rops_table_mqtt,
613 	/* rops_idx */			{
614 	  /* LWS_ROPS_check_upgrades */
615 	  /* LWS_ROPS_pt_init_destroy */		0x00,
616 	  /* LWS_ROPS_init_vhost */
617 	  /* LWS_ROPS_destroy_vhost */			0x00,
618 	  /* LWS_ROPS_service_flag_pending */
619 	  /* LWS_ROPS_handle_POLLIN */			0x01,
620 	  /* LWS_ROPS_handle_POLLOUT */
621 	  /* LWS_ROPS_perform_user_POLLOUT */		0x20,
622 	  /* LWS_ROPS_callback_on_writable */
623 	  /* LWS_ROPS_tx_credit */			0x30,
624 	  /* LWS_ROPS_write_role_protocol */
625 	  /* LWS_ROPS_encapsulation_parent */		0x00,
626 	  /* LWS_ROPS_alpn_negotiated */
627 	  /* LWS_ROPS_close_via_role_protocol */	0x00,
628 	  /* LWS_ROPS_close_role */
629 	  /* LWS_ROPS_close_kill_connection */		0x45,
630 	  /* LWS_ROPS_destroy_role */
631 	  /* LWS_ROPS_adoption_bind */			0x00,
632 
633 	  /* LWS_ROPS_client_bind */
634 #if defined(LWS_WITH_CLIENT)
635 	  /* LWS_ROPS_issue_keepalive */		0x67,
636 #else
637 	  /* LWS_ROPS_issue_keepalive */		0x00,
638 #endif
639 					},
640 
641 	.adoption_cb =			{ LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED,
642 					  LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED },
643 	.rx_cb =			{ LWS_CALLBACK_MQTT_CLIENT_RX,
644 					  LWS_CALLBACK_MQTT_CLIENT_RX },
645 	.writeable_cb =			{ LWS_CALLBACK_MQTT_CLIENT_WRITEABLE,
646 					  LWS_CALLBACK_MQTT_CLIENT_WRITEABLE },
647 	.close_cb =			{ LWS_CALLBACK_MQTT_CLIENT_CLOSED,
648 					  LWS_CALLBACK_MQTT_CLIENT_CLOSED },
649 	.protocol_bind_cb =		{ LWS_CALLBACK_MQTT_IDLE,
650 					  LWS_CALLBACK_MQTT_IDLE },
651 	.protocol_unbind_cb =		{ LWS_CALLBACK_MQTT_DROP_PROTOCOL,
652 					  LWS_CALLBACK_MQTT_DROP_PROTOCOL },
653 	.file_handle =			0,
654 };
655