1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 *
24 * MQTT v5
25 *
26 * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
27 *
28 * Control Packet structure
29 *
30 * - Always: 2+ byte: Fixed Hdr
31 * - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
32 * - Required in some: variable: Payload
33 *
34 * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
35 *
36 * - Client Identifier
37 * - Will Properties
38 * - Will Topic
39 * - Will Payload
40 * - User Name
41 * - Password
42 */
43
44 #include "private-lib-core.h"
45 #include <string.h>
46 #include <sys/types.h>
47 #include <assert.h>
48
49 typedef enum {
50 LMQPRS_AWAITING_CONNECT,
51
52 } lws_mqtt_protocol_server_connstate_t;
53
54 const char * const reason_names_g1[] = {
55 "Success / Normal disconnection / QoS0",
56 "QoS1",
57 "QoS2",
58 "Disconnect Will",
59 "No matching subscriber",
60 "No subscription existed",
61 "Continue authentication",
62 "Re-authenticate"
63 };
64
65 const char * const reason_names_g2[] = {
66 "Unspecified error",
67 "Malformed packet",
68 "Protocol error",
69 "Implementation specific error",
70 "Unsupported protocol",
71 "Client ID invalid",
72 "Bad credentials",
73 "Not Authorized",
74 "Server Unavailable",
75 "Server Busy",
76 "Banned",
77 "Server Shutting Down",
78 "Bad Authentication Method",
79 "Keepalive Timeout",
80 "Session taken over",
81 "Topic Filter Invalid",
82 "Packet ID in use",
83 "Packet ID not found",
84 "Max RX Exceeded",
85 "Topic Alias Invalid",
86 "Packet too large",
87 "Ratelimit",
88 "Quota Exceeded",
89 "Administrative Action",
90 "Payload format invalid",
91 "Retain not supported",
92 "QoS not supported",
93 "Use another server",
94 "Server Moved",
95 "Shared subscriptions not supported",
96 "Connection rate exceeded",
97 "Maximum Connect Time",
98 "Subscription IDs not supported",
99 "Wildcard subscriptions not supported"
100 };
101
102 #define LMQCP_WILL_PROPERTIES 0
103
104 /* For each property, a bitmap describing which commands it is valid for */
105
106 static const uint16_t property_valid[] = {
107 [LMQPROP_PAYLOAD_FORMAT_INDICATOR] = (1 << LMQCP_PUBLISH) |
108 (1 << LMQCP_WILL_PROPERTIES),
109 [LMQPROP_MESSAGE_EXPIRY_INTERVAL] = (1 << LMQCP_PUBLISH) |
110 (1 << LMQCP_WILL_PROPERTIES),
111 [LMQPROP_CONTENT_TYPE] = (1 << LMQCP_PUBLISH) |
112 (1 << LMQCP_WILL_PROPERTIES),
113 [LMQPROP_RESPONSE_TOPIC] = (1 << LMQCP_PUBLISH) |
114 (1 << LMQCP_WILL_PROPERTIES),
115 [LMQPROP_CORRELATION_DATA] = (1 << LMQCP_PUBLISH) |
116 (1 << LMQCP_WILL_PROPERTIES),
117 [LMQPROP_SUBSCRIPTION_IDENTIFIER] = (1 << LMQCP_PUBLISH) |
118 (1 << LMQCP_CTOS_SUBSCRIBE),
119 [LMQPROP_SESSION_EXPIRY_INTERVAL] = (1 << LMQCP_CTOS_CONNECT) |
120 (1 << LMQCP_STOC_CONNACK) |
121 (1 << LMQCP_DISCONNECT),
122 [LMQPROP_ASSIGNED_CLIENT_IDENTIFIER] = (1 << LMQCP_STOC_CONNACK),
123 [LMQPROP_SERVER_KEEP_ALIVE] = (1 << LMQCP_STOC_CONNACK),
124 [LMQPROP_AUTHENTICATION_METHOD] = (1 << LMQCP_CTOS_CONNECT) |
125 (1 << LMQCP_STOC_CONNACK) |
126 (1 << LMQCP_AUTH),
127 [LMQPROP_AUTHENTICATION_DATA] = (1 << LMQCP_CTOS_CONNECT) |
128 (1 << LMQCP_STOC_CONNACK) |
129 (1 << LMQCP_AUTH),
130 [LMQPROP_REQUEST_PROBLEM_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
131 [LMQPROP_WILL_DELAY_INTERVAL] = (1 << LMQCP_WILL_PROPERTIES),
132 [LMQPROP_REQUEST_RESPONSE_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
133 [LMQPROP_RESPONSE_INFORMATION] = (1 << LMQCP_STOC_CONNACK),
134 [LMQPROP_SERVER_REFERENCE] = (1 << LMQCP_STOC_CONNACK) |
135 (1 << LMQCP_DISCONNECT),
136 [LMQPROP_REASON_STRING] = (1 << LMQCP_STOC_CONNACK) |
137 (1 << LMQCP_PUBACK) |
138 (1 << LMQCP_PUBREC) |
139 (1 << LMQCP_PUBREL) |
140 (1 << LMQCP_PUBCOMP) |
141 (1 << LMQCP_STOC_SUBACK) |
142 (1 << LMQCP_STOC_UNSUBACK) |
143 (1 << LMQCP_DISCONNECT) |
144 (1 << LMQCP_AUTH),
145 [LMQPROP_RECEIVE_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
146 (1 << LMQCP_STOC_CONNACK),
147 [LMQPROP_TOPIC_ALIAS_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
148 (1 << LMQCP_STOC_CONNACK),
149 [LMQPROP_TOPIC_ALIAS] = (1 << LMQCP_PUBLISH),
150 [LMQPROP_MAXIMUM_QOS] = (1 << LMQCP_STOC_CONNACK),
151 [LMQPROP_RETAIN_AVAILABLE] = (1 << LMQCP_STOC_CONNACK),
152 [LMQPROP_USER_PROPERTY] = (1 << LMQCP_CTOS_CONNECT) |
153 (1 << LMQCP_STOC_CONNACK) |
154 (1 << LMQCP_PUBLISH) |
155 (1 << LMQCP_WILL_PROPERTIES) |
156 (1 << LMQCP_PUBACK) |
157 (1 << LMQCP_PUBREC) |
158 (1 << LMQCP_PUBREL) |
159 (1 << LMQCP_PUBCOMP) |
160 (1 << LMQCP_CTOS_SUBSCRIBE) |
161 (1 << LMQCP_STOC_SUBACK) |
162 (1 << LMQCP_CTOS_UNSUBSCRIBE) |
163 (1 << LMQCP_STOC_UNSUBACK) |
164 (1 << LMQCP_DISCONNECT) |
165 (1 << LMQCP_AUTH),
166 [LMQPROP_MAXIMUM_PACKET_SIZE] = (1 << LMQCP_CTOS_CONNECT) |
167 (1 << LMQCP_STOC_CONNACK),
168 [LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK),
169 [LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL] = (1 << LMQCP_STOC_CONNACK),
170 [LMQPROP_SHARED_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK)
171 };
172
173
174 /*
175 * For each command index, maps flags, id, qos and payload legality
176 * notice in most cases PUBLISH requires further processing
177 */
178 static const uint8_t map_flags[] = {
179 [LMQCP_RESERVED] = 0x00,
180 [LMQCP_CTOS_CONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
181 LMQCP_LUT_FLAG_PAYLOAD |
182 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
183 [LMQCP_STOC_CONNACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
184 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
185 [LMQCP_PUBLISH] = LMQCP_LUT_FLAG_PAYLOAD | /* option */
186 LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
187 [LMQCP_PUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
188 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
189 [LMQCP_PUBREC] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
190 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
191 [LMQCP_PUBREL] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
192 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
193 [LMQCP_PUBCOMP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
194 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
195 [LMQCP_CTOS_SUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
196 LMQCP_LUT_FLAG_PAYLOAD |
197 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
198 [LMQCP_STOC_SUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
199 LMQCP_LUT_FLAG_PAYLOAD |
200 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
201 [LMQCP_CTOS_UNSUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
202 LMQCP_LUT_FLAG_PAYLOAD |
203 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
204 [LMQCP_STOC_UNSUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
205 LMQCP_LUT_FLAG_PAYLOAD |
206 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
207 [LMQCP_CTOS_PINGREQ] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
208 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
209 [LMQCP_STOC_PINGRESP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
210 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
211 [LMQCP_DISCONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
212 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
213 [LMQCP_AUTH] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
214 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
215 };
216
217 void
lws_mqttc_state_transition(lws_mqttc_t * c,lwsgs_mqtt_states_t s)218 lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
219 {
220 lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
221 c->estate = s;
222 }
223
224 static int
lws_mqtt_pconsume(lws_mqtt_parser_t * par,int consumed)225 lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
226 {
227 par->consumed += (unsigned int)consumed;
228
229 if (par->consumed > par->props_len)
230 return -1;
231
232 /* more properties coming */
233
234 if (par->consumed < par->props_len) {
235 par->state = LMQCPP_PROP_ID_VBI;
236 return 0;
237 }
238
239 /* properties finished: are we headed for payload or idle? */
240
241 if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
242 /* A PUBLISH packet MUST NOT contain a Packet Identifier if
243 * its QoS value is set to 0 [MQTT-2.2.1-2]. */
244 (ctl_pkt_type(par) != LMQCP_PUBLISH ||
245 (par->packet_type_flags & 6))) {
246 par->state = LMQCPP_PAYLOAD;
247 return 0;
248 }
249
250 par->state = LMQCPP_IDLE;
251
252 return 0;
253 }
254
255 static int
lws_mqtt_set_client_established(struct lws * wsi)256 lws_mqtt_set_client_established(struct lws *wsi)
257 {
258 lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
259 &role_ops_mqtt);
260
261 if (user_callback_handle_rxflow(wsi->a.protocol->callback,
262 wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
263 wsi->user_space, NULL, 0) < 0) {
264 lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
265
266 return -1;
267 }
268 /*
269 * If we made a new connection and got the ACK, our connection is
270 * definitely working in both directions at the moment
271 */
272 lws_validity_confirmed(wsi);
273
274 /* clear connection timeout */
275 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
276
277 return 0;
278 }
279
280
281 static lws_mqtt_match_topic_return_t
lws_mqtt_is_topic_matched(const char * sub,const char * pub)282 lws_mqtt_is_topic_matched(const char* sub, const char* pub)
283 {
284 const char *ppos = pub, *spos = sub;
285
286 if (!ppos || !spos) {
287 return LMMTR_TOPIC_MATCH_ERROR;
288 }
289
290 while (*spos) {
291 if (*ppos == '#' || *ppos == '+') {
292 lwsl_err("%s: PUBLISH to wildcard "
293 "topic \"%s\" not supported\n",
294 __func__, pub);
295 return LMMTR_TOPIC_MATCH_ERROR;
296 }
297 /* foo/+/bar == foo/xyz/bar ? */
298 if (*spos == '+') {
299 /* Skip ahead */
300 while (*ppos != '\0' && *ppos != '/') {
301 ppos++;
302 }
303 } else if (*spos == '#') {
304 return LMMTR_TOPIC_MATCH;
305 } else {
306 if (*ppos == '\0') {
307 /* foo/bar == foo/bar/# ? */
308 if (!strncmp(spos, "/#", 2))
309 return LMMTR_TOPIC_MATCH;
310 return LMMTR_TOPIC_NOMATCH;
311 /* Non-matching character */
312 } else if (*ppos != *spos) {
313 return LMMTR_TOPIC_NOMATCH;
314 }
315 ppos++;
316 }
317 spos++;
318 }
319
320 if (*spos == '\0' && *ppos == '\0')
321 return LMMTR_TOPIC_MATCH;
322
323 return LMMTR_TOPIC_NOMATCH;
324 }
325
lws_mqtt_find_sub(struct _lws_mqtt_related * mqtt,const char * ptopic)326 lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
327 const char* ptopic) {
328 lws_mqtt_subs_t *s = mqtt->subs_head;
329
330 while (s) {
331 /* SUB topic == PUB topic ? */
332 /* foo/bar/xyz == foo/bar/xyz ? */
333 if (!s->wildcard) {
334 if (!strcmp((const char*)s->topic, ptopic))
335 return s;
336 } else {
337 if (lws_mqtt_is_topic_matched(
338 s->topic, ptopic) == LMMTR_TOPIC_MATCH)
339 return s;
340 }
341
342 s = s->next;
343 }
344
345 return NULL;
346 }
347
348 static lws_mqtt_validate_topic_return_t
lws_mqtt_validate_topic(const char * topic,size_t topiclen,uint8_t awsiot)349 lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
350 {
351 size_t spos = 0;
352 const char *sub = topic;
353 int8_t slashes = 0;
354 lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
355
356 if (awsiot) {
357 if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN)
358 return LMVTR_FAILED_OVERSIZE;
359 if (topic[0] == '$') {
360 ret = LMVTR_VALID_SHADOW;
361 slashes = -3;
362 }
363 } else {
364 if (topiclen > LWS_MQTT_MAX_TOPICLEN)
365 return LMVTR_FAILED_OVERSIZE;
366 if (topic[0] == '$')
367 return LMVTR_FAILED_WILDCARD_FORMAT;
368 }
369
370 while (*sub != 0) {
371 if (sub[0] == '+') {
372 /* topic == "+foo" || "a/+foo" ? */
373 if (spos > 0 && sub[-1] != '/')
374 return LMVTR_FAILED_WILDCARD_FORMAT;
375
376 /* topic == "foo+" or "foo+/a" ? */
377 if (sub[1] != 0 && sub[1] != '/')
378 return LMVTR_FAILED_WILDCARD_FORMAT;
379
380 ret = LMVTR_VALID_WILDCARD;
381 } else if (sub[0] == '#') {
382 /* topic == "foo#" ? */
383 if (spos > 0 && sub[-1] != '/')
384 return LMVTR_FAILED_WILDCARD_FORMAT;
385
386 /* topic == "#foo" ? */
387 if (sub[1] != 0)
388 return LMVTR_FAILED_WILDCARD_FORMAT;
389
390 ret = LMVTR_VALID_WILDCARD;
391 } else if (sub[0] == '/') {
392 slashes++;
393 }
394 spos++;
395 sub++;
396 }
397
398 if (awsiot && (slashes < 0 || slashes > 7))
399 return LMVTR_FAILED_SHADOW_FORMAT;
400
401 return ret;
402 }
403
404 static lws_mqtt_subs_t *
lws_mqtt_create_sub(struct _lws_mqtt_related * mqtt,const char * topic)405 lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
406 {
407 lws_mqtt_subs_t *mysub;
408 size_t topiclen = strlen(topic);
409 lws_mqtt_validate_topic_return_t flag;
410
411 flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot);
412 switch (flag) {
413 case LMVTR_FAILED_OVERSIZE:
414 lwsl_err("%s: Topic is too long\n",
415 __func__);
416 return NULL;
417 case LMVTR_FAILED_SHADOW_FORMAT:
418 case LMVTR_FAILED_WILDCARD_FORMAT:
419 lwsl_err("%s: Invalid topic format \"%s\"\n",
420 __func__, topic);
421 return NULL;
422
423 case LMVTR_VALID:
424 case LMVTR_VALID_WILDCARD:
425 case LMVTR_VALID_SHADOW:
426 mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub");
427 if (!mysub) {
428 lwsl_err("%s: Error allocating mysub\n",
429 __func__);
430 return NULL;
431 }
432 mysub->wildcard = (flag == LMVTR_VALID_WILDCARD);
433 mysub->shadow = (flag == LMVTR_VALID_SHADOW);
434 break;
435
436 default:
437 lwsl_err("%s: Unknown flag - %d\n",
438 __func__, flag);
439 return NULL;
440 }
441
442 mysub->next = mqtt->subs_head;
443 mqtt->subs_head = mysub;
444 memcpy(mysub->topic, topic, strlen(topic) + 1);
445 mysub->ref_count = 1;
446
447 lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
448 __func__, mysub, mqtt);
449
450 return mysub;
451 }
452
453 static int
lws_mqtt_client_remove_subs(struct _lws_mqtt_related * mqtt)454 lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
455 {
456 lws_mqtt_subs_t *s = mqtt->subs_head;
457 lws_mqtt_subs_t *temp = NULL;
458
459
460 lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
461 __func__, mqtt);
462
463 while (s && s->next) {
464 if (s->next->ref_count == 0)
465 break;
466 s = s->next;
467 }
468
469 if (s && s->next) {
470 temp = s->next;
471 lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
472 __func__, temp, mqtt);
473 s->next = temp->next;
474 lws_free(temp);
475 return 0;
476 }
477 return 1;
478 }
479
480 int
_lws_mqtt_rx_parser(struct lws * wsi,lws_mqtt_parser_t * par,const uint8_t * buf,size_t len)481 _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
482 const uint8_t *buf, size_t len)
483 {
484 struct lws *w;
485 int n;
486
487 if (par->flag_pending_send_reason_close)
488 return 0;
489
490 /*
491 * Stateful, fragmentation-immune parser
492 *
493 * Notice that len can always be 1 if under attack, even over tls if
494 * the server is compromised or malicious.
495 */
496
497 while (len) {
498 lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
499 switch (par->state) {
500 case LMQCPP_IDLE:
501 par->packet_type_flags = *buf++;
502 len--;
503
504 #if defined(LWS_WITH_CLIENT)
505 /*
506 * The case where we sent the connect, but we received
507 * something else before any CONNACK
508 */
509 if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
510 par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
511 lwsl_notice("%s: server sent non-CONNACK\n",
512 __func__);
513 goto send_protocol_error_and_close;
514 }
515 #endif /* LWS_WITH_CLIENT */
516
517 n = map_flags[par->packet_type_flags >> 4];
518 /*
519 * Where a flag bit is marked as “Reserved”, it is
520 * reserved for future use and MUST be set to the value
521 * listed [MQTT-2.1.3-1].
522 */
523 if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
524 ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
525 lwsl_notice("%s: %s: bad flags, 0x%02x mask 0x%02x (len %d)\n",
526 __func__, lws_wsi_tag(wsi),
527 par->packet_type_flags, n, (int)len + 1);
528 lwsl_hexdump_err(buf - 1, len + 1);
529 goto send_protocol_error_and_close;
530 }
531
532 lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
533 __func__, par->packet_type_flags >> 4,
534 par->packet_type_flags & 0xf);
535
536 /* allows us to know if a property that can only be
537 * given once, appears twice */
538 memset(par->props_seen, 0, sizeof(par->props_seen));
539 par->state = par->packet_type_flags & 0xf0;
540 break;
541
542 case LMQCPP_CONNECT_PACKET:
543 lwsl_debug("%s: received CONNECT pkt\n", __func__);
544 par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
545 lws_mqtt_vbi_init(&par->vbit);
546 break;
547
548 case LMQCPP_CONNECT_REMAINING_LEN_VBI:
549 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
550 case LMSPR_NEED_MORE:
551 break;
552 case LMSPR_COMPLETED:
553 par->cpkt_remlen = par->vbit.value;
554 n = map_flags[ctl_pkt_type(par)];
555 lws_mqtt_str_init(&par->s_temp, par->temp,
556 sizeof(par->temp), 0);
557 par->state = LMQCPP_CONNECT_VH_PNAME;
558 break;
559 default:
560 lwsl_notice("%s: bad vbi\n", __func__);
561 goto send_protocol_error_and_close;
562 }
563 break;
564
565 case LMQCPP_CONNECT_VH_PNAME:
566 switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
567 case LMSPR_NEED_MORE:
568 break;
569 case LMSPR_COMPLETED:
570 if (par->s_temp.len != 4 ||
571 memcmp(par->s_temp.buf, "MQTT",
572 par->s_temp.len)) {
573 lwsl_notice("%s: protocol name: %.*s\n",
574 __func__, par->s_temp.len,
575 par->s_temp.buf);
576 goto send_unsupp_connack_and_close;
577 }
578 par->state = LMQCPP_CONNECT_VH_PVERSION;
579 break;
580 default:
581 lwsl_notice("%s: bad protocol name\n", __func__);
582 goto send_protocol_error_and_close;
583 }
584 break;
585
586 case LMQCPP_CONNECT_VH_PVERSION:
587 par->conn_protocol_version = *buf++;
588 len--;
589 if (par->conn_protocol_version != 5) {
590 lwsl_info("%s: unsupported MQTT version %d\n",
591 __func__, par->conn_protocol_version);
592 goto send_unsupp_connack_and_close;
593 }
594 par->state = LMQCPP_CONNECT_VH_FLAGS;
595 break;
596
597 case LMQCPP_CONNECT_VH_FLAGS:
598 par->cpkt_flags = *buf++;
599 len--;
600 if (par->cpkt_flags & 1) {
601 /*
602 * The Server MUST validate that the reserved
603 * flag in the CONNECT packet is set to 0
604 * [MQTT-3.1.2-3].
605 */
606 par->reason = LMQCP_REASON_MALFORMED_PACKET;
607 goto send_reason_and_close;
608 }
609 /*
610 * conn_flags specifies the Will Properties that should
611 * appear in the payload section
612 */
613 lws_mqtt_2byte_init(&par->vbit);
614 par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
615 break;
616
617 case LMQCPP_CONNECT_VH_KEEPALIVE:
618 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
619 case LMSPR_NEED_MORE:
620 break;
621 case LMSPR_COMPLETED:
622 par->keepalive = (uint16_t)par->vbit.value;
623 lws_mqtt_vbi_init(&par->vbit);
624 par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
625 break;
626 default:
627 lwsl_notice("%s: ka bad vbi\n", __func__);
628 goto send_protocol_error_and_close;
629 }
630 break;
631
632 case LMQCPP_PINGRESP_ZERO:
633 len--;
634 /* second byte of PINGRESP must be zero */
635 if (*buf++)
636 goto send_protocol_error_and_close;
637 goto cmd_completion;
638
639 case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
640 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
641 case LMSPR_NEED_MORE:
642 break;
643 case LMSPR_COMPLETED:
644 /* reset consumption counter */
645 par->consumed = 0;
646 par->props_len = par->vbit.value;
647 lws_mqtt_vbi_init(&par->vbit);
648 par->state = LMQCPP_PROP_ID_VBI;
649 break;
650 default:
651 lwsl_notice("%s: connpr bad vbi\n", __func__);
652 goto send_protocol_error_and_close;
653 }
654 break;
655
656 /* PUBREC */
657 case LMQCPP_PUBREC_PACKET:
658 lwsl_debug("%s: received PUBREC pkt\n", __func__);
659 lws_mqtt_vbi_init(&par->vbit);
660 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
661 case LMSPR_NEED_MORE:
662 break;
663 case LMSPR_COMPLETED:
664 par->cpkt_remlen = par->vbit.value;
665 lwsl_debug("%s: PUBREC pkt len = %d\n",
666 __func__, (int)par->cpkt_remlen);
667 if (par->cpkt_remlen < 2)
668 goto send_protocol_error_and_close;
669 par->state = LMQCPP_PUBREC_VH_PKT_ID;
670 break;
671 default:
672 lwsl_notice("%s: pubrec bad vbi\n", __func__);
673 goto send_protocol_error_and_close;
674 }
675 break;
676
677 case LMQCPP_PUBREC_VH_PKT_ID:
678 if (len < 2) {
679 lwsl_notice("%s: len breakage 3\n", __func__);
680 return -1;
681 }
682
683 par->cpkt_id = lws_ser_ru16be(buf);
684 wsi->mqtt->ack_pkt_id = par->cpkt_id;
685 buf += 2;
686 len -= 2;
687 par->cpkt_remlen -= 2;
688 par->n = 0;
689
690 goto cmd_completion;
691
692 /* PUBREL */
693 case LMQCPP_PUBREL_PACKET:
694 lwsl_debug("%s: received PUBREL pkt\n", __func__);
695 lws_mqtt_vbi_init(&par->vbit);
696 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
697 case LMSPR_NEED_MORE:
698 break;
699 case LMSPR_COMPLETED:
700 par->cpkt_remlen = par->vbit.value;
701 lwsl_debug("%s: PUBREL pkt len = %d\n",
702 __func__, (int)par->cpkt_remlen);
703 if (par->cpkt_remlen < 2)
704 goto send_protocol_error_and_close;
705 par->state = LMQCPP_PUBREL_VH_PKT_ID;
706 break;
707 default:
708 lwsl_err("%s: pubrel bad vbi\n", __func__);
709 goto send_protocol_error_and_close;
710 }
711 break;
712
713 case LMQCPP_PUBREL_VH_PKT_ID:
714 if (len < 2) {
715 lwsl_notice("%s: len breakage 3\n", __func__);
716 return -1;
717 }
718
719 par->cpkt_id = lws_ser_ru16be(buf);
720 wsi->mqtt->ack_pkt_id = par->cpkt_id;
721 buf += 2;
722 len -= 2;
723 par->cpkt_remlen -= 2;
724 par->n = 0;
725
726 goto cmd_completion;
727
728 /* PUBCOMP */
729 case LMQCPP_PUBCOMP_PACKET:
730 lwsl_debug("%s: received PUBCOMP pkt\n", __func__);
731 lws_mqtt_vbi_init(&par->vbit);
732 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
733 case LMSPR_NEED_MORE:
734 break;
735 case LMSPR_COMPLETED:
736 par->cpkt_remlen = par->vbit.value;
737 lwsl_debug("%s: PUBCOMP pkt len = %d\n",
738 __func__, (int)par->cpkt_remlen);
739 if (par->cpkt_remlen < 2)
740 goto send_protocol_error_and_close;
741 par->state = LMQCPP_PUBCOMP_VH_PKT_ID;
742 break;
743 default:
744 lwsl_err("%s: pubcmp bad vbi\n", __func__);
745 goto send_protocol_error_and_close;
746 }
747 break;
748
749 case LMQCPP_PUBCOMP_VH_PKT_ID:
750 if (len < 2) {
751 lwsl_notice("%s: len breakage 3\n", __func__);
752 return -1;
753 }
754
755 par->cpkt_id = lws_ser_ru16be(buf);
756 wsi->mqtt->ack_pkt_id = par->cpkt_id;
757 buf += 2;
758 len -= 2;
759 par->cpkt_remlen -= 2;
760 par->n = 0;
761
762 goto cmd_completion;
763
764 case LMQCPP_PUBLISH_PACKET:
765 if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
766 lwsl_notice("%s: Topic rx before subscribing\n",
767 __func__);
768 goto send_protocol_error_and_close;
769 }
770 lwsl_info("%s: received PUBLISH pkt\n", __func__);
771 par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
772 lws_mqtt_vbi_init(&par->vbit);
773 break;
774 case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
775 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
776 case LMSPR_NEED_MORE:
777 break;
778 case LMSPR_COMPLETED:
779 par->cpkt_remlen = par->vbit.value;
780 lwsl_debug("%s: PUBLISH pkt len = %d\n",
781 __func__, (int)par->cpkt_remlen);
782 /* Move on to PUBLISH's variable header */
783 par->state = LMQCPP_PUBLISH_VH_TOPIC;
784 break;
785 default:
786 lwsl_notice("%s: pubrem bad vbi\n", __func__);
787 goto send_protocol_error_and_close;
788 }
789 break;
790
791 case LMQCPP_PUBLISH_VH_TOPIC:
792 {
793 lws_mqtt_publish_param_t *pub = NULL;
794
795 if (len < 2) {
796 lwsl_notice("%s: topic too short\n", __func__);
797 return -1;
798 }
799
800 /* Topic len */
801 par->n = lws_ser_ru16be(buf);
802 buf += 2;
803 len -= 2;
804
805 if (len < par->n) {/* the way this is written... */
806 lwsl_notice("%s: len breakage\n", __func__);
807 return -1;
808 }
809
810 /* Invalid topic len */
811 if (par->n == 0) {
812 lwsl_notice("%s: zero topic len\n", __func__);
813 par->reason = LMQCP_REASON_MALFORMED_PACKET;
814 goto send_reason_and_close;
815 }
816 lwsl_debug("%s: PUBLISH topic len %d\n",
817 __func__, (int)par->n);
818 assert(!wsi->mqtt->rx_cpkt_param);
819 wsi->mqtt->rx_cpkt_param = lws_zalloc(
820 sizeof(lws_mqtt_publish_param_t), "rx pub param");
821 if (!wsi->mqtt->rx_cpkt_param)
822 goto oom;
823 pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
824
825 pub->topic_len = (uint16_t)par->n;
826
827 /* Topic Name */
828 pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
829 "rx publish topic");
830 if (!pub->topic)
831 goto oom;
832 lws_strncpy(pub->topic, (const char *)buf,
833 (size_t)pub->topic_len + 1);
834 buf += pub->topic_len;
835 len -= pub->topic_len;
836
837 /* Extract QoS Level from Fixed Header Flags */
838 pub->qos = (lws_mqtt_qos_levels_t)
839 ((par->packet_type_flags >> 1) & 0x3);
840
841 pub->payload_pos = 0;
842
843 pub->payload_len = par->cpkt_remlen -
844 (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
845
846 switch (pub->qos) {
847 case QOS0:
848 par->state = LMQCPP_PAYLOAD;
849 if (pub->payload_len == 0)
850 goto cmd_completion;
851
852 break;
853 case QOS1:
854 case QOS2:
855 par->state = LMQCPP_PUBLISH_VH_PKT_ID;
856 break;
857 default:
858 par->reason = LMQCP_REASON_MALFORMED_PACKET;
859 lws_free_set_NULL(pub->topic);
860 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
861 goto send_reason_and_close;
862 }
863 break;
864 }
865 case LMQCPP_PUBLISH_VH_PKT_ID:
866 {
867 lws_mqtt_publish_param_t *pub =
868 (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
869
870 if (len < 2) {
871 lwsl_notice("%s: len breakage 2\n", __func__);
872 return -1;
873 }
874
875 par->cpkt_id = lws_ser_ru16be(buf);
876 buf += 2;
877 len -= 2;
878 wsi->mqtt->peer_ack_pkt_id = par->cpkt_id;
879 lwsl_debug("%s: Packet ID %d\n",
880 __func__, (int)par->cpkt_id);
881 par->state = LMQCPP_PAYLOAD;
882 pub->payload_pos = 0;
883 pub->payload_len = par->cpkt_remlen -
884 (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
885 if (pub->payload_len == 0)
886 goto cmd_completion;
887
888 break;
889 }
890 case LMQCPP_PAYLOAD:
891 {
892 lws_mqtt_publish_param_t *pub =
893 (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
894 if (pub == NULL) {
895 lwsl_err("%s: Uninitialized pub_param\n",
896 __func__);
897 goto send_protocol_error_and_close;
898 }
899
900 pub->payload = buf;
901 goto cmd_completion;
902 }
903
904 case LMQCPP_CONNACK_PACKET:
905 if (!lwsi_role_client(wsi)) {
906 lwsl_err("%s: CONNACK is only Server to Client",
907 __func__);
908 goto send_unsupp_connack_and_close;
909 }
910
911 lwsl_debug("%s: received CONNACK pkt\n", __func__);
912 lws_mqtt_vbi_init(&par->vbit);
913 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
914 case LMSPR_NEED_MORE:
915 break;
916 case LMSPR_COMPLETED:
917 par->cpkt_remlen = par->vbit.value;
918 lwsl_debug("%s: CONNACK pkt len = %d\n",
919 __func__, (int)par->cpkt_remlen);
920 if (par->cpkt_remlen != 2)
921 goto send_protocol_error_and_close;
922
923 par->state = LMQCPP_CONNACK_VH_FLAGS;
924 break;
925 default:
926 lwsl_notice("%s: connack bad vbi\n", __func__);
927 goto send_protocol_error_and_close;
928 }
929 break;
930
931 case LMQCPP_CONNACK_VH_FLAGS:
932 {
933 lws_mqttc_t *c = &wsi->mqtt->client;
934 par->cpkt_flags = *buf++;
935 len--;
936
937 if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
938 /*
939 * Byte 1 is the "Connect Acknowledge
940 * Flags". Bits 7-1 are reserved and
941 * MUST be set to 0.
942 */
943 par->reason = LMQCP_REASON_MALFORMED_PACKET;
944 goto send_reason_and_close;
945 }
946 /*
947 * If the Server accepts a connection with
948 * CleanSession set to 1, the Server MUST set
949 * Session Present to 0 in the CONNACK packet
950 * in addition to setting a zero return code
951 * in the CONNACK packet [MQTT-3.2.2-1]. If
952 * the Server accepts a connection with
953 * CleanSession set to 0, the value set in
954 * Session Present depends on whether the
955 * Server already has stored Session state for
956 * the supplied client ID. If the Server has
957 * stored Session state, it MUST set
958 * SessionPresent to 1 in the CONNACK packet
959 * [MQTT-3.2.2-2]. If the Server does not have
960 * stored Session state, it MUST set Session
961 * Present to 0 in the CONNACK packet. This is
962 * in addition to setting a zero return code
963 * in the CONNACK packet [MQTT-3.2.2-3].
964 */
965 if ((c->conn_flags & LMQCFT_CLEAN_START) &&
966 (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
967 goto send_protocol_error_and_close;
968
969 wsi->mqtt->session_resumed = ((unsigned int)par->cpkt_flags &
970 LMQCFT_SESSION_PRESENT);
971
972 /* Move on to Connect Return Code */
973 par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
974 break;
975 }
976 case LMQCPP_CONNACK_VH_RETURN_CODE:
977 par->conn_rc = *buf++;
978 len--;
979 /*
980 * If a server sends a CONNACK packet containing a
981 * non-zero return code it MUST then close the Network
982 * Connection [MQTT-3.2.2-5]
983 */
984 switch (par->conn_rc) {
985 case 0:
986 goto cmd_completion;
987 case 1:
988 case 2:
989 case 3:
990 case 4:
991 case 5:
992 par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
993 par->conn_rc - 1;
994 goto send_reason_and_close;
995 default:
996 lwsl_notice("%s: bad connack retcode\n", __func__);
997 goto send_protocol_error_and_close;
998 }
999 break;
1000
1001 /* SUBACK */
1002 case LMQCPP_SUBACK_PACKET:
1003 if (!lwsi_role_client(wsi)) {
1004 lwsl_err("%s: SUBACK is only Server to Client",
1005 __func__);
1006 goto send_unsupp_connack_and_close;
1007 }
1008
1009 lwsl_debug("%s: received SUBACK pkt\n", __func__);
1010 lws_mqtt_vbi_init(&par->vbit);
1011 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1012 case LMSPR_NEED_MORE:
1013 break;
1014 case LMSPR_COMPLETED:
1015 par->cpkt_remlen = par->vbit.value;
1016 lwsl_debug("%s: SUBACK pkt len = %d\n",
1017 __func__, (int)par->cpkt_remlen);
1018 if (par->cpkt_remlen <= 2)
1019 goto send_protocol_error_and_close;
1020 par->state = LMQCPP_SUBACK_VH_PKT_ID;
1021 break;
1022 default:
1023 lwsl_notice("%s: suback bad vbi\n", __func__);
1024 goto send_protocol_error_and_close;
1025 }
1026 break;
1027
1028 case LMQCPP_SUBACK_VH_PKT_ID:
1029
1030 if (len < 2) {
1031 lwsl_notice("%s: len breakage 4\n", __func__);
1032 return -1;
1033 }
1034
1035 par->cpkt_id = lws_ser_ru16be(buf);
1036 wsi->mqtt->ack_pkt_id = par->cpkt_id;
1037 buf += 2;
1038 len -= 2;
1039 par->cpkt_remlen -= 2;
1040 par->n = 0;
1041 par->state = LMQCPP_SUBACK_PAYLOAD;
1042 *par->temp = 0;
1043 break;
1044
1045 case LMQCPP_SUBACK_PAYLOAD:
1046 {
1047 lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
1048
1049 len--;
1050 switch (qos) {
1051 case QOS0:
1052 case QOS1:
1053 case QOS2:
1054 break;
1055 case FAILURE_QOS_LEVEL:
1056 goto send_protocol_error_and_close;
1057
1058 default:
1059 par->reason = LMQCP_REASON_MALFORMED_PACKET;
1060 goto send_reason_and_close;
1061 }
1062
1063 if (++(par->n) == par->cpkt_remlen) {
1064 par->n = 0;
1065 goto cmd_completion;
1066 }
1067
1068 break;
1069 }
1070
1071 /* UNSUBACK */
1072 case LMQCPP_UNSUBACK_PACKET:
1073 if (!lwsi_role_client(wsi)) {
1074 lwsl_err("%s: UNSUBACK is only Server to Client",
1075 __func__);
1076 goto send_unsupp_connack_and_close;
1077 }
1078
1079 lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
1080 lws_mqtt_vbi_init(&par->vbit);
1081 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1082 case LMSPR_NEED_MORE:
1083 break;
1084 case LMSPR_COMPLETED:
1085 par->cpkt_remlen = par->vbit.value;
1086 lwsl_debug("%s: UNSUBACK pkt len = %d\n",
1087 __func__, (int)par->cpkt_remlen);
1088 if (par->cpkt_remlen < 2)
1089 goto send_protocol_error_and_close;
1090 par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
1091 break;
1092 default:
1093 lwsl_notice("%s: unsuback bad vbi\n", __func__);
1094 goto send_protocol_error_and_close;
1095 }
1096 break;
1097
1098 case LMQCPP_UNSUBACK_VH_PKT_ID:
1099
1100 if (len < 2) {
1101 lwsl_notice("%s: len breakage 3\n", __func__);
1102 return -1;
1103 }
1104
1105 par->cpkt_id = lws_ser_ru16be(buf);
1106 wsi->mqtt->ack_pkt_id = par->cpkt_id;
1107 buf += 2;
1108 len -= 2;
1109 par->cpkt_remlen -= 2;
1110 par->n = 0;
1111
1112 goto cmd_completion;
1113
1114 case LMQCPP_PUBACK_PACKET:
1115 lws_mqtt_vbi_init(&par->vbit);
1116 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1117 case LMSPR_NEED_MORE:
1118 break;
1119 case LMSPR_COMPLETED:
1120 par->cpkt_remlen = par->vbit.value;
1121 lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
1122 (int)par->cpkt_remlen);
1123 /*
1124 * must be 4 or more, with special case that 2
1125 * means success with no reason code or props
1126 */
1127 if (par->cpkt_remlen <= 1 ||
1128 par->cpkt_remlen == 3)
1129 goto send_protocol_error_and_close;
1130
1131 par->state = LMQCPP_PUBACK_VH_PKT_ID;
1132 par->fixed_seen[2] = par->fixed_seen[3] = 0;
1133 par->fixed = 0;
1134 par->n = 0;
1135 break;
1136 default:
1137 lwsl_notice("%s: puback bad vbi\n", __func__);
1138 goto send_protocol_error_and_close;
1139 }
1140 break;
1141
1142 case LMQCPP_PUBACK_VH_PKT_ID:
1143 /*
1144 * There are 3 fixed bytes and then a VBI for the
1145 * property section length
1146 */
1147 par->fixed_seen[par->fixed++] = *buf++;
1148 if (len < par->cpkt_remlen - par->n) {
1149 lwsl_notice("%s: len breakage 4\n", __func__);
1150 return -1;
1151 }
1152 len--;
1153 par->n++;
1154 if (par->fixed == 2)
1155 par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
1156
1157 if (par->fixed == 3) {
1158 lws_mqtt_vbi_init(&par->vbit);
1159 par->props_consumed = 0;
1160 par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
1161 }
1162 /* length of 2 is truncated packet and we completed it */
1163 if (par->cpkt_remlen == par->fixed)
1164 goto cmd_completion;
1165 break;
1166
1167 case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
1168 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1169 case LMSPR_NEED_MORE:
1170 break;
1171 case LMSPR_COMPLETED:
1172 par->props_len = par->vbit.value;
1173 lwsl_info("%s: PUBACK props len = %d\n",
1174 __func__, (int)par->cpkt_remlen);
1175 /*
1176 * If there are no properties, this is a
1177 * command completion event in itself
1178 */
1179 if (!par->props_len)
1180 goto cmd_completion;
1181
1182 /*
1183 * Otherwise consume the properties before
1184 * completing the command
1185 */
1186 lws_mqtt_vbi_init(&par->vbit);
1187 par->state = LMQCPP_PUBACK_VH_PKT_ID;
1188 break;
1189 default:
1190 lwsl_notice("%s: puback pr bad vbi\n", __func__);
1191 goto send_protocol_error_and_close;
1192 }
1193 break;
1194
1195 case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
1196 /*
1197 * TODO: stash the props
1198 */
1199 par->props_consumed++;
1200 len--;
1201 buf++;
1202 if (par->props_len != par->props_consumed)
1203 break;
1204
1205 cmd_completion:
1206 /*
1207 * We come here when we understood we just processed
1208 * the last byte of a command packet, regardless of the
1209 * packet type
1210 */
1211 par->state = LMQCPP_IDLE;
1212
1213 switch (par->packet_type_flags >> 4) {
1214 case LMQCP_STOC_CONNACK:
1215 lwsl_info("%s: cmd_completion: CONNACK\n",
1216 __func__);
1217
1218 /*
1219 * Getting the CONNACK means we are the first,
1220 * the nwsi, and we succeeded to create a new
1221 * network connection ourselves.
1222 *
1223 * Since others may join us sharing the nwsi,
1224 * and we may close while they still want to use
1225 * it, our wsi lifecycle alone can no longer
1226 * define the lifecycle of the nwsi... it means
1227 * we need to do a "magic trick" and instead of
1228 * being both the nwsi and act like a child
1229 * stream, create a new wsi to take over the
1230 * nwsi duties and turn our wsi into a child of
1231 * the nwsi with its own lifecycle.
1232 *
1233 * The nwsi gets a mostly empty wsi->nwsi used
1234 * to track already-subscribed topics globally
1235 * for the connection.
1236 */
1237
1238 /* we were under SENT_CLIENT_HANDSHAKE timeout */
1239 lws_set_timeout(wsi, 0, 0);
1240
1241 w = lws_create_new_server_wsi(wsi->a.vhost,
1242 wsi->tsi, "mqtt_sid1");
1243 if (!w) {
1244 lwsl_notice("%s: sid 1 migrate failed\n",
1245 __func__);
1246 return -1;
1247 }
1248
1249 wsi->mux.highest_sid = 1;
1250 lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
1251
1252 wsi->mux_substream = 1;
1253 w->mux_substream = 1;
1254 w->client_mux_substream = 1;
1255 wsi->client_mux_migrated = 1;
1256 wsi->told_user_closed = 1; /* don't tell nwsi closed */
1257
1258 lwsi_set_state(w, LRS_ESTABLISHED);
1259 lwsi_set_state(wsi, LRS_ESTABLISHED);
1260 lwsi_set_role(w, lwsi_role(wsi));
1261
1262 #if defined(LWS_WITH_CLIENT)
1263 w->flags = wsi->flags;
1264 #endif
1265
1266 w->mqtt = wsi->mqtt;
1267 wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
1268 if (!wsi->mqtt)
1269 return -1;
1270 w->mqtt->wsi = w;
1271 w->a.protocol = wsi->a.protocol;
1272 if (w->user_space &&
1273 !w->user_space_externally_allocated)
1274 lws_free_set_NULL(w->user_space);
1275 w->user_space = wsi->user_space;
1276 wsi->user_space = NULL;
1277 w->user_space_externally_allocated =
1278 wsi->user_space_externally_allocated;
1279 if (lws_ensure_user_space(w))
1280 goto bail1;
1281 w->a.opaque_user_data = wsi->a.opaque_user_data;
1282 wsi->a.opaque_user_data = NULL;
1283 w->stash = wsi->stash;
1284 wsi->stash = NULL;
1285
1286 lws_mux_mark_immortal(w);
1287
1288 lwsl_notice("%s: migrated nwsi %s to sid 1 %s\n",
1289 __func__, lws_wsi_tag(wsi),
1290 lws_wsi_tag(w));
1291
1292 /*
1293 * It was the last thing we were waiting for
1294 * before we can be fully ESTABLISHED
1295 */
1296 if (lws_mqtt_set_client_established(w)) {
1297 lwsl_notice("%s: set EST fail\n", __func__);
1298 return -1;
1299 }
1300
1301 /* get the ball rolling */
1302 lws_validity_confirmed(wsi);
1303
1304 /* well, add the queued guys as children */
1305 lws_wsi_mux_apply_queue(wsi);
1306 break;
1307
1308 bail1:
1309 /* undo the insert */
1310 wsi->mux.child_list = w->mux.sibling_list;
1311 wsi->mux.child_count--;
1312
1313 if (w->user_space)
1314 lws_free_set_NULL(w->user_space);
1315 w->a.vhost->protocols[0].callback(w,
1316 LWS_CALLBACK_WSI_DESTROY,
1317 NULL, NULL, 0);
1318 __lws_vhost_unbind_wsi(w); /* cx + vh lock */
1319 lws_free(w);
1320
1321 return 0;
1322
1323 case LMQCP_PUBREC:
1324 lwsl_err("%s: cmd_completion: PUBREC\n",
1325 __func__);
1326 /*
1327 * Figure out which child asked for this
1328 */
1329 n = 0;
1330 lws_start_foreach_ll(struct lws *, w,
1331 wsi->mux.child_list) {
1332 if (w->mqtt->unacked_publish &&
1333 w->mqtt->ack_pkt_id == par->cpkt_id) {
1334 char requested_close = 0;
1335
1336 w->mqtt->unacked_publish = 0;
1337 w->mqtt->unacked_pubrel = 1;
1338
1339 if (user_callback_handle_rxflow(
1340 w->a.protocol->callback,
1341 w, LWS_CALLBACK_MQTT_ACK,
1342 w->user_space, NULL, 0) < 0) {
1343 lwsl_info("%s: MQTT_ACK requests close\n",
1344 __func__);
1345 requested_close = 1;
1346 }
1347 n = 1;
1348
1349 /*
1350 * We got an assertive PUBREC,
1351 * no need for timeout wait
1352 * any more
1353 */
1354 lws_sul_cancel(&w->mqtt->
1355 sul_qos_puback_pubrec_wait);
1356
1357 if (requested_close) {
1358 __lws_close_free_wsi(w,
1359 0, "ack cb");
1360 break;
1361 }
1362
1363 break;
1364 }
1365 } lws_end_foreach_ll(w, mux.sibling_list);
1366
1367 if (!n) {
1368 lwsl_err("%s: unsolicited PUBREC\n",
1369 __func__);
1370 return -1;
1371 }
1372 wsi->mqtt->send_pubrel = 1;
1373 lws_callback_on_writable(wsi);
1374 break;
1375
1376 case LMQCP_PUBCOMP:
1377 lwsl_err("%s: cmd_completion: PUBCOMP\n",
1378 __func__);
1379 n = 0;
1380 lws_start_foreach_ll(struct lws *, w,
1381 wsi->mux.child_list) {
1382 if (w->mqtt->unacked_pubrel > 0 &&
1383 w->mqtt->ack_pkt_id == par->cpkt_id) {
1384 w->mqtt->unacked_pubrel = 0;
1385 n = 1;
1386 }
1387 } lws_end_foreach_ll(w, mux.sibling_list);
1388
1389 if (!n) {
1390 lwsl_err("%s: unsolicited PUBCOMP\n",
1391 __func__);
1392 return -1;
1393 }
1394
1395 /*
1396 * If we published something and PUBCOMP arrived,
1397 * our connection is definitely working in both
1398 * directions at the moment.
1399 */
1400 lws_validity_confirmed(wsi);
1401 break;
1402
1403 case LMQCP_PUBREL:
1404 lwsl_err("%s: cmd_completion: PUBREL\n",
1405 __func__);
1406 wsi->mqtt->send_pubcomp = 1;
1407 lws_callback_on_writable(wsi);
1408 break;
1409
1410 case LMQCP_PUBACK:
1411 lwsl_info("%s: cmd_completion: PUBACK\n",
1412 __func__);
1413
1414 /*
1415 * Figure out which child asked for this
1416 */
1417
1418 n = 0;
1419 lws_start_foreach_ll(struct lws *, w,
1420 wsi->mux.child_list) {
1421 if (w->mqtt->unacked_publish &&
1422 w->mqtt->ack_pkt_id == par->cpkt_id) {
1423 char requested_close = 0;
1424
1425 w->mqtt->unacked_publish = 0;
1426 if (user_callback_handle_rxflow(
1427 w->a.protocol->callback,
1428 w, LWS_CALLBACK_MQTT_ACK,
1429 w->user_space, NULL, 0) < 0) {
1430 lwsl_info("%s: MQTT_ACK requests close\n",
1431 __func__);
1432 requested_close = 1;
1433 }
1434 n = 1;
1435
1436 /*
1437 * We got an assertive PUBACK,
1438 * no need for ACK timeout wait
1439 * any more
1440 */
1441 lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait);
1442
1443 if (requested_close) {
1444 __lws_close_free_wsi(w,
1445 0, "ack cb");
1446 break;
1447 }
1448
1449 break;
1450 }
1451 } lws_end_foreach_ll(w, mux.sibling_list);
1452
1453 if (!n) {
1454 lwsl_err("%s: unsolicited PUBACK\n",
1455 __func__);
1456 return -1;
1457 }
1458
1459 /*
1460 * If we published something and it was acked,
1461 * our connection is definitely working in both
1462 * directions at the moment.
1463 */
1464 lws_validity_confirmed(wsi);
1465 break;
1466
1467 case LMQCP_STOC_PINGRESP:
1468 lwsl_info("%s: cmd_completion: PINGRESP\n",
1469 __func__);
1470 /*
1471 * If we asked for a PINGRESP and it came,
1472 * our connection is definitely working in both
1473 * directions at the moment.
1474 */
1475 lws_validity_confirmed(wsi);
1476 break;
1477
1478 case LMQCP_STOC_SUBACK:
1479 lwsl_info("%s: cmd_completion: SUBACK\n",
1480 __func__);
1481
1482 /*
1483 * Figure out which child asked for this
1484 */
1485
1486 n = 0;
1487 lws_start_foreach_ll(struct lws *, w,
1488 wsi->mux.child_list) {
1489 if (w->mqtt->inside_subscribe &&
1490 w->mqtt->ack_pkt_id == par->cpkt_id) {
1491 w->mqtt->inside_subscribe = 0;
1492 if (user_callback_handle_rxflow(
1493 w->a.protocol->callback,
1494 w, LWS_CALLBACK_MQTT_SUBSCRIBED,
1495 w->user_space, NULL, 0) < 0) {
1496 lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1497 __func__);
1498 return -1;
1499 }
1500 n = 1;
1501 break;
1502 }
1503 } lws_end_foreach_ll(w, mux.sibling_list);
1504
1505 if (!n) {
1506 lwsl_err("%s: unsolicited SUBACK\n",
1507 __func__);
1508 return -1;
1509 }
1510
1511 /*
1512 * If we subscribed to something and SUBACK came,
1513 * our connection is definitely working in both
1514 * directions at the moment.
1515 */
1516 lws_validity_confirmed(wsi);
1517
1518 break;
1519
1520 case LMQCP_STOC_UNSUBACK:
1521 {
1522 char requested_close = 0;
1523 lwsl_info("%s: cmd_completion: UNSUBACK\n",
1524 __func__);
1525 /*
1526 * Figure out which child asked for this
1527 */
1528 n = 0;
1529 lws_start_foreach_ll(struct lws *, w,
1530 wsi->mux.child_list) {
1531 if (w->mqtt->inside_unsubscribe &&
1532 w->mqtt->ack_pkt_id == par->cpkt_id) {
1533 struct lws *nwsi = lws_get_network_wsi(w);
1534
1535 /*
1536 * No more subscribers left,
1537 * remove the topic from nwsi
1538 */
1539 lws_mqtt_client_remove_subs(nwsi->mqtt);
1540
1541 w->mqtt->inside_unsubscribe = 0;
1542 if (user_callback_handle_rxflow(
1543 w->a.protocol->callback,
1544 w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1545 w->user_space, NULL, 0) < 0) {
1546 lwsl_info("%s: MQTT_UNSUBACK requests close\n",
1547 __func__);
1548 requested_close = 1;
1549 }
1550 n = 1;
1551
1552 lws_sul_cancel(&w->mqtt->sul_unsuback_wait);
1553 if (requested_close) {
1554 __lws_close_free_wsi(w,
1555 0, "unsub ack cb");
1556 break;
1557 }
1558 break;
1559 }
1560 } lws_end_foreach_ll(w, mux.sibling_list);
1561
1562 if (!n) {
1563 lwsl_err("%s: unsolicited UNSUBACK\n",
1564 __func__);
1565 return -1;
1566 }
1567
1568
1569 /*
1570 * If we unsubscribed to something and
1571 * UNSUBACK came, our connection is
1572 * definitely working in both
1573 * directions at the moment.
1574 */
1575 lws_validity_confirmed(wsi);
1576
1577 break;
1578 }
1579 case LMQCP_PUBLISH:
1580 {
1581 lws_mqtt_publish_param_t *pub =
1582 (lws_mqtt_publish_param_t *)
1583 wsi->mqtt->rx_cpkt_param;
1584 size_t chunk;
1585
1586 if (pub == NULL) {
1587 lwsl_notice("%s: no pub\n", __func__);
1588 return -1;
1589 }
1590
1591 /*
1592 * RX PUBLISH is delivered to any children that
1593 * registered for the related topic
1594 */
1595
1596 n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
1597
1598 chunk = pub->payload_len - pub->payload_pos;
1599 if (chunk > len)
1600 chunk = len;
1601
1602 lws_start_foreach_ll(struct lws *, w,
1603 wsi->mux.child_list) {
1604 if (lws_mqtt_find_sub(w->mqtt,
1605 pub->topic))
1606 if (w->a.protocol->callback(
1607 w, (enum lws_callback_reasons)n,
1608 w->user_space,
1609 (void *)pub,
1610 chunk)) {
1611 par->payload_consumed = 0;
1612 lws_free_set_NULL(pub->topic);
1613 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1614 return 1;
1615 }
1616 } lws_end_foreach_ll(w, mux.sibling_list);
1617
1618
1619 pub->payload_pos += (uint32_t)chunk;
1620 len -= chunk;
1621 buf += chunk;
1622
1623 lwsl_debug("%s: post pos %d, plen %d, len %d\n",
1624 __func__, (int)pub->payload_pos,
1625 (int)pub->payload_len, (int)len);
1626
1627 if (pub->payload_pos != pub->payload_len) {
1628 /*
1629 * More chunks of the payload pending,
1630 * blocking this connection from doing
1631 * anything else
1632 */
1633 par->state = LMQCPP_PAYLOAD;
1634 break;
1635 }
1636
1637 if (pub->qos == 1) {
1638 /* For QOS = 1, send out PUBACK */
1639 wsi->mqtt->send_puback = 1;
1640 lws_callback_on_writable(wsi);
1641 } else if (pub->qos == 2) {
1642 /* For QOS = 2, send out PUBREC */
1643 wsi->mqtt->send_pubrec = 1;
1644 lws_callback_on_writable(wsi);
1645 }
1646
1647 par->payload_consumed = 0;
1648 lws_free_set_NULL(pub->topic);
1649 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1650
1651 break;
1652 }
1653 default:
1654 break;
1655 }
1656
1657 break;
1658
1659
1660 case LMQCPP_PROP_ID_VBI:
1661 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1662 case LMSPR_NEED_MORE:
1663 break;
1664 case LMSPR_COMPLETED:
1665 par->consumed = (uint32_t)((unsigned int)par->consumed + (unsigned int)(unsigned char)par->vbit.consumed);
1666 if (par->vbit.value >
1667 LWS_ARRAY_SIZE(property_valid)) {
1668 lwsl_notice("%s: undef prop id 0x%x\n",
1669 __func__, (int)par->vbit.value);
1670 goto send_protocol_error_and_close;
1671 }
1672 if (!(property_valid[par->vbit.value] &
1673 (1 << ctl_pkt_type(par)))) {
1674 lwsl_notice("%s: prop id 0x%x invalid for"
1675 " control pkt %d\n", __func__,
1676 (int)par->vbit.value,
1677 ctl_pkt_type(par));
1678 goto send_protocol_error_and_close;
1679 }
1680 par->prop_id = par->vbit.value;
1681 par->flag_prop_multi = !!(
1682 par->props_seen[par->prop_id >> 3] &
1683 (1 << (par->prop_id & 7)));
1684 par->props_seen[par->prop_id >> 3] =
1685 (uint8_t)((par->props_seen[par->prop_id >> 3]) | (1 << (par->prop_id & 7)));
1686 /*
1687 * even if it's not a vbi property arg,
1688 * .consumed of this will be zero the first time
1689 */
1690 lws_mqtt_vbi_init(&par->vbit);
1691 /*
1692 * if it's a string, next state must set the
1693 * destination and size limit itself. But
1694 * resetting it generically here lets it use
1695 * lws_mqtt_str_first() to understand it's the
1696 * first time around.
1697 */
1698 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
1699
1700 /* property arg state enums are so encoded */
1701 par->state = 0x100 | par->vbit.value;
1702 break;
1703 default:
1704 lwsl_notice("%s: prop id bad vbi\n", __func__);
1705 goto send_protocol_error_and_close;
1706 }
1707 break;
1708
1709 /*
1710 * All possible property payloads... restricting which ones
1711 * can appear in which control packets is already done above
1712 * in LMQCPP_PROP_ID_VBI
1713 */
1714
1715 case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
1716 case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
1717 case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
1718 case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
1719 case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
1720 case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
1721 case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
1722 case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
1723 if (par->flag_prop_multi)
1724 goto singular_prop_seen_twice;
1725 par->payload_format = *buf++;
1726 len--;
1727 if (lws_mqtt_pconsume(par, 1))
1728 goto send_protocol_error_and_close;
1729 break;
1730
1731 case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
1732 case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
1733 case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
1734 case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
1735 if (par->flag_prop_multi)
1736 goto singular_prop_seen_twice;
1737
1738 if (lws_mqtt_mb_first(&par->vbit))
1739 lws_mqtt_4byte_init(&par->vbit);
1740
1741 switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1742 case LMSPR_NEED_MORE:
1743 break;
1744 case LMSPR_COMPLETED:
1745 if (lws_mqtt_pconsume(par, par->vbit.consumed))
1746 goto send_protocol_error_and_close;
1747 break;
1748 default:
1749 goto send_protocol_error_and_close;
1750 }
1751 break;
1752
1753 case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
1754 case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
1755 case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
1756 case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
1757 if (par->flag_prop_multi)
1758 goto singular_prop_seen_twice;
1759
1760 if (lws_mqtt_mb_first(&par->vbit))
1761 lws_mqtt_2byte_init(&par->vbit);
1762
1763 switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1764 case LMSPR_NEED_MORE:
1765 break;
1766 case LMSPR_COMPLETED:
1767 if (lws_mqtt_pconsume(par, par->vbit.consumed))
1768 goto send_protocol_error_and_close;
1769 break;
1770 default:
1771 goto send_protocol_error_and_close;
1772 }
1773 break;
1774
1775 case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
1776 case LMQCPP_PROP_AUTH_METHOD_UTF8S:
1777 case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
1778 case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
1779 case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
1780 case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
1781 case LMQCPP_PROP_REASON_STRING_UTF8S:
1782 case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
1783 case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
1784 if (par->flag_prop_multi)
1785 goto singular_prop_seen_twice;
1786
1787 if (lws_mqtt_str_first(&par->s_temp))
1788 lws_mqtt_str_init(&par->s_temp, par->temp,
1789 sizeof(par->temp), 0);
1790
1791 switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
1792 case LMSPR_NEED_MORE:
1793 break;
1794 case LMSPR_COMPLETED:
1795 if (lws_mqtt_pconsume(par, par->s_temp.len))
1796 goto send_protocol_error_and_close;
1797 break;
1798
1799 default:
1800 lwsl_info("%s: bad protocol name\n", __func__);
1801 goto send_protocol_error_and_close;
1802 }
1803 break;
1804
1805 case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
1806
1807 case LMQCPP_PROP_CORRELATION_BINDATA:
1808 case LMQCPP_PROP_AUTH_DATA_BINDATA:
1809
1810 /* TODO */
1811 lwsl_err("%s: Unimplemented packet state 0x%x\n",
1812 __func__, par->state);
1813 return -1;
1814 }
1815 }
1816
1817 return 0;
1818
1819 oom:
1820 lwsl_err("%s: OOM!\n", __func__);
1821 goto send_protocol_error_and_close;
1822
1823 singular_prop_seen_twice:
1824 lwsl_info("%s: property appears twice\n", __func__);
1825
1826 send_protocol_error_and_close:
1827 lwsl_notice("%s: peac\n", __func__);
1828 par->reason = LMQCP_REASON_PROTOCOL_ERROR;
1829
1830 send_reason_and_close:
1831 lwsl_notice("%s: srac\n", __func__);
1832 par->flag_pending_send_reason_close = 1;
1833 goto ask;
1834
1835 send_unsupp_connack_and_close:
1836 lwsl_notice("%s: unsupac\n", __func__);
1837 par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
1838 par->flag_pending_send_connack_close = 1;
1839
1840 ask:
1841 /* Should we ask for clients? */
1842 lws_callback_on_writable(wsi);
1843
1844 return -1;
1845 }
1846
1847 int
lws_mqtt_fill_fixed_header(uint8_t * p,lws_mqtt_control_packet_t ctrl_pkt_type,uint8_t dup,lws_mqtt_qos_levels_t qos,uint8_t retain)1848 lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
1849 uint8_t dup, lws_mqtt_qos_levels_t qos,
1850 uint8_t retain)
1851 {
1852 lws_mqtt_fixed_hdr_t hdr;
1853
1854 hdr.bits = 0;
1855 hdr.flags.ctrl_pkt_type = ctrl_pkt_type & 0xf;
1856
1857 switch(ctrl_pkt_type) {
1858 case LMQCP_PUBLISH:
1859 hdr.flags.dup = !!dup;
1860 /*
1861 * A PUBLISH Packet MUST NOT have both QoS bits set to
1862 * 1. If a Server or Client receives a PUBLISH Packet
1863 * which has both QoS bits set to 1 it MUST close the
1864 * Network Connection [MQTT-3.3.1-4].
1865 */
1866 if (qos >= RESERVED_QOS_LEVEL) {
1867 lwsl_err("%s: Unsupport QoS level 0x%x\n",
1868 __func__, qos);
1869 return -1;
1870 }
1871 hdr.flags.qos = qos & 3;
1872 hdr.flags.retain = !!retain;
1873 break;
1874
1875 case LMQCP_CTOS_CONNECT:
1876 case LMQCP_STOC_CONNACK:
1877 case LMQCP_PUBACK:
1878 case LMQCP_PUBREC:
1879 case LMQCP_PUBCOMP:
1880 case LMQCP_STOC_SUBACK:
1881 case LMQCP_STOC_UNSUBACK:
1882 case LMQCP_CTOS_PINGREQ:
1883 case LMQCP_STOC_PINGRESP:
1884 case LMQCP_DISCONNECT:
1885 case LMQCP_AUTH:
1886 hdr.bits &= 0xf0;
1887 break;
1888
1889 /*
1890 * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
1891 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
1892 * MUST be set to 0,0,1 and 0 respectively. The Server MUST
1893 * treat any other value as malformed and close the Network
1894 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
1895 */
1896 case LMQCP_PUBREL:
1897 case LMQCP_CTOS_SUBSCRIBE:
1898 case LMQCP_CTOS_UNSUBSCRIBE:
1899 hdr.bits |= 0x02;
1900 break;
1901
1902 default:
1903 return -1;
1904 }
1905
1906 *p = hdr.bits;
1907
1908 return 0;
1909 }
1910
1911 /*
1912 * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
1913 * PUBREC came before the timeout period
1914 */
1915
1916 static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list * sul)1917 lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
1918 {
1919 struct _lws_mqtt_related *mqtt = lws_container_of(sul,
1920 struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
1921
1922 lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
1923
1924 if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
1925 mqtt->wsi->user_space, NULL, 0))
1926 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
1927 }
1928
1929 static void
lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list * sul)1930 lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
1931 {
1932 struct _lws_mqtt_related *mqtt = lws_container_of(sul,
1933 struct _lws_mqtt_related, sul_unsuback_wait);
1934
1935 lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
1936
1937 if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
1938 LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
1939 mqtt->wsi->user_space, NULL, 0))
1940 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
1941 }
1942
1943 int
lws_mqtt_client_send_publish(struct lws * wsi,lws_mqtt_publish_param_t * pub,const void * buf,uint32_t len,int is_complete)1944 lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
1945 const void *buf, uint32_t len, int is_complete)
1946 {
1947 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
1948 uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
1949 struct lws *nwsi = lws_get_network_wsi(wsi);
1950 lws_mqtt_str_t mqtt_vh_payload;
1951 uint32_t vh_len, rem_len;
1952
1953 assert(pub->topic);
1954
1955 lwsl_debug("%s: len = %d, is_complete = %d\n",
1956 __func__, (int)len, (int)is_complete);
1957
1958 if (lwsi_state(wsi) != LRS_ESTABLISHED) {
1959 lwsl_err("%s: %s: unknown state 0x%x\n", __func__,
1960 lws_wsi_tag(wsi), lwsi_state(wsi));
1961 assert(0);
1962 return 1;
1963 }
1964
1965 if (wsi->mqtt->inside_payload) {
1966 /*
1967 * Headers are filled, we are sending
1968 * the payload - a buffer with LWS_PRE
1969 * in front it.
1970 */
1971 start = (uint8_t *)buf;
1972 p = start + len;
1973 if (is_complete)
1974 wsi->mqtt->inside_payload = 0;
1975 goto do_write;
1976 }
1977
1978 start = b + LWS_PRE;
1979 p = start;
1980 /*
1981 * Fill headers and the first chunk of the
1982 * payload (if any)
1983 */
1984 if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
1985 0, pub->qos, 0)) {
1986 lwsl_err("%s: Failed to fill fixed header\n", __func__);
1987 return 1;
1988 }
1989
1990 /*
1991 * Topic len field + Topic len + Packet ID
1992 * (for QOS>0) + Payload len
1993 */
1994 vh_len = (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
1995 rem_len = vh_len + pub->payload_len;
1996 lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
1997
1998 /* Will the chunk of payload fit? */
1999 if ((vh_len + len) >=
2000 (wsi->a.context->pt_serv_buf_size - LWS_PRE)) {
2001 lwsl_err("%s: Payload is too big\n", __func__);
2002 return 1;
2003 }
2004
2005 p += lws_mqtt_vbi_encode(rem_len, p);
2006
2007 /* Topic's Len */
2008 lws_ser_wu16be(p, pub->topic_len);
2009 p += 2;
2010
2011 /*
2012 * Init lws_mqtt_str for "MQTT Variable
2013 * Headers + payload" (only the supplied
2014 * chuncked payload)
2015 */
2016 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
2017 (uint16_t)(unsigned int)(pub->topic_len + ((pub->qos) ? 2u : 0u) + len),
2018 0);
2019
2020 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2021 lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
2022 if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
2023 lwsl_err("%s: a\n", __func__);
2024 return 1;
2025 }
2026
2027 /* Packet ID */
2028 if (pub->qos != QOS0) {
2029 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2030 wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id;
2031 lwsl_debug("%s: pkt_id = %d\n", __func__,
2032 (int)wsi->mqtt->ack_pkt_id);
2033 lws_ser_wu16be(p, pub->packet_id);
2034 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
2035 lwsl_err("%s: b\n", __func__);
2036 return 1;
2037 }
2038 }
2039
2040 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2041 memcpy(p, buf, len);
2042 if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
2043 return 1;
2044 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2045
2046 if (!is_complete)
2047 nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
2048
2049 do_write:
2050
2051 // lwsl_hexdump_err(start, lws_ptr_diff(p, start));
2052
2053 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2054 lws_ptr_diff(p, start)) {
2055 lwsl_err("%s: write failed\n", __func__);
2056 return 1;
2057 }
2058
2059 if (!is_complete) {
2060 /* still some more chunks to come... */
2061 lws_callback_on_writable(wsi);
2062
2063 return 0;
2064 }
2065
2066 wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
2067
2068 if (pub->qos != QOS0)
2069 wsi->mqtt->unacked_publish = 1;
2070
2071 /* this was the last part of the publish message */
2072
2073 if (pub->qos == QOS0) {
2074 /*
2075 * There won't be any real PUBACK, act like we got one
2076 * so the user callback logic is the same for QoS0 or
2077 * QoS1
2078 */
2079 if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
2080 wsi->user_space, NULL, 0)) {
2081 lwsl_err("%s: ACK callback exited\n", __func__);
2082 return 1;
2083 }
2084 } else if (pub->qos == QOS1 || pub->qos == QOS2) {
2085 /* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s,
2086 * we must RETRY the publish
2087 */
2088 wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend;
2089 __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2090 &wsi->mqtt->sul_qos_puback_pubrec_wait,
2091 3 * LWS_USEC_PER_SEC);
2092 }
2093
2094 return 0;
2095 }
2096
2097 int
lws_mqtt_client_send_subcribe(struct lws * wsi,lws_mqtt_subscribe_param_t * sub)2098 lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
2099 {
2100 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2101 uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2102 struct lws *nwsi = lws_get_network_wsi(wsi);
2103 lws_mqtt_str_t mqtt_vh_payload;
2104 uint8_t exists[8], extant;
2105 lws_mqtt_subs_t *mysub;
2106 uint32_t rem_len;
2107 #if defined(_DEBUG)
2108 uint32_t tops;
2109 #endif
2110 uint32_t n;
2111
2112 assert(sub->num_topics);
2113 assert(sub->num_topics < sizeof(exists));
2114
2115 switch (lwsi_state(wsi)) {
2116 case LRS_ESTABLISHED: /* Protocol connection established */
2117 if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
2118 0, 0, 0)) {
2119 lwsl_err("%s: Failed to fill fixed header\n", __func__);
2120 return 1;
2121 }
2122
2123 /*
2124 * The stream wants to subscribe to one or more topic, but
2125 * the shared nwsi may already be subscribed to some or all of
2126 * them from interactions with other streams. For those cases,
2127 * we filter them from the list the child wants until we just
2128 * have ones that are new to the nwsi. If nothing left, we just
2129 * synthesize the callback to the child as if SUBACK had come
2130 * and we're done, otherwise just ask the server for topics that
2131 * are new to the wsi.
2132 */
2133
2134 extant = 0;
2135 memset(&exists, 0, sizeof(exists));
2136 for (n = 0; n < sub->num_topics; n++) {
2137 lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
2138 __func__, (int)n, sub->topic[n].name);
2139
2140 mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
2141 if (mysub && mysub->ref_count) {
2142 mysub->ref_count++; /* another stream using it */
2143 exists[n] = 1;
2144 extant++;
2145 }
2146
2147 /*
2148 * Attach the topic we're subscribing to, to wsi->mqtt
2149 */
2150 if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
2151 lwsl_err("%s: create sub fail\n", __func__);
2152 return 1;
2153 }
2154 }
2155
2156 if (extant == sub->num_topics) {
2157 /*
2158 * It turns out there's nothing to do here, the nwsi has
2159 * already subscribed to all the topics this stream
2160 * wanted. Just tell it it can have them.
2161 */
2162 lwsl_notice("%s: all topics already subscribed\n", __func__);
2163 if (user_callback_handle_rxflow(
2164 wsi->a.protocol->callback,
2165 wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
2166 wsi->user_space, NULL, 0) < 0) {
2167 lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
2168 __func__);
2169 return -1;
2170 }
2171
2172 return 0;
2173 }
2174
2175 #if defined(_DEBUG)
2176 /*
2177 * zero or more of the topics already existed, but not all,
2178 * so we must go to the server with a filtered list of the
2179 * new ones only
2180 */
2181
2182 tops = sub->num_topics - extant;
2183 #endif
2184
2185 /*
2186 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
2187 */
2188 rem_len = 2;
2189 for (n = 0; n < sub->num_topics; n++)
2190 if (!exists[n])
2191 rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
2192
2193 wsi->mqtt->sub_size = (uint16_t)rem_len;
2194
2195 #if defined(_DEBUG)
2196 lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2197 __func__, (int)tops, (int)rem_len);
2198 #endif
2199
2200 p += lws_mqtt_vbi_encode(rem_len, p);
2201
2202 if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2203 wsi->a.context->pt_serv_buf_size) {
2204 lwsl_err("%s: Payload is too big\n", __func__);
2205 return 1;
2206 }
2207
2208 /* Init lws_mqtt_str */
2209 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2210 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2211
2212 /* Packet ID */
2213 wsi->mqtt->ack_pkt_id = sub->packet_id = ++nwsi->mqtt->pkt_id;
2214 lwsl_debug("%s: pkt_id = %d\n", __func__,
2215 (int)sub->packet_id);
2216 lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2217
2218 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2219 return 1;
2220
2221 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2222
2223 for (n = 0; n < sub->num_topics; n++) {
2224 lwsl_info("%s: topics[%d] = %s\n", __func__,
2225 (int)n, sub->topic[n].name);
2226
2227 /* if the nwsi already has it, don't ask server for it */
2228 if (exists[n]) {
2229 lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
2230 __func__, (int)n, sub->topic[n].name);
2231 continue;
2232 }
2233
2234 /*
2235 * Attach the topic we're subscribing to, to nwsi->mqtt
2236 * so we know the nwsi itself has a subscription to it
2237 */
2238
2239 if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
2240 return 1;
2241
2242 /* Topic's Len */
2243 lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
2244 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2245 return 1;
2246 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2247
2248 /* Topic Name */
2249 lws_strncpy((char *)p, sub->topic[n].name,
2250 strlen(sub->topic[n].name) + 1);
2251 if (lws_mqtt_str_advance(&mqtt_vh_payload,
2252 (int)strlen(sub->topic[n].name)))
2253 return 1;
2254 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2255
2256 /* QoS */
2257 *p = (uint8_t)sub->topic[n].qos;
2258 if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
2259 return 1;
2260 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2261 }
2262 break;
2263
2264 default:
2265 return 1;
2266 }
2267
2268 if (wsi->mqtt->inside_resume_session)
2269 return 0;
2270
2271 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2272 lws_ptr_diff(p, start))
2273 return 1;
2274
2275 wsi->mqtt->inside_subscribe = 1;
2276
2277 return 0;
2278 }
2279
2280 int
lws_mqtt_client_send_unsubcribe(struct lws * wsi,const lws_mqtt_subscribe_param_t * unsub)2281 lws_mqtt_client_send_unsubcribe(struct lws *wsi,
2282 const lws_mqtt_subscribe_param_t *unsub)
2283 {
2284 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2285 uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2286 struct lws *nwsi = lws_get_network_wsi(wsi);
2287 lws_mqtt_str_t mqtt_vh_payload;
2288 uint8_t send_unsub[8], orphaned;
2289 uint32_t rem_len, n;
2290 lws_mqtt_subs_t *mysub;
2291 #if defined(_DEBUG)
2292 uint32_t tops;
2293 #endif
2294
2295 lwsl_info("%s: Enter\n", __func__);
2296
2297 switch (lwsi_state(wsi)) {
2298 case LRS_ESTABLISHED: /* Protocol connection established */
2299 orphaned = 0;
2300 memset(&send_unsub, 0, sizeof(send_unsub));
2301 for (n = 0; n < unsub->num_topics; n++) {
2302 mysub = lws_mqtt_find_sub(nwsi->mqtt,
2303 unsub->topic[n].name);
2304 assert(mysub);
2305
2306 if (mysub && --mysub->ref_count == 0) {
2307 lwsl_notice("%s: Need to send UNSUB\n", __func__);
2308 send_unsub[n] = 1;
2309 orphaned++;
2310 }
2311 }
2312
2313 if (!orphaned) {
2314 /*
2315 * The nwsi still has other subscribers bound to the
2316 * topics.
2317 *
2318 * So, don't send UNSUB to server, and just fake the
2319 * UNSUB ACK event for the guy going away.
2320 */
2321 lwsl_notice("%s: unsubscribed!\n", __func__);
2322 if (user_callback_handle_rxflow(
2323 wsi->a.protocol->callback,
2324 wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
2325 wsi->user_space, NULL, 0) < 0) {
2326 /*
2327 * We can't directly close here, because the
2328 * caller still has the wsi. Inform the
2329 * caller that we want to close
2330 */
2331
2332 return 1;
2333 }
2334
2335 return 0;
2336 }
2337 #if defined(_DEBUG)
2338 /*
2339 * one or more of the topics needs to be unsubscribed
2340 * from, so we must go to the server with a filtered
2341 * list of the new ones only
2342 */
2343
2344 tops = orphaned;
2345 #endif
2346
2347 if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
2348 0, 0, 0)) {
2349 lwsl_err("%s: Failed to fill fixed header\n", __func__);
2350 return 1;
2351 }
2352
2353 /*
2354 * Pid + (Topic len field + Topic len) x Num of Topics
2355 */
2356 rem_len = 2;
2357 for (n = 0; n < unsub->num_topics; n++)
2358 if (send_unsub[n])
2359 rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
2360
2361 wsi->mqtt->sub_size = (uint16_t)rem_len;
2362
2363 #if defined(_DEBUG)
2364 lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2365 __func__, (int)tops, (int)rem_len);
2366 #endif
2367
2368 p += lws_mqtt_vbi_encode(rem_len, p);
2369
2370 if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2371 wsi->a.context->pt_serv_buf_size) {
2372 lwsl_err("%s: Payload is too big\n", __func__);
2373 return 1;
2374 }
2375
2376 /* Init lws_mqtt_str */
2377 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2378 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2379
2380 /* Packet ID */
2381 wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
2382 lwsl_debug("%s: pkt_id = %d\n", __func__,
2383 (int)wsi->mqtt->ack_pkt_id);
2384 lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2385
2386 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2387 return 1;
2388
2389 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2390
2391 for (n = 0; n < unsub->num_topics; n++) {
2392 lwsl_info("%s: topics[%d] = %s\n", __func__,
2393 (int)n, unsub->topic[n].name);
2394
2395 /*
2396 * Subscriber still bound to it, don't UBSUB
2397 * from the server
2398 */
2399 if (!send_unsub[n])
2400 continue;
2401
2402 /* Topic's Len */
2403 lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
2404 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2405 return 1;
2406 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2407
2408 /* Topic Name */
2409 lws_strncpy((char *)p, unsub->topic[n].name,
2410 strlen(unsub->topic[n].name) + 1);
2411 if (lws_mqtt_str_advance(&mqtt_vh_payload,
2412 (int)strlen(unsub->topic[n].name)))
2413 return 1;
2414 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2415 }
2416 break;
2417
2418 default:
2419 return 1;
2420 }
2421
2422 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2423 lws_ptr_diff(p, start))
2424 return 1;
2425
2426 wsi->mqtt->inside_unsubscribe = 1;
2427
2428 wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout;
2429 __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2430 &wsi->mqtt->sul_unsuback_wait,
2431 3 * LWS_USEC_PER_SEC);
2432
2433 return 0;
2434 }
2435
2436 /*
2437 * This is called when child streams bind to an already-existing and compatible
2438 * MQTT stream
2439 */
2440
2441 struct lws *
lws_wsi_mqtt_adopt(struct lws * parent_wsi,struct lws * wsi)2442 lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
2443 {
2444 /* no more children allowed by parent? */
2445
2446 if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
2447 lwsl_err("%s: reached concurrent stream limit\n", __func__);
2448 return NULL;
2449 }
2450
2451 #if defined(LWS_WITH_CLIENT)
2452 wsi->client_mux_substream = 1;
2453 #endif
2454
2455 lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
2456
2457 if (lws_ensure_user_space(wsi))
2458 goto bail1;
2459
2460 lws_mqtt_set_client_established(wsi);
2461 lws_callback_on_writable(wsi);
2462
2463 return wsi;
2464
2465 bail1:
2466 /* undo the insert */
2467 parent_wsi->mux.child_list = wsi->mux.sibling_list;
2468 parent_wsi->mux.child_count--;
2469
2470 if (wsi->user_space)
2471 lws_free_set_NULL(wsi->user_space);
2472
2473 wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
2474 lws_free(wsi);
2475
2476 return NULL;
2477 }
2478
2479