• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  *
24  * MQTT v5
25  *
26  * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
27  *
28  * Control Packet structure
29  *
30  *  - Always:           2+ byte:  Fixed Hdr
31  *  - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
32  *  - Required in some: variable: Payload
33  *
34  * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
35  *
36  *  - Client Identifier
37  *  - Will Properties
38  *  - Will Topic
39  *  - Will Payload
40  *  - User Name
41  *  - Password
42  */
43 
44 #include "private-lib-core.h"
45 /* #include "lws-mqtt.h" */
46 
47 #include <string.h>
48 #include <sys/types.h>
49 #include <fcntl.h>
50 #include <assert.h>
51 
52 typedef enum {
53 	LMQPRS_AWAITING_CONNECT,
54 
55 } lws_mqtt_protocol_server_connstate_t;
56 
57 const char * const reason_names_g1[] = {
58 	"Success / Normal disconnection / QoS0",
59 	"QoS1",
60 	"QoS2",
61 	"Disconnect Will",
62 	"No matching subscriber",
63 	"No subscription existed",
64 	"Continue authentication",
65 	"Re-authenticate"
66 };
67 
68 const char * const reason_names_g2[] = {
69 	"Unspecified error",
70 	"Malformed packet",
71 	"Protocol error",
72 	"Implementation specific error",
73 	"Unsupported protocol",
74 	"Client ID invalid",
75 	"Bad credentials",
76 	"Not Authorized",
77 	"Server Unavailable",
78 	"Server Busy",
79 	"Banned",
80 	"Server Shutting Down",
81 	"Bad Authentication Method",
82 	"Keepalive Timeout",
83 	"Session taken over",
84 	"Topic Filter Invalid",
85 	"Packet ID in use",
86 	"Packet ID not found",
87 	"Max RX Exceeded",
88 	"Topic Alias Invalid",
89 	"Packet too large",
90 	"Ratelimit",
91 	"Quota Exceeded",
92 	"Administrative Action",
93 	"Payload format invalid",
94 	"Retain not supported",
95 	"QoS not supported",
96 	"Use another server",
97 	"Server Moved",
98 	"Shared subscriptions not supported",
99 	"Connection rate exceeded",
100 	"Maximum Connect Time",
101 	"Subscription IDs not supported",
102 	"Wildcard subscriptions not supported"
103 };
104 
105 #define LMQCP_WILL_PROPERTIES 0
106 
107 /* For each property, a bitmap describing which commands it is valid for */
108 
109 static const uint16_t property_valid[] = {
110 	[LMQPROP_PAYLOAD_FORMAT_INDICATOR]	= (1 << LMQCP_PUBLISH) |
111 						  (1 << LMQCP_WILL_PROPERTIES),
112 	[LMQPROP_MESSAGE_EXPIRY_INTERVAL]	= (1 << LMQCP_PUBLISH) |
113 						  (1 << LMQCP_WILL_PROPERTIES),
114 	[LMQPROP_CONTENT_TYPE]			= (1 << LMQCP_PUBLISH) |
115 						  (1 << LMQCP_WILL_PROPERTIES),
116 	[LMQPROP_RESPONSE_TOPIC]		= (1 << LMQCP_PUBLISH) |
117 						  (1 << LMQCP_WILL_PROPERTIES),
118 	[LMQPROP_CORRELATION_DATA]		= (1 << LMQCP_PUBLISH) |
119 						  (1 << LMQCP_WILL_PROPERTIES),
120 	[LMQPROP_SUBSCRIPTION_IDENTIFIER]	= (1 << LMQCP_PUBLISH) |
121 						  (1 << LMQCP_CTOS_SUBSCRIBE),
122 	[LMQPROP_SESSION_EXPIRY_INTERVAL]	= (1 << LMQCP_CTOS_CONNECT) |
123 						  (1 << LMQCP_STOC_CONNACK) |
124 						  (1 << LMQCP_DISCONNECT),
125 	[LMQPROP_ASSIGNED_CLIENT_IDENTIFIER]	= (1 << LMQCP_STOC_CONNACK),
126 	[LMQPROP_SERVER_KEEP_ALIVE]		= (1 << LMQCP_STOC_CONNACK),
127 	[LMQPROP_AUTHENTICATION_METHOD]		= (1 << LMQCP_CTOS_CONNECT) |
128 						  (1 << LMQCP_STOC_CONNACK) |
129 						  (1 << LMQCP_AUTH),
130 	[LMQPROP_AUTHENTICATION_DATA]		= (1 << LMQCP_CTOS_CONNECT) |
131 						  (1 << LMQCP_STOC_CONNACK) |
132 						  (1 << LMQCP_AUTH),
133 	[LMQPROP_REQUEST_PROBLEM_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
134 	[LMQPROP_WILL_DELAY_INTERVAL]		= (1 << LMQCP_WILL_PROPERTIES),
135 	[LMQPROP_REQUEST_RESPONSE_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
136 	[LMQPROP_RESPONSE_INFORMATION]		= (1 << LMQCP_STOC_CONNACK),
137 	[LMQPROP_SERVER_REFERENCE]		= (1 << LMQCP_STOC_CONNACK) |
138 						  (1 << LMQCP_DISCONNECT),
139 	[LMQPROP_REASON_STRING]			= (1 << LMQCP_STOC_CONNACK) |
140 						  (1 << LMQCP_PUBACK) |
141 						  (1 << LMQCP_PUBREC) |
142 						  (1 << LMQCP_PUBREL) |
143 						  (1 << LMQCP_PUBCOMP) |
144 						  (1 << LMQCP_STOC_SUBACK) |
145 						  (1 << LMQCP_STOC_UNSUBACK) |
146 						  (1 << LMQCP_DISCONNECT) |
147 						  (1 << LMQCP_AUTH),
148 	[LMQPROP_RECEIVE_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
149 						  (1 << LMQCP_STOC_CONNACK),
150 	[LMQPROP_TOPIC_ALIAS_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
151 						  (1 << LMQCP_STOC_CONNACK),
152 	[LMQPROP_TOPIC_ALIAS]			= (1 << LMQCP_PUBLISH),
153 	[LMQPROP_MAXIMUM_QOS]			= (1 << LMQCP_STOC_CONNACK),
154 	[LMQPROP_RETAIN_AVAILABLE]		= (1 << LMQCP_STOC_CONNACK),
155 	[LMQPROP_USER_PROPERTY]			= (1 << LMQCP_CTOS_CONNECT) |
156 						  (1 << LMQCP_STOC_CONNACK) |
157 						  (1 << LMQCP_PUBLISH) |
158 						  (1 << LMQCP_WILL_PROPERTIES) |
159 						  (1 << LMQCP_PUBACK) |
160 						  (1 << LMQCP_PUBREC) |
161 						  (1 << LMQCP_PUBREL) |
162 						  (1 << LMQCP_PUBCOMP) |
163 						  (1 << LMQCP_CTOS_SUBSCRIBE) |
164 						  (1 << LMQCP_STOC_SUBACK) |
165 						  (1 << LMQCP_CTOS_UNSUBSCRIBE) |
166 						  (1 << LMQCP_STOC_UNSUBACK) |
167 						  (1 << LMQCP_DISCONNECT) |
168 						  (1 << LMQCP_AUTH),
169 	[LMQPROP_MAXIMUM_PACKET_SIZE]		= (1 << LMQCP_CTOS_CONNECT) |
170 						  (1 << LMQCP_STOC_CONNACK),
171 	[LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
172 	[LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
173 	[LMQPROP_SHARED_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK)
174 };
175 
176 
177 /*
178  * For each command index, maps flags, id, qos and payload legality
179  * notice in most cases PUBLISH requires further processing
180  */
181 static const uint8_t map_flags[] = {
182 	[LMQCP_RESERVED]		= 0x00,
183 	[LMQCP_CTOS_CONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
184 					  LMQCP_LUT_FLAG_PAYLOAD |
185 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
186 	[LMQCP_STOC_CONNACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
187 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
188 	[LMQCP_PUBLISH]			= LMQCP_LUT_FLAG_PAYLOAD | /* option */
189 					  LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
190 	[LMQCP_PUBACK]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
191 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
192 	[LMQCP_PUBREC]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
193 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
194 	[LMQCP_PUBREL]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
195 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
196 	[LMQCP_PUBCOMP]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
197 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
198 	[LMQCP_CTOS_SUBSCRIBE]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
199 					  LMQCP_LUT_FLAG_PAYLOAD |
200 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
201 	[LMQCP_STOC_SUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
202 					  LMQCP_LUT_FLAG_PAYLOAD |
203 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
204 	[LMQCP_CTOS_UNSUBSCRIBE]	= LMQCP_LUT_FLAG_RESERVED_FLAGS |
205 					  LMQCP_LUT_FLAG_PAYLOAD |
206 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
207 	[LMQCP_STOC_UNSUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
208 					  LMQCP_LUT_FLAG_PAYLOAD |
209 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
210 	[LMQCP_CTOS_PINGREQ]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
211 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
212 	[LMQCP_STOC_PINGRESP]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
213 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
214 	[LMQCP_DISCONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
215 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
216 	[LMQCP_AUTH]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
217 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
218 };
219 
220 void
lws_mqttc_state_transition(lws_mqttc_t * c,lwsgs_mqtt_states_t s)221 lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
222 {
223 	lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
224 	c->estate = s;
225 }
226 
227 static int
lws_mqtt_pconsume(lws_mqtt_parser_t * par,int consumed)228 lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
229 {
230 	par->consumed += consumed;
231 
232 	if (par->consumed > par->props_len)
233 		return -1;
234 
235 	/* more properties coming */
236 
237 	if (par->consumed < par->props_len) {
238 		par->state = LMQCPP_PROP_ID_VBI;
239 		return 0;
240 	}
241 
242 	/* properties finished: are we headed for payload or idle? */
243 
244 	if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
245 		/* A PUBLISH packet MUST NOT contain a Packet Identifier if
246 		 * its QoS value is set to 0 [MQTT-2.2.1-2]. */
247 	    (ctl_pkt_type(par) != LMQCP_PUBLISH ||
248 	     (par->packet_type_flags & 6))) {
249 		par->state = LMQCPP_PAYLOAD;
250 		return 0;
251 	}
252 
253 	par->state = LMQCPP_IDLE;
254 
255 	return 0;
256 }
257 
258 static int
lws_mqtt_set_client_established(struct lws * wsi)259 lws_mqtt_set_client_established(struct lws *wsi)
260 {
261 	lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
262 			    &role_ops_mqtt);
263 
264 	if (user_callback_handle_rxflow(wsi->protocol->callback,
265 					wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
266 					wsi->user_space, NULL, 0) < 0) {
267 		lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
268 
269 		return -1;
270 	}
271 	/*
272 	 * If we made a new connection and got the ACK, our connection is
273 	 * definitely working in both directions at the moment
274 	 */
275 	lws_validity_confirmed(wsi);
276 
277 	/* clear connection timeout */
278 	lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
279 
280 	return 0;
281 }
282 
283 lws_mqtt_subs_t *
lws_mqtt_find_sub(struct _lws_mqtt_related * mqtt,const char * topic)284 lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic)
285 {
286 	lws_mqtt_subs_t *s = mqtt->subs_head;
287 
288 	while (s) {
289 		if (!strcmp((const char *)s->topic, topic))
290 			return s;
291 		s = s->next;
292 	}
293 
294 	return NULL;
295 }
296 
297 static lws_mqtt_subs_t *
lws_mqtt_create_sub(struct _lws_mqtt_related * mqtt,const char * topic)298 lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
299 {
300 	lws_mqtt_subs_t *mysub;
301 
302 	mysub = lws_malloc(sizeof(*mysub) + strlen(topic) + 1, "sub");
303 	if (!mysub)
304 		return NULL;
305 
306 	mysub->next = mqtt->subs_head;
307 	mqtt->subs_head = mysub;
308 	memcpy(mysub->topic, topic, strlen(topic) + 1);
309 	mysub->ref_count = 1;
310 
311 	lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
312 		  __func__, mysub, mqtt);
313 
314 	return mysub;
315 }
316 
317 static int
lws_mqtt_client_remove_subs(struct _lws_mqtt_related * mqtt)318 lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
319 {
320 	lws_mqtt_subs_t *s = mqtt->subs_head;
321 	lws_mqtt_subs_t *temp = NULL;
322 
323 
324 	lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
325 		  __func__, mqtt);
326 
327 	while (s && s->next) {
328 		if (s->next->ref_count == 0)
329 			break;
330 		s = s->next;
331 	}
332 
333 	if (s && s->next) {
334 		temp = s->next;
335 		lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
336 			  __func__, temp, mqtt);
337 		s->next = temp->next;
338 		lws_free(temp);
339 		return 0;
340 	}
341 	return 1;
342 }
343 
344 int
_lws_mqtt_rx_parser(struct lws * wsi,lws_mqtt_parser_t * par,const uint8_t * buf,size_t len)345 _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
346 		    const uint8_t *buf, size_t len)
347 {
348 	struct lws *w;
349 	int n;
350 
351 	if (par->flag_pending_send_reason_close)
352 		return 0;
353 
354 	/*
355 	 * Stateful, fragmentation-immune parser
356 	 *
357 	 * Notice that len can always be 1 if under attack, even over tls if
358 	 * the server is compromised or malicious.
359 	 */
360 
361 	while (len) {
362 		lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
363 		switch (par->state) {
364 		case LMQCPP_IDLE:
365 			par->packet_type_flags = *buf++;
366 			len--;
367 
368 #if defined(LWS_WITH_CLIENT)
369 			/*
370 			 * The case where we sent the connect, but we received
371 			 * something else before any CONNACK
372 			 */
373 			if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
374 			    par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
375 				lwsl_notice("%s: server sent non-CONNACK\n",
376 						__func__);
377 				goto send_protocol_error_and_close;
378 			}
379 #endif /* LWS_WITH_CLIENT */
380 
381 			n = map_flags[par->packet_type_flags >> 4];
382 			/*
383 			 *  Where a flag bit is marked as “Reserved”, it is
384 			 *  reserved for future use and MUST be set to the value
385 			 *  listed [MQTT-2.1.3-1].
386 			 */
387 			if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
388 			    ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
389 				lwsl_notice("%s: wsi %p: bad flags, 0x%02x mask 0x%02x (len %d)\n",
390 						__func__, wsi, par->packet_type_flags, n, (int)len + 1);
391 				lwsl_hexdump_err(buf - 1, len + 1);
392 				goto send_protocol_error_and_close;
393 			}
394 
395 			lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
396 				   __func__, par->packet_type_flags >> 4,
397 				   par->packet_type_flags & 0xf);
398 
399 			/* allows us to know if a property that can only be
400 			 * given once, appears twice */
401 			memset(par->props_seen, 0, sizeof(par->props_seen));
402 			par->state = par->packet_type_flags & 0xf0;
403 			break;
404 
405 		case LMQCPP_CONNECT_PACKET:
406 			lwsl_debug("%s: received CONNECT pkt\n", __func__);
407 			par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
408 			lws_mqtt_vbi_init(&par->vbit);
409 			break;
410 
411 		case LMQCPP_CONNECT_REMAINING_LEN_VBI:
412 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
413 			case LMSPR_NEED_MORE:
414 				break;
415 			case LMSPR_COMPLETED:
416 				par->cpkt_remlen = par->vbit.value;
417 				n = map_flags[ctl_pkt_type(par)];
418 				lws_mqtt_str_init(&par->s_temp, par->temp,
419 						  sizeof(par->temp), 0);
420 				par->state = LMQCPP_CONNECT_VH_PNAME;
421 				break;
422 			default:
423 				lwsl_notice("%s: bad vbi\n", __func__);
424 				goto send_protocol_error_and_close;
425 			}
426 			break;
427 
428 		case LMQCPP_CONNECT_VH_PNAME:
429 			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
430 			case LMSPR_NEED_MORE:
431 				break;
432 			case LMSPR_COMPLETED:
433 				if (par->s_temp.len != 4 ||
434 				    memcmp(par->s_temp.buf, "MQTT",
435 					   par->s_temp.len)) {
436 					lwsl_notice("%s: protocol name: %.*s\n",
437 						  __func__, par->s_temp.len,
438 						  par->s_temp.buf);
439 					goto send_unsupp_connack_and_close;
440 				}
441 				par->state = LMQCPP_CONNECT_VH_PVERSION;
442 				break;
443 			default:
444 				lwsl_notice("%s: bad protocol name\n", __func__);
445 				goto send_protocol_error_and_close;
446 			}
447 			break;
448 
449 		case LMQCPP_CONNECT_VH_PVERSION:
450 			par->conn_protocol_version = *buf++;
451 			len--;
452 			if (par->conn_protocol_version != 5) {
453 				lwsl_info("%s: unsupported MQTT version %d\n",
454 					  __func__, par->conn_protocol_version);
455 				goto send_unsupp_connack_and_close;
456 			}
457 			par->state = LMQCPP_CONNECT_VH_FLAGS;
458 			break;
459 
460 		case LMQCPP_CONNECT_VH_FLAGS:
461 			par->cpkt_flags = *buf++;
462 			len--;
463 			if (par->cpkt_flags & 1) {
464 				/*
465 				 * The Server MUST validate that the reserved
466 				 * flag in the CONNECT packet is set to 0
467 				 * [MQTT-3.1.2-3].
468 				 */
469 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
470 				goto send_reason_and_close;
471 			}
472 			/*
473 			 * conn_flags specifies the Will Properties that should
474 			 * appear in the payload section
475 			 */
476 			lws_mqtt_2byte_init(&par->vbit);
477 			par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
478 			break;
479 
480 		case LMQCPP_CONNECT_VH_KEEPALIVE:
481 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
482 			case LMSPR_NEED_MORE:
483 				break;
484 			case LMSPR_COMPLETED:
485 				par->keepalive = (uint16_t)par->vbit.value;
486 				lws_mqtt_vbi_init(&par->vbit);
487 				par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
488 				break;
489 			default:
490 				lwsl_notice("%s: ka bad vbi\n", __func__);
491 				goto send_protocol_error_and_close;
492 			}
493 			break;
494 
495 		case LMQCPP_PINGRESP_ZERO:
496 			len--;
497 			/* second byte of PINGRESP must be zero */
498 			if (*buf++)
499 				goto send_protocol_error_and_close;
500 			goto cmd_completion;
501 
502 		case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
503 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
504 			case LMSPR_NEED_MORE:
505 				break;
506 			case LMSPR_COMPLETED:
507 				/* reset consumption counter */
508 				par->consumed = 0;
509 				par->props_len = par->vbit.value;
510 				lws_mqtt_vbi_init(&par->vbit);
511 				par->state = LMQCPP_PROP_ID_VBI;
512 				break;
513 			default:
514 				lwsl_notice("%s: connpr bad vbi\n", __func__);
515 				goto send_protocol_error_and_close;
516 			}
517 			break;
518 
519 		case LMQCPP_PUBLISH_PACKET:
520 			if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
521 				lwsl_notice("%s: Topic rx before subscribing\n",
522 					    __func__);
523 				goto send_protocol_error_and_close;
524 			}
525 			lwsl_info("%s: received PUBLISH pkt\n", __func__);
526 			par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
527 			lws_mqtt_vbi_init(&par->vbit);
528 			break;
529 		case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
530 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
531 			case LMSPR_NEED_MORE:
532 				break;
533 			case LMSPR_COMPLETED:
534 				par->cpkt_remlen = par->vbit.value;
535 				lwsl_debug("%s: PUBLISH pkt len = %d\n",
536 					   __func__, (int)par->cpkt_remlen);
537 				/* Move on to PUBLISH's variable header */
538 				par->state = LMQCPP_PUBLISH_VH_TOPIC;
539 				break;
540 			default:
541 				lwsl_notice("%s: pubrem bad vbi\n", __func__);
542 				goto send_protocol_error_and_close;
543 			}
544 			break;
545 
546 		case LMQCPP_PUBLISH_VH_TOPIC:
547 		{
548 			lws_mqtt_publish_param_t *pub = NULL;
549 
550 			if (len < 2) {
551 				lwsl_notice("%s: topic too short\n", __func__);
552 				return -1;
553 			}
554 
555 			/* Topic len */
556 			par->n = lws_ser_ru16be(buf);
557 			buf += 2;
558 			len -= 2;
559 
560 			if (len < par->n) {/* the way this is written... */
561 				lwsl_notice("%s: len breakage\n", __func__);
562 				return -1;
563 			}
564 
565 			/* Invalid topic len */
566 			if (par->n == 0) {
567 				lwsl_notice("%s: zero topic len\n", __func__);
568 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
569 				goto send_reason_and_close;
570 			}
571 			lwsl_debug("%s: PUBLISH topic len %d\n",
572 				   __func__, (int)par->n);
573 			assert(!wsi->mqtt->rx_cpkt_param);
574 			wsi->mqtt->rx_cpkt_param = lws_zalloc(
575 				sizeof(lws_mqtt_publish_param_t), "rx pub param");
576 			if (!wsi->mqtt->rx_cpkt_param)
577 				goto oom;
578 			pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
579 
580 			pub->topic_len = par->n;
581 
582 			/* Topic Name */
583 			pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
584 							"rx publish topic");
585 			if (!pub->topic)
586 				goto oom;
587 			lws_strncpy(pub->topic, (const char *)buf,
588 				    (size_t)pub->topic_len + 1);
589 			buf += pub->topic_len;
590 			len -= pub->topic_len;
591 
592 			/* Extract QoS Level from Fixed Header Flags */
593 			pub->qos = (lws_mqtt_qos_levels_t)
594 					((par->packet_type_flags >> 1) & 0x3);
595 
596 			pub->payload_pos = 0;
597 
598 			pub->payload_len = par->cpkt_remlen -
599 				(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
600 
601 			switch (pub->qos) {
602 			case QOS0:
603 				par->state = LMQCPP_PAYLOAD;
604 				if (pub->payload_len == 0)
605 					goto cmd_completion;
606 
607 				break;
608 			case QOS1:
609 			case QOS2:
610 				par->state = LMQCPP_PUBLISH_VH_PKT_ID;
611 				break;
612 			default:
613 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
614 				lws_free_set_NULL(pub->topic);
615 				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
616 				goto send_reason_and_close;
617 			}
618 			break;
619 		}
620 		case LMQCPP_PUBLISH_VH_PKT_ID:
621 		{
622 			lws_mqtt_publish_param_t *pub =
623 				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
624 
625 			if (len < 2) {
626 				lwsl_notice("%s: len breakage 2\n", __func__);
627 				return -1;
628 			}
629 
630 			par->cpkt_id = lws_ser_ru16be(buf);
631 			buf += 2;
632 			len -= 2;
633 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
634 			lwsl_debug("%s: Packet ID %d\n",
635 					__func__, (int)par->cpkt_id);
636 			par->state = LMQCPP_PAYLOAD;
637 			pub->payload_pos = 0;
638 			pub->payload_len = par->cpkt_remlen -
639 				(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
640 			if (pub->payload_len == 0)
641 				goto cmd_completion;
642 
643 			break;
644 		}
645 		case LMQCPP_PAYLOAD:
646 		{
647 			lws_mqtt_publish_param_t *pub =
648 				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
649 			if (pub == NULL) {
650 				lwsl_err("%s: Uninitialized pub_param\n",
651 						__func__);
652 				goto send_protocol_error_and_close;
653 			}
654 
655 			pub->payload = buf;
656 			goto cmd_completion;
657 		}
658 
659 		case LMQCPP_CONNACK_PACKET:
660 			if (!lwsi_role_client(wsi)) {
661 				lwsl_err("%s: CONNACK is only Server to Client",
662 						__func__);
663 				goto send_unsupp_connack_and_close;
664 			}
665 
666 			lwsl_debug("%s: received CONNACK pkt\n", __func__);
667 			lws_mqtt_vbi_init(&par->vbit);
668 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
669 			case LMSPR_NEED_MORE:
670 				break;
671 			case LMSPR_COMPLETED:
672 				par->cpkt_remlen = par->vbit.value;
673 				lwsl_debug("%s: CONNACK pkt len = %d\n",
674 					   __func__, (int)par->cpkt_remlen);
675 				if (par->cpkt_remlen != 2)
676 					goto send_protocol_error_and_close;
677 
678 				par->state = LMQCPP_CONNACK_VH_FLAGS;
679 				break;
680 			default:
681 				lwsl_notice("%s: connack bad vbi\n", __func__);
682 				goto send_protocol_error_and_close;
683 			}
684 			break;
685 
686 		case LMQCPP_CONNACK_VH_FLAGS:
687 		{
688 			lws_mqttc_t *c = &wsi->mqtt->client;
689 			par->cpkt_flags = *buf++;
690 			len--;
691 
692 			if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
693 				/*
694 				 * Byte 1 is the "Connect Acknowledge
695 				 * Flags". Bits 7-1 are reserved and
696 				 * MUST be set to 0.
697 				 */
698 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
699 				goto send_reason_and_close;
700 			}
701 			/*
702 			 * If the Server accepts a connection with
703 			 * CleanSession set to 1, the Server MUST set
704 			 * Session Present to 0 in the CONNACK packet
705 			 * in addition to setting a zero return code
706 			 * in the CONNACK packet [MQTT-3.2.2-1]. If
707 			 * the Server accepts a connection with
708 			 * CleanSession set to 0, the value set in
709 			 * Session Present depends on whether the
710 			 * Server already has stored Session state for
711 			 * the supplied client ID. If the Server has
712 			 * stored Session state, it MUST set
713 			 * SessionPresent to 1 in the CONNACK packet
714 			 * [MQTT-3.2.2-2]. If the Server does not have
715 			 * stored Session state, it MUST set Session
716 			 * Present to 0 in the CONNACK packet. This is
717 			 * in addition to setting a zero return code
718 			 * in the CONNACK packet [MQTT-3.2.2-3].
719 			 */
720 			if ((c->conn_flags & LMQCFT_CLEAN_START) &&
721 			    (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
722 				goto send_protocol_error_and_close;
723 
724 			wsi->mqtt->session_resumed = (par->cpkt_flags &
725 						      LMQCFT_SESSION_PRESENT);
726 
727 			/* Move on to Connect Return Code */
728 			par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
729 			break;
730 		}
731 		case LMQCPP_CONNACK_VH_RETURN_CODE:
732 			par->conn_rc = *buf++;
733 			len--;
734 			/*
735 			 * If a server sends a CONNACK packet containing a
736 			 * non-zero return code it MUST then close the Network
737 			 * Connection [MQTT-3.2.2-5]
738 			 */
739 			switch (par->conn_rc) {
740 			case 0:
741 				goto cmd_completion;
742 			case 1:
743 			case 2:
744 			case 3:
745 			case 4:
746 			case 5:
747 				par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
748 						par->conn_rc - 1;
749 				goto send_reason_and_close;
750 			default:
751 				lwsl_notice("%s: bad connack retcode\n", __func__);
752 				goto send_protocol_error_and_close;
753 			}
754 			break;
755 
756 		/* SUBACK */
757 		case LMQCPP_SUBACK_PACKET:
758 			if (!lwsi_role_client(wsi)) {
759 				lwsl_err("%s: SUBACK is only Server to Client",
760 						__func__);
761 				goto send_unsupp_connack_and_close;
762 			}
763 
764 			lwsl_debug("%s: received SUBACK pkt\n", __func__);
765 			lws_mqtt_vbi_init(&par->vbit);
766 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
767 			case LMSPR_NEED_MORE:
768 				break;
769 			case LMSPR_COMPLETED:
770 				par->cpkt_remlen = par->vbit.value;
771 				lwsl_debug("%s: SUBACK pkt len = %d\n",
772 					   __func__, (int)par->cpkt_remlen);
773 				if (par->cpkt_remlen <= 2)
774 					goto send_protocol_error_and_close;
775 				par->state = LMQCPP_SUBACK_VH_PKT_ID;
776 				break;
777 			default:
778 				lwsl_notice("%s: suback bad vbi\n", __func__);
779 				goto send_protocol_error_and_close;
780 			}
781 			break;
782 
783 		case LMQCPP_SUBACK_VH_PKT_ID:
784 
785 			if (len < 2) {
786 				lwsl_notice("%s: len breakage 4\n", __func__);
787 				return -1;
788 			}
789 
790 			par->cpkt_id = lws_ser_ru16be(buf);
791 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
792 			buf += 2;
793 			len -= 2;
794 			par->cpkt_remlen -= 2;
795 			par->n = 0;
796 			par->state = LMQCPP_SUBACK_PAYLOAD;
797 			*par->temp = 0;
798 			break;
799 
800 		case LMQCPP_SUBACK_PAYLOAD:
801 		{
802 			lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
803 
804 			len--;
805 			switch (qos) {
806 				case QOS0:
807 				case QOS1:
808 				case QOS2:
809 					break;
810 				case FAILURE_QOS_LEVEL:
811 					goto send_protocol_error_and_close;
812 
813 				default:
814 					par->reason = LMQCP_REASON_MALFORMED_PACKET;
815 					goto send_reason_and_close;
816 			}
817 
818 			if (++(par->n) == par->cpkt_remlen) {
819 				par->n = 0;
820 				goto cmd_completion;
821 			}
822 
823 			break;
824 		}
825 
826 		/* UNSUBACK */
827 		case LMQCPP_UNSUBACK_PACKET:
828 			if (!lwsi_role_client(wsi)) {
829 				lwsl_err("%s: UNSUBACK is only Server to Client",
830 						__func__);
831 				goto send_unsupp_connack_and_close;
832 			}
833 
834 			lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
835 			lws_mqtt_vbi_init(&par->vbit);
836 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
837 			case LMSPR_NEED_MORE:
838 				break;
839 			case LMSPR_COMPLETED:
840 				par->cpkt_remlen = par->vbit.value;
841 				lwsl_debug("%s: UNSUBACK pkt len = %d\n",
842 					   __func__, (int)par->cpkt_remlen);
843 				if (par->cpkt_remlen < 2)
844 					goto send_protocol_error_and_close;
845 				par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
846 				break;
847 			default:
848 				lwsl_notice("%s: unsuback bad vbi\n", __func__);
849 				goto send_protocol_error_and_close;
850 			}
851 			break;
852 
853 		case LMQCPP_UNSUBACK_VH_PKT_ID:
854 
855 			if (len < 2) {
856 				lwsl_notice("%s: len breakage 3\n", __func__);
857 				return -1;
858 			}
859 
860 			par->cpkt_id = lws_ser_ru16be(buf);
861 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
862 			buf += 2;
863 			len -= 2;
864 			par->cpkt_remlen -= 2;
865 			par->n = 0;
866 
867 			goto cmd_completion;
868 
869 		case LMQCPP_PUBACK_PACKET:
870 			lws_mqtt_vbi_init(&par->vbit);
871 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
872 			case LMSPR_NEED_MORE:
873 				break;
874 			case LMSPR_COMPLETED:
875 				par->cpkt_remlen = par->vbit.value;
876 				lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
877 					  (int)par->cpkt_remlen);
878 				/*
879 				 * must be 4 or more, with special case that 2
880 				 * means success with no reason code or props
881 				 */
882 				if (par->cpkt_remlen <= 1 ||
883 				    par->cpkt_remlen == 3)
884 					goto send_protocol_error_and_close;
885 
886 				par->state = LMQCPP_PUBACK_VH_PKT_ID;
887 				par->fixed_seen[2] = par->fixed_seen[3] = 0;
888 				par->fixed = 0;
889 				par->n = 0;
890 				break;
891 			default:
892 				lwsl_notice("%s: puback bad vbi\n", __func__);
893 				goto send_protocol_error_and_close;
894 			}
895 			break;
896 
897 		case LMQCPP_PUBACK_VH_PKT_ID:
898 			/*
899 			 * There are 3 fixed bytes and then a VBI for the
900 			 * property section length
901 			 */
902 			par->fixed_seen[par->fixed++] = *buf++;
903 			if (len < par->cpkt_remlen - par->n) {
904 				lwsl_notice("%s: len breakage 4\n", __func__);
905 				return -1;
906 			}
907 			len--;
908 			par->n++;
909 			if (par->fixed == 2)
910 				par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
911 
912 			if (par->fixed == 3) {
913 				lws_mqtt_vbi_init(&par->vbit);
914 				par->props_consumed = 0;
915 				par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
916 			}
917 			/* length of 2 is truncated packet and we completed it */
918 			if (par->cpkt_remlen == par->fixed)
919 				goto cmd_completion;
920 			break;
921 
922 		case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
923 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
924 			case LMSPR_NEED_MORE:
925 				break;
926 			case LMSPR_COMPLETED:
927 				par->props_len = par->vbit.value;
928 				lwsl_info("%s: PUBACK props len = %d\n",
929 					  __func__, (int)par->cpkt_remlen);
930 				/*
931 				 * If there are no properties, this is a
932 				 * command completion event in itself
933 				 */
934 				if (!par->props_len)
935 					goto cmd_completion;
936 
937 				/*
938 				 * Otherwise consume the properties before
939 				 * completing the command
940 				 */
941 				lws_mqtt_vbi_init(&par->vbit);
942 				par->state = LMQCPP_PUBACK_VH_PKT_ID;
943 				break;
944 			default:
945 				lwsl_notice("%s: puback pr bad vbi\n", __func__);
946 				goto send_protocol_error_and_close;
947 			}
948 			break;
949 
950 		case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
951 			/*
952 			 * TODO: stash the props
953 			 */
954 			par->props_consumed++;
955 			len--;
956 			buf++;
957 			if (par->props_len != par->props_consumed)
958 				break;
959 
960 cmd_completion:
961 			/*
962 			 * We come here when we understood we just processed
963 			 * the last byte of a command packet, regardless of the
964 			 * packet type
965 			 */
966 			par->state = LMQCPP_IDLE;
967 
968 			switch (par->packet_type_flags >> 4) {
969 			case LMQCP_STOC_CONNACK:
970 				lwsl_info("%s: cmd_completion: CONNACK\n",
971 					  __func__);
972 
973 				/*
974 				 * Getting the CONNACK means we are the first,
975 				 * the nwsi, and we succeeded to create a new
976 				 * network connection ourselves.
977 				 *
978 				 * Since others may join us sharing the nwsi,
979 				 * and we may close while they still want to use
980 				 * it, our wsi lifecycle alone can no longer
981 				 * define the lifecycle of the nwsi... it means
982 				 * we need to do a "magic trick" and instead of
983 				 * being both the nwsi and act like a child
984 				 * stream, create a new wsi to take over the
985 				 * nwsi duties and turn our wsi into a child of
986 				 * the nwsi with its own lifecycle.
987 				 *
988 				 * The nwsi gets a mostly empty wsi->nwsi used
989 				 * to track already-subscribed topics globally
990 				 * for the connection.
991 				 */
992 
993 				/* we were under SENT_CLIENT_HANDSHAKE timeout */
994 				lws_set_timeout(wsi, 0, 0);
995 
996 				w = lws_create_new_server_wsi(wsi->vhost,
997 							      wsi->tsi);
998 				if (!w) {
999 					lwsl_notice("%s: sid 1 migrate failed\n",
1000 							__func__);
1001 					return -1;
1002 				}
1003 
1004 				wsi->mux.highest_sid = 1;
1005 				lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
1006 
1007 				wsi->mux_substream = 1;
1008 				w->mux_substream = 1;
1009 				w->client_mux_substream = 1;
1010 				wsi->client_mux_migrated = 1;
1011 				wsi->told_user_closed = 1; /* don't tell nwsi closed */
1012 
1013 				lwsi_set_state(w, LRS_ESTABLISHED);
1014 				lwsi_set_state(wsi, LRS_ESTABLISHED);
1015 				lwsi_set_role(w, lwsi_role(wsi));
1016 
1017 #if defined(LWS_WITH_CLIENT)
1018 				w->flags = wsi->flags;
1019 #endif
1020 
1021 				w->mqtt = wsi->mqtt;
1022 				wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
1023 				if (!wsi->mqtt)
1024 					return -1;
1025 				w->mqtt->wsi = w;
1026 				w->protocol = wsi->protocol;
1027 				if (w->user_space &&
1028 				    !w->user_space_externally_allocated)
1029 					lws_free_set_NULL(w->user_space);
1030 				w->user_space = wsi->user_space;
1031 				wsi->user_space = NULL;
1032 				w->user_space_externally_allocated =
1033 					wsi->user_space_externally_allocated;
1034 				if (lws_ensure_user_space(w))
1035 					goto bail1;
1036 				w->opaque_user_data = wsi->opaque_user_data;
1037 				wsi->opaque_user_data = NULL;
1038 				w->stash = wsi->stash;
1039 				wsi->stash = NULL;
1040 
1041 				lws_mux_mark_immortal(w);
1042 
1043 				lwsl_notice("%s: migrated nwsi %p to sid 1 %p\n",
1044 						__func__, wsi, w);
1045 
1046 			#if defined(LWS_WITH_SERVER_STATUS)
1047 				wsi->vhost->conn_stats.h2_subs++;
1048 			#endif
1049 
1050 				/*
1051 				 * It was the last thing we were waiting for
1052 				 * before we can be fully ESTABLISHED
1053 				 */
1054 				if (lws_mqtt_set_client_established(w)) {
1055 					lwsl_notice("%s: set EST fail\n", __func__);
1056 					return -1;
1057 				}
1058 
1059 				/* get the ball rolling */
1060 				lws_validity_confirmed(wsi);
1061 
1062 				/* well, add the queued guys as children */
1063 				lws_wsi_mux_apply_queue(wsi);
1064 				break;
1065 
1066 bail1:
1067 				/* undo the insert */
1068 				wsi->mux.child_list = w->mux.sibling_list;
1069 				wsi->mux.child_count--;
1070 
1071 				w->context->count_wsi_allocated--;
1072 
1073 				if (w->user_space)
1074 					lws_free_set_NULL(w->user_space);
1075 				w->vhost->protocols[0].callback(w,
1076 							LWS_CALLBACK_WSI_DESTROY,
1077 							NULL, NULL, 0);
1078 				lws_vhost_unbind_wsi(w);
1079 				lws_free(w);
1080 
1081 				return 0;
1082 
1083 			case LMQCP_PUBACK:
1084 				lwsl_info("%s: cmd_completion: PUBACK\n",
1085 						__func__);
1086 
1087 				/*
1088 				 * Figure out which child asked for this
1089 				 */
1090 
1091 				n = 0;
1092 				lws_start_foreach_ll(struct lws *, w,
1093 						      wsi->mux.child_list) {
1094 					if (w->mqtt->unacked_publish &&
1095 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1096 						char requested_close = 0;
1097 
1098 						w->mqtt->unacked_publish = 0;
1099 						if (user_callback_handle_rxflow(
1100 							    w->protocol->callback,
1101 							    w, LWS_CALLBACK_MQTT_ACK,
1102 							    w->user_space, NULL, 0) < 0) {
1103 							lwsl_info("%s: MQTT_ACK requests close\n",
1104 								 __func__);
1105 							requested_close = 1;
1106 						}
1107 						n = 1;
1108 
1109 						/*
1110 						 * We got an assertive PUBACK,
1111 						 * no need for ACK timeout wait
1112 						 * any more
1113 						 */
1114 						lws_sul_schedule(lws_get_context(w), 0,
1115 							&w->mqtt->sul_qos1_puback_wait, NULL,
1116 							LWS_SET_TIMER_USEC_CANCEL);
1117 
1118 						if (requested_close) {
1119 							__lws_close_free_wsi(w,
1120 								0, "ack cb");
1121 							break;
1122 						}
1123 
1124 						break;
1125 					}
1126 				} lws_end_foreach_ll(w, mux.sibling_list);
1127 
1128 				if (!n) {
1129 					lwsl_err("%s: unsolicited PUBACK\n",
1130 							__func__);
1131 					return -1;
1132 				}
1133 
1134 				/*
1135 				 * If we published something and it was acked,
1136 				 * our connection is definitely working in both
1137 				 * directions at the moment.
1138 				 */
1139 				lws_validity_confirmed(wsi);
1140 				break;
1141 
1142 			case LMQCP_STOC_PINGRESP:
1143 				lwsl_info("%s: cmd_completion: PINGRESP\n",
1144 						__func__);
1145 				/*
1146 				 * If we asked for a PINGRESP and it came,
1147 				 * our connection is definitely working in both
1148 				 * directions at the moment.
1149 				 */
1150 				lws_validity_confirmed(wsi);
1151 				break;
1152 
1153 			case LMQCP_STOC_SUBACK:
1154 				lwsl_info("%s: cmd_completion: SUBACK\n",
1155 						__func__);
1156 
1157 				/*
1158 				 * Figure out which child asked for this
1159 				 */
1160 
1161 				n = 0;
1162 				lws_start_foreach_ll(struct lws *, w,
1163 						      wsi->mux.child_list) {
1164 					if (w->mqtt->inside_subscribe &&
1165 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1166 						w->mqtt->inside_subscribe = 0;
1167 						if (user_callback_handle_rxflow(
1168 							    w->protocol->callback,
1169 							    w, LWS_CALLBACK_MQTT_SUBSCRIBED,
1170 							    w->user_space, NULL, 0) < 0) {
1171 							lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1172 								 __func__);
1173 							return -1;
1174 						}
1175 						n = 1;
1176 						break;
1177 					}
1178 				} lws_end_foreach_ll(w, mux.sibling_list);
1179 
1180 				if (!n) {
1181 					lwsl_err("%s: unsolicited SUBACK\n",
1182 							__func__);
1183 					return -1;
1184 				}
1185 
1186 				/*
1187 				 * If we subscribed to something and SUBACK came,
1188 				 * our connection is definitely working in both
1189 				 * directions at the moment.
1190 				 */
1191 				lws_validity_confirmed(wsi);
1192 
1193 				break;
1194 
1195 			case LMQCP_STOC_UNSUBACK:
1196 			{
1197 				char requested_close = 0;
1198 				lwsl_info("%s: cmd_completion: UNSUBACK\n",
1199 						__func__);
1200 				/*
1201 				 * Figure out which child asked for this
1202 				 */
1203 				n = 0;
1204 				lws_start_foreach_ll(struct lws *, w,
1205 						      wsi->mux.child_list) {
1206 					if (w->mqtt->inside_unsubscribe &&
1207 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1208 						struct lws *nwsi = lws_get_network_wsi(w);
1209 
1210 						/*
1211 						 * No more subscribers left,
1212 						 * remove the topic from nwsi
1213 						 */
1214 						lws_mqtt_client_remove_subs(nwsi->mqtt);
1215 
1216 						w->mqtt->inside_unsubscribe = 0;
1217 						if (user_callback_handle_rxflow(
1218 							    w->protocol->callback,
1219 							    w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1220 							    w->user_space, NULL, 0) < 0) {
1221 							lwsl_info("%s: MQTT_UNSUBACK requests close\n",
1222 								 __func__);
1223 							requested_close = 1;
1224 						}
1225 						n = 1;
1226 
1227 						if (requested_close) {
1228 							__lws_close_free_wsi(w,
1229 									     0, "unsub ack cb");
1230 							break;
1231 						}
1232 						break;
1233 					}
1234 				} lws_end_foreach_ll(w, mux.sibling_list);
1235 
1236 				if (!n) {
1237 					lwsl_err("%s: unsolicited UNSUBACK\n",
1238 							__func__);
1239 					return -1;
1240 				}
1241 
1242 
1243 				/*
1244 				 * If we unsubscribed to something and
1245 				 * UNSUBACK came, our connection is
1246 				 * definitely working in both
1247 				 * directions at the moment.
1248 				 */
1249 				lws_validity_confirmed(wsi);
1250 
1251 				break;
1252 			}
1253 			case LMQCP_PUBLISH:
1254 			{
1255 				lws_mqtt_publish_param_t *pub =
1256 						(lws_mqtt_publish_param_t *)
1257 							wsi->mqtt->rx_cpkt_param;
1258 				size_t chunk;
1259 
1260 				if (pub == NULL) {
1261 					lwsl_notice("%s: no pub\n", __func__);
1262 					return -1;
1263 				}
1264 
1265 				/*
1266 				 * RX PUBLISH is delivered to any children that
1267 				 * registered for the related topic
1268 				 */
1269 
1270 				n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
1271 
1272 				chunk = pub->payload_len - pub->payload_pos;
1273 				if (chunk > len)
1274 					chunk = len;
1275 
1276 				lws_start_foreach_ll(struct lws *, w,
1277 						      wsi->mux.child_list) {
1278 					if (lws_mqtt_find_sub(w->mqtt,
1279 							      pub->topic))
1280 						if (w->protocol->callback(
1281 							    w, n,
1282 							    w->user_space,
1283 							    (void *)pub,
1284 							    chunk))
1285 							return 1;
1286 				} lws_end_foreach_ll(w, mux.sibling_list);
1287 
1288 
1289 				pub->payload_pos += (uint32_t)chunk;
1290 				len -= chunk;
1291 				buf += chunk;
1292 
1293 				lwsl_debug("%s: post pos %d, plen %d, len %d\n",
1294 					    __func__, (int)pub->payload_pos,
1295 					    (int)pub->payload_len, (int)len);
1296 
1297 				if (pub->payload_pos != pub->payload_len) {
1298 					/*
1299 					 * More chunks of the payload pending,
1300 					 * blocking this connection from doing
1301 					 * anything else
1302 					 */
1303 					par->state = LMQCPP_PAYLOAD;
1304 					break;
1305 				}
1306 
1307 				/* For QOS>0, send out PUBACK */
1308 				if (pub->qos) {
1309 					wsi->mqtt->send_puback = 1;
1310 					lws_callback_on_writable(wsi);
1311 				}
1312 
1313 				par->payload_consumed = 0;
1314 				lws_free_set_NULL(pub->topic);
1315 				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1316 
1317 				break;
1318 			}
1319 			default:
1320 				break;
1321 			}
1322 
1323 			break;
1324 
1325 
1326 		case LMQCPP_PROP_ID_VBI:
1327 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1328 			case LMSPR_NEED_MORE:
1329 				break;
1330 			case LMSPR_COMPLETED:
1331 				par->consumed += par->vbit.consumed;
1332 				if (par->vbit.value >
1333 				    LWS_ARRAY_SIZE(property_valid)) {
1334 					lwsl_notice("%s: undef prop id 0x%x\n",
1335 						  __func__, (int)par->vbit.value);
1336 					goto send_protocol_error_and_close;
1337 				}
1338 				if (!(property_valid[par->vbit.value] &
1339 					(1 << ctl_pkt_type(par)))) {
1340 					lwsl_notice("%s: prop id 0x%x invalid for"
1341 						  " control pkt %d\n", __func__,
1342 						  (int)par->vbit.value,
1343 						  ctl_pkt_type(par));
1344 					goto send_protocol_error_and_close;
1345 				}
1346 				par->prop_id = par->vbit.value;
1347 				par->flag_prop_multi =
1348 					par->props_seen[par->prop_id >> 3] &
1349 					(1 << (par->prop_id & 7));
1350 				par->props_seen[par->prop_id >> 3] |=
1351 						(1 << (par->prop_id & 7));
1352 				/*
1353 				 *  even if it's not a vbi property arg,
1354 				 * .consumed of this will be zero the first time
1355 				 */
1356 				lws_mqtt_vbi_init(&par->vbit);
1357 				/*
1358 				 * if it's a string, next state must set the
1359 				 * destination and size limit itself.  But
1360 				 * resetting it generically here lets it use
1361 				 * lws_mqtt_str_first() to understand it's the
1362 				 * first time around.
1363 				 */
1364 				 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
1365 
1366 				/* property arg state enums are so encoded */
1367 				par->state = 0x100 | par->vbit.value;
1368 				break;
1369 			default:
1370 				lwsl_notice("%s: prop id bad vbi\n", __func__);
1371 				goto send_protocol_error_and_close;
1372 			}
1373 			break;
1374 
1375 		/*
1376 		 * All possible property payloads... restricting which ones
1377 		 * can appear in which control packets is already done above
1378 		 * in LMQCPP_PROP_ID_VBI
1379 		 */
1380 
1381 		case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
1382 		case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
1383 		case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
1384 		case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
1385 		case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
1386 		case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
1387 		case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
1388 		case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
1389 			if (par->flag_prop_multi)
1390 				goto singular_prop_seen_twice;
1391 			par->payload_format = *buf++;
1392 			len--;
1393 			if (lws_mqtt_pconsume(par, 1))
1394 				goto send_protocol_error_and_close;
1395 			break;
1396 
1397 		case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
1398 		case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
1399 		case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
1400 		case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
1401 			if (par->flag_prop_multi)
1402 				goto singular_prop_seen_twice;
1403 
1404 			if (lws_mqtt_mb_first(&par->vbit))
1405 				lws_mqtt_4byte_init(&par->vbit);
1406 
1407 			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1408 			case LMSPR_NEED_MORE:
1409 				break;
1410 			case LMSPR_COMPLETED:
1411 				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1412 					goto send_protocol_error_and_close;
1413 				break;
1414 			default:
1415 				goto send_protocol_error_and_close;
1416 			}
1417 			break;
1418 
1419 		case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
1420 		case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
1421 		case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
1422 		case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
1423 			if (par->flag_prop_multi)
1424 				goto singular_prop_seen_twice;
1425 
1426 			if (lws_mqtt_mb_first(&par->vbit))
1427 				lws_mqtt_2byte_init(&par->vbit);
1428 
1429 			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1430 			case LMSPR_NEED_MORE:
1431 				break;
1432 			case LMSPR_COMPLETED:
1433 				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1434 					goto send_protocol_error_and_close;
1435 				break;
1436 			default:
1437 				goto send_protocol_error_and_close;
1438 			}
1439 			break;
1440 
1441 		case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
1442 		case LMQCPP_PROP_AUTH_METHOD_UTF8S:
1443 		case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
1444 		case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
1445 		case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
1446 		case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
1447 		case LMQCPP_PROP_REASON_STRING_UTF8S:
1448 		case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
1449 		case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
1450 			if (par->flag_prop_multi)
1451 				goto singular_prop_seen_twice;
1452 
1453 			if (lws_mqtt_str_first(&par->s_temp))
1454 				lws_mqtt_str_init(&par->s_temp, par->temp,
1455 						  sizeof(par->temp), 0);
1456 
1457 			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
1458 			case LMSPR_NEED_MORE:
1459 				break;
1460 			case LMSPR_COMPLETED:
1461 				if (lws_mqtt_pconsume(par, par->s_temp.len))
1462 					goto send_protocol_error_and_close;
1463 				break;
1464 
1465 			default:
1466 				lwsl_info("%s: bad protocol name\n", __func__);
1467 				goto send_protocol_error_and_close;
1468 			}
1469 			break;
1470 
1471 		case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
1472 
1473 		case LMQCPP_PROP_CORRELATION_BINDATA:
1474 		case LMQCPP_PROP_AUTH_DATA_BINDATA:
1475 
1476 		/* TODO */
1477 			lwsl_err("%s: Unimplemented packet state 0x%x\n",
1478 					__func__, par->state);
1479 			return -1;
1480 		}
1481 	}
1482 
1483 	return 0;
1484 
1485 oom:
1486 	lwsl_err("%s: OOM!\n", __func__);
1487 	goto send_protocol_error_and_close;
1488 
1489 singular_prop_seen_twice:
1490 	lwsl_info("%s: property appears twice\n", __func__);
1491 
1492 send_protocol_error_and_close:
1493 	lwsl_notice("%s: peac\n", __func__);
1494 	par->reason = LMQCP_REASON_PROTOCOL_ERROR;
1495 
1496 send_reason_and_close:
1497 	lwsl_notice("%s: srac\n", __func__);
1498 	par->flag_pending_send_reason_close = 1;
1499 	goto ask;
1500 
1501 send_unsupp_connack_and_close:
1502 	lwsl_notice("%s: unsupac\n", __func__);
1503 	par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
1504 	par->flag_pending_send_connack_close = 1;
1505 
1506 ask:
1507 	/* Should we ask for clients? */
1508 	lws_callback_on_writable(wsi);
1509 
1510 	return -1;
1511 }
1512 
1513 int
lws_mqtt_fill_fixed_header(uint8_t * p,lws_mqtt_control_packet_t ctrl_pkt_type,uint8_t dup,lws_mqtt_qos_levels_t qos,uint8_t retain)1514 lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
1515 			   uint8_t dup, lws_mqtt_qos_levels_t qos,
1516 			   uint8_t retain)
1517 {
1518 	lws_mqtt_fixed_hdr_t hdr;
1519 
1520 	hdr.bits = 0;
1521 	hdr.flags.ctrl_pkt_type = (uint8_t) ctrl_pkt_type;
1522 
1523 	switch(ctrl_pkt_type) {
1524 	case LMQCP_PUBLISH:
1525 		hdr.flags.dup = !!dup;
1526 		/*
1527 		 * A PUBLISH Packet MUST NOT have both QoS bits set to
1528 		 * 1. If a Server or Client receives a PUBLISH Packet
1529 		 * which has both QoS bits set to 1 it MUST close the
1530 		 * Network Connection [MQTT-3.3.1-4].
1531 		 */
1532 		if (qos >= RESERVED_QOS_LEVEL) {
1533 			lwsl_err("%s: Unsupport QoS level 0x%x\n",
1534 				 __func__, qos);
1535 			return -1;
1536 		}
1537 		hdr.flags.qos = (uint8_t)qos;
1538 		hdr.flags.retain = !!retain;
1539 		break;
1540 
1541 	case LMQCP_CTOS_CONNECT:
1542 	case LMQCP_STOC_CONNACK:
1543 	case LMQCP_PUBACK:
1544 	case LMQCP_PUBREC:
1545 	case LMQCP_PUBCOMP:
1546 	case LMQCP_STOC_SUBACK:
1547 	case LMQCP_STOC_UNSUBACK:
1548 	case LMQCP_CTOS_PINGREQ:
1549 	case LMQCP_STOC_PINGRESP:
1550 	case LMQCP_DISCONNECT:
1551 	case LMQCP_AUTH:
1552 		hdr.bits &= 0xf0;
1553 		break;
1554 
1555 	/*
1556 	 * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
1557 	 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
1558 	 * MUST be set to 0,0,1 and 0 respectively. The Server MUST
1559 	 * treat any other value as malformed and close the Network
1560 	 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
1561 	 */
1562 	case LMQCP_PUBREL:
1563 	case LMQCP_CTOS_SUBSCRIBE:
1564 	case LMQCP_CTOS_UNSUBSCRIBE:
1565 		hdr.bits |= 0x02;
1566 		break;
1567 
1568 	default:
1569 		return -1;
1570 	}
1571 
1572 	*p = hdr.bits;
1573 
1574 	return 0;
1575 }
1576 
1577 /*
1578  * This fires if the wsi did a PUBLISH under QoS1, but no PUBACK came before
1579  * the timeout period
1580  */
1581 
1582 static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list * sul)1583 lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
1584 {
1585 	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
1586 			struct _lws_mqtt_related, sul_qos1_puback_wait);
1587 
1588 	lwsl_notice("%s: wsi %p\n", __func__, mqtt->wsi);
1589 
1590 	if (mqtt->wsi->protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
1591 				mqtt->wsi->user_space, NULL, 0))
1592 		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
1593 }
1594 
1595 int
lws_mqtt_client_send_publish(struct lws * wsi,lws_mqtt_publish_param_t * pub,const void * buf,uint32_t len,int is_complete)1596 lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
1597 			     const void *buf, uint32_t len, int is_complete)
1598 {
1599 	struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
1600 	uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
1601 	struct lws *nwsi = lws_get_network_wsi(wsi);
1602 	lws_mqtt_str_t mqtt_vh_payload;
1603 	uint32_t vh_len, rem_len;
1604 
1605 	assert(pub->topic);
1606 
1607 	lwsl_debug("%s: len = %d, is_complete = %d\n",
1608 		   __func__, (int)len, (int)is_complete);
1609 
1610 	if (lwsi_state(wsi) != LRS_ESTABLISHED) {
1611 		lwsl_err("%s: wsi %p: unknown state 0x%x\n", __func__, wsi,
1612 			 lwsi_state(wsi));
1613 		assert(0);
1614 		return 1;
1615 	}
1616 
1617 	if (wsi->mqtt->inside_payload) {
1618 		/*
1619 		 * Headers are filled, we are sending
1620 		 * the payload - a buffer with LWS_PRE
1621 		 * in front it.
1622 		 */
1623 		start = (uint8_t *)buf;
1624 		p = start + len;
1625 		if (is_complete)
1626 			wsi->mqtt->inside_payload = 0;
1627 		goto do_write;
1628 	}
1629 
1630 	start = b + LWS_PRE;
1631 	p = start;
1632 	/*
1633 	 * Fill headers and the first chunk of the
1634 	 * payload (if any)
1635 	 */
1636 	if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
1637 				       0, pub->qos, 0)) {
1638 		lwsl_err("%s: Failed to fill fixed header\n", __func__);
1639 		return 1;
1640 	}
1641 
1642 	/*
1643 	 * Topic len field + Topic len + Packet ID
1644 	 * (for QOS>0) + Payload len
1645 	 */
1646 	vh_len = 2 + pub->topic_len + ((pub->qos) ? 2 : 0);
1647 	rem_len = vh_len + pub->payload_len;
1648 	lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
1649 
1650 	/* Will the chunk of payload fit? */
1651 	if ((vh_len + len) >=
1652 	    (wsi->context->pt_serv_buf_size - LWS_PRE)) {
1653 		lwsl_err("%s: Payload is too big\n", __func__);
1654 		return 1;
1655 	}
1656 
1657 	p += lws_mqtt_vbi_encode(rem_len, p);
1658 
1659 	/* Topic's Len */
1660 	lws_ser_wu16be(p, pub->topic_len);
1661 	p += 2;
1662 
1663 	/*
1664 	 * Init lws_mqtt_str for "MQTT Variable
1665 	 * Headers + payload" (only the supplied
1666 	 * chuncked payload)
1667 	 */
1668 	lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
1669 			  (pub->topic_len + ((pub->qos) ? 2 : 0) + len),
1670 			  0);
1671 
1672 	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1673 	lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
1674 	if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
1675 		lwsl_err("%s: a\n", __func__);
1676 		return 1;
1677 	}
1678 
1679 	/* Packet ID */
1680 	if (pub->qos != QOS0) {
1681 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1682 		wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id;
1683 		lwsl_debug("%s: pkt_id = %d\n", __func__,
1684 			   (int)wsi->mqtt->ack_pkt_id);
1685 		lws_ser_wu16be(p, pub->packet_id);
1686 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
1687 			lwsl_err("%s: b\n", __func__);
1688 			return 1;
1689 		}
1690 	}
1691 	/*
1692 	 * A non-empty Payload is expected and a chunk
1693 	 * is present
1694 	 */
1695 	if (pub->payload_len && len) {
1696 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1697 		memcpy(p, buf, len);
1698 		if (lws_mqtt_str_advance(&mqtt_vh_payload, len))
1699 			return 1;
1700 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1701 	}
1702 
1703 	if (!is_complete)
1704 		nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
1705 
1706 do_write:
1707 
1708 	// lwsl_hexdump_err(start, lws_ptr_diff(p, start));
1709 
1710 	if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
1711 			lws_ptr_diff(p, start)) {
1712 		lwsl_err("%s: write failed\n", __func__);
1713 		return 1;
1714 	}
1715 
1716 	if (!is_complete) {
1717 		/* still some more chunks to come... */
1718 		lws_callback_on_writable(wsi);
1719 
1720 		return 0;
1721 	}
1722 
1723 	wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
1724 
1725 	if (pub->qos != QOS0)
1726 		wsi->mqtt->unacked_publish = 1;
1727 
1728 	/* this was the last part of the publish message */
1729 
1730 	if (pub->qos == QOS0) {
1731 		/*
1732 		 * There won't be any real PUBACK, act like we got one
1733 		 * so the user callback logic is the same for QoS0 or
1734 		 * QoS1
1735 		 */
1736 		if (wsi->protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
1737 					    wsi->user_space, NULL, 0)) {
1738 			lwsl_err("%s: ACK callback exited\n", __func__);
1739 			return 1;
1740 		}
1741 
1742 		return 0;
1743 	}
1744 
1745 	/* For QoS1, if no PUBACK coming after 3s, we must RETRY the publish */
1746 
1747 	wsi->mqtt->sul_qos1_puback_wait.cb = lws_mqtt_publish_resend;
1748 	__lws_sul_insert(&pt->pt_sul_owner, &wsi->mqtt->sul_qos1_puback_wait,
1749 			 3 * LWS_USEC_PER_SEC);
1750 
1751 	return 0;
1752 }
1753 
1754 int
lws_mqtt_client_send_subcribe(struct lws * wsi,lws_mqtt_subscribe_param_t * sub)1755 lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
1756 {
1757 	struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
1758 	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
1759 	struct lws *nwsi = lws_get_network_wsi(wsi);
1760 	lws_mqtt_str_t mqtt_vh_payload;
1761 	uint8_t exists[8], extant;
1762 	lws_mqtt_subs_t *mysub;
1763 	uint32_t rem_len;
1764 #if defined(_DEBUG)
1765 	uint32_t tops;
1766 #endif
1767 	uint32_t n;
1768 
1769 	assert(sub->num_topics);
1770 	assert(sub->num_topics < sizeof(exists));
1771 
1772 	switch (lwsi_state(wsi)) {
1773 	case LRS_ESTABLISHED: /* Protocol connection established */
1774 		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
1775 					       0, 0, 0)) {
1776 			lwsl_err("%s: Failed to fill fixed header\n", __func__);
1777 			return 1;
1778 		}
1779 
1780 		/*
1781 		 * The stream wants to subscribe to one or more topic, but
1782 		 * the shared nwsi may already be subscribed to some or all of
1783 		 * them from interactions with other streams.  For those cases,
1784 		 * we filter them from the list the child wants until we just
1785 		 * have ones that are new to the nwsi.  If nothing left, we just
1786 		 * synthesize the callback to the child as if SUBACK had come
1787 		 * and we're done, otherwise just ask the server for topics that
1788 		 * are new to the wsi.
1789 		 */
1790 
1791 		extant = 0;
1792 		memset(&exists, 0, sizeof(exists));
1793 		for (n = 0; n < sub->num_topics; n++) {
1794 			lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
1795 				  __func__, (int)n, sub->topic[n].name);
1796 
1797 			mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
1798 			if (mysub && mysub->ref_count) {
1799 				mysub->ref_count++; /* another stream using it */
1800 				exists[n] = 1;
1801 				extant++;
1802 			}
1803 
1804 			/*
1805 			 * Attach the topic we're subscribing to, to wsi->mqtt
1806 			 */
1807 			if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
1808 				lwsl_err("%s: create sub fail\n", __func__);
1809 				return 1;
1810 			}
1811 		}
1812 
1813 		if (extant == sub->num_topics) {
1814 			/*
1815 			 * It turns out there's nothing to do here, the nwsi has
1816 			 * already subscribed to all the topics this stream
1817 			 * wanted.  Just tell it it can have them.
1818 			 */
1819 			lwsl_notice("%s: all topics already subscribed\n", __func__);
1820 			if (user_callback_handle_rxflow(
1821 				    wsi->protocol->callback,
1822 				    wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
1823 				    wsi->user_space, NULL, 0) < 0) {
1824 				lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1825 					 __func__);
1826 				return -1;
1827 			}
1828 
1829 			return 0;
1830 		}
1831 
1832 #if defined(_DEBUG)
1833 		/*
1834 		 * zero or more of the topics already existed, but not all,
1835 		 * so we must go to the server with a filtered list of the
1836 		 * new ones only
1837 		 */
1838 
1839 		tops = sub->num_topics - extant;
1840 #endif
1841 
1842 		/*
1843 		 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
1844 		 */
1845 		rem_len = 2;
1846 		for (n = 0; n < sub->num_topics; n++)
1847 			if (!exists[n])
1848 				rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
1849 
1850 		wsi->mqtt->sub_size = rem_len;
1851 
1852 #if defined(_DEBUG)
1853 		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
1854 			   __func__, (int)tops, (int)rem_len);
1855 #endif
1856 
1857 		p += lws_mqtt_vbi_encode(rem_len, p);
1858 
1859 		if ((rem_len + lws_ptr_diff(p, start)) >=
1860 					       wsi->context->pt_serv_buf_size) {
1861 			lwsl_err("%s: Payload is too big\n", __func__);
1862 			return 1;
1863 		}
1864 
1865 		/* Init lws_mqtt_str */
1866 		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
1867 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1868 
1869 		/* Packet ID */
1870 		wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
1871 		lwsl_debug("%s: pkt_id = %d\n", __func__,
1872 			   (int)wsi->mqtt->ack_pkt_id);
1873 		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
1874 
1875 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
1876 			return 1;
1877 
1878 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1879 
1880 		for (n = 0; n < sub->num_topics; n++) {
1881 			lwsl_info("%s: topics[%d] = %s\n", __func__,
1882 				   (int)n, sub->topic[n].name);
1883 
1884 			/* if the nwsi already has it, don't ask server for it */
1885 			if (exists[n]) {
1886 				lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
1887 					    __func__, (int)n, sub->topic[n].name);
1888 				continue;
1889 			}
1890 
1891 			/*
1892 			 * Attach the topic we're subscribing to, to nwsi->mqtt
1893 			 * so we know the nwsi itself has a subscription to it
1894 			 */
1895 
1896 			if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
1897 				return 1;
1898 
1899 			/* Topic's Len */
1900 			lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
1901 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
1902 				return 1;
1903 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1904 
1905 			/* Topic Name */
1906 			lws_strncpy((char *)p, sub->topic[n].name,
1907 				    strlen(sub->topic[n].name) + 1);
1908 			if (lws_mqtt_str_advance(&mqtt_vh_payload,
1909 						 (int)strlen(sub->topic[n].name)))
1910 				return 1;
1911 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1912 
1913 			/* QoS */
1914 			*p = sub->topic[n].qos;
1915 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
1916 				return 1;
1917 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1918 		}
1919 		break;
1920 
1921 	default:
1922 		return 1;
1923 	}
1924 
1925 	if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
1926 					lws_ptr_diff(p, start))
1927 		return 1;
1928 
1929 	wsi->mqtt->inside_subscribe = 1;
1930 
1931 	return 0;
1932 }
1933 
1934 int
lws_mqtt_client_send_unsubcribe(struct lws * wsi,const lws_mqtt_subscribe_param_t * unsub)1935 lws_mqtt_client_send_unsubcribe(struct lws *wsi,
1936 				const lws_mqtt_subscribe_param_t *unsub)
1937 {
1938 	struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
1939 	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
1940 	struct lws *nwsi = lws_get_network_wsi(wsi);
1941 	lws_mqtt_str_t mqtt_vh_payload;
1942 	uint8_t send_unsub[8], orphaned;
1943 	uint32_t rem_len, n;
1944 	lws_mqtt_subs_t *mysub;
1945 #if defined(_DEBUG)
1946 	uint32_t tops;
1947 #endif
1948 
1949 	lwsl_info("%s: Enter\n", __func__);
1950 
1951 	switch (lwsi_state(wsi)) {
1952 	case LRS_ESTABLISHED: /* Protocol connection established */
1953 		orphaned = 0;
1954 		memset(&send_unsub, 0, sizeof(send_unsub));
1955 		for (n = 0; n < unsub->num_topics; n++) {
1956 			mysub = lws_mqtt_find_sub(nwsi->mqtt,
1957 						  unsub->topic[n].name);
1958 			assert(mysub);
1959 
1960 			if (--mysub->ref_count == 0) {
1961 				lwsl_notice("%s: Need to send UNSUB\n", __func__);
1962 				send_unsub[n] = 1;
1963 				orphaned++;
1964 			}
1965 		}
1966 
1967 		if (!orphaned) {
1968 			/*
1969 			 * The nwsi still has other subscribers bound to the
1970 			 * topics.
1971 			 *
1972 			 * So, don't send UNSUB to server, and just fake the
1973 			 * UNSUB ACK event for the guy going away.
1974 			 */
1975 			lwsl_notice("%s: unsubscribed!\n", __func__);
1976 			if (user_callback_handle_rxflow(
1977 				    wsi->protocol->callback,
1978 				    wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1979 				    wsi->user_space, NULL, 0) < 0) {
1980 				/*
1981 				 * We can't directly close here, because the
1982 				 * caller still has the wsi.  Inform the
1983 				 * caller that we want to close
1984 				 */
1985 
1986 				return 1;
1987 			}
1988 
1989 			return 0;
1990 		}
1991 #if defined(_DEBUG)
1992 		/*
1993 		 * one or more of the topics needs to be unsubscribed
1994 		 * from, so we must go to the server with a filtered
1995 		 * list of the new ones only
1996 		 */
1997 
1998 		tops = orphaned;
1999 #endif
2000 
2001 		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
2002 					       0, 0, 0)) {
2003 			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2004 			return 1;
2005 		}
2006 
2007 		/*
2008 		 * Pid + (Topic len field + Topic len) x Num of Topics
2009 		 */
2010 		rem_len = 2;
2011 		for (n = 0; n < unsub->num_topics; n++)
2012 			if (send_unsub[n])
2013 				rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
2014 
2015 		wsi->mqtt->sub_size = rem_len;
2016 
2017 		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2018 			   __func__, (int)tops, (int)rem_len);
2019 
2020 		p += lws_mqtt_vbi_encode(rem_len, p);
2021 
2022 		if ((rem_len + lws_ptr_diff(p, start)) >=
2023 					       wsi->context->pt_serv_buf_size) {
2024 			lwsl_err("%s: Payload is too big\n", __func__);
2025 			return 1;
2026 		}
2027 
2028 		/* Init lws_mqtt_str */
2029 		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
2030 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2031 
2032 		/* Packet ID */
2033 		wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
2034 		lwsl_debug("%s: pkt_id = %d\n", __func__,
2035 			   (int)wsi->mqtt->ack_pkt_id);
2036 		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2037 
2038 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2039 			return 1;
2040 
2041 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2042 
2043 		for (n = 0; n < unsub->num_topics; n++) {
2044 			lwsl_info("%s: topics[%d] = %s\n", __func__,
2045 				   (int)n, unsub->topic[n].name);
2046 
2047 			/*
2048 			 * Subscriber still bound to it, don't UBSUB
2049 			 * from the server
2050 			 */
2051 			if (!send_unsub[n])
2052 				continue;
2053 
2054 			/* Topic's Len */
2055 			lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
2056 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2057 				return 1;
2058 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2059 
2060 			/* Topic Name */
2061 			lws_strncpy((char *)p, unsub->topic[n].name,
2062 				    strlen(unsub->topic[n].name) + 1);
2063 			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2064 						 (int)strlen(unsub->topic[n].name)))
2065 				return 1;
2066 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2067 		}
2068 		break;
2069 
2070 	default:
2071 		return 1;
2072 	}
2073 
2074 	if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
2075 					lws_ptr_diff(p, start))
2076 		return 1;
2077 
2078 	wsi->mqtt->inside_unsubscribe = 1;
2079 
2080 	return 0;
2081 }
2082 
2083 /*
2084  * This is called when child streams bind to an already-existing and compatible
2085  * MQTT stream
2086  */
2087 
2088 struct lws *
lws_wsi_mqtt_adopt(struct lws * parent_wsi,struct lws * wsi)2089 lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
2090 {
2091 	/* no more children allowed by parent? */
2092 
2093 	if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
2094 		lwsl_err("%s: reached concurrent stream limit\n", __func__);
2095 		return NULL;
2096 	}
2097 
2098 #if defined(LWS_WITH_CLIENT)
2099 	wsi->client_mux_substream = 1;
2100 #endif
2101 
2102 	lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
2103 
2104 	if (lws_ensure_user_space(wsi))
2105 		goto bail1;
2106 
2107 	lws_mqtt_set_client_established(wsi);
2108 	lws_callback_on_writable(wsi);
2109 
2110 #if defined(LWS_WITH_SERVER_STATUS)
2111 	wsi->vhost->conn_stats.mqtt_subs++;
2112 #endif
2113 
2114 	return wsi;
2115 
2116 bail1:
2117 	/* undo the insert */
2118 	parent_wsi->mux.child_list = wsi->mux.sibling_list;
2119 	parent_wsi->mux.child_count--;
2120 
2121 	if (wsi->user_space)
2122 		lws_free_set_NULL(wsi->user_space);
2123 
2124 	wsi->protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
2125 	lws_free(wsi);
2126 
2127 	return NULL;
2128 }
2129 
2130