1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 *
24 * MQTT v5
25 *
26 * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
27 *
28 * Control Packet structure
29 *
30 * - Always: 2+ byte: Fixed Hdr
31 * - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
32 * - Required in some: variable: Payload
33 *
34 * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
35 *
36 * - Client Identifier
37 * - Will Properties
38 * - Will Topic
39 * - Will Payload
40 * - User Name
41 * - Password
42 */
43
44 #include "private-lib-core.h"
45 /* #include "lws-mqtt.h" */
46
47 #include <string.h>
48 #include <sys/types.h>
49 #include <fcntl.h>
50 #include <assert.h>
51
52 typedef enum {
53 LMQPRS_AWAITING_CONNECT,
54
55 } lws_mqtt_protocol_server_connstate_t;
56
57 const char * const reason_names_g1[] = {
58 "Success / Normal disconnection / QoS0",
59 "QoS1",
60 "QoS2",
61 "Disconnect Will",
62 "No matching subscriber",
63 "No subscription existed",
64 "Continue authentication",
65 "Re-authenticate"
66 };
67
68 const char * const reason_names_g2[] = {
69 "Unspecified error",
70 "Malformed packet",
71 "Protocol error",
72 "Implementation specific error",
73 "Unsupported protocol",
74 "Client ID invalid",
75 "Bad credentials",
76 "Not Authorized",
77 "Server Unavailable",
78 "Server Busy",
79 "Banned",
80 "Server Shutting Down",
81 "Bad Authentication Method",
82 "Keepalive Timeout",
83 "Session taken over",
84 "Topic Filter Invalid",
85 "Packet ID in use",
86 "Packet ID not found",
87 "Max RX Exceeded",
88 "Topic Alias Invalid",
89 "Packet too large",
90 "Ratelimit",
91 "Quota Exceeded",
92 "Administrative Action",
93 "Payload format invalid",
94 "Retain not supported",
95 "QoS not supported",
96 "Use another server",
97 "Server Moved",
98 "Shared subscriptions not supported",
99 "Connection rate exceeded",
100 "Maximum Connect Time",
101 "Subscription IDs not supported",
102 "Wildcard subscriptions not supported"
103 };
104
105 #define LMQCP_WILL_PROPERTIES 0
106
107 /* For each property, a bitmap describing which commands it is valid for */
108
109 static const uint16_t property_valid[] = {
110 [LMQPROP_PAYLOAD_FORMAT_INDICATOR] = (1 << LMQCP_PUBLISH) |
111 (1 << LMQCP_WILL_PROPERTIES),
112 [LMQPROP_MESSAGE_EXPIRY_INTERVAL] = (1 << LMQCP_PUBLISH) |
113 (1 << LMQCP_WILL_PROPERTIES),
114 [LMQPROP_CONTENT_TYPE] = (1 << LMQCP_PUBLISH) |
115 (1 << LMQCP_WILL_PROPERTIES),
116 [LMQPROP_RESPONSE_TOPIC] = (1 << LMQCP_PUBLISH) |
117 (1 << LMQCP_WILL_PROPERTIES),
118 [LMQPROP_CORRELATION_DATA] = (1 << LMQCP_PUBLISH) |
119 (1 << LMQCP_WILL_PROPERTIES),
120 [LMQPROP_SUBSCRIPTION_IDENTIFIER] = (1 << LMQCP_PUBLISH) |
121 (1 << LMQCP_CTOS_SUBSCRIBE),
122 [LMQPROP_SESSION_EXPIRY_INTERVAL] = (1 << LMQCP_CTOS_CONNECT) |
123 (1 << LMQCP_STOC_CONNACK) |
124 (1 << LMQCP_DISCONNECT),
125 [LMQPROP_ASSIGNED_CLIENT_IDENTIFIER] = (1 << LMQCP_STOC_CONNACK),
126 [LMQPROP_SERVER_KEEP_ALIVE] = (1 << LMQCP_STOC_CONNACK),
127 [LMQPROP_AUTHENTICATION_METHOD] = (1 << LMQCP_CTOS_CONNECT) |
128 (1 << LMQCP_STOC_CONNACK) |
129 (1 << LMQCP_AUTH),
130 [LMQPROP_AUTHENTICATION_DATA] = (1 << LMQCP_CTOS_CONNECT) |
131 (1 << LMQCP_STOC_CONNACK) |
132 (1 << LMQCP_AUTH),
133 [LMQPROP_REQUEST_PROBLEM_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
134 [LMQPROP_WILL_DELAY_INTERVAL] = (1 << LMQCP_WILL_PROPERTIES),
135 [LMQPROP_REQUEST_RESPONSE_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
136 [LMQPROP_RESPONSE_INFORMATION] = (1 << LMQCP_STOC_CONNACK),
137 [LMQPROP_SERVER_REFERENCE] = (1 << LMQCP_STOC_CONNACK) |
138 (1 << LMQCP_DISCONNECT),
139 [LMQPROP_REASON_STRING] = (1 << LMQCP_STOC_CONNACK) |
140 (1 << LMQCP_PUBACK) |
141 (1 << LMQCP_PUBREC) |
142 (1 << LMQCP_PUBREL) |
143 (1 << LMQCP_PUBCOMP) |
144 (1 << LMQCP_STOC_SUBACK) |
145 (1 << LMQCP_STOC_UNSUBACK) |
146 (1 << LMQCP_DISCONNECT) |
147 (1 << LMQCP_AUTH),
148 [LMQPROP_RECEIVE_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
149 (1 << LMQCP_STOC_CONNACK),
150 [LMQPROP_TOPIC_ALIAS_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
151 (1 << LMQCP_STOC_CONNACK),
152 [LMQPROP_TOPIC_ALIAS] = (1 << LMQCP_PUBLISH),
153 [LMQPROP_MAXIMUM_QOS] = (1 << LMQCP_STOC_CONNACK),
154 [LMQPROP_RETAIN_AVAILABLE] = (1 << LMQCP_STOC_CONNACK),
155 [LMQPROP_USER_PROPERTY] = (1 << LMQCP_CTOS_CONNECT) |
156 (1 << LMQCP_STOC_CONNACK) |
157 (1 << LMQCP_PUBLISH) |
158 (1 << LMQCP_WILL_PROPERTIES) |
159 (1 << LMQCP_PUBACK) |
160 (1 << LMQCP_PUBREC) |
161 (1 << LMQCP_PUBREL) |
162 (1 << LMQCP_PUBCOMP) |
163 (1 << LMQCP_CTOS_SUBSCRIBE) |
164 (1 << LMQCP_STOC_SUBACK) |
165 (1 << LMQCP_CTOS_UNSUBSCRIBE) |
166 (1 << LMQCP_STOC_UNSUBACK) |
167 (1 << LMQCP_DISCONNECT) |
168 (1 << LMQCP_AUTH),
169 [LMQPROP_MAXIMUM_PACKET_SIZE] = (1 << LMQCP_CTOS_CONNECT) |
170 (1 << LMQCP_STOC_CONNACK),
171 [LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK),
172 [LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL] = (1 << LMQCP_STOC_CONNACK),
173 [LMQPROP_SHARED_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK)
174 };
175
176
177 /*
178 * For each command index, maps flags, id, qos and payload legality
179 * notice in most cases PUBLISH requires further processing
180 */
181 static const uint8_t map_flags[] = {
182 [LMQCP_RESERVED] = 0x00,
183 [LMQCP_CTOS_CONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
184 LMQCP_LUT_FLAG_PAYLOAD |
185 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
186 [LMQCP_STOC_CONNACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
187 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
188 [LMQCP_PUBLISH] = LMQCP_LUT_FLAG_PAYLOAD | /* option */
189 LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
190 [LMQCP_PUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
191 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
192 [LMQCP_PUBREC] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
193 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
194 [LMQCP_PUBREL] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
195 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
196 [LMQCP_PUBCOMP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
197 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
198 [LMQCP_CTOS_SUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
199 LMQCP_LUT_FLAG_PAYLOAD |
200 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
201 [LMQCP_STOC_SUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
202 LMQCP_LUT_FLAG_PAYLOAD |
203 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
204 [LMQCP_CTOS_UNSUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
205 LMQCP_LUT_FLAG_PAYLOAD |
206 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
207 [LMQCP_STOC_UNSUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
208 LMQCP_LUT_FLAG_PAYLOAD |
209 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
210 [LMQCP_CTOS_PINGREQ] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
211 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
212 [LMQCP_STOC_PINGRESP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
213 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
214 [LMQCP_DISCONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
215 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
216 [LMQCP_AUTH] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
217 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
218 };
219
220 void
lws_mqttc_state_transition(lws_mqttc_t * c,lwsgs_mqtt_states_t s)221 lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
222 {
223 lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
224 c->estate = s;
225 }
226
227 static int
lws_mqtt_pconsume(lws_mqtt_parser_t * par,int consumed)228 lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
229 {
230 par->consumed += consumed;
231
232 if (par->consumed > par->props_len)
233 return -1;
234
235 /* more properties coming */
236
237 if (par->consumed < par->props_len) {
238 par->state = LMQCPP_PROP_ID_VBI;
239 return 0;
240 }
241
242 /* properties finished: are we headed for payload or idle? */
243
244 if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
245 /* A PUBLISH packet MUST NOT contain a Packet Identifier if
246 * its QoS value is set to 0 [MQTT-2.2.1-2]. */
247 (ctl_pkt_type(par) != LMQCP_PUBLISH ||
248 (par->packet_type_flags & 6))) {
249 par->state = LMQCPP_PAYLOAD;
250 return 0;
251 }
252
253 par->state = LMQCPP_IDLE;
254
255 return 0;
256 }
257
258 static int
lws_mqtt_set_client_established(struct lws * wsi)259 lws_mqtt_set_client_established(struct lws *wsi)
260 {
261 lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
262 &role_ops_mqtt);
263
264 if (user_callback_handle_rxflow(wsi->protocol->callback,
265 wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
266 wsi->user_space, NULL, 0) < 0) {
267 lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
268
269 return -1;
270 }
271 /*
272 * If we made a new connection and got the ACK, our connection is
273 * definitely working in both directions at the moment
274 */
275 lws_validity_confirmed(wsi);
276
277 /* clear connection timeout */
278 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
279
280 return 0;
281 }
282
283 lws_mqtt_subs_t *
lws_mqtt_find_sub(struct _lws_mqtt_related * mqtt,const char * topic)284 lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic)
285 {
286 lws_mqtt_subs_t *s = mqtt->subs_head;
287
288 while (s) {
289 if (!strcmp((const char *)s->topic, topic))
290 return s;
291 s = s->next;
292 }
293
294 return NULL;
295 }
296
297 static lws_mqtt_subs_t *
lws_mqtt_create_sub(struct _lws_mqtt_related * mqtt,const char * topic)298 lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
299 {
300 lws_mqtt_subs_t *mysub;
301
302 mysub = lws_malloc(sizeof(*mysub) + strlen(topic) + 1, "sub");
303 if (!mysub)
304 return NULL;
305
306 mysub->next = mqtt->subs_head;
307 mqtt->subs_head = mysub;
308 memcpy(mysub->topic, topic, strlen(topic) + 1);
309 mysub->ref_count = 1;
310
311 lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
312 __func__, mysub, mqtt);
313
314 return mysub;
315 }
316
317 static int
lws_mqtt_client_remove_subs(struct _lws_mqtt_related * mqtt)318 lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
319 {
320 lws_mqtt_subs_t *s = mqtt->subs_head;
321 lws_mqtt_subs_t *temp = NULL;
322
323
324 lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
325 __func__, mqtt);
326
327 while (s && s->next) {
328 if (s->next->ref_count == 0)
329 break;
330 s = s->next;
331 }
332
333 if (s && s->next) {
334 temp = s->next;
335 lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
336 __func__, temp, mqtt);
337 s->next = temp->next;
338 lws_free(temp);
339 return 0;
340 }
341 return 1;
342 }
343
344 int
_lws_mqtt_rx_parser(struct lws * wsi,lws_mqtt_parser_t * par,const uint8_t * buf,size_t len)345 _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
346 const uint8_t *buf, size_t len)
347 {
348 struct lws *w;
349 int n;
350
351 if (par->flag_pending_send_reason_close)
352 return 0;
353
354 /*
355 * Stateful, fragmentation-immune parser
356 *
357 * Notice that len can always be 1 if under attack, even over tls if
358 * the server is compromised or malicious.
359 */
360
361 while (len) {
362 lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
363 switch (par->state) {
364 case LMQCPP_IDLE:
365 par->packet_type_flags = *buf++;
366 len--;
367
368 #if defined(LWS_WITH_CLIENT)
369 /*
370 * The case where we sent the connect, but we received
371 * something else before any CONNACK
372 */
373 if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
374 par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
375 lwsl_notice("%s: server sent non-CONNACK\n",
376 __func__);
377 goto send_protocol_error_and_close;
378 }
379 #endif /* LWS_WITH_CLIENT */
380
381 n = map_flags[par->packet_type_flags >> 4];
382 /*
383 * Where a flag bit is marked as “Reserved”, it is
384 * reserved for future use and MUST be set to the value
385 * listed [MQTT-2.1.3-1].
386 */
387 if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
388 ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
389 lwsl_notice("%s: wsi %p: bad flags, 0x%02x mask 0x%02x (len %d)\n",
390 __func__, wsi, par->packet_type_flags, n, (int)len + 1);
391 lwsl_hexdump_err(buf - 1, len + 1);
392 goto send_protocol_error_and_close;
393 }
394
395 lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
396 __func__, par->packet_type_flags >> 4,
397 par->packet_type_flags & 0xf);
398
399 /* allows us to know if a property that can only be
400 * given once, appears twice */
401 memset(par->props_seen, 0, sizeof(par->props_seen));
402 par->state = par->packet_type_flags & 0xf0;
403 break;
404
405 case LMQCPP_CONNECT_PACKET:
406 lwsl_debug("%s: received CONNECT pkt\n", __func__);
407 par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
408 lws_mqtt_vbi_init(&par->vbit);
409 break;
410
411 case LMQCPP_CONNECT_REMAINING_LEN_VBI:
412 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
413 case LMSPR_NEED_MORE:
414 break;
415 case LMSPR_COMPLETED:
416 par->cpkt_remlen = par->vbit.value;
417 n = map_flags[ctl_pkt_type(par)];
418 lws_mqtt_str_init(&par->s_temp, par->temp,
419 sizeof(par->temp), 0);
420 par->state = LMQCPP_CONNECT_VH_PNAME;
421 break;
422 default:
423 lwsl_notice("%s: bad vbi\n", __func__);
424 goto send_protocol_error_and_close;
425 }
426 break;
427
428 case LMQCPP_CONNECT_VH_PNAME:
429 switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
430 case LMSPR_NEED_MORE:
431 break;
432 case LMSPR_COMPLETED:
433 if (par->s_temp.len != 4 ||
434 memcmp(par->s_temp.buf, "MQTT",
435 par->s_temp.len)) {
436 lwsl_notice("%s: protocol name: %.*s\n",
437 __func__, par->s_temp.len,
438 par->s_temp.buf);
439 goto send_unsupp_connack_and_close;
440 }
441 par->state = LMQCPP_CONNECT_VH_PVERSION;
442 break;
443 default:
444 lwsl_notice("%s: bad protocol name\n", __func__);
445 goto send_protocol_error_and_close;
446 }
447 break;
448
449 case LMQCPP_CONNECT_VH_PVERSION:
450 par->conn_protocol_version = *buf++;
451 len--;
452 if (par->conn_protocol_version != 5) {
453 lwsl_info("%s: unsupported MQTT version %d\n",
454 __func__, par->conn_protocol_version);
455 goto send_unsupp_connack_and_close;
456 }
457 par->state = LMQCPP_CONNECT_VH_FLAGS;
458 break;
459
460 case LMQCPP_CONNECT_VH_FLAGS:
461 par->cpkt_flags = *buf++;
462 len--;
463 if (par->cpkt_flags & 1) {
464 /*
465 * The Server MUST validate that the reserved
466 * flag in the CONNECT packet is set to 0
467 * [MQTT-3.1.2-3].
468 */
469 par->reason = LMQCP_REASON_MALFORMED_PACKET;
470 goto send_reason_and_close;
471 }
472 /*
473 * conn_flags specifies the Will Properties that should
474 * appear in the payload section
475 */
476 lws_mqtt_2byte_init(&par->vbit);
477 par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
478 break;
479
480 case LMQCPP_CONNECT_VH_KEEPALIVE:
481 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
482 case LMSPR_NEED_MORE:
483 break;
484 case LMSPR_COMPLETED:
485 par->keepalive = (uint16_t)par->vbit.value;
486 lws_mqtt_vbi_init(&par->vbit);
487 par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
488 break;
489 default:
490 lwsl_notice("%s: ka bad vbi\n", __func__);
491 goto send_protocol_error_and_close;
492 }
493 break;
494
495 case LMQCPP_PINGRESP_ZERO:
496 len--;
497 /* second byte of PINGRESP must be zero */
498 if (*buf++)
499 goto send_protocol_error_and_close;
500 goto cmd_completion;
501
502 case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
503 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
504 case LMSPR_NEED_MORE:
505 break;
506 case LMSPR_COMPLETED:
507 /* reset consumption counter */
508 par->consumed = 0;
509 par->props_len = par->vbit.value;
510 lws_mqtt_vbi_init(&par->vbit);
511 par->state = LMQCPP_PROP_ID_VBI;
512 break;
513 default:
514 lwsl_notice("%s: connpr bad vbi\n", __func__);
515 goto send_protocol_error_and_close;
516 }
517 break;
518
519 case LMQCPP_PUBLISH_PACKET:
520 if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
521 lwsl_notice("%s: Topic rx before subscribing\n",
522 __func__);
523 goto send_protocol_error_and_close;
524 }
525 lwsl_info("%s: received PUBLISH pkt\n", __func__);
526 par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
527 lws_mqtt_vbi_init(&par->vbit);
528 break;
529 case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
530 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
531 case LMSPR_NEED_MORE:
532 break;
533 case LMSPR_COMPLETED:
534 par->cpkt_remlen = par->vbit.value;
535 lwsl_debug("%s: PUBLISH pkt len = %d\n",
536 __func__, (int)par->cpkt_remlen);
537 /* Move on to PUBLISH's variable header */
538 par->state = LMQCPP_PUBLISH_VH_TOPIC;
539 break;
540 default:
541 lwsl_notice("%s: pubrem bad vbi\n", __func__);
542 goto send_protocol_error_and_close;
543 }
544 break;
545
546 case LMQCPP_PUBLISH_VH_TOPIC:
547 {
548 lws_mqtt_publish_param_t *pub = NULL;
549
550 if (len < 2) {
551 lwsl_notice("%s: topic too short\n", __func__);
552 return -1;
553 }
554
555 /* Topic len */
556 par->n = lws_ser_ru16be(buf);
557 buf += 2;
558 len -= 2;
559
560 if (len < par->n) {/* the way this is written... */
561 lwsl_notice("%s: len breakage\n", __func__);
562 return -1;
563 }
564
565 /* Invalid topic len */
566 if (par->n == 0) {
567 lwsl_notice("%s: zero topic len\n", __func__);
568 par->reason = LMQCP_REASON_MALFORMED_PACKET;
569 goto send_reason_and_close;
570 }
571 lwsl_debug("%s: PUBLISH topic len %d\n",
572 __func__, (int)par->n);
573 assert(!wsi->mqtt->rx_cpkt_param);
574 wsi->mqtt->rx_cpkt_param = lws_zalloc(
575 sizeof(lws_mqtt_publish_param_t), "rx pub param");
576 if (!wsi->mqtt->rx_cpkt_param)
577 goto oom;
578 pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
579
580 pub->topic_len = par->n;
581
582 /* Topic Name */
583 pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
584 "rx publish topic");
585 if (!pub->topic)
586 goto oom;
587 lws_strncpy(pub->topic, (const char *)buf,
588 (size_t)pub->topic_len + 1);
589 buf += pub->topic_len;
590 len -= pub->topic_len;
591
592 /* Extract QoS Level from Fixed Header Flags */
593 pub->qos = (lws_mqtt_qos_levels_t)
594 ((par->packet_type_flags >> 1) & 0x3);
595
596 pub->payload_pos = 0;
597
598 pub->payload_len = par->cpkt_remlen -
599 (2 + pub->topic_len + ((pub->qos) ? 2 : 0));
600
601 switch (pub->qos) {
602 case QOS0:
603 par->state = LMQCPP_PAYLOAD;
604 if (pub->payload_len == 0)
605 goto cmd_completion;
606
607 break;
608 case QOS1:
609 case QOS2:
610 par->state = LMQCPP_PUBLISH_VH_PKT_ID;
611 break;
612 default:
613 par->reason = LMQCP_REASON_MALFORMED_PACKET;
614 lws_free_set_NULL(pub->topic);
615 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
616 goto send_reason_and_close;
617 }
618 break;
619 }
620 case LMQCPP_PUBLISH_VH_PKT_ID:
621 {
622 lws_mqtt_publish_param_t *pub =
623 (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
624
625 if (len < 2) {
626 lwsl_notice("%s: len breakage 2\n", __func__);
627 return -1;
628 }
629
630 par->cpkt_id = lws_ser_ru16be(buf);
631 buf += 2;
632 len -= 2;
633 wsi->mqtt->ack_pkt_id = par->cpkt_id;
634 lwsl_debug("%s: Packet ID %d\n",
635 __func__, (int)par->cpkt_id);
636 par->state = LMQCPP_PAYLOAD;
637 pub->payload_pos = 0;
638 pub->payload_len = par->cpkt_remlen -
639 (2 + pub->topic_len + ((pub->qos) ? 2 : 0));
640 if (pub->payload_len == 0)
641 goto cmd_completion;
642
643 break;
644 }
645 case LMQCPP_PAYLOAD:
646 {
647 lws_mqtt_publish_param_t *pub =
648 (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
649 if (pub == NULL) {
650 lwsl_err("%s: Uninitialized pub_param\n",
651 __func__);
652 goto send_protocol_error_and_close;
653 }
654
655 pub->payload = buf;
656 goto cmd_completion;
657 }
658
659 case LMQCPP_CONNACK_PACKET:
660 if (!lwsi_role_client(wsi)) {
661 lwsl_err("%s: CONNACK is only Server to Client",
662 __func__);
663 goto send_unsupp_connack_and_close;
664 }
665
666 lwsl_debug("%s: received CONNACK pkt\n", __func__);
667 lws_mqtt_vbi_init(&par->vbit);
668 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
669 case LMSPR_NEED_MORE:
670 break;
671 case LMSPR_COMPLETED:
672 par->cpkt_remlen = par->vbit.value;
673 lwsl_debug("%s: CONNACK pkt len = %d\n",
674 __func__, (int)par->cpkt_remlen);
675 if (par->cpkt_remlen != 2)
676 goto send_protocol_error_and_close;
677
678 par->state = LMQCPP_CONNACK_VH_FLAGS;
679 break;
680 default:
681 lwsl_notice("%s: connack bad vbi\n", __func__);
682 goto send_protocol_error_and_close;
683 }
684 break;
685
686 case LMQCPP_CONNACK_VH_FLAGS:
687 {
688 lws_mqttc_t *c = &wsi->mqtt->client;
689 par->cpkt_flags = *buf++;
690 len--;
691
692 if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
693 /*
694 * Byte 1 is the "Connect Acknowledge
695 * Flags". Bits 7-1 are reserved and
696 * MUST be set to 0.
697 */
698 par->reason = LMQCP_REASON_MALFORMED_PACKET;
699 goto send_reason_and_close;
700 }
701 /*
702 * If the Server accepts a connection with
703 * CleanSession set to 1, the Server MUST set
704 * Session Present to 0 in the CONNACK packet
705 * in addition to setting a zero return code
706 * in the CONNACK packet [MQTT-3.2.2-1]. If
707 * the Server accepts a connection with
708 * CleanSession set to 0, the value set in
709 * Session Present depends on whether the
710 * Server already has stored Session state for
711 * the supplied client ID. If the Server has
712 * stored Session state, it MUST set
713 * SessionPresent to 1 in the CONNACK packet
714 * [MQTT-3.2.2-2]. If the Server does not have
715 * stored Session state, it MUST set Session
716 * Present to 0 in the CONNACK packet. This is
717 * in addition to setting a zero return code
718 * in the CONNACK packet [MQTT-3.2.2-3].
719 */
720 if ((c->conn_flags & LMQCFT_CLEAN_START) &&
721 (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
722 goto send_protocol_error_and_close;
723
724 wsi->mqtt->session_resumed = (par->cpkt_flags &
725 LMQCFT_SESSION_PRESENT);
726
727 /* Move on to Connect Return Code */
728 par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
729 break;
730 }
731 case LMQCPP_CONNACK_VH_RETURN_CODE:
732 par->conn_rc = *buf++;
733 len--;
734 /*
735 * If a server sends a CONNACK packet containing a
736 * non-zero return code it MUST then close the Network
737 * Connection [MQTT-3.2.2-5]
738 */
739 switch (par->conn_rc) {
740 case 0:
741 goto cmd_completion;
742 case 1:
743 case 2:
744 case 3:
745 case 4:
746 case 5:
747 par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
748 par->conn_rc - 1;
749 goto send_reason_and_close;
750 default:
751 lwsl_notice("%s: bad connack retcode\n", __func__);
752 goto send_protocol_error_and_close;
753 }
754 break;
755
756 /* SUBACK */
757 case LMQCPP_SUBACK_PACKET:
758 if (!lwsi_role_client(wsi)) {
759 lwsl_err("%s: SUBACK is only Server to Client",
760 __func__);
761 goto send_unsupp_connack_and_close;
762 }
763
764 lwsl_debug("%s: received SUBACK pkt\n", __func__);
765 lws_mqtt_vbi_init(&par->vbit);
766 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
767 case LMSPR_NEED_MORE:
768 break;
769 case LMSPR_COMPLETED:
770 par->cpkt_remlen = par->vbit.value;
771 lwsl_debug("%s: SUBACK pkt len = %d\n",
772 __func__, (int)par->cpkt_remlen);
773 if (par->cpkt_remlen <= 2)
774 goto send_protocol_error_and_close;
775 par->state = LMQCPP_SUBACK_VH_PKT_ID;
776 break;
777 default:
778 lwsl_notice("%s: suback bad vbi\n", __func__);
779 goto send_protocol_error_and_close;
780 }
781 break;
782
783 case LMQCPP_SUBACK_VH_PKT_ID:
784
785 if (len < 2) {
786 lwsl_notice("%s: len breakage 4\n", __func__);
787 return -1;
788 }
789
790 par->cpkt_id = lws_ser_ru16be(buf);
791 wsi->mqtt->ack_pkt_id = par->cpkt_id;
792 buf += 2;
793 len -= 2;
794 par->cpkt_remlen -= 2;
795 par->n = 0;
796 par->state = LMQCPP_SUBACK_PAYLOAD;
797 *par->temp = 0;
798 break;
799
800 case LMQCPP_SUBACK_PAYLOAD:
801 {
802 lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
803
804 len--;
805 switch (qos) {
806 case QOS0:
807 case QOS1:
808 case QOS2:
809 break;
810 case FAILURE_QOS_LEVEL:
811 goto send_protocol_error_and_close;
812
813 default:
814 par->reason = LMQCP_REASON_MALFORMED_PACKET;
815 goto send_reason_and_close;
816 }
817
818 if (++(par->n) == par->cpkt_remlen) {
819 par->n = 0;
820 goto cmd_completion;
821 }
822
823 break;
824 }
825
826 /* UNSUBACK */
827 case LMQCPP_UNSUBACK_PACKET:
828 if (!lwsi_role_client(wsi)) {
829 lwsl_err("%s: UNSUBACK is only Server to Client",
830 __func__);
831 goto send_unsupp_connack_and_close;
832 }
833
834 lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
835 lws_mqtt_vbi_init(&par->vbit);
836 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
837 case LMSPR_NEED_MORE:
838 break;
839 case LMSPR_COMPLETED:
840 par->cpkt_remlen = par->vbit.value;
841 lwsl_debug("%s: UNSUBACK pkt len = %d\n",
842 __func__, (int)par->cpkt_remlen);
843 if (par->cpkt_remlen < 2)
844 goto send_protocol_error_and_close;
845 par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
846 break;
847 default:
848 lwsl_notice("%s: unsuback bad vbi\n", __func__);
849 goto send_protocol_error_and_close;
850 }
851 break;
852
853 case LMQCPP_UNSUBACK_VH_PKT_ID:
854
855 if (len < 2) {
856 lwsl_notice("%s: len breakage 3\n", __func__);
857 return -1;
858 }
859
860 par->cpkt_id = lws_ser_ru16be(buf);
861 wsi->mqtt->ack_pkt_id = par->cpkt_id;
862 buf += 2;
863 len -= 2;
864 par->cpkt_remlen -= 2;
865 par->n = 0;
866
867 goto cmd_completion;
868
869 case LMQCPP_PUBACK_PACKET:
870 lws_mqtt_vbi_init(&par->vbit);
871 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
872 case LMSPR_NEED_MORE:
873 break;
874 case LMSPR_COMPLETED:
875 par->cpkt_remlen = par->vbit.value;
876 lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
877 (int)par->cpkt_remlen);
878 /*
879 * must be 4 or more, with special case that 2
880 * means success with no reason code or props
881 */
882 if (par->cpkt_remlen <= 1 ||
883 par->cpkt_remlen == 3)
884 goto send_protocol_error_and_close;
885
886 par->state = LMQCPP_PUBACK_VH_PKT_ID;
887 par->fixed_seen[2] = par->fixed_seen[3] = 0;
888 par->fixed = 0;
889 par->n = 0;
890 break;
891 default:
892 lwsl_notice("%s: puback bad vbi\n", __func__);
893 goto send_protocol_error_and_close;
894 }
895 break;
896
897 case LMQCPP_PUBACK_VH_PKT_ID:
898 /*
899 * There are 3 fixed bytes and then a VBI for the
900 * property section length
901 */
902 par->fixed_seen[par->fixed++] = *buf++;
903 if (len < par->cpkt_remlen - par->n) {
904 lwsl_notice("%s: len breakage 4\n", __func__);
905 return -1;
906 }
907 len--;
908 par->n++;
909 if (par->fixed == 2)
910 par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
911
912 if (par->fixed == 3) {
913 lws_mqtt_vbi_init(&par->vbit);
914 par->props_consumed = 0;
915 par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
916 }
917 /* length of 2 is truncated packet and we completed it */
918 if (par->cpkt_remlen == par->fixed)
919 goto cmd_completion;
920 break;
921
922 case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
923 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
924 case LMSPR_NEED_MORE:
925 break;
926 case LMSPR_COMPLETED:
927 par->props_len = par->vbit.value;
928 lwsl_info("%s: PUBACK props len = %d\n",
929 __func__, (int)par->cpkt_remlen);
930 /*
931 * If there are no properties, this is a
932 * command completion event in itself
933 */
934 if (!par->props_len)
935 goto cmd_completion;
936
937 /*
938 * Otherwise consume the properties before
939 * completing the command
940 */
941 lws_mqtt_vbi_init(&par->vbit);
942 par->state = LMQCPP_PUBACK_VH_PKT_ID;
943 break;
944 default:
945 lwsl_notice("%s: puback pr bad vbi\n", __func__);
946 goto send_protocol_error_and_close;
947 }
948 break;
949
950 case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
951 /*
952 * TODO: stash the props
953 */
954 par->props_consumed++;
955 len--;
956 buf++;
957 if (par->props_len != par->props_consumed)
958 break;
959
960 cmd_completion:
961 /*
962 * We come here when we understood we just processed
963 * the last byte of a command packet, regardless of the
964 * packet type
965 */
966 par->state = LMQCPP_IDLE;
967
968 switch (par->packet_type_flags >> 4) {
969 case LMQCP_STOC_CONNACK:
970 lwsl_info("%s: cmd_completion: CONNACK\n",
971 __func__);
972
973 /*
974 * Getting the CONNACK means we are the first,
975 * the nwsi, and we succeeded to create a new
976 * network connection ourselves.
977 *
978 * Since others may join us sharing the nwsi,
979 * and we may close while they still want to use
980 * it, our wsi lifecycle alone can no longer
981 * define the lifecycle of the nwsi... it means
982 * we need to do a "magic trick" and instead of
983 * being both the nwsi and act like a child
984 * stream, create a new wsi to take over the
985 * nwsi duties and turn our wsi into a child of
986 * the nwsi with its own lifecycle.
987 *
988 * The nwsi gets a mostly empty wsi->nwsi used
989 * to track already-subscribed topics globally
990 * for the connection.
991 */
992
993 /* we were under SENT_CLIENT_HANDSHAKE timeout */
994 lws_set_timeout(wsi, 0, 0);
995
996 w = lws_create_new_server_wsi(wsi->vhost,
997 wsi->tsi);
998 if (!w) {
999 lwsl_notice("%s: sid 1 migrate failed\n",
1000 __func__);
1001 return -1;
1002 }
1003
1004 wsi->mux.highest_sid = 1;
1005 lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
1006
1007 wsi->mux_substream = 1;
1008 w->mux_substream = 1;
1009 w->client_mux_substream = 1;
1010 wsi->client_mux_migrated = 1;
1011 wsi->told_user_closed = 1; /* don't tell nwsi closed */
1012
1013 lwsi_set_state(w, LRS_ESTABLISHED);
1014 lwsi_set_state(wsi, LRS_ESTABLISHED);
1015 lwsi_set_role(w, lwsi_role(wsi));
1016
1017 #if defined(LWS_WITH_CLIENT)
1018 w->flags = wsi->flags;
1019 #endif
1020
1021 w->mqtt = wsi->mqtt;
1022 wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
1023 if (!wsi->mqtt)
1024 return -1;
1025 w->mqtt->wsi = w;
1026 w->protocol = wsi->protocol;
1027 if (w->user_space &&
1028 !w->user_space_externally_allocated)
1029 lws_free_set_NULL(w->user_space);
1030 w->user_space = wsi->user_space;
1031 wsi->user_space = NULL;
1032 w->user_space_externally_allocated =
1033 wsi->user_space_externally_allocated;
1034 if (lws_ensure_user_space(w))
1035 goto bail1;
1036 w->opaque_user_data = wsi->opaque_user_data;
1037 wsi->opaque_user_data = NULL;
1038 w->stash = wsi->stash;
1039 wsi->stash = NULL;
1040
1041 lws_mux_mark_immortal(w);
1042
1043 lwsl_notice("%s: migrated nwsi %p to sid 1 %p\n",
1044 __func__, wsi, w);
1045
1046 #if defined(LWS_WITH_SERVER_STATUS)
1047 wsi->vhost->conn_stats.h2_subs++;
1048 #endif
1049
1050 /*
1051 * It was the last thing we were waiting for
1052 * before we can be fully ESTABLISHED
1053 */
1054 if (lws_mqtt_set_client_established(w)) {
1055 lwsl_notice("%s: set EST fail\n", __func__);
1056 return -1;
1057 }
1058
1059 /* get the ball rolling */
1060 lws_validity_confirmed(wsi);
1061
1062 /* well, add the queued guys as children */
1063 lws_wsi_mux_apply_queue(wsi);
1064 break;
1065
1066 bail1:
1067 /* undo the insert */
1068 wsi->mux.child_list = w->mux.sibling_list;
1069 wsi->mux.child_count--;
1070
1071 w->context->count_wsi_allocated--;
1072
1073 if (w->user_space)
1074 lws_free_set_NULL(w->user_space);
1075 w->vhost->protocols[0].callback(w,
1076 LWS_CALLBACK_WSI_DESTROY,
1077 NULL, NULL, 0);
1078 lws_vhost_unbind_wsi(w);
1079 lws_free(w);
1080
1081 return 0;
1082
1083 case LMQCP_PUBACK:
1084 lwsl_info("%s: cmd_completion: PUBACK\n",
1085 __func__);
1086
1087 /*
1088 * Figure out which child asked for this
1089 */
1090
1091 n = 0;
1092 lws_start_foreach_ll(struct lws *, w,
1093 wsi->mux.child_list) {
1094 if (w->mqtt->unacked_publish &&
1095 w->mqtt->ack_pkt_id == par->cpkt_id) {
1096 char requested_close = 0;
1097
1098 w->mqtt->unacked_publish = 0;
1099 if (user_callback_handle_rxflow(
1100 w->protocol->callback,
1101 w, LWS_CALLBACK_MQTT_ACK,
1102 w->user_space, NULL, 0) < 0) {
1103 lwsl_info("%s: MQTT_ACK requests close\n",
1104 __func__);
1105 requested_close = 1;
1106 }
1107 n = 1;
1108
1109 /*
1110 * We got an assertive PUBACK,
1111 * no need for ACK timeout wait
1112 * any more
1113 */
1114 lws_sul_schedule(lws_get_context(w), 0,
1115 &w->mqtt->sul_qos1_puback_wait, NULL,
1116 LWS_SET_TIMER_USEC_CANCEL);
1117
1118 if (requested_close) {
1119 __lws_close_free_wsi(w,
1120 0, "ack cb");
1121 break;
1122 }
1123
1124 break;
1125 }
1126 } lws_end_foreach_ll(w, mux.sibling_list);
1127
1128 if (!n) {
1129 lwsl_err("%s: unsolicited PUBACK\n",
1130 __func__);
1131 return -1;
1132 }
1133
1134 /*
1135 * If we published something and it was acked,
1136 * our connection is definitely working in both
1137 * directions at the moment.
1138 */
1139 lws_validity_confirmed(wsi);
1140 break;
1141
1142 case LMQCP_STOC_PINGRESP:
1143 lwsl_info("%s: cmd_completion: PINGRESP\n",
1144 __func__);
1145 /*
1146 * If we asked for a PINGRESP and it came,
1147 * our connection is definitely working in both
1148 * directions at the moment.
1149 */
1150 lws_validity_confirmed(wsi);
1151 break;
1152
1153 case LMQCP_STOC_SUBACK:
1154 lwsl_info("%s: cmd_completion: SUBACK\n",
1155 __func__);
1156
1157 /*
1158 * Figure out which child asked for this
1159 */
1160
1161 n = 0;
1162 lws_start_foreach_ll(struct lws *, w,
1163 wsi->mux.child_list) {
1164 if (w->mqtt->inside_subscribe &&
1165 w->mqtt->ack_pkt_id == par->cpkt_id) {
1166 w->mqtt->inside_subscribe = 0;
1167 if (user_callback_handle_rxflow(
1168 w->protocol->callback,
1169 w, LWS_CALLBACK_MQTT_SUBSCRIBED,
1170 w->user_space, NULL, 0) < 0) {
1171 lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1172 __func__);
1173 return -1;
1174 }
1175 n = 1;
1176 break;
1177 }
1178 } lws_end_foreach_ll(w, mux.sibling_list);
1179
1180 if (!n) {
1181 lwsl_err("%s: unsolicited SUBACK\n",
1182 __func__);
1183 return -1;
1184 }
1185
1186 /*
1187 * If we subscribed to something and SUBACK came,
1188 * our connection is definitely working in both
1189 * directions at the moment.
1190 */
1191 lws_validity_confirmed(wsi);
1192
1193 break;
1194
1195 case LMQCP_STOC_UNSUBACK:
1196 {
1197 char requested_close = 0;
1198 lwsl_info("%s: cmd_completion: UNSUBACK\n",
1199 __func__);
1200 /*
1201 * Figure out which child asked for this
1202 */
1203 n = 0;
1204 lws_start_foreach_ll(struct lws *, w,
1205 wsi->mux.child_list) {
1206 if (w->mqtt->inside_unsubscribe &&
1207 w->mqtt->ack_pkt_id == par->cpkt_id) {
1208 struct lws *nwsi = lws_get_network_wsi(w);
1209
1210 /*
1211 * No more subscribers left,
1212 * remove the topic from nwsi
1213 */
1214 lws_mqtt_client_remove_subs(nwsi->mqtt);
1215
1216 w->mqtt->inside_unsubscribe = 0;
1217 if (user_callback_handle_rxflow(
1218 w->protocol->callback,
1219 w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1220 w->user_space, NULL, 0) < 0) {
1221 lwsl_info("%s: MQTT_UNSUBACK requests close\n",
1222 __func__);
1223 requested_close = 1;
1224 }
1225 n = 1;
1226
1227 if (requested_close) {
1228 __lws_close_free_wsi(w,
1229 0, "unsub ack cb");
1230 break;
1231 }
1232 break;
1233 }
1234 } lws_end_foreach_ll(w, mux.sibling_list);
1235
1236 if (!n) {
1237 lwsl_err("%s: unsolicited UNSUBACK\n",
1238 __func__);
1239 return -1;
1240 }
1241
1242
1243 /*
1244 * If we unsubscribed to something and
1245 * UNSUBACK came, our connection is
1246 * definitely working in both
1247 * directions at the moment.
1248 */
1249 lws_validity_confirmed(wsi);
1250
1251 break;
1252 }
1253 case LMQCP_PUBLISH:
1254 {
1255 lws_mqtt_publish_param_t *pub =
1256 (lws_mqtt_publish_param_t *)
1257 wsi->mqtt->rx_cpkt_param;
1258 size_t chunk;
1259
1260 if (pub == NULL) {
1261 lwsl_notice("%s: no pub\n", __func__);
1262 return -1;
1263 }
1264
1265 /*
1266 * RX PUBLISH is delivered to any children that
1267 * registered for the related topic
1268 */
1269
1270 n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
1271
1272 chunk = pub->payload_len - pub->payload_pos;
1273 if (chunk > len)
1274 chunk = len;
1275
1276 lws_start_foreach_ll(struct lws *, w,
1277 wsi->mux.child_list) {
1278 if (lws_mqtt_find_sub(w->mqtt,
1279 pub->topic))
1280 if (w->protocol->callback(
1281 w, n,
1282 w->user_space,
1283 (void *)pub,
1284 chunk))
1285 return 1;
1286 } lws_end_foreach_ll(w, mux.sibling_list);
1287
1288
1289 pub->payload_pos += (uint32_t)chunk;
1290 len -= chunk;
1291 buf += chunk;
1292
1293 lwsl_debug("%s: post pos %d, plen %d, len %d\n",
1294 __func__, (int)pub->payload_pos,
1295 (int)pub->payload_len, (int)len);
1296
1297 if (pub->payload_pos != pub->payload_len) {
1298 /*
1299 * More chunks of the payload pending,
1300 * blocking this connection from doing
1301 * anything else
1302 */
1303 par->state = LMQCPP_PAYLOAD;
1304 break;
1305 }
1306
1307 /* For QOS>0, send out PUBACK */
1308 if (pub->qos) {
1309 wsi->mqtt->send_puback = 1;
1310 lws_callback_on_writable(wsi);
1311 }
1312
1313 par->payload_consumed = 0;
1314 lws_free_set_NULL(pub->topic);
1315 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1316
1317 break;
1318 }
1319 default:
1320 break;
1321 }
1322
1323 break;
1324
1325
1326 case LMQCPP_PROP_ID_VBI:
1327 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1328 case LMSPR_NEED_MORE:
1329 break;
1330 case LMSPR_COMPLETED:
1331 par->consumed += par->vbit.consumed;
1332 if (par->vbit.value >
1333 LWS_ARRAY_SIZE(property_valid)) {
1334 lwsl_notice("%s: undef prop id 0x%x\n",
1335 __func__, (int)par->vbit.value);
1336 goto send_protocol_error_and_close;
1337 }
1338 if (!(property_valid[par->vbit.value] &
1339 (1 << ctl_pkt_type(par)))) {
1340 lwsl_notice("%s: prop id 0x%x invalid for"
1341 " control pkt %d\n", __func__,
1342 (int)par->vbit.value,
1343 ctl_pkt_type(par));
1344 goto send_protocol_error_and_close;
1345 }
1346 par->prop_id = par->vbit.value;
1347 par->flag_prop_multi =
1348 par->props_seen[par->prop_id >> 3] &
1349 (1 << (par->prop_id & 7));
1350 par->props_seen[par->prop_id >> 3] |=
1351 (1 << (par->prop_id & 7));
1352 /*
1353 * even if it's not a vbi property arg,
1354 * .consumed of this will be zero the first time
1355 */
1356 lws_mqtt_vbi_init(&par->vbit);
1357 /*
1358 * if it's a string, next state must set the
1359 * destination and size limit itself. But
1360 * resetting it generically here lets it use
1361 * lws_mqtt_str_first() to understand it's the
1362 * first time around.
1363 */
1364 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
1365
1366 /* property arg state enums are so encoded */
1367 par->state = 0x100 | par->vbit.value;
1368 break;
1369 default:
1370 lwsl_notice("%s: prop id bad vbi\n", __func__);
1371 goto send_protocol_error_and_close;
1372 }
1373 break;
1374
1375 /*
1376 * All possible property payloads... restricting which ones
1377 * can appear in which control packets is already done above
1378 * in LMQCPP_PROP_ID_VBI
1379 */
1380
1381 case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
1382 case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
1383 case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
1384 case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
1385 case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
1386 case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
1387 case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
1388 case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
1389 if (par->flag_prop_multi)
1390 goto singular_prop_seen_twice;
1391 par->payload_format = *buf++;
1392 len--;
1393 if (lws_mqtt_pconsume(par, 1))
1394 goto send_protocol_error_and_close;
1395 break;
1396
1397 case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
1398 case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
1399 case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
1400 case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
1401 if (par->flag_prop_multi)
1402 goto singular_prop_seen_twice;
1403
1404 if (lws_mqtt_mb_first(&par->vbit))
1405 lws_mqtt_4byte_init(&par->vbit);
1406
1407 switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1408 case LMSPR_NEED_MORE:
1409 break;
1410 case LMSPR_COMPLETED:
1411 if (lws_mqtt_pconsume(par, par->vbit.consumed))
1412 goto send_protocol_error_and_close;
1413 break;
1414 default:
1415 goto send_protocol_error_and_close;
1416 }
1417 break;
1418
1419 case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
1420 case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
1421 case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
1422 case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
1423 if (par->flag_prop_multi)
1424 goto singular_prop_seen_twice;
1425
1426 if (lws_mqtt_mb_first(&par->vbit))
1427 lws_mqtt_2byte_init(&par->vbit);
1428
1429 switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1430 case LMSPR_NEED_MORE:
1431 break;
1432 case LMSPR_COMPLETED:
1433 if (lws_mqtt_pconsume(par, par->vbit.consumed))
1434 goto send_protocol_error_and_close;
1435 break;
1436 default:
1437 goto send_protocol_error_and_close;
1438 }
1439 break;
1440
1441 case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
1442 case LMQCPP_PROP_AUTH_METHOD_UTF8S:
1443 case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
1444 case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
1445 case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
1446 case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
1447 case LMQCPP_PROP_REASON_STRING_UTF8S:
1448 case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
1449 case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
1450 if (par->flag_prop_multi)
1451 goto singular_prop_seen_twice;
1452
1453 if (lws_mqtt_str_first(&par->s_temp))
1454 lws_mqtt_str_init(&par->s_temp, par->temp,
1455 sizeof(par->temp), 0);
1456
1457 switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
1458 case LMSPR_NEED_MORE:
1459 break;
1460 case LMSPR_COMPLETED:
1461 if (lws_mqtt_pconsume(par, par->s_temp.len))
1462 goto send_protocol_error_and_close;
1463 break;
1464
1465 default:
1466 lwsl_info("%s: bad protocol name\n", __func__);
1467 goto send_protocol_error_and_close;
1468 }
1469 break;
1470
1471 case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
1472
1473 case LMQCPP_PROP_CORRELATION_BINDATA:
1474 case LMQCPP_PROP_AUTH_DATA_BINDATA:
1475
1476 /* TODO */
1477 lwsl_err("%s: Unimplemented packet state 0x%x\n",
1478 __func__, par->state);
1479 return -1;
1480 }
1481 }
1482
1483 return 0;
1484
1485 oom:
1486 lwsl_err("%s: OOM!\n", __func__);
1487 goto send_protocol_error_and_close;
1488
1489 singular_prop_seen_twice:
1490 lwsl_info("%s: property appears twice\n", __func__);
1491
1492 send_protocol_error_and_close:
1493 lwsl_notice("%s: peac\n", __func__);
1494 par->reason = LMQCP_REASON_PROTOCOL_ERROR;
1495
1496 send_reason_and_close:
1497 lwsl_notice("%s: srac\n", __func__);
1498 par->flag_pending_send_reason_close = 1;
1499 goto ask;
1500
1501 send_unsupp_connack_and_close:
1502 lwsl_notice("%s: unsupac\n", __func__);
1503 par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
1504 par->flag_pending_send_connack_close = 1;
1505
1506 ask:
1507 /* Should we ask for clients? */
1508 lws_callback_on_writable(wsi);
1509
1510 return -1;
1511 }
1512
1513 int
lws_mqtt_fill_fixed_header(uint8_t * p,lws_mqtt_control_packet_t ctrl_pkt_type,uint8_t dup,lws_mqtt_qos_levels_t qos,uint8_t retain)1514 lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
1515 uint8_t dup, lws_mqtt_qos_levels_t qos,
1516 uint8_t retain)
1517 {
1518 lws_mqtt_fixed_hdr_t hdr;
1519
1520 hdr.bits = 0;
1521 hdr.flags.ctrl_pkt_type = (uint8_t) ctrl_pkt_type;
1522
1523 switch(ctrl_pkt_type) {
1524 case LMQCP_PUBLISH:
1525 hdr.flags.dup = !!dup;
1526 /*
1527 * A PUBLISH Packet MUST NOT have both QoS bits set to
1528 * 1. If a Server or Client receives a PUBLISH Packet
1529 * which has both QoS bits set to 1 it MUST close the
1530 * Network Connection [MQTT-3.3.1-4].
1531 */
1532 if (qos >= RESERVED_QOS_LEVEL) {
1533 lwsl_err("%s: Unsupport QoS level 0x%x\n",
1534 __func__, qos);
1535 return -1;
1536 }
1537 hdr.flags.qos = (uint8_t)qos;
1538 hdr.flags.retain = !!retain;
1539 break;
1540
1541 case LMQCP_CTOS_CONNECT:
1542 case LMQCP_STOC_CONNACK:
1543 case LMQCP_PUBACK:
1544 case LMQCP_PUBREC:
1545 case LMQCP_PUBCOMP:
1546 case LMQCP_STOC_SUBACK:
1547 case LMQCP_STOC_UNSUBACK:
1548 case LMQCP_CTOS_PINGREQ:
1549 case LMQCP_STOC_PINGRESP:
1550 case LMQCP_DISCONNECT:
1551 case LMQCP_AUTH:
1552 hdr.bits &= 0xf0;
1553 break;
1554
1555 /*
1556 * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
1557 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
1558 * MUST be set to 0,0,1 and 0 respectively. The Server MUST
1559 * treat any other value as malformed and close the Network
1560 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
1561 */
1562 case LMQCP_PUBREL:
1563 case LMQCP_CTOS_SUBSCRIBE:
1564 case LMQCP_CTOS_UNSUBSCRIBE:
1565 hdr.bits |= 0x02;
1566 break;
1567
1568 default:
1569 return -1;
1570 }
1571
1572 *p = hdr.bits;
1573
1574 return 0;
1575 }
1576
1577 /*
1578 * This fires if the wsi did a PUBLISH under QoS1, but no PUBACK came before
1579 * the timeout period
1580 */
1581
1582 static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list * sul)1583 lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
1584 {
1585 struct _lws_mqtt_related *mqtt = lws_container_of(sul,
1586 struct _lws_mqtt_related, sul_qos1_puback_wait);
1587
1588 lwsl_notice("%s: wsi %p\n", __func__, mqtt->wsi);
1589
1590 if (mqtt->wsi->protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
1591 mqtt->wsi->user_space, NULL, 0))
1592 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
1593 }
1594
1595 int
lws_mqtt_client_send_publish(struct lws * wsi,lws_mqtt_publish_param_t * pub,const void * buf,uint32_t len,int is_complete)1596 lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
1597 const void *buf, uint32_t len, int is_complete)
1598 {
1599 struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
1600 uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
1601 struct lws *nwsi = lws_get_network_wsi(wsi);
1602 lws_mqtt_str_t mqtt_vh_payload;
1603 uint32_t vh_len, rem_len;
1604
1605 assert(pub->topic);
1606
1607 lwsl_debug("%s: len = %d, is_complete = %d\n",
1608 __func__, (int)len, (int)is_complete);
1609
1610 if (lwsi_state(wsi) != LRS_ESTABLISHED) {
1611 lwsl_err("%s: wsi %p: unknown state 0x%x\n", __func__, wsi,
1612 lwsi_state(wsi));
1613 assert(0);
1614 return 1;
1615 }
1616
1617 if (wsi->mqtt->inside_payload) {
1618 /*
1619 * Headers are filled, we are sending
1620 * the payload - a buffer with LWS_PRE
1621 * in front it.
1622 */
1623 start = (uint8_t *)buf;
1624 p = start + len;
1625 if (is_complete)
1626 wsi->mqtt->inside_payload = 0;
1627 goto do_write;
1628 }
1629
1630 start = b + LWS_PRE;
1631 p = start;
1632 /*
1633 * Fill headers and the first chunk of the
1634 * payload (if any)
1635 */
1636 if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
1637 0, pub->qos, 0)) {
1638 lwsl_err("%s: Failed to fill fixed header\n", __func__);
1639 return 1;
1640 }
1641
1642 /*
1643 * Topic len field + Topic len + Packet ID
1644 * (for QOS>0) + Payload len
1645 */
1646 vh_len = 2 + pub->topic_len + ((pub->qos) ? 2 : 0);
1647 rem_len = vh_len + pub->payload_len;
1648 lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
1649
1650 /* Will the chunk of payload fit? */
1651 if ((vh_len + len) >=
1652 (wsi->context->pt_serv_buf_size - LWS_PRE)) {
1653 lwsl_err("%s: Payload is too big\n", __func__);
1654 return 1;
1655 }
1656
1657 p += lws_mqtt_vbi_encode(rem_len, p);
1658
1659 /* Topic's Len */
1660 lws_ser_wu16be(p, pub->topic_len);
1661 p += 2;
1662
1663 /*
1664 * Init lws_mqtt_str for "MQTT Variable
1665 * Headers + payload" (only the supplied
1666 * chuncked payload)
1667 */
1668 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
1669 (pub->topic_len + ((pub->qos) ? 2 : 0) + len),
1670 0);
1671
1672 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1673 lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
1674 if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
1675 lwsl_err("%s: a\n", __func__);
1676 return 1;
1677 }
1678
1679 /* Packet ID */
1680 if (pub->qos != QOS0) {
1681 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1682 wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id;
1683 lwsl_debug("%s: pkt_id = %d\n", __func__,
1684 (int)wsi->mqtt->ack_pkt_id);
1685 lws_ser_wu16be(p, pub->packet_id);
1686 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
1687 lwsl_err("%s: b\n", __func__);
1688 return 1;
1689 }
1690 }
1691 /*
1692 * A non-empty Payload is expected and a chunk
1693 * is present
1694 */
1695 if (pub->payload_len && len) {
1696 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1697 memcpy(p, buf, len);
1698 if (lws_mqtt_str_advance(&mqtt_vh_payload, len))
1699 return 1;
1700 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1701 }
1702
1703 if (!is_complete)
1704 nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
1705
1706 do_write:
1707
1708 // lwsl_hexdump_err(start, lws_ptr_diff(p, start));
1709
1710 if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
1711 lws_ptr_diff(p, start)) {
1712 lwsl_err("%s: write failed\n", __func__);
1713 return 1;
1714 }
1715
1716 if (!is_complete) {
1717 /* still some more chunks to come... */
1718 lws_callback_on_writable(wsi);
1719
1720 return 0;
1721 }
1722
1723 wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
1724
1725 if (pub->qos != QOS0)
1726 wsi->mqtt->unacked_publish = 1;
1727
1728 /* this was the last part of the publish message */
1729
1730 if (pub->qos == QOS0) {
1731 /*
1732 * There won't be any real PUBACK, act like we got one
1733 * so the user callback logic is the same for QoS0 or
1734 * QoS1
1735 */
1736 if (wsi->protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
1737 wsi->user_space, NULL, 0)) {
1738 lwsl_err("%s: ACK callback exited\n", __func__);
1739 return 1;
1740 }
1741
1742 return 0;
1743 }
1744
1745 /* For QoS1, if no PUBACK coming after 3s, we must RETRY the publish */
1746
1747 wsi->mqtt->sul_qos1_puback_wait.cb = lws_mqtt_publish_resend;
1748 __lws_sul_insert(&pt->pt_sul_owner, &wsi->mqtt->sul_qos1_puback_wait,
1749 3 * LWS_USEC_PER_SEC);
1750
1751 return 0;
1752 }
1753
1754 int
lws_mqtt_client_send_subcribe(struct lws * wsi,lws_mqtt_subscribe_param_t * sub)1755 lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
1756 {
1757 struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
1758 uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
1759 struct lws *nwsi = lws_get_network_wsi(wsi);
1760 lws_mqtt_str_t mqtt_vh_payload;
1761 uint8_t exists[8], extant;
1762 lws_mqtt_subs_t *mysub;
1763 uint32_t rem_len;
1764 #if defined(_DEBUG)
1765 uint32_t tops;
1766 #endif
1767 uint32_t n;
1768
1769 assert(sub->num_topics);
1770 assert(sub->num_topics < sizeof(exists));
1771
1772 switch (lwsi_state(wsi)) {
1773 case LRS_ESTABLISHED: /* Protocol connection established */
1774 if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
1775 0, 0, 0)) {
1776 lwsl_err("%s: Failed to fill fixed header\n", __func__);
1777 return 1;
1778 }
1779
1780 /*
1781 * The stream wants to subscribe to one or more topic, but
1782 * the shared nwsi may already be subscribed to some or all of
1783 * them from interactions with other streams. For those cases,
1784 * we filter them from the list the child wants until we just
1785 * have ones that are new to the nwsi. If nothing left, we just
1786 * synthesize the callback to the child as if SUBACK had come
1787 * and we're done, otherwise just ask the server for topics that
1788 * are new to the wsi.
1789 */
1790
1791 extant = 0;
1792 memset(&exists, 0, sizeof(exists));
1793 for (n = 0; n < sub->num_topics; n++) {
1794 lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
1795 __func__, (int)n, sub->topic[n].name);
1796
1797 mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
1798 if (mysub && mysub->ref_count) {
1799 mysub->ref_count++; /* another stream using it */
1800 exists[n] = 1;
1801 extant++;
1802 }
1803
1804 /*
1805 * Attach the topic we're subscribing to, to wsi->mqtt
1806 */
1807 if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
1808 lwsl_err("%s: create sub fail\n", __func__);
1809 return 1;
1810 }
1811 }
1812
1813 if (extant == sub->num_topics) {
1814 /*
1815 * It turns out there's nothing to do here, the nwsi has
1816 * already subscribed to all the topics this stream
1817 * wanted. Just tell it it can have them.
1818 */
1819 lwsl_notice("%s: all topics already subscribed\n", __func__);
1820 if (user_callback_handle_rxflow(
1821 wsi->protocol->callback,
1822 wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
1823 wsi->user_space, NULL, 0) < 0) {
1824 lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1825 __func__);
1826 return -1;
1827 }
1828
1829 return 0;
1830 }
1831
1832 #if defined(_DEBUG)
1833 /*
1834 * zero or more of the topics already existed, but not all,
1835 * so we must go to the server with a filtered list of the
1836 * new ones only
1837 */
1838
1839 tops = sub->num_topics - extant;
1840 #endif
1841
1842 /*
1843 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
1844 */
1845 rem_len = 2;
1846 for (n = 0; n < sub->num_topics; n++)
1847 if (!exists[n])
1848 rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
1849
1850 wsi->mqtt->sub_size = rem_len;
1851
1852 #if defined(_DEBUG)
1853 lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
1854 __func__, (int)tops, (int)rem_len);
1855 #endif
1856
1857 p += lws_mqtt_vbi_encode(rem_len, p);
1858
1859 if ((rem_len + lws_ptr_diff(p, start)) >=
1860 wsi->context->pt_serv_buf_size) {
1861 lwsl_err("%s: Payload is too big\n", __func__);
1862 return 1;
1863 }
1864
1865 /* Init lws_mqtt_str */
1866 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
1867 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1868
1869 /* Packet ID */
1870 wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
1871 lwsl_debug("%s: pkt_id = %d\n", __func__,
1872 (int)wsi->mqtt->ack_pkt_id);
1873 lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
1874
1875 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
1876 return 1;
1877
1878 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1879
1880 for (n = 0; n < sub->num_topics; n++) {
1881 lwsl_info("%s: topics[%d] = %s\n", __func__,
1882 (int)n, sub->topic[n].name);
1883
1884 /* if the nwsi already has it, don't ask server for it */
1885 if (exists[n]) {
1886 lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
1887 __func__, (int)n, sub->topic[n].name);
1888 continue;
1889 }
1890
1891 /*
1892 * Attach the topic we're subscribing to, to nwsi->mqtt
1893 * so we know the nwsi itself has a subscription to it
1894 */
1895
1896 if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
1897 return 1;
1898
1899 /* Topic's Len */
1900 lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
1901 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
1902 return 1;
1903 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1904
1905 /* Topic Name */
1906 lws_strncpy((char *)p, sub->topic[n].name,
1907 strlen(sub->topic[n].name) + 1);
1908 if (lws_mqtt_str_advance(&mqtt_vh_payload,
1909 (int)strlen(sub->topic[n].name)))
1910 return 1;
1911 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1912
1913 /* QoS */
1914 *p = sub->topic[n].qos;
1915 if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
1916 return 1;
1917 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
1918 }
1919 break;
1920
1921 default:
1922 return 1;
1923 }
1924
1925 if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
1926 lws_ptr_diff(p, start))
1927 return 1;
1928
1929 wsi->mqtt->inside_subscribe = 1;
1930
1931 return 0;
1932 }
1933
1934 int
lws_mqtt_client_send_unsubcribe(struct lws * wsi,const lws_mqtt_subscribe_param_t * unsub)1935 lws_mqtt_client_send_unsubcribe(struct lws *wsi,
1936 const lws_mqtt_subscribe_param_t *unsub)
1937 {
1938 struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
1939 uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
1940 struct lws *nwsi = lws_get_network_wsi(wsi);
1941 lws_mqtt_str_t mqtt_vh_payload;
1942 uint8_t send_unsub[8], orphaned;
1943 uint32_t rem_len, n;
1944 lws_mqtt_subs_t *mysub;
1945 #if defined(_DEBUG)
1946 uint32_t tops;
1947 #endif
1948
1949 lwsl_info("%s: Enter\n", __func__);
1950
1951 switch (lwsi_state(wsi)) {
1952 case LRS_ESTABLISHED: /* Protocol connection established */
1953 orphaned = 0;
1954 memset(&send_unsub, 0, sizeof(send_unsub));
1955 for (n = 0; n < unsub->num_topics; n++) {
1956 mysub = lws_mqtt_find_sub(nwsi->mqtt,
1957 unsub->topic[n].name);
1958 assert(mysub);
1959
1960 if (--mysub->ref_count == 0) {
1961 lwsl_notice("%s: Need to send UNSUB\n", __func__);
1962 send_unsub[n] = 1;
1963 orphaned++;
1964 }
1965 }
1966
1967 if (!orphaned) {
1968 /*
1969 * The nwsi still has other subscribers bound to the
1970 * topics.
1971 *
1972 * So, don't send UNSUB to server, and just fake the
1973 * UNSUB ACK event for the guy going away.
1974 */
1975 lwsl_notice("%s: unsubscribed!\n", __func__);
1976 if (user_callback_handle_rxflow(
1977 wsi->protocol->callback,
1978 wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1979 wsi->user_space, NULL, 0) < 0) {
1980 /*
1981 * We can't directly close here, because the
1982 * caller still has the wsi. Inform the
1983 * caller that we want to close
1984 */
1985
1986 return 1;
1987 }
1988
1989 return 0;
1990 }
1991 #if defined(_DEBUG)
1992 /*
1993 * one or more of the topics needs to be unsubscribed
1994 * from, so we must go to the server with a filtered
1995 * list of the new ones only
1996 */
1997
1998 tops = orphaned;
1999 #endif
2000
2001 if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
2002 0, 0, 0)) {
2003 lwsl_err("%s: Failed to fill fixed header\n", __func__);
2004 return 1;
2005 }
2006
2007 /*
2008 * Pid + (Topic len field + Topic len) x Num of Topics
2009 */
2010 rem_len = 2;
2011 for (n = 0; n < unsub->num_topics; n++)
2012 if (send_unsub[n])
2013 rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
2014
2015 wsi->mqtt->sub_size = rem_len;
2016
2017 lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2018 __func__, (int)tops, (int)rem_len);
2019
2020 p += lws_mqtt_vbi_encode(rem_len, p);
2021
2022 if ((rem_len + lws_ptr_diff(p, start)) >=
2023 wsi->context->pt_serv_buf_size) {
2024 lwsl_err("%s: Payload is too big\n", __func__);
2025 return 1;
2026 }
2027
2028 /* Init lws_mqtt_str */
2029 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
2030 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2031
2032 /* Packet ID */
2033 wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
2034 lwsl_debug("%s: pkt_id = %d\n", __func__,
2035 (int)wsi->mqtt->ack_pkt_id);
2036 lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2037
2038 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2039 return 1;
2040
2041 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2042
2043 for (n = 0; n < unsub->num_topics; n++) {
2044 lwsl_info("%s: topics[%d] = %s\n", __func__,
2045 (int)n, unsub->topic[n].name);
2046
2047 /*
2048 * Subscriber still bound to it, don't UBSUB
2049 * from the server
2050 */
2051 if (!send_unsub[n])
2052 continue;
2053
2054 /* Topic's Len */
2055 lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
2056 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2057 return 1;
2058 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2059
2060 /* Topic Name */
2061 lws_strncpy((char *)p, unsub->topic[n].name,
2062 strlen(unsub->topic[n].name) + 1);
2063 if (lws_mqtt_str_advance(&mqtt_vh_payload,
2064 (int)strlen(unsub->topic[n].name)))
2065 return 1;
2066 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2067 }
2068 break;
2069
2070 default:
2071 return 1;
2072 }
2073
2074 if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
2075 lws_ptr_diff(p, start))
2076 return 1;
2077
2078 wsi->mqtt->inside_unsubscribe = 1;
2079
2080 return 0;
2081 }
2082
2083 /*
2084 * This is called when child streams bind to an already-existing and compatible
2085 * MQTT stream
2086 */
2087
2088 struct lws *
lws_wsi_mqtt_adopt(struct lws * parent_wsi,struct lws * wsi)2089 lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
2090 {
2091 /* no more children allowed by parent? */
2092
2093 if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
2094 lwsl_err("%s: reached concurrent stream limit\n", __func__);
2095 return NULL;
2096 }
2097
2098 #if defined(LWS_WITH_CLIENT)
2099 wsi->client_mux_substream = 1;
2100 #endif
2101
2102 lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
2103
2104 if (lws_ensure_user_space(wsi))
2105 goto bail1;
2106
2107 lws_mqtt_set_client_established(wsi);
2108 lws_callback_on_writable(wsi);
2109
2110 #if defined(LWS_WITH_SERVER_STATUS)
2111 wsi->vhost->conn_stats.mqtt_subs++;
2112 #endif
2113
2114 return wsi;
2115
2116 bail1:
2117 /* undo the insert */
2118 parent_wsi->mux.child_list = wsi->mux.sibling_list;
2119 parent_wsi->mux.child_count--;
2120
2121 if (wsi->user_space)
2122 lws_free_set_NULL(wsi->user_space);
2123
2124 wsi->protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
2125 lws_free(wsi);
2126
2127 return NULL;
2128 }
2129
2130