• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  *
24  * MQTT v5
25  *
26  * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
27  *
28  * Control Packet structure
29  *
30  *  - Always:           2+ byte:  Fixed Hdr
31  *  - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
32  *  - Required in some: variable: Payload
33  *
34  * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
35  *
36  *  - Client Identifier
37  *  - Will Properties
38  *  - Will Topic
39  *  - Will Payload
40  *  - User Name
41  *  - Password
42  */
43 
44 #include "private-lib-core.h"
45 #include <string.h>
46 #include <sys/types.h>
47 #include <assert.h>
48 
49 typedef enum {
50 	LMQPRS_AWAITING_CONNECT,
51 
52 } lws_mqtt_protocol_server_connstate_t;
53 
54 const char * const reason_names_g1[] = {
55 	"Success / Normal disconnection / QoS0",
56 	"QoS1",
57 	"QoS2",
58 	"Disconnect Will",
59 	"No matching subscriber",
60 	"No subscription existed",
61 	"Continue authentication",
62 	"Re-authenticate"
63 };
64 
65 const char * const reason_names_g2[] = {
66 	"Unspecified error",
67 	"Malformed packet",
68 	"Protocol error",
69 	"Implementation specific error",
70 	"Unsupported protocol",
71 	"Client ID invalid",
72 	"Bad credentials",
73 	"Not Authorized",
74 	"Server Unavailable",
75 	"Server Busy",
76 	"Banned",
77 	"Server Shutting Down",
78 	"Bad Authentication Method",
79 	"Keepalive Timeout",
80 	"Session taken over",
81 	"Topic Filter Invalid",
82 	"Packet ID in use",
83 	"Packet ID not found",
84 	"Max RX Exceeded",
85 	"Topic Alias Invalid",
86 	"Packet too large",
87 	"Ratelimit",
88 	"Quota Exceeded",
89 	"Administrative Action",
90 	"Payload format invalid",
91 	"Retain not supported",
92 	"QoS not supported",
93 	"Use another server",
94 	"Server Moved",
95 	"Shared subscriptions not supported",
96 	"Connection rate exceeded",
97 	"Maximum Connect Time",
98 	"Subscription IDs not supported",
99 	"Wildcard subscriptions not supported"
100 };
101 
102 #define LMQCP_WILL_PROPERTIES 0
103 
104 /* For each property, a bitmap describing which commands it is valid for */
105 
106 static const uint16_t property_valid[] = {
107 	[LMQPROP_PAYLOAD_FORMAT_INDICATOR]	= (1 << LMQCP_PUBLISH) |
108 						  (1 << LMQCP_WILL_PROPERTIES),
109 	[LMQPROP_MESSAGE_EXPIRY_INTERVAL]	= (1 << LMQCP_PUBLISH) |
110 						  (1 << LMQCP_WILL_PROPERTIES),
111 	[LMQPROP_CONTENT_TYPE]			= (1 << LMQCP_PUBLISH) |
112 						  (1 << LMQCP_WILL_PROPERTIES),
113 	[LMQPROP_RESPONSE_TOPIC]		= (1 << LMQCP_PUBLISH) |
114 						  (1 << LMQCP_WILL_PROPERTIES),
115 	[LMQPROP_CORRELATION_DATA]		= (1 << LMQCP_PUBLISH) |
116 						  (1 << LMQCP_WILL_PROPERTIES),
117 	[LMQPROP_SUBSCRIPTION_IDENTIFIER]	= (1 << LMQCP_PUBLISH) |
118 						  (1 << LMQCP_CTOS_SUBSCRIBE),
119 	[LMQPROP_SESSION_EXPIRY_INTERVAL]	= (1 << LMQCP_CTOS_CONNECT) |
120 						  (1 << LMQCP_STOC_CONNACK) |
121 						  (1 << LMQCP_DISCONNECT),
122 	[LMQPROP_ASSIGNED_CLIENT_IDENTIFIER]	= (1 << LMQCP_STOC_CONNACK),
123 	[LMQPROP_SERVER_KEEP_ALIVE]		= (1 << LMQCP_STOC_CONNACK),
124 	[LMQPROP_AUTHENTICATION_METHOD]		= (1 << LMQCP_CTOS_CONNECT) |
125 						  (1 << LMQCP_STOC_CONNACK) |
126 						  (1 << LMQCP_AUTH),
127 	[LMQPROP_AUTHENTICATION_DATA]		= (1 << LMQCP_CTOS_CONNECT) |
128 						  (1 << LMQCP_STOC_CONNACK) |
129 						  (1 << LMQCP_AUTH),
130 	[LMQPROP_REQUEST_PROBLEM_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
131 	[LMQPROP_WILL_DELAY_INTERVAL]		= (1 << LMQCP_WILL_PROPERTIES),
132 	[LMQPROP_REQUEST_RESPONSE_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
133 	[LMQPROP_RESPONSE_INFORMATION]		= (1 << LMQCP_STOC_CONNACK),
134 	[LMQPROP_SERVER_REFERENCE]		= (1 << LMQCP_STOC_CONNACK) |
135 						  (1 << LMQCP_DISCONNECT),
136 	[LMQPROP_REASON_STRING]			= (1 << LMQCP_STOC_CONNACK) |
137 						  (1 << LMQCP_PUBACK) |
138 						  (1 << LMQCP_PUBREC) |
139 						  (1 << LMQCP_PUBREL) |
140 						  (1 << LMQCP_PUBCOMP) |
141 						  (1 << LMQCP_STOC_SUBACK) |
142 						  (1 << LMQCP_STOC_UNSUBACK) |
143 						  (1 << LMQCP_DISCONNECT) |
144 						  (1 << LMQCP_AUTH),
145 	[LMQPROP_RECEIVE_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
146 						  (1 << LMQCP_STOC_CONNACK),
147 	[LMQPROP_TOPIC_ALIAS_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
148 						  (1 << LMQCP_STOC_CONNACK),
149 	[LMQPROP_TOPIC_ALIAS]			= (1 << LMQCP_PUBLISH),
150 	[LMQPROP_MAXIMUM_QOS]			= (1 << LMQCP_STOC_CONNACK),
151 	[LMQPROP_RETAIN_AVAILABLE]		= (1 << LMQCP_STOC_CONNACK),
152 	[LMQPROP_USER_PROPERTY]			= (1 << LMQCP_CTOS_CONNECT) |
153 						  (1 << LMQCP_STOC_CONNACK) |
154 						  (1 << LMQCP_PUBLISH) |
155 						  (1 << LMQCP_WILL_PROPERTIES) |
156 						  (1 << LMQCP_PUBACK) |
157 						  (1 << LMQCP_PUBREC) |
158 						  (1 << LMQCP_PUBREL) |
159 						  (1 << LMQCP_PUBCOMP) |
160 						  (1 << LMQCP_CTOS_SUBSCRIBE) |
161 						  (1 << LMQCP_STOC_SUBACK) |
162 						  (1 << LMQCP_CTOS_UNSUBSCRIBE) |
163 						  (1 << LMQCP_STOC_UNSUBACK) |
164 						  (1 << LMQCP_DISCONNECT) |
165 						  (1 << LMQCP_AUTH),
166 	[LMQPROP_MAXIMUM_PACKET_SIZE]		= (1 << LMQCP_CTOS_CONNECT) |
167 						  (1 << LMQCP_STOC_CONNACK),
168 	[LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
169 	[LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
170 	[LMQPROP_SHARED_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK)
171 };
172 
173 
174 /*
175  * For each command index, maps flags, id, qos and payload legality
176  * notice in most cases PUBLISH requires further processing
177  */
178 static const uint8_t map_flags[] = {
179 	[LMQCP_RESERVED]		= 0x00,
180 	[LMQCP_CTOS_CONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
181 					  LMQCP_LUT_FLAG_PAYLOAD |
182 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
183 	[LMQCP_STOC_CONNACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
184 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
185 	[LMQCP_PUBLISH]			= LMQCP_LUT_FLAG_PAYLOAD | /* option */
186 					  LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
187 	[LMQCP_PUBACK]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
188 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
189 	[LMQCP_PUBREC]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
190 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
191 	[LMQCP_PUBREL]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
192 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
193 	[LMQCP_PUBCOMP]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
194 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
195 	[LMQCP_CTOS_SUBSCRIBE]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
196 					  LMQCP_LUT_FLAG_PAYLOAD |
197 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
198 	[LMQCP_STOC_SUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
199 					  LMQCP_LUT_FLAG_PAYLOAD |
200 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
201 	[LMQCP_CTOS_UNSUBSCRIBE]	= LMQCP_LUT_FLAG_RESERVED_FLAGS |
202 					  LMQCP_LUT_FLAG_PAYLOAD |
203 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
204 	[LMQCP_STOC_UNSUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
205 					  LMQCP_LUT_FLAG_PAYLOAD |
206 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
207 	[LMQCP_CTOS_PINGREQ]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
208 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
209 	[LMQCP_STOC_PINGRESP]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
210 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
211 	[LMQCP_DISCONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
212 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
213 	[LMQCP_AUTH]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
214 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
215 };
216 
217 void
lws_mqttc_state_transition(lws_mqttc_t * c,lwsgs_mqtt_states_t s)218 lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
219 {
220 	lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
221 	c->estate = s;
222 }
223 
224 static int
lws_mqtt_pconsume(lws_mqtt_parser_t * par,int consumed)225 lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
226 {
227 	par->consumed += (unsigned int)consumed;
228 
229 	if (par->consumed > par->props_len)
230 		return -1;
231 
232 	/* more properties coming */
233 
234 	if (par->consumed < par->props_len) {
235 		par->state = LMQCPP_PROP_ID_VBI;
236 		return 0;
237 	}
238 
239 	/* properties finished: are we headed for payload or idle? */
240 
241 	if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
242 		/* A PUBLISH packet MUST NOT contain a Packet Identifier if
243 		 * its QoS value is set to 0 [MQTT-2.2.1-2]. */
244 	    (ctl_pkt_type(par) != LMQCP_PUBLISH ||
245 	     (par->packet_type_flags & 6))) {
246 		par->state = LMQCPP_PAYLOAD;
247 		return 0;
248 	}
249 
250 	par->state = LMQCPP_IDLE;
251 
252 	return 0;
253 }
254 
255 static int
lws_mqtt_set_client_established(struct lws * wsi)256 lws_mqtt_set_client_established(struct lws *wsi)
257 {
258 	lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
259 			    &role_ops_mqtt);
260 
261 	if (user_callback_handle_rxflow(wsi->a.protocol->callback,
262 					wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
263 					wsi->user_space, NULL, 0) < 0) {
264 		lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
265 
266 		return -1;
267 	}
268 	/*
269 	 * If we made a new connection and got the ACK, our connection is
270 	 * definitely working in both directions at the moment
271 	 */
272 	lws_validity_confirmed(wsi);
273 
274 	/* clear connection timeout */
275 	lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
276 
277 	return 0;
278 }
279 
280 
281 static lws_mqtt_match_topic_return_t
lws_mqtt_is_topic_matched(const char * sub,const char * pub)282 lws_mqtt_is_topic_matched(const char* sub, const char* pub)
283 {
284 	const char *ppos = pub, *spos = sub;
285 
286 	if (!ppos || !spos) {
287 		return LMMTR_TOPIC_MATCH_ERROR;
288 	}
289 
290 	while (*spos) {
291 		if (*ppos == '#' || *ppos == '+') {
292 			lwsl_err("%s: PUBLISH to wildcard "
293 				 "topic \"%s\" not supported\n",
294 				 __func__, pub);
295 			return LMMTR_TOPIC_MATCH_ERROR;
296 		}
297 		/* foo/+/bar == foo/xyz/bar ? */
298 		if (*spos == '+') {
299 			/* Skip ahead */
300 			while (*ppos != '\0' && *ppos != '/') {
301 				ppos++;
302 			}
303 		} else if (*spos == '#') {
304 			return LMMTR_TOPIC_MATCH;
305 		} else {
306 			if (*ppos == '\0') {
307 				/* foo/bar == foo/bar/# ? */
308 				if (!strncmp(spos, "/#", 2))
309 					return LMMTR_TOPIC_MATCH;
310 				return LMMTR_TOPIC_NOMATCH;
311 			/* Non-matching character */
312 			} else if (*ppos != *spos) {
313 				return LMMTR_TOPIC_NOMATCH;
314 			}
315 			ppos++;
316 		}
317 		spos++;
318 	}
319 
320 	if (*spos == '\0' && *ppos == '\0')
321 		return LMMTR_TOPIC_MATCH;
322 
323 	return LMMTR_TOPIC_NOMATCH;
324 }
325 
lws_mqtt_find_sub(struct _lws_mqtt_related * mqtt,const char * ptopic)326 lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
327 				   const char* ptopic) {
328 	lws_mqtt_subs_t *s = mqtt->subs_head;
329 
330 	while (s) {
331 		/*  SUB topic  ==   PUB topic  ? */
332 		/* foo/bar/xyz ==  foo/bar/xyz ? */
333 		if (!s->wildcard) {
334 			if (!strcmp((const char*)s->topic, ptopic))
335 				return s;
336 		} else {
337 			if (lws_mqtt_is_topic_matched(
338 			    s->topic, ptopic) == LMMTR_TOPIC_MATCH)
339 				return s;
340 		}
341 
342 		s = s->next;
343 	}
344 
345 	return NULL;
346 }
347 
348 static lws_mqtt_validate_topic_return_t
lws_mqtt_validate_topic(const char * topic,size_t topiclen,uint8_t awsiot)349 lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
350 {
351 	size_t spos = 0;
352 	const char *sub = topic;
353 	int8_t slashes = 0;
354 	lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
355 
356 	if (awsiot) {
357 		if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN)
358 			return LMVTR_FAILED_OVERSIZE;
359 		if (topic[0] == '$') {
360 			ret = LMVTR_VALID_SHADOW;
361 			slashes = -3;
362 		}
363 	} else {
364 		if (topiclen > LWS_MQTT_MAX_TOPICLEN)
365 			return LMVTR_FAILED_OVERSIZE;
366 		if (topic[0] == '$')
367 			return LMVTR_FAILED_WILDCARD_FORMAT;
368 	}
369 
370 	while (*sub != 0) {
371 		if (sub[0] == '+') {
372 			/* topic == "+foo" || "a/+foo" ? */
373 			if (spos > 0 && sub[-1] != '/')
374 				return LMVTR_FAILED_WILDCARD_FORMAT;
375 
376 			/* topic == "foo+" or "foo+/a" ? */
377 			if (sub[1] != 0 && sub[1] != '/')
378 				return LMVTR_FAILED_WILDCARD_FORMAT;
379 
380 			ret = LMVTR_VALID_WILDCARD;
381 		} else if (sub[0] == '#') {
382 			/* topic == "foo#" ? */
383 			if (spos > 0 && sub[-1] != '/')
384 				return LMVTR_FAILED_WILDCARD_FORMAT;
385 
386 			/* topic == "#foo" ? */
387 			if (sub[1] != 0)
388 				return LMVTR_FAILED_WILDCARD_FORMAT;
389 
390 			ret = LMVTR_VALID_WILDCARD;
391 		} else if (sub[0] == '/') {
392 			slashes++;
393 		}
394 		spos++;
395 		sub++;
396 	}
397 
398 	if (awsiot && (slashes < 0 || slashes > 7))
399 		return LMVTR_FAILED_SHADOW_FORMAT;
400 
401 	return ret;
402 }
403 
404 static lws_mqtt_subs_t *
lws_mqtt_create_sub(struct _lws_mqtt_related * mqtt,const char * topic)405 lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
406 {
407 	lws_mqtt_subs_t *mysub;
408 	size_t topiclen = strlen(topic);
409 	lws_mqtt_validate_topic_return_t flag;
410 
411 	flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot);
412 	switch (flag) {
413 	case LMVTR_FAILED_OVERSIZE:
414 		lwsl_err("%s: Topic is too long\n",
415 			 __func__);
416 		return NULL;
417 	case LMVTR_FAILED_SHADOW_FORMAT:
418 	case LMVTR_FAILED_WILDCARD_FORMAT:
419 		lwsl_err("%s: Invalid topic format \"%s\"\n",
420 			 __func__, topic);
421 		return NULL;
422 
423 	case LMVTR_VALID:
424 	case LMVTR_VALID_WILDCARD:
425 	case LMVTR_VALID_SHADOW:
426 		mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub");
427 		if (!mysub) {
428 			lwsl_err("%s: Error allocating mysub\n",
429 				 __func__);
430 			return NULL;
431 		}
432 		mysub->wildcard = (flag == LMVTR_VALID_WILDCARD);
433 		mysub->shadow = (flag == LMVTR_VALID_SHADOW);
434 		break;
435 
436 	default:
437 		lwsl_err("%s: Unknown flag - %d\n",
438 			 __func__, flag);
439 		return NULL;
440 	}
441 
442 	mysub->next = mqtt->subs_head;
443 	mqtt->subs_head = mysub;
444 	memcpy(mysub->topic, topic, strlen(topic) + 1);
445 	mysub->ref_count = 1;
446 
447 	lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
448 		  __func__, mysub, mqtt);
449 
450 	return mysub;
451 }
452 
453 static int
lws_mqtt_client_remove_subs(struct _lws_mqtt_related * mqtt)454 lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
455 {
456 	lws_mqtt_subs_t *s = mqtt->subs_head;
457 	lws_mqtt_subs_t *temp = NULL;
458 
459 
460 	lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
461 		  __func__, mqtt);
462 
463 	while (s && s->next) {
464 		if (s->next->ref_count == 0)
465 			break;
466 		s = s->next;
467 	}
468 
469 	if (s && s->next) {
470 		temp = s->next;
471 		lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
472 			  __func__, temp, mqtt);
473 		s->next = temp->next;
474 		lws_free(temp);
475 		return 0;
476 	}
477 	return 1;
478 }
479 
480 int
_lws_mqtt_rx_parser(struct lws * wsi,lws_mqtt_parser_t * par,const uint8_t * buf,size_t len)481 _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
482 		    const uint8_t *buf, size_t len)
483 {
484 	struct lws *w;
485 	int n;
486 
487 	if (par->flag_pending_send_reason_close)
488 		return 0;
489 
490 	/*
491 	 * Stateful, fragmentation-immune parser
492 	 *
493 	 * Notice that len can always be 1 if under attack, even over tls if
494 	 * the server is compromised or malicious.
495 	 */
496 
497 	while (len) {
498 		lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
499 		switch (par->state) {
500 		case LMQCPP_IDLE:
501 			par->packet_type_flags = *buf++;
502 			len--;
503 
504 #if defined(LWS_WITH_CLIENT)
505 			/*
506 			 * The case where we sent the connect, but we received
507 			 * something else before any CONNACK
508 			 */
509 			if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
510 			    par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
511 				lwsl_notice("%s: server sent non-CONNACK\n",
512 						__func__);
513 				goto send_protocol_error_and_close;
514 			}
515 #endif /* LWS_WITH_CLIENT */
516 
517 			n = map_flags[par->packet_type_flags >> 4];
518 			/*
519 			 *  Where a flag bit is marked as “Reserved”, it is
520 			 *  reserved for future use and MUST be set to the value
521 			 *  listed [MQTT-2.1.3-1].
522 			 */
523 			if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
524 			    ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
525 				lwsl_notice("%s: %s: bad flags, 0x%02x mask 0x%02x (len %d)\n",
526 					    __func__, lws_wsi_tag(wsi),
527 					    par->packet_type_flags, n, (int)len + 1);
528 				lwsl_hexdump_err(buf - 1, len + 1);
529 				goto send_protocol_error_and_close;
530 			}
531 
532 			lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
533 				   __func__, par->packet_type_flags >> 4,
534 				   par->packet_type_flags & 0xf);
535 
536 			/* allows us to know if a property that can only be
537 			 * given once, appears twice */
538 			memset(par->props_seen, 0, sizeof(par->props_seen));
539 			par->state = par->packet_type_flags & 0xf0;
540 			break;
541 
542 		case LMQCPP_CONNECT_PACKET:
543 			lwsl_debug("%s: received CONNECT pkt\n", __func__);
544 			par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
545 			lws_mqtt_vbi_init(&par->vbit);
546 			break;
547 
548 		case LMQCPP_CONNECT_REMAINING_LEN_VBI:
549 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
550 			case LMSPR_NEED_MORE:
551 				break;
552 			case LMSPR_COMPLETED:
553 				par->cpkt_remlen = par->vbit.value;
554 				n = map_flags[ctl_pkt_type(par)];
555 				lws_mqtt_str_init(&par->s_temp, par->temp,
556 						  sizeof(par->temp), 0);
557 				par->state = LMQCPP_CONNECT_VH_PNAME;
558 				break;
559 			default:
560 				lwsl_notice("%s: bad vbi\n", __func__);
561 				goto send_protocol_error_and_close;
562 			}
563 			break;
564 
565 		case LMQCPP_CONNECT_VH_PNAME:
566 			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
567 			case LMSPR_NEED_MORE:
568 				break;
569 			case LMSPR_COMPLETED:
570 				if (par->s_temp.len != 4 ||
571 				    memcmp(par->s_temp.buf, "MQTT",
572 					   par->s_temp.len)) {
573 					lwsl_notice("%s: protocol name: %.*s\n",
574 						  __func__, par->s_temp.len,
575 						  par->s_temp.buf);
576 					goto send_unsupp_connack_and_close;
577 				}
578 				par->state = LMQCPP_CONNECT_VH_PVERSION;
579 				break;
580 			default:
581 				lwsl_notice("%s: bad protocol name\n", __func__);
582 				goto send_protocol_error_and_close;
583 			}
584 			break;
585 
586 		case LMQCPP_CONNECT_VH_PVERSION:
587 			par->conn_protocol_version = *buf++;
588 			len--;
589 			if (par->conn_protocol_version != 5) {
590 				lwsl_info("%s: unsupported MQTT version %d\n",
591 					  __func__, par->conn_protocol_version);
592 				goto send_unsupp_connack_and_close;
593 			}
594 			par->state = LMQCPP_CONNECT_VH_FLAGS;
595 			break;
596 
597 		case LMQCPP_CONNECT_VH_FLAGS:
598 			par->cpkt_flags = *buf++;
599 			len--;
600 			if (par->cpkt_flags & 1) {
601 				/*
602 				 * The Server MUST validate that the reserved
603 				 * flag in the CONNECT packet is set to 0
604 				 * [MQTT-3.1.2-3].
605 				 */
606 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
607 				goto send_reason_and_close;
608 			}
609 			/*
610 			 * conn_flags specifies the Will Properties that should
611 			 * appear in the payload section
612 			 */
613 			lws_mqtt_2byte_init(&par->vbit);
614 			par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
615 			break;
616 
617 		case LMQCPP_CONNECT_VH_KEEPALIVE:
618 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
619 			case LMSPR_NEED_MORE:
620 				break;
621 			case LMSPR_COMPLETED:
622 				par->keepalive = (uint16_t)par->vbit.value;
623 				lws_mqtt_vbi_init(&par->vbit);
624 				par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
625 				break;
626 			default:
627 				lwsl_notice("%s: ka bad vbi\n", __func__);
628 				goto send_protocol_error_and_close;
629 			}
630 			break;
631 
632 		case LMQCPP_PINGRESP_ZERO:
633 			len--;
634 			/* second byte of PINGRESP must be zero */
635 			if (*buf++)
636 				goto send_protocol_error_and_close;
637 			goto cmd_completion;
638 
639 		case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
640 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
641 			case LMSPR_NEED_MORE:
642 				break;
643 			case LMSPR_COMPLETED:
644 				/* reset consumption counter */
645 				par->consumed = 0;
646 				par->props_len = par->vbit.value;
647 				lws_mqtt_vbi_init(&par->vbit);
648 				par->state = LMQCPP_PROP_ID_VBI;
649 				break;
650 			default:
651 				lwsl_notice("%s: connpr bad vbi\n", __func__);
652 				goto send_protocol_error_and_close;
653 			}
654 			break;
655 
656 		/* PUBREC */
657 		case LMQCPP_PUBREC_PACKET:
658 			lwsl_debug("%s: received PUBREC pkt\n", __func__);
659 			lws_mqtt_vbi_init(&par->vbit);
660 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
661 			case LMSPR_NEED_MORE:
662 				break;
663 			case LMSPR_COMPLETED:
664 				par->cpkt_remlen = par->vbit.value;
665 				lwsl_debug("%s: PUBREC pkt len = %d\n",
666 					   __func__, (int)par->cpkt_remlen);
667 				if (par->cpkt_remlen < 2)
668 					goto send_protocol_error_and_close;
669 				par->state = LMQCPP_PUBREC_VH_PKT_ID;
670 				break;
671 			default:
672 				lwsl_notice("%s: pubrec bad vbi\n", __func__);
673 				goto send_protocol_error_and_close;
674 			}
675 			break;
676 
677 		case LMQCPP_PUBREC_VH_PKT_ID:
678 			if (len < 2) {
679 				lwsl_notice("%s: len breakage 3\n", __func__);
680 				return -1;
681 			}
682 
683 			par->cpkt_id = lws_ser_ru16be(buf);
684 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
685 			buf += 2;
686 			len -= 2;
687 			par->cpkt_remlen -= 2;
688 			par->n = 0;
689 
690 			goto cmd_completion;
691 
692 		/* PUBREL */
693 		case LMQCPP_PUBREL_PACKET:
694 			lwsl_debug("%s: received PUBREL pkt\n", __func__);
695 			lws_mqtt_vbi_init(&par->vbit);
696 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
697 			case LMSPR_NEED_MORE:
698 				break;
699 			case LMSPR_COMPLETED:
700 				par->cpkt_remlen = par->vbit.value;
701 				lwsl_debug("%s: PUBREL pkt len = %d\n",
702 					   __func__, (int)par->cpkt_remlen);
703 				if (par->cpkt_remlen < 2)
704 					goto send_protocol_error_and_close;
705 				par->state = LMQCPP_PUBREL_VH_PKT_ID;
706 				break;
707 			default:
708 				lwsl_err("%s: pubrel bad vbi\n", __func__);
709 				goto send_protocol_error_and_close;
710 			}
711 			break;
712 
713 		case LMQCPP_PUBREL_VH_PKT_ID:
714 			if (len < 2) {
715 				lwsl_notice("%s: len breakage 3\n", __func__);
716 				return -1;
717 			}
718 
719 			par->cpkt_id = lws_ser_ru16be(buf);
720 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
721 			buf += 2;
722 			len -= 2;
723 			par->cpkt_remlen -= 2;
724 			par->n = 0;
725 
726 			goto cmd_completion;
727 
728 		/* PUBCOMP */
729 		case LMQCPP_PUBCOMP_PACKET:
730 			lwsl_debug("%s: received PUBCOMP pkt\n", __func__);
731 			lws_mqtt_vbi_init(&par->vbit);
732 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
733 			case LMSPR_NEED_MORE:
734 				break;
735 			case LMSPR_COMPLETED:
736 				par->cpkt_remlen = par->vbit.value;
737 				lwsl_debug("%s: PUBCOMP pkt len = %d\n",
738 					   __func__, (int)par->cpkt_remlen);
739 				if (par->cpkt_remlen < 2)
740 					goto send_protocol_error_and_close;
741 				par->state = LMQCPP_PUBCOMP_VH_PKT_ID;
742 				break;
743 			default:
744 				lwsl_err("%s: pubcmp bad vbi\n", __func__);
745 				goto send_protocol_error_and_close;
746 			}
747 			break;
748 
749 		case LMQCPP_PUBCOMP_VH_PKT_ID:
750 			if (len < 2) {
751 				lwsl_notice("%s: len breakage 3\n", __func__);
752 				return -1;
753 			}
754 
755 			par->cpkt_id = lws_ser_ru16be(buf);
756 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
757 			buf += 2;
758 			len -= 2;
759 			par->cpkt_remlen -= 2;
760 			par->n = 0;
761 
762 			goto cmd_completion;
763 
764 		case LMQCPP_PUBLISH_PACKET:
765 			if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
766 				lwsl_notice("%s: Topic rx before subscribing\n",
767 					    __func__);
768 				goto send_protocol_error_and_close;
769 			}
770 			lwsl_info("%s: received PUBLISH pkt\n", __func__);
771 			par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
772 			lws_mqtt_vbi_init(&par->vbit);
773 			break;
774 		case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
775 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
776 			case LMSPR_NEED_MORE:
777 				break;
778 			case LMSPR_COMPLETED:
779 				par->cpkt_remlen = par->vbit.value;
780 				lwsl_debug("%s: PUBLISH pkt len = %d\n",
781 					   __func__, (int)par->cpkt_remlen);
782 				/* Move on to PUBLISH's variable header */
783 				par->state = LMQCPP_PUBLISH_VH_TOPIC;
784 				break;
785 			default:
786 				lwsl_notice("%s: pubrem bad vbi\n", __func__);
787 				goto send_protocol_error_and_close;
788 			}
789 			break;
790 
791 		case LMQCPP_PUBLISH_VH_TOPIC:
792 		{
793 			lws_mqtt_publish_param_t *pub = NULL;
794 
795 			if (len < 2) {
796 				lwsl_notice("%s: topic too short\n", __func__);
797 				return -1;
798 			}
799 
800 			/* Topic len */
801 			par->n = lws_ser_ru16be(buf);
802 			buf += 2;
803 			len -= 2;
804 
805 			if (len < par->n) {/* the way this is written... */
806 				lwsl_notice("%s: len breakage\n", __func__);
807 				return -1;
808 			}
809 
810 			/* Invalid topic len */
811 			if (par->n == 0) {
812 				lwsl_notice("%s: zero topic len\n", __func__);
813 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
814 				goto send_reason_and_close;
815 			}
816 			lwsl_debug("%s: PUBLISH topic len %d\n",
817 				   __func__, (int)par->n);
818 			assert(!wsi->mqtt->rx_cpkt_param);
819 			wsi->mqtt->rx_cpkt_param = lws_zalloc(
820 				sizeof(lws_mqtt_publish_param_t), "rx pub param");
821 			if (!wsi->mqtt->rx_cpkt_param)
822 				goto oom;
823 			pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
824 
825 			pub->topic_len = (uint16_t)par->n;
826 
827 			/* Topic Name */
828 			pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
829 							"rx publish topic");
830 			if (!pub->topic)
831 				goto oom;
832 			lws_strncpy(pub->topic, (const char *)buf,
833 				    (size_t)pub->topic_len + 1);
834 			buf += pub->topic_len;
835 			len -= pub->topic_len;
836 
837 			/* Extract QoS Level from Fixed Header Flags */
838 			pub->qos = (lws_mqtt_qos_levels_t)
839 					((par->packet_type_flags >> 1) & 0x3);
840 
841 			pub->payload_pos = 0;
842 
843 			pub->payload_len = par->cpkt_remlen -
844 				(unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
845 
846 			switch (pub->qos) {
847 			case QOS0:
848 				par->state = LMQCPP_PAYLOAD;
849 				if (pub->payload_len == 0)
850 					goto cmd_completion;
851 
852 				break;
853 			case QOS1:
854 			case QOS2:
855 				par->state = LMQCPP_PUBLISH_VH_PKT_ID;
856 				break;
857 			default:
858 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
859 				lws_free_set_NULL(pub->topic);
860 				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
861 				goto send_reason_and_close;
862 			}
863 			break;
864 		}
865 		case LMQCPP_PUBLISH_VH_PKT_ID:
866 		{
867 			lws_mqtt_publish_param_t *pub =
868 				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
869 
870 			if (len < 2) {
871 				lwsl_notice("%s: len breakage 2\n", __func__);
872 				return -1;
873 			}
874 
875 			par->cpkt_id = lws_ser_ru16be(buf);
876 			buf += 2;
877 			len -= 2;
878 			wsi->mqtt->peer_ack_pkt_id = par->cpkt_id;
879 			lwsl_debug("%s: Packet ID %d\n",
880 					__func__, (int)par->cpkt_id);
881 			par->state = LMQCPP_PAYLOAD;
882 			pub->payload_pos = 0;
883 			pub->payload_len = par->cpkt_remlen -
884 				(unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
885 			if (pub->payload_len == 0)
886 				goto cmd_completion;
887 
888 			break;
889 		}
890 		case LMQCPP_PAYLOAD:
891 		{
892 			lws_mqtt_publish_param_t *pub =
893 				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
894 			if (pub == NULL) {
895 				lwsl_err("%s: Uninitialized pub_param\n",
896 						__func__);
897 				goto send_protocol_error_and_close;
898 			}
899 
900 			pub->payload = buf;
901 			goto cmd_completion;
902 		}
903 
904 		case LMQCPP_CONNACK_PACKET:
905 			if (!lwsi_role_client(wsi)) {
906 				lwsl_err("%s: CONNACK is only Server to Client",
907 						__func__);
908 				goto send_unsupp_connack_and_close;
909 			}
910 
911 			lwsl_debug("%s: received CONNACK pkt\n", __func__);
912 			lws_mqtt_vbi_init(&par->vbit);
913 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
914 			case LMSPR_NEED_MORE:
915 				break;
916 			case LMSPR_COMPLETED:
917 				par->cpkt_remlen = par->vbit.value;
918 				lwsl_debug("%s: CONNACK pkt len = %d\n",
919 					   __func__, (int)par->cpkt_remlen);
920 				if (par->cpkt_remlen != 2)
921 					goto send_protocol_error_and_close;
922 
923 				par->state = LMQCPP_CONNACK_VH_FLAGS;
924 				break;
925 			default:
926 				lwsl_notice("%s: connack bad vbi\n", __func__);
927 				goto send_protocol_error_and_close;
928 			}
929 			break;
930 
931 		case LMQCPP_CONNACK_VH_FLAGS:
932 		{
933 			lws_mqttc_t *c = &wsi->mqtt->client;
934 			par->cpkt_flags = *buf++;
935 			len--;
936 
937 			if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
938 				/*
939 				 * Byte 1 is the "Connect Acknowledge
940 				 * Flags". Bits 7-1 are reserved and
941 				 * MUST be set to 0.
942 				 */
943 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
944 				goto send_reason_and_close;
945 			}
946 			/*
947 			 * If the Server accepts a connection with
948 			 * CleanSession set to 1, the Server MUST set
949 			 * Session Present to 0 in the CONNACK packet
950 			 * in addition to setting a zero return code
951 			 * in the CONNACK packet [MQTT-3.2.2-1]. If
952 			 * the Server accepts a connection with
953 			 * CleanSession set to 0, the value set in
954 			 * Session Present depends on whether the
955 			 * Server already has stored Session state for
956 			 * the supplied client ID. If the Server has
957 			 * stored Session state, it MUST set
958 			 * SessionPresent to 1 in the CONNACK packet
959 			 * [MQTT-3.2.2-2]. If the Server does not have
960 			 * stored Session state, it MUST set Session
961 			 * Present to 0 in the CONNACK packet. This is
962 			 * in addition to setting a zero return code
963 			 * in the CONNACK packet [MQTT-3.2.2-3].
964 			 */
965 			if ((c->conn_flags & LMQCFT_CLEAN_START) &&
966 			    (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
967 				goto send_protocol_error_and_close;
968 
969 			wsi->mqtt->session_resumed = ((unsigned int)par->cpkt_flags &
970 						      LMQCFT_SESSION_PRESENT);
971 
972 			/* Move on to Connect Return Code */
973 			par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
974 			break;
975 		}
976 		case LMQCPP_CONNACK_VH_RETURN_CODE:
977 			par->conn_rc = *buf++;
978 			len--;
979 			/*
980 			 * If a server sends a CONNACK packet containing a
981 			 * non-zero return code it MUST then close the Network
982 			 * Connection [MQTT-3.2.2-5]
983 			 */
984 			switch (par->conn_rc) {
985 			case 0:
986 				goto cmd_completion;
987 			case 1:
988 			case 2:
989 			case 3:
990 			case 4:
991 			case 5:
992 				par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
993 						par->conn_rc - 1;
994 				goto send_reason_and_close;
995 			default:
996 				lwsl_notice("%s: bad connack retcode\n", __func__);
997 				goto send_protocol_error_and_close;
998 			}
999 			break;
1000 
1001 		/* SUBACK */
1002 		case LMQCPP_SUBACK_PACKET:
1003 			if (!lwsi_role_client(wsi)) {
1004 				lwsl_err("%s: SUBACK is only Server to Client",
1005 						__func__);
1006 				goto send_unsupp_connack_and_close;
1007 			}
1008 
1009 			lwsl_debug("%s: received SUBACK pkt\n", __func__);
1010 			lws_mqtt_vbi_init(&par->vbit);
1011 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1012 			case LMSPR_NEED_MORE:
1013 				break;
1014 			case LMSPR_COMPLETED:
1015 				par->cpkt_remlen = par->vbit.value;
1016 				lwsl_debug("%s: SUBACK pkt len = %d\n",
1017 					   __func__, (int)par->cpkt_remlen);
1018 				if (par->cpkt_remlen <= 2)
1019 					goto send_protocol_error_and_close;
1020 				par->state = LMQCPP_SUBACK_VH_PKT_ID;
1021 				break;
1022 			default:
1023 				lwsl_notice("%s: suback bad vbi\n", __func__);
1024 				goto send_protocol_error_and_close;
1025 			}
1026 			break;
1027 
1028 		case LMQCPP_SUBACK_VH_PKT_ID:
1029 
1030 			if (len < 2) {
1031 				lwsl_notice("%s: len breakage 4\n", __func__);
1032 				return -1;
1033 			}
1034 
1035 			par->cpkt_id = lws_ser_ru16be(buf);
1036 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
1037 			buf += 2;
1038 			len -= 2;
1039 			par->cpkt_remlen -= 2;
1040 			par->n = 0;
1041 			par->state = LMQCPP_SUBACK_PAYLOAD;
1042 			*par->temp = 0;
1043 			break;
1044 
1045 		case LMQCPP_SUBACK_PAYLOAD:
1046 		{
1047 			lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
1048 
1049 			len--;
1050 			switch (qos) {
1051 				case QOS0:
1052 				case QOS1:
1053 				case QOS2:
1054 					break;
1055 				case FAILURE_QOS_LEVEL:
1056 					goto send_protocol_error_and_close;
1057 
1058 				default:
1059 					par->reason = LMQCP_REASON_MALFORMED_PACKET;
1060 					goto send_reason_and_close;
1061 			}
1062 
1063 			if (++(par->n) == par->cpkt_remlen) {
1064 				par->n = 0;
1065 				goto cmd_completion;
1066 			}
1067 
1068 			break;
1069 		}
1070 
1071 		/* UNSUBACK */
1072 		case LMQCPP_UNSUBACK_PACKET:
1073 			if (!lwsi_role_client(wsi)) {
1074 				lwsl_err("%s: UNSUBACK is only Server to Client",
1075 						__func__);
1076 				goto send_unsupp_connack_and_close;
1077 			}
1078 
1079 			lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
1080 			lws_mqtt_vbi_init(&par->vbit);
1081 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1082 			case LMSPR_NEED_MORE:
1083 				break;
1084 			case LMSPR_COMPLETED:
1085 				par->cpkt_remlen = par->vbit.value;
1086 				lwsl_debug("%s: UNSUBACK pkt len = %d\n",
1087 					   __func__, (int)par->cpkt_remlen);
1088 				if (par->cpkt_remlen < 2)
1089 					goto send_protocol_error_and_close;
1090 				par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
1091 				break;
1092 			default:
1093 				lwsl_notice("%s: unsuback bad vbi\n", __func__);
1094 				goto send_protocol_error_and_close;
1095 			}
1096 			break;
1097 
1098 		case LMQCPP_UNSUBACK_VH_PKT_ID:
1099 
1100 			if (len < 2) {
1101 				lwsl_notice("%s: len breakage 3\n", __func__);
1102 				return -1;
1103 			}
1104 
1105 			par->cpkt_id = lws_ser_ru16be(buf);
1106 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
1107 			buf += 2;
1108 			len -= 2;
1109 			par->cpkt_remlen -= 2;
1110 			par->n = 0;
1111 
1112 			goto cmd_completion;
1113 
1114 		case LMQCPP_PUBACK_PACKET:
1115 			lws_mqtt_vbi_init(&par->vbit);
1116 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1117 			case LMSPR_NEED_MORE:
1118 				break;
1119 			case LMSPR_COMPLETED:
1120 				par->cpkt_remlen = par->vbit.value;
1121 				lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
1122 					  (int)par->cpkt_remlen);
1123 				/*
1124 				 * must be 4 or more, with special case that 2
1125 				 * means success with no reason code or props
1126 				 */
1127 				if (par->cpkt_remlen <= 1 ||
1128 				    par->cpkt_remlen == 3)
1129 					goto send_protocol_error_and_close;
1130 
1131 				par->state = LMQCPP_PUBACK_VH_PKT_ID;
1132 				par->fixed_seen[2] = par->fixed_seen[3] = 0;
1133 				par->fixed = 0;
1134 				par->n = 0;
1135 				break;
1136 			default:
1137 				lwsl_notice("%s: puback bad vbi\n", __func__);
1138 				goto send_protocol_error_and_close;
1139 			}
1140 			break;
1141 
1142 		case LMQCPP_PUBACK_VH_PKT_ID:
1143 			/*
1144 			 * There are 3 fixed bytes and then a VBI for the
1145 			 * property section length
1146 			 */
1147 			par->fixed_seen[par->fixed++] = *buf++;
1148 			if (len < par->cpkt_remlen - par->n) {
1149 				lwsl_notice("%s: len breakage 4\n", __func__);
1150 				return -1;
1151 			}
1152 			len--;
1153 			par->n++;
1154 			if (par->fixed == 2)
1155 				par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
1156 
1157 			if (par->fixed == 3) {
1158 				lws_mqtt_vbi_init(&par->vbit);
1159 				par->props_consumed = 0;
1160 				par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
1161 			}
1162 			/* length of 2 is truncated packet and we completed it */
1163 			if (par->cpkt_remlen == par->fixed)
1164 				goto cmd_completion;
1165 			break;
1166 
1167 		case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
1168 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1169 			case LMSPR_NEED_MORE:
1170 				break;
1171 			case LMSPR_COMPLETED:
1172 				par->props_len = par->vbit.value;
1173 				lwsl_info("%s: PUBACK props len = %d\n",
1174 					  __func__, (int)par->cpkt_remlen);
1175 				/*
1176 				 * If there are no properties, this is a
1177 				 * command completion event in itself
1178 				 */
1179 				if (!par->props_len)
1180 					goto cmd_completion;
1181 
1182 				/*
1183 				 * Otherwise consume the properties before
1184 				 * completing the command
1185 				 */
1186 				lws_mqtt_vbi_init(&par->vbit);
1187 				par->state = LMQCPP_PUBACK_VH_PKT_ID;
1188 				break;
1189 			default:
1190 				lwsl_notice("%s: puback pr bad vbi\n", __func__);
1191 				goto send_protocol_error_and_close;
1192 			}
1193 			break;
1194 
1195 		case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
1196 			/*
1197 			 * TODO: stash the props
1198 			 */
1199 			par->props_consumed++;
1200 			len--;
1201 			buf++;
1202 			if (par->props_len != par->props_consumed)
1203 				break;
1204 
1205 cmd_completion:
1206 			/*
1207 			 * We come here when we understood we just processed
1208 			 * the last byte of a command packet, regardless of the
1209 			 * packet type
1210 			 */
1211 			par->state = LMQCPP_IDLE;
1212 
1213 			switch (par->packet_type_flags >> 4) {
1214 			case LMQCP_STOC_CONNACK:
1215 				lwsl_info("%s: cmd_completion: CONNACK\n",
1216 					  __func__);
1217 
1218 				/*
1219 				 * Getting the CONNACK means we are the first,
1220 				 * the nwsi, and we succeeded to create a new
1221 				 * network connection ourselves.
1222 				 *
1223 				 * Since others may join us sharing the nwsi,
1224 				 * and we may close while they still want to use
1225 				 * it, our wsi lifecycle alone can no longer
1226 				 * define the lifecycle of the nwsi... it means
1227 				 * we need to do a "magic trick" and instead of
1228 				 * being both the nwsi and act like a child
1229 				 * stream, create a new wsi to take over the
1230 				 * nwsi duties and turn our wsi into a child of
1231 				 * the nwsi with its own lifecycle.
1232 				 *
1233 				 * The nwsi gets a mostly empty wsi->nwsi used
1234 				 * to track already-subscribed topics globally
1235 				 * for the connection.
1236 				 */
1237 
1238 				/* we were under SENT_CLIENT_HANDSHAKE timeout */
1239 				lws_set_timeout(wsi, 0, 0);
1240 
1241 				w = lws_create_new_server_wsi(wsi->a.vhost,
1242 							      wsi->tsi, "mqtt_sid1");
1243 				if (!w) {
1244 					lwsl_notice("%s: sid 1 migrate failed\n",
1245 							__func__);
1246 					return -1;
1247 				}
1248 
1249 				wsi->mux.highest_sid = 1;
1250 				lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
1251 
1252 				wsi->mux_substream = 1;
1253 				w->mux_substream = 1;
1254 				w->client_mux_substream = 1;
1255 				wsi->client_mux_migrated = 1;
1256 				wsi->told_user_closed = 1; /* don't tell nwsi closed */
1257 
1258 				lwsi_set_state(w, LRS_ESTABLISHED);
1259 				lwsi_set_state(wsi, LRS_ESTABLISHED);
1260 				lwsi_set_role(w, lwsi_role(wsi));
1261 
1262 #if defined(LWS_WITH_CLIENT)
1263 				w->flags = wsi->flags;
1264 #endif
1265 
1266 				w->mqtt = wsi->mqtt;
1267 				wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
1268 				if (!wsi->mqtt)
1269 					return -1;
1270 				w->mqtt->wsi = w;
1271 				w->a.protocol = wsi->a.protocol;
1272 				if (w->user_space &&
1273 				    !w->user_space_externally_allocated)
1274 					lws_free_set_NULL(w->user_space);
1275 				w->user_space = wsi->user_space;
1276 				wsi->user_space = NULL;
1277 				w->user_space_externally_allocated =
1278 					wsi->user_space_externally_allocated;
1279 				if (lws_ensure_user_space(w))
1280 					goto bail1;
1281 				w->a.opaque_user_data = wsi->a.opaque_user_data;
1282 				wsi->a.opaque_user_data = NULL;
1283 				w->stash = wsi->stash;
1284 				wsi->stash = NULL;
1285 
1286 				lws_mux_mark_immortal(w);
1287 
1288 				lwsl_notice("%s: migrated nwsi %s to sid 1 %s\n",
1289 						__func__, lws_wsi_tag(wsi),
1290 						lws_wsi_tag(w));
1291 
1292 				/*
1293 				 * It was the last thing we were waiting for
1294 				 * before we can be fully ESTABLISHED
1295 				 */
1296 				if (lws_mqtt_set_client_established(w)) {
1297 					lwsl_notice("%s: set EST fail\n", __func__);
1298 					return -1;
1299 				}
1300 
1301 				/* get the ball rolling */
1302 				lws_validity_confirmed(wsi);
1303 
1304 				/* well, add the queued guys as children */
1305 				lws_wsi_mux_apply_queue(wsi);
1306 				break;
1307 
1308 bail1:
1309 				/* undo the insert */
1310 				wsi->mux.child_list = w->mux.sibling_list;
1311 				wsi->mux.child_count--;
1312 
1313 				if (w->user_space)
1314 					lws_free_set_NULL(w->user_space);
1315 				w->a.vhost->protocols[0].callback(w,
1316 							LWS_CALLBACK_WSI_DESTROY,
1317 							NULL, NULL, 0);
1318 				__lws_vhost_unbind_wsi(w); /* cx + vh lock */
1319 				lws_free(w);
1320 
1321 				return 0;
1322 
1323 			case LMQCP_PUBREC:
1324 				lwsl_err("%s: cmd_completion: PUBREC\n",
1325 						__func__);
1326 				/*
1327 				 * Figure out which child asked for this
1328 				 */
1329 				n = 0;
1330 				lws_start_foreach_ll(struct lws *, w,
1331 						     wsi->mux.child_list) {
1332 					if (w->mqtt->unacked_publish &&
1333 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1334 						char requested_close = 0;
1335 
1336 						w->mqtt->unacked_publish = 0;
1337 						w->mqtt->unacked_pubrel = 1;
1338 
1339 						if (user_callback_handle_rxflow(
1340 							    w->a.protocol->callback,
1341 							    w, LWS_CALLBACK_MQTT_ACK,
1342 							    w->user_space, NULL, 0) < 0) {
1343 							lwsl_info("%s: MQTT_ACK requests close\n",
1344 								 __func__);
1345 							requested_close = 1;
1346 						}
1347 						n = 1;
1348 
1349 						/*
1350 						 * We got an assertive PUBREC,
1351 						 * no need for timeout wait
1352 						 * any more
1353 						 */
1354 						lws_sul_cancel(&w->mqtt->
1355 							  sul_qos_puback_pubrec_wait);
1356 
1357 						if (requested_close) {
1358 							__lws_close_free_wsi(w,
1359 								0, "ack cb");
1360 							break;
1361 						}
1362 
1363 						break;
1364 					}
1365 				} lws_end_foreach_ll(w, mux.sibling_list);
1366 
1367 				if (!n) {
1368 					lwsl_err("%s: unsolicited PUBREC\n",
1369 							__func__);
1370 					return -1;
1371 				}
1372 				wsi->mqtt->send_pubrel = 1;
1373 				lws_callback_on_writable(wsi);
1374 				break;
1375 
1376 			case LMQCP_PUBCOMP:
1377 				lwsl_err("%s: cmd_completion: PUBCOMP\n",
1378 						__func__);
1379 				n = 0;
1380 				lws_start_foreach_ll(struct lws *, w,
1381 						     wsi->mux.child_list) {
1382 					if (w->mqtt->unacked_pubrel > 0 &&
1383 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1384 						w->mqtt->unacked_pubrel = 0;
1385 						n = 1;
1386 					}
1387 				} lws_end_foreach_ll(w, mux.sibling_list);
1388 
1389 				if (!n) {
1390 					lwsl_err("%s: unsolicited PUBCOMP\n",
1391 							__func__);
1392 					return -1;
1393 				}
1394 
1395 				/*
1396 				 * If we published something and PUBCOMP arrived,
1397 				 * our connection is definitely working in both
1398 				 * directions at the moment.
1399 				 */
1400 				lws_validity_confirmed(wsi);
1401 				break;
1402 
1403 			case LMQCP_PUBREL:
1404 				lwsl_err("%s: cmd_completion: PUBREL\n",
1405 						__func__);
1406 				wsi->mqtt->send_pubcomp = 1;
1407 				lws_callback_on_writable(wsi);
1408 				break;
1409 
1410 			case LMQCP_PUBACK:
1411 				lwsl_info("%s: cmd_completion: PUBACK\n",
1412 						__func__);
1413 
1414 				/*
1415 				 * Figure out which child asked for this
1416 				 */
1417 
1418 				n = 0;
1419 				lws_start_foreach_ll(struct lws *, w,
1420 						      wsi->mux.child_list) {
1421 					if (w->mqtt->unacked_publish &&
1422 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1423 						char requested_close = 0;
1424 
1425 						w->mqtt->unacked_publish = 0;
1426 						if (user_callback_handle_rxflow(
1427 							    w->a.protocol->callback,
1428 							    w, LWS_CALLBACK_MQTT_ACK,
1429 							    w->user_space, NULL, 0) < 0) {
1430 							lwsl_info("%s: MQTT_ACK requests close\n",
1431 								 __func__);
1432 							requested_close = 1;
1433 						}
1434 						n = 1;
1435 
1436 						/*
1437 						 * We got an assertive PUBACK,
1438 						 * no need for ACK timeout wait
1439 						 * any more
1440 						 */
1441 						lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait);
1442 
1443 						if (requested_close) {
1444 							__lws_close_free_wsi(w,
1445 								0, "ack cb");
1446 							break;
1447 						}
1448 
1449 						break;
1450 					}
1451 				} lws_end_foreach_ll(w, mux.sibling_list);
1452 
1453 				if (!n) {
1454 					lwsl_err("%s: unsolicited PUBACK\n",
1455 							__func__);
1456 					return -1;
1457 				}
1458 
1459 				/*
1460 				 * If we published something and it was acked,
1461 				 * our connection is definitely working in both
1462 				 * directions at the moment.
1463 				 */
1464 				lws_validity_confirmed(wsi);
1465 				break;
1466 
1467 			case LMQCP_STOC_PINGRESP:
1468 				lwsl_info("%s: cmd_completion: PINGRESP\n",
1469 						__func__);
1470 				/*
1471 				 * If we asked for a PINGRESP and it came,
1472 				 * our connection is definitely working in both
1473 				 * directions at the moment.
1474 				 */
1475 				lws_validity_confirmed(wsi);
1476 				break;
1477 
1478 			case LMQCP_STOC_SUBACK:
1479 				lwsl_info("%s: cmd_completion: SUBACK\n",
1480 						__func__);
1481 
1482 				/*
1483 				 * Figure out which child asked for this
1484 				 */
1485 
1486 				n = 0;
1487 				lws_start_foreach_ll(struct lws *, w,
1488 						      wsi->mux.child_list) {
1489 					if (w->mqtt->inside_subscribe &&
1490 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1491 						w->mqtt->inside_subscribe = 0;
1492 						if (user_callback_handle_rxflow(
1493 							    w->a.protocol->callback,
1494 							    w, LWS_CALLBACK_MQTT_SUBSCRIBED,
1495 							    w->user_space, NULL, 0) < 0) {
1496 							lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1497 								 __func__);
1498 							return -1;
1499 						}
1500 						n = 1;
1501 						break;
1502 					}
1503 				} lws_end_foreach_ll(w, mux.sibling_list);
1504 
1505 				if (!n) {
1506 					lwsl_err("%s: unsolicited SUBACK\n",
1507 							__func__);
1508 					return -1;
1509 				}
1510 
1511 				/*
1512 				 * If we subscribed to something and SUBACK came,
1513 				 * our connection is definitely working in both
1514 				 * directions at the moment.
1515 				 */
1516 				lws_validity_confirmed(wsi);
1517 
1518 				break;
1519 
1520 			case LMQCP_STOC_UNSUBACK:
1521 			{
1522 				char requested_close = 0;
1523 				lwsl_info("%s: cmd_completion: UNSUBACK\n",
1524 						__func__);
1525 				/*
1526 				 * Figure out which child asked for this
1527 				 */
1528 				n = 0;
1529 				lws_start_foreach_ll(struct lws *, w,
1530 						      wsi->mux.child_list) {
1531 					if (w->mqtt->inside_unsubscribe &&
1532 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1533 						struct lws *nwsi = lws_get_network_wsi(w);
1534 
1535 						/*
1536 						 * No more subscribers left,
1537 						 * remove the topic from nwsi
1538 						 */
1539 						lws_mqtt_client_remove_subs(nwsi->mqtt);
1540 
1541 						w->mqtt->inside_unsubscribe = 0;
1542 						if (user_callback_handle_rxflow(
1543 							    w->a.protocol->callback,
1544 							    w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1545 							    w->user_space, NULL, 0) < 0) {
1546 							lwsl_info("%s: MQTT_UNSUBACK requests close\n",
1547 								 __func__);
1548 							requested_close = 1;
1549 						}
1550 						n = 1;
1551 
1552 						lws_sul_cancel(&w->mqtt->sul_unsuback_wait);
1553 						if (requested_close) {
1554 							__lws_close_free_wsi(w,
1555 									     0, "unsub ack cb");
1556 							break;
1557 						}
1558 						break;
1559 					}
1560 				} lws_end_foreach_ll(w, mux.sibling_list);
1561 
1562 				if (!n) {
1563 					lwsl_err("%s: unsolicited UNSUBACK\n",
1564 							__func__);
1565 					return -1;
1566 				}
1567 
1568 
1569 				/*
1570 				 * If we unsubscribed to something and
1571 				 * UNSUBACK came, our connection is
1572 				 * definitely working in both
1573 				 * directions at the moment.
1574 				 */
1575 				lws_validity_confirmed(wsi);
1576 
1577 				break;
1578 			}
1579 			case LMQCP_PUBLISH:
1580 			{
1581 				lws_mqtt_publish_param_t *pub =
1582 						(lws_mqtt_publish_param_t *)
1583 							wsi->mqtt->rx_cpkt_param;
1584 				size_t chunk;
1585 
1586 				if (pub == NULL) {
1587 					lwsl_notice("%s: no pub\n", __func__);
1588 					return -1;
1589 				}
1590 
1591 				/*
1592 				 * RX PUBLISH is delivered to any children that
1593 				 * registered for the related topic
1594 				 */
1595 
1596 				n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
1597 
1598 				chunk = pub->payload_len - pub->payload_pos;
1599 				if (chunk > len)
1600 					chunk = len;
1601 
1602 				lws_start_foreach_ll(struct lws *, w,
1603 						      wsi->mux.child_list) {
1604 					if (lws_mqtt_find_sub(w->mqtt,
1605 							      pub->topic))
1606 						if (w->a.protocol->callback(
1607 							    w, (enum lws_callback_reasons)n,
1608 							    w->user_space,
1609 							    (void *)pub,
1610 							    chunk)) {
1611 								par->payload_consumed = 0;
1612 								lws_free_set_NULL(pub->topic);
1613 								lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1614 								return 1;
1615 							}
1616 				} lws_end_foreach_ll(w, mux.sibling_list);
1617 
1618 
1619 				pub->payload_pos += (uint32_t)chunk;
1620 				len -= chunk;
1621 				buf += chunk;
1622 
1623 				lwsl_debug("%s: post pos %d, plen %d, len %d\n",
1624 					    __func__, (int)pub->payload_pos,
1625 					    (int)pub->payload_len, (int)len);
1626 
1627 				if (pub->payload_pos != pub->payload_len) {
1628 					/*
1629 					 * More chunks of the payload pending,
1630 					 * blocking this connection from doing
1631 					 * anything else
1632 					 */
1633 					par->state = LMQCPP_PAYLOAD;
1634 					break;
1635 				}
1636 
1637 				if (pub->qos == 1) {
1638 				/* For QOS = 1, send out PUBACK */
1639 					wsi->mqtt->send_puback = 1;
1640 					lws_callback_on_writable(wsi);
1641 				} else if (pub->qos == 2) {
1642 				/* For QOS = 2, send out PUBREC */
1643 					wsi->mqtt->send_pubrec = 1;
1644 					lws_callback_on_writable(wsi);
1645 				}
1646 
1647 				par->payload_consumed = 0;
1648 				lws_free_set_NULL(pub->topic);
1649 				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1650 
1651 				break;
1652 			}
1653 			default:
1654 				break;
1655 			}
1656 
1657 			break;
1658 
1659 
1660 		case LMQCPP_PROP_ID_VBI:
1661 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1662 			case LMSPR_NEED_MORE:
1663 				break;
1664 			case LMSPR_COMPLETED:
1665 				par->consumed = (uint32_t)((unsigned int)par->consumed + (unsigned int)(unsigned char)par->vbit.consumed);
1666 				if (par->vbit.value >
1667 				    LWS_ARRAY_SIZE(property_valid)) {
1668 					lwsl_notice("%s: undef prop id 0x%x\n",
1669 						  __func__, (int)par->vbit.value);
1670 					goto send_protocol_error_and_close;
1671 				}
1672 				if (!(property_valid[par->vbit.value] &
1673 					(1 << ctl_pkt_type(par)))) {
1674 					lwsl_notice("%s: prop id 0x%x invalid for"
1675 						  " control pkt %d\n", __func__,
1676 						  (int)par->vbit.value,
1677 						  ctl_pkt_type(par));
1678 					goto send_protocol_error_and_close;
1679 				}
1680 				par->prop_id = par->vbit.value;
1681 				par->flag_prop_multi = !!(
1682 					par->props_seen[par->prop_id >> 3] &
1683 					(1 << (par->prop_id & 7)));
1684 				par->props_seen[par->prop_id >> 3] =
1685 						(uint8_t)((par->props_seen[par->prop_id >> 3]) | (1 << (par->prop_id & 7)));
1686 				/*
1687 				 *  even if it's not a vbi property arg,
1688 				 * .consumed of this will be zero the first time
1689 				 */
1690 				lws_mqtt_vbi_init(&par->vbit);
1691 				/*
1692 				 * if it's a string, next state must set the
1693 				 * destination and size limit itself.  But
1694 				 * resetting it generically here lets it use
1695 				 * lws_mqtt_str_first() to understand it's the
1696 				 * first time around.
1697 				 */
1698 				 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
1699 
1700 				/* property arg state enums are so encoded */
1701 				par->state = 0x100 | par->vbit.value;
1702 				break;
1703 			default:
1704 				lwsl_notice("%s: prop id bad vbi\n", __func__);
1705 				goto send_protocol_error_and_close;
1706 			}
1707 			break;
1708 
1709 		/*
1710 		 * All possible property payloads... restricting which ones
1711 		 * can appear in which control packets is already done above
1712 		 * in LMQCPP_PROP_ID_VBI
1713 		 */
1714 
1715 		case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
1716 		case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
1717 		case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
1718 		case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
1719 		case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
1720 		case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
1721 		case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
1722 		case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
1723 			if (par->flag_prop_multi)
1724 				goto singular_prop_seen_twice;
1725 			par->payload_format = *buf++;
1726 			len--;
1727 			if (lws_mqtt_pconsume(par, 1))
1728 				goto send_protocol_error_and_close;
1729 			break;
1730 
1731 		case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
1732 		case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
1733 		case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
1734 		case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
1735 			if (par->flag_prop_multi)
1736 				goto singular_prop_seen_twice;
1737 
1738 			if (lws_mqtt_mb_first(&par->vbit))
1739 				lws_mqtt_4byte_init(&par->vbit);
1740 
1741 			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1742 			case LMSPR_NEED_MORE:
1743 				break;
1744 			case LMSPR_COMPLETED:
1745 				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1746 					goto send_protocol_error_and_close;
1747 				break;
1748 			default:
1749 				goto send_protocol_error_and_close;
1750 			}
1751 			break;
1752 
1753 		case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
1754 		case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
1755 		case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
1756 		case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
1757 			if (par->flag_prop_multi)
1758 				goto singular_prop_seen_twice;
1759 
1760 			if (lws_mqtt_mb_first(&par->vbit))
1761 				lws_mqtt_2byte_init(&par->vbit);
1762 
1763 			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1764 			case LMSPR_NEED_MORE:
1765 				break;
1766 			case LMSPR_COMPLETED:
1767 				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1768 					goto send_protocol_error_and_close;
1769 				break;
1770 			default:
1771 				goto send_protocol_error_and_close;
1772 			}
1773 			break;
1774 
1775 		case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
1776 		case LMQCPP_PROP_AUTH_METHOD_UTF8S:
1777 		case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
1778 		case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
1779 		case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
1780 		case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
1781 		case LMQCPP_PROP_REASON_STRING_UTF8S:
1782 		case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
1783 		case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
1784 			if (par->flag_prop_multi)
1785 				goto singular_prop_seen_twice;
1786 
1787 			if (lws_mqtt_str_first(&par->s_temp))
1788 				lws_mqtt_str_init(&par->s_temp, par->temp,
1789 						  sizeof(par->temp), 0);
1790 
1791 			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
1792 			case LMSPR_NEED_MORE:
1793 				break;
1794 			case LMSPR_COMPLETED:
1795 				if (lws_mqtt_pconsume(par, par->s_temp.len))
1796 					goto send_protocol_error_and_close;
1797 				break;
1798 
1799 			default:
1800 				lwsl_info("%s: bad protocol name\n", __func__);
1801 				goto send_protocol_error_and_close;
1802 			}
1803 			break;
1804 
1805 		case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
1806 
1807 		case LMQCPP_PROP_CORRELATION_BINDATA:
1808 		case LMQCPP_PROP_AUTH_DATA_BINDATA:
1809 
1810 		/* TODO */
1811 			lwsl_err("%s: Unimplemented packet state 0x%x\n",
1812 					__func__, par->state);
1813 			return -1;
1814 		}
1815 	}
1816 
1817 	return 0;
1818 
1819 oom:
1820 	lwsl_err("%s: OOM!\n", __func__);
1821 	goto send_protocol_error_and_close;
1822 
1823 singular_prop_seen_twice:
1824 	lwsl_info("%s: property appears twice\n", __func__);
1825 
1826 send_protocol_error_and_close:
1827 	lwsl_notice("%s: peac\n", __func__);
1828 	par->reason = LMQCP_REASON_PROTOCOL_ERROR;
1829 
1830 send_reason_and_close:
1831 	lwsl_notice("%s: srac\n", __func__);
1832 	par->flag_pending_send_reason_close = 1;
1833 	goto ask;
1834 
1835 send_unsupp_connack_and_close:
1836 	lwsl_notice("%s: unsupac\n", __func__);
1837 	par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
1838 	par->flag_pending_send_connack_close = 1;
1839 
1840 ask:
1841 	/* Should we ask for clients? */
1842 	lws_callback_on_writable(wsi);
1843 
1844 	return -1;
1845 }
1846 
1847 int
lws_mqtt_fill_fixed_header(uint8_t * p,lws_mqtt_control_packet_t ctrl_pkt_type,uint8_t dup,lws_mqtt_qos_levels_t qos,uint8_t retain)1848 lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
1849 			   uint8_t dup, lws_mqtt_qos_levels_t qos,
1850 			   uint8_t retain)
1851 {
1852 	lws_mqtt_fixed_hdr_t hdr;
1853 
1854 	hdr.bits = 0;
1855 	hdr.flags.ctrl_pkt_type = ctrl_pkt_type & 0xf;
1856 
1857 	switch(ctrl_pkt_type) {
1858 	case LMQCP_PUBLISH:
1859 		hdr.flags.dup = !!dup;
1860 		/*
1861 		 * A PUBLISH Packet MUST NOT have both QoS bits set to
1862 		 * 1. If a Server or Client receives a PUBLISH Packet
1863 		 * which has both QoS bits set to 1 it MUST close the
1864 		 * Network Connection [MQTT-3.3.1-4].
1865 		 */
1866 		if (qos >= RESERVED_QOS_LEVEL) {
1867 			lwsl_err("%s: Unsupport QoS level 0x%x\n",
1868 				 __func__, qos);
1869 			return -1;
1870 		}
1871 		hdr.flags.qos = qos & 3;
1872 		hdr.flags.retain = !!retain;
1873 		break;
1874 
1875 	case LMQCP_CTOS_CONNECT:
1876 	case LMQCP_STOC_CONNACK:
1877 	case LMQCP_PUBACK:
1878 	case LMQCP_PUBREC:
1879 	case LMQCP_PUBCOMP:
1880 	case LMQCP_STOC_SUBACK:
1881 	case LMQCP_STOC_UNSUBACK:
1882 	case LMQCP_CTOS_PINGREQ:
1883 	case LMQCP_STOC_PINGRESP:
1884 	case LMQCP_DISCONNECT:
1885 	case LMQCP_AUTH:
1886 		hdr.bits &= 0xf0;
1887 		break;
1888 
1889 	/*
1890 	 * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
1891 	 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
1892 	 * MUST be set to 0,0,1 and 0 respectively. The Server MUST
1893 	 * treat any other value as malformed and close the Network
1894 	 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
1895 	 */
1896 	case LMQCP_PUBREL:
1897 	case LMQCP_CTOS_SUBSCRIBE:
1898 	case LMQCP_CTOS_UNSUBSCRIBE:
1899 		hdr.bits |= 0x02;
1900 		break;
1901 
1902 	default:
1903 		return -1;
1904 	}
1905 
1906 	*p = hdr.bits;
1907 
1908 	return 0;
1909 }
1910 
1911 /*
1912  * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
1913  * PUBREC came before the timeout period
1914  */
1915 
1916 static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list * sul)1917 lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
1918 {
1919 	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
1920 			struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
1921 
1922 	lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
1923 
1924 	if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
1925 				mqtt->wsi->user_space, NULL, 0))
1926 		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
1927 }
1928 
1929 static void
lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list * sul)1930 lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
1931 {
1932 	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
1933 			struct _lws_mqtt_related, sul_unsuback_wait);
1934 
1935 	lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
1936 
1937 	if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
1938 					   LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
1939 					   mqtt->wsi->user_space, NULL, 0))
1940 		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
1941 }
1942 
1943 int
lws_mqtt_client_send_publish(struct lws * wsi,lws_mqtt_publish_param_t * pub,const void * buf,uint32_t len,int is_complete)1944 lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
1945 			     const void *buf, uint32_t len, int is_complete)
1946 {
1947 	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
1948 	uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
1949 	struct lws *nwsi = lws_get_network_wsi(wsi);
1950 	lws_mqtt_str_t mqtt_vh_payload;
1951 	uint32_t vh_len, rem_len;
1952 
1953 	assert(pub->topic);
1954 
1955 	lwsl_debug("%s: len = %d, is_complete = %d\n",
1956 		   __func__, (int)len, (int)is_complete);
1957 
1958 	if (lwsi_state(wsi) != LRS_ESTABLISHED) {
1959 		lwsl_err("%s: %s: unknown state 0x%x\n", __func__,
1960 				lws_wsi_tag(wsi), lwsi_state(wsi));
1961 		assert(0);
1962 		return 1;
1963 	}
1964 
1965 	if (wsi->mqtt->inside_payload) {
1966 		/*
1967 		 * Headers are filled, we are sending
1968 		 * the payload - a buffer with LWS_PRE
1969 		 * in front it.
1970 		 */
1971 		start = (uint8_t *)buf;
1972 		p = start + len;
1973 		if (is_complete)
1974 			wsi->mqtt->inside_payload = 0;
1975 		goto do_write;
1976 	}
1977 
1978 	start = b + LWS_PRE;
1979 	p = start;
1980 	/*
1981 	 * Fill headers and the first chunk of the
1982 	 * payload (if any)
1983 	 */
1984 	if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
1985 				       0, pub->qos, 0)) {
1986 		lwsl_err("%s: Failed to fill fixed header\n", __func__);
1987 		return 1;
1988 	}
1989 
1990 	/*
1991 	 * Topic len field + Topic len + Packet ID
1992 	 * (for QOS>0) + Payload len
1993 	 */
1994 	vh_len = (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
1995 	rem_len = vh_len + pub->payload_len;
1996 	lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
1997 
1998 	/* Will the chunk of payload fit? */
1999 	if ((vh_len + len) >=
2000 	    (wsi->a.context->pt_serv_buf_size - LWS_PRE)) {
2001 		lwsl_err("%s: Payload is too big\n", __func__);
2002 		return 1;
2003 	}
2004 
2005 	p += lws_mqtt_vbi_encode(rem_len, p);
2006 
2007 	/* Topic's Len */
2008 	lws_ser_wu16be(p, pub->topic_len);
2009 	p += 2;
2010 
2011 	/*
2012 	 * Init lws_mqtt_str for "MQTT Variable
2013 	 * Headers + payload" (only the supplied
2014 	 * chuncked payload)
2015 	 */
2016 	lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
2017 			  (uint16_t)(unsigned int)(pub->topic_len + ((pub->qos) ? 2u : 0u) + len),
2018 			  0);
2019 
2020 	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2021 	lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
2022 	if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
2023 		lwsl_err("%s: a\n", __func__);
2024 		return 1;
2025 	}
2026 
2027 	/* Packet ID */
2028 	if (pub->qos != QOS0) {
2029 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2030 		wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id;
2031 		lwsl_debug("%s: pkt_id = %d\n", __func__,
2032 			   (int)wsi->mqtt->ack_pkt_id);
2033 		lws_ser_wu16be(p, pub->packet_id);
2034 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
2035 			lwsl_err("%s: b\n", __func__);
2036 			return 1;
2037 		}
2038 	}
2039 
2040 	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2041 	memcpy(p, buf, len);
2042 	if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
2043 		return 1;
2044 	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2045 
2046 	if (!is_complete)
2047 		nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
2048 
2049 do_write:
2050 
2051 	// lwsl_hexdump_err(start, lws_ptr_diff(p, start));
2052 
2053 	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2054 			lws_ptr_diff(p, start)) {
2055 		lwsl_err("%s: write failed\n", __func__);
2056 		return 1;
2057 	}
2058 
2059 	if (!is_complete) {
2060 		/* still some more chunks to come... */
2061 		lws_callback_on_writable(wsi);
2062 
2063 		return 0;
2064 	}
2065 
2066 	wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
2067 
2068 	if (pub->qos != QOS0)
2069 		wsi->mqtt->unacked_publish = 1;
2070 
2071 	/* this was the last part of the publish message */
2072 
2073 	if (pub->qos == QOS0) {
2074 		/*
2075 		 * There won't be any real PUBACK, act like we got one
2076 		 * so the user callback logic is the same for QoS0 or
2077 		 * QoS1
2078 		 */
2079 		if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
2080 					    wsi->user_space, NULL, 0)) {
2081 			lwsl_err("%s: ACK callback exited\n", __func__);
2082 			return 1;
2083 		}
2084 	} else if (pub->qos == QOS1 || pub->qos == QOS2) {
2085 		/* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s,
2086 		 * we must RETRY the publish
2087 		 */
2088 		wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend;
2089 		__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2090 				    &wsi->mqtt->sul_qos_puback_pubrec_wait,
2091 				    3 * LWS_USEC_PER_SEC);
2092 	}
2093 
2094 	return 0;
2095 }
2096 
2097 int
lws_mqtt_client_send_subcribe(struct lws * wsi,lws_mqtt_subscribe_param_t * sub)2098 lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
2099 {
2100 	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2101 	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2102 	struct lws *nwsi = lws_get_network_wsi(wsi);
2103 	lws_mqtt_str_t mqtt_vh_payload;
2104 	uint8_t exists[8], extant;
2105 	lws_mqtt_subs_t *mysub;
2106 	uint32_t rem_len;
2107 #if defined(_DEBUG)
2108 	uint32_t tops;
2109 #endif
2110 	uint32_t n;
2111 
2112 	assert(sub->num_topics);
2113 	assert(sub->num_topics < sizeof(exists));
2114 
2115 	switch (lwsi_state(wsi)) {
2116 	case LRS_ESTABLISHED: /* Protocol connection established */
2117 		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
2118 					       0, 0, 0)) {
2119 			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2120 			return 1;
2121 		}
2122 
2123 		/*
2124 		 * The stream wants to subscribe to one or more topic, but
2125 		 * the shared nwsi may already be subscribed to some or all of
2126 		 * them from interactions with other streams.  For those cases,
2127 		 * we filter them from the list the child wants until we just
2128 		 * have ones that are new to the nwsi.  If nothing left, we just
2129 		 * synthesize the callback to the child as if SUBACK had come
2130 		 * and we're done, otherwise just ask the server for topics that
2131 		 * are new to the wsi.
2132 		 */
2133 
2134 		extant = 0;
2135 		memset(&exists, 0, sizeof(exists));
2136 		for (n = 0; n < sub->num_topics; n++) {
2137 			lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
2138 				  __func__, (int)n, sub->topic[n].name);
2139 
2140 			mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
2141 			if (mysub && mysub->ref_count) {
2142 				mysub->ref_count++; /* another stream using it */
2143 				exists[n] = 1;
2144 				extant++;
2145 			}
2146 
2147 			/*
2148 			 * Attach the topic we're subscribing to, to wsi->mqtt
2149 			 */
2150 			if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
2151 				lwsl_err("%s: create sub fail\n", __func__);
2152 				return 1;
2153 			}
2154 		}
2155 
2156 		if (extant == sub->num_topics) {
2157 			/*
2158 			 * It turns out there's nothing to do here, the nwsi has
2159 			 * already subscribed to all the topics this stream
2160 			 * wanted.  Just tell it it can have them.
2161 			 */
2162 			lwsl_notice("%s: all topics already subscribed\n", __func__);
2163 			if (user_callback_handle_rxflow(
2164 				    wsi->a.protocol->callback,
2165 				    wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
2166 				    wsi->user_space, NULL, 0) < 0) {
2167 				lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
2168 					 __func__);
2169 				return -1;
2170 			}
2171 
2172 			return 0;
2173 		}
2174 
2175 #if defined(_DEBUG)
2176 		/*
2177 		 * zero or more of the topics already existed, but not all,
2178 		 * so we must go to the server with a filtered list of the
2179 		 * new ones only
2180 		 */
2181 
2182 		tops = sub->num_topics - extant;
2183 #endif
2184 
2185 		/*
2186 		 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
2187 		 */
2188 		rem_len = 2;
2189 		for (n = 0; n < sub->num_topics; n++)
2190 			if (!exists[n])
2191 				rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
2192 
2193 		wsi->mqtt->sub_size = (uint16_t)rem_len;
2194 
2195 #if defined(_DEBUG)
2196 		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2197 			   __func__, (int)tops, (int)rem_len);
2198 #endif
2199 
2200 		p += lws_mqtt_vbi_encode(rem_len, p);
2201 
2202 		if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2203 					       wsi->a.context->pt_serv_buf_size) {
2204 			lwsl_err("%s: Payload is too big\n", __func__);
2205 			return 1;
2206 		}
2207 
2208 		/* Init lws_mqtt_str */
2209 		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2210 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2211 
2212 		/* Packet ID */
2213 		wsi->mqtt->ack_pkt_id = sub->packet_id = ++nwsi->mqtt->pkt_id;
2214 		lwsl_debug("%s: pkt_id = %d\n", __func__,
2215 			   (int)sub->packet_id);
2216 		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2217 
2218 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2219 			return 1;
2220 
2221 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2222 
2223 		for (n = 0; n < sub->num_topics; n++) {
2224 			lwsl_info("%s: topics[%d] = %s\n", __func__,
2225 				   (int)n, sub->topic[n].name);
2226 
2227 			/* if the nwsi already has it, don't ask server for it */
2228 			if (exists[n]) {
2229 				lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
2230 					    __func__, (int)n, sub->topic[n].name);
2231 				continue;
2232 			}
2233 
2234 			/*
2235 			 * Attach the topic we're subscribing to, to nwsi->mqtt
2236 			 * so we know the nwsi itself has a subscription to it
2237 			 */
2238 
2239 			if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
2240 				return 1;
2241 
2242 			/* Topic's Len */
2243 			lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
2244 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2245 				return 1;
2246 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2247 
2248 			/* Topic Name */
2249 			lws_strncpy((char *)p, sub->topic[n].name,
2250 				    strlen(sub->topic[n].name) + 1);
2251 			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2252 						 (int)strlen(sub->topic[n].name)))
2253 				return 1;
2254 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2255 
2256 			/* QoS */
2257 			*p = (uint8_t)sub->topic[n].qos;
2258 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
2259 				return 1;
2260 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2261 		}
2262 		break;
2263 
2264 	default:
2265 		return 1;
2266 	}
2267 
2268 	if (wsi->mqtt->inside_resume_session)
2269 		return 0;
2270 
2271 	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2272 					lws_ptr_diff(p, start))
2273 		return 1;
2274 
2275 	wsi->mqtt->inside_subscribe = 1;
2276 
2277 	return 0;
2278 }
2279 
2280 int
lws_mqtt_client_send_unsubcribe(struct lws * wsi,const lws_mqtt_subscribe_param_t * unsub)2281 lws_mqtt_client_send_unsubcribe(struct lws *wsi,
2282 				const lws_mqtt_subscribe_param_t *unsub)
2283 {
2284 	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2285 	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2286 	struct lws *nwsi = lws_get_network_wsi(wsi);
2287 	lws_mqtt_str_t mqtt_vh_payload;
2288 	uint8_t send_unsub[8], orphaned;
2289 	uint32_t rem_len, n;
2290 	lws_mqtt_subs_t *mysub;
2291 #if defined(_DEBUG)
2292 	uint32_t tops;
2293 #endif
2294 
2295 	lwsl_info("%s: Enter\n", __func__);
2296 
2297 	switch (lwsi_state(wsi)) {
2298 	case LRS_ESTABLISHED: /* Protocol connection established */
2299 		orphaned = 0;
2300 		memset(&send_unsub, 0, sizeof(send_unsub));
2301 		for (n = 0; n < unsub->num_topics; n++) {
2302 			mysub = lws_mqtt_find_sub(nwsi->mqtt,
2303 						  unsub->topic[n].name);
2304 			assert(mysub);
2305 
2306 			if (mysub && --mysub->ref_count == 0) {
2307 				lwsl_notice("%s: Need to send UNSUB\n", __func__);
2308 				send_unsub[n] = 1;
2309 				orphaned++;
2310 			}
2311 		}
2312 
2313 		if (!orphaned) {
2314 			/*
2315 			 * The nwsi still has other subscribers bound to the
2316 			 * topics.
2317 			 *
2318 			 * So, don't send UNSUB to server, and just fake the
2319 			 * UNSUB ACK event for the guy going away.
2320 			 */
2321 			lwsl_notice("%s: unsubscribed!\n", __func__);
2322 			if (user_callback_handle_rxflow(
2323 				    wsi->a.protocol->callback,
2324 				    wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
2325 				    wsi->user_space, NULL, 0) < 0) {
2326 				/*
2327 				 * We can't directly close here, because the
2328 				 * caller still has the wsi.  Inform the
2329 				 * caller that we want to close
2330 				 */
2331 
2332 				return 1;
2333 			}
2334 
2335 			return 0;
2336 		}
2337 #if defined(_DEBUG)
2338 		/*
2339 		 * one or more of the topics needs to be unsubscribed
2340 		 * from, so we must go to the server with a filtered
2341 		 * list of the new ones only
2342 		 */
2343 
2344 		tops = orphaned;
2345 #endif
2346 
2347 		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
2348 					       0, 0, 0)) {
2349 			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2350 			return 1;
2351 		}
2352 
2353 		/*
2354 		 * Pid + (Topic len field + Topic len) x Num of Topics
2355 		 */
2356 		rem_len = 2;
2357 		for (n = 0; n < unsub->num_topics; n++)
2358 			if (send_unsub[n])
2359 				rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
2360 
2361 		wsi->mqtt->sub_size = (uint16_t)rem_len;
2362 
2363 #if defined(_DEBUG)
2364 		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2365 			   __func__, (int)tops, (int)rem_len);
2366 #endif
2367 
2368 		p += lws_mqtt_vbi_encode(rem_len, p);
2369 
2370 		if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2371 					       wsi->a.context->pt_serv_buf_size) {
2372 			lwsl_err("%s: Payload is too big\n", __func__);
2373 			return 1;
2374 		}
2375 
2376 		/* Init lws_mqtt_str */
2377 		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2378 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2379 
2380 		/* Packet ID */
2381 		wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
2382 		lwsl_debug("%s: pkt_id = %d\n", __func__,
2383 			   (int)wsi->mqtt->ack_pkt_id);
2384 		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2385 
2386 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2387 			return 1;
2388 
2389 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2390 
2391 		for (n = 0; n < unsub->num_topics; n++) {
2392 			lwsl_info("%s: topics[%d] = %s\n", __func__,
2393 				   (int)n, unsub->topic[n].name);
2394 
2395 			/*
2396 			 * Subscriber still bound to it, don't UBSUB
2397 			 * from the server
2398 			 */
2399 			if (!send_unsub[n])
2400 				continue;
2401 
2402 			/* Topic's Len */
2403 			lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
2404 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2405 				return 1;
2406 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2407 
2408 			/* Topic Name */
2409 			lws_strncpy((char *)p, unsub->topic[n].name,
2410 				    strlen(unsub->topic[n].name) + 1);
2411 			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2412 						 (int)strlen(unsub->topic[n].name)))
2413 				return 1;
2414 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2415 		}
2416 		break;
2417 
2418 	default:
2419 		return 1;
2420 	}
2421 
2422 	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2423 					lws_ptr_diff(p, start))
2424 		return 1;
2425 
2426 	wsi->mqtt->inside_unsubscribe = 1;
2427 
2428 	wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout;
2429 	__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2430 			    &wsi->mqtt->sul_unsuback_wait,
2431 			    3 * LWS_USEC_PER_SEC);
2432 
2433 	return 0;
2434 }
2435 
2436 /*
2437  * This is called when child streams bind to an already-existing and compatible
2438  * MQTT stream
2439  */
2440 
2441 struct lws *
lws_wsi_mqtt_adopt(struct lws * parent_wsi,struct lws * wsi)2442 lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
2443 {
2444 	/* no more children allowed by parent? */
2445 
2446 	if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
2447 		lwsl_err("%s: reached concurrent stream limit\n", __func__);
2448 		return NULL;
2449 	}
2450 
2451 #if defined(LWS_WITH_CLIENT)
2452 	wsi->client_mux_substream = 1;
2453 #endif
2454 
2455 	lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
2456 
2457 	if (lws_ensure_user_space(wsi))
2458 		goto bail1;
2459 
2460 	lws_mqtt_set_client_established(wsi);
2461 	lws_callback_on_writable(wsi);
2462 
2463 	return wsi;
2464 
2465 bail1:
2466 	/* undo the insert */
2467 	parent_wsi->mux.child_list = wsi->mux.sibling_list;
2468 	parent_wsi->mux.child_count--;
2469 
2470 	if (wsi->user_space)
2471 		lws_free_set_NULL(wsi->user_space);
2472 
2473 	wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
2474 	lws_free(wsi);
2475 
2476 	return NULL;
2477 }
2478 
2479