1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #ifndef _PRIVATE_LIB_ROLES_MQTT
26 #define _PRIVATE_LIB_ROLES_MQTT 1
27
28 extern struct lws_role_ops role_ops_mqtt;
29
30 #define lwsi_role_mqtt(wsi) (wsi->role_ops == &role_ops_mqtt)
31
32 #define LWS_MQTT_MAX_CHILDREN 8 /* max child streams on same parent */
33
34 #define LMQCP_LUT_FLAG_RESERVED_FLAGS 0x10
35 #define LMQCP_LUT_FLAG_PACKET_ID_NONE 0x00
36 #define LMQCP_LUT_FLAG_PACKET_ID_HAS 0x20
37 #define LMQCP_LUT_FLAG_PACKET_ID_QOS12 0x40
38 #define LMQCP_LUT_FLAG_PACKET_ID_MASK 0x60
39 #define LMQCP_LUT_FLAG_PAYLOAD 0x80 /* payload req (publish = opt)*/
40
41 #define lws_mqtt_str_is_not_empty(s) ( ((s)) && \
42 ((s))->len && \
43 ((s))->buf && \
44 *((s))->buf )
45
46 #define LWS_MQTT_RESPONSE_TIMEOUT (3 * LWS_US_PER_SEC)
47 #define LWS_MQTT_RETRY_CEILING (60 * LWS_US_PER_SEC)
48
49 typedef enum {
50 LMSPR_COMPLETED = 0,
51 LMSPR_NEED_MORE = 1,
52
53 LMSPR_FAILED_OOM = -1,
54 LMSPR_FAILED_OVERSIZE = -2,
55 LMSPR_FAILED_FORMAT = -3,
56 LMSPR_FAILED_ALREADY_COMPLETED = -4,
57 } lws_mqtt_stateful_primitive_return_t;
58
59 typedef struct {
60 uint32_t value;
61 char budget;
62 char consumed;
63 } lws_mqtt_vbi;
64
65 /* works for vbi, 2-byte and 4-byte fixed length */
66 static inline int
lws_mqtt_mb_first(lws_mqtt_vbi * vbi)67 lws_mqtt_mb_first(lws_mqtt_vbi *vbi) { return !vbi->consumed; }
68
69 int
70 lws_mqtt_vbi_encode(uint32_t value, void *buf);
71
72 /*
73 * Decode is done statefully on an arbitrary amount of input data (which may
74 * be one byte). It's like this so it can continue seamlessly if a buffer ends
75 * partway through the primitive, and the api matches the bulk binary data case.
76 *
77 * VBI decode:
78 *
79 * Initialize the lws_mqtt_vbi state by calling lws_mqtt_vbi_init() on it, then
80 * feed lws_mqtt_vbi_r() bytes to decode.
81 *
82 * Returns <0 for error, LMSPR_COMPLETED if done (vbi->value is valid), or
83 * LMSPR_NEED_MORE if more calls to lws_mqtt_vbi_r() with subsequent bytes
84 * needed.
85 *
86 * *in and *len are updated accordingly.
87 *
88 * 2-byte and 4-byte decode:
89 *
90 * Initialize the lws_mqtt_vbi state by calling lws_mqtt_2byte_init() or
91 * lws_mqtt_4byte_init() on it, then feed lws_mqtt_mb_parse() bytes
92 * to decode.
93 *
94 * Returns <0 for error, LMSPR_COMPLETED if done (vbi->value is valid), or
95 * LMSPR_NEED_MORE if more calls to lws_mqtt_mb_parse() with subsequent
96 * bytes needed.
97 *
98 * *in and *len are updated accordingly.
99 */
100
101 void
102 lws_mqtt_vbi_init(lws_mqtt_vbi *vbi);
103
104 void
105 lws_mqtt_2byte_init(lws_mqtt_vbi *vbi);
106
107 void
108 lws_mqtt_4byte_init(lws_mqtt_vbi *vbi);
109
110 lws_mqtt_stateful_primitive_return_t
111 lws_mqtt_vbi_r(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len);
112
113 lws_mqtt_stateful_primitive_return_t
114 lws_mqtt_mb_parse(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len);
115
116 struct lws_mqtt_str_st {
117 uint8_t *buf;
118 uint16_t len;
119
120 uint16_t limit; /* it's cheaper to add the state here than
121 * the pointer to point to it elsewhere */
122 uint16_t pos;
123 char len_valid;
124 char needs_freeing;
125 };
126
127 static inline int
lws_mqtt_str_first(struct lws_mqtt_str_st * s)128 lws_mqtt_str_first(struct lws_mqtt_str_st *s) { return !s->buf && !s->pos; }
129
130
131 lws_mqtt_stateful_primitive_return_t
132 lws_mqtt_str_parse(struct lws_mqtt_str_st *bd, const uint8_t **in, size_t *len);
133
134 typedef enum {
135 LMQCPP_IDLE,
136
137 /* receive packet type part of fixed header took us out of idle... */
138 LMQCPP_CONNECT_PACKET = LMQCP_CTOS_CONNECT << 4,
139 LMQCPP_CONNECT_REMAINING_LEN_VBI,
140 LMQCPP_CONNECT_VH_PNAME,
141 LMQCPP_CONNECT_VH_PVERSION,
142 LMQCPP_CONNECT_VH_FLAGS,
143 LMQCPP_CONNECT_VH_KEEPALIVE,
144 LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN,
145
146 LMQCPP_CONNACK_PACKET = LMQCP_STOC_CONNACK << 4,
147 LMQCPP_CONNACK_VH_FLAGS,
148 LMQCPP_CONNACK_VH_RETURN_CODE,
149
150 LMQCPP_PUBLISH_PACKET = LMQCP_PUBLISH << 4,
151 LMQCPP_PUBLISH_REMAINING_LEN_VBI,
152 LMQCPP_PUBLISH_VH_TOPIC,
153 LMQCPP_PUBLISH_VH_PKT_ID,
154
155 LMQCPP_PUBACK_PACKET = LMQCP_PUBACK << 4,
156 LMQCPP_PUBACK_VH_PKT_ID,
157 LMQCPP_PUBACK_PROPERTIES_LEN_VBI,
158
159 LMQCPP_PUBREC_PACKET = LMQCP_PUBREC << 4,
160 LMQCPP_PUBREC_VH_PKT_ID,
161
162 LMQCPP_PUBREL_PACKET = LMQCP_PUBREL << 4,
163 LMQCPP_PUBREL_VH_PKT_ID,
164
165 LMQCPP_PUBCOMP_PACKET = LMQCP_PUBCOMP << 4,
166 LMQCPP_PUBCOMP_VH_PKT_ID,
167
168 LMQCPP_SUBACK_PACKET = LMQCP_STOC_SUBACK << 4,
169 LMQCPP_SUBACK_VH_PKT_ID,
170 LMQCPP_SUBACK_PAYLOAD,
171
172 LMQCPP_UNSUBACK_PACKET = LMQCP_STOC_UNSUBACK << 4,
173 LMQCPP_UNSUBACK_VH_PKT_ID,
174
175 LMQCPP_PINGRESP_ZERO = LMQCP_STOC_PINGRESP << 4,
176
177 LMQCPP_PAYLOAD,
178
179 LMQCPP_EAT_PROPERTIES_AND_COMPLETE,
180
181 LMQCPP_PROP_ID_VBI,
182
183 /* all possible property payloads */
184
185 /* 3.3.2.3.2 */
186 LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE = 0x101,
187
188 LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE = 0x102,
189
190 LMQCPP_PROP_CONTENT_TYPE_UTF8S = 0x103,
191
192 LMQCPP_PROP_RESPONSE_TOPIC_UTF8S = 0x108,
193
194 LMQCPP_PROP_CORRELATION_BINDATA = 0x109,
195
196 LMQCPP_PROP_SUBSCRIPTION_ID_VBI = 0x10b,
197
198 LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE = 0x111,
199
200 LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S = 0x112,
201
202 LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE = 0x113,
203
204 LMQCPP_PROP_AUTH_METHOD_UTF8S = 0x115,
205
206 LMQCPP_PROP_AUTH_DATA_BINDATA = 0x116,
207
208 LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE = 0x117,
209
210 LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE = 0x118,
211
212 LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE = 0x119,
213
214 LMQCPP_PROP_RESPONSE_INFO_UTF8S = 0x11a,
215
216 LMQCPP_PROP_SERVER_REFERENCE_UTF8S = 0x11c,
217
218 LMQCPP_PROP_REASON_STRING_UTF8S = 0x11f,
219
220 LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE = 0x121,
221
222 LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE = 0x122,
223
224 LMQCPP_PROP_TOPIC_ALIAS_2BYTE = 0x123,
225
226 LMQCPP_PROP_MAXIMUM_QOS_1BYTE = 0x124,
227
228 LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE = 0x125,
229
230 LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S = 0x126,
231 LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S = 0x226,
232
233 LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE = 0x127,
234
235 LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE = 0x128,
236
237 LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE = 0x129,
238
239 LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE = 0x12a,
240
241 } lws_mqtt_packet_parse_state_t;
242
243 /*
244 * the states an MQTT connection can be in
245 */
246
247 typedef enum {
248 LGSMQTT_UNKNOWN,
249 LGSMQTT_IDLE,
250 LGSMQTT_TRANSPORT_CONNECTED,
251
252 LGSMQTT_SENT_CONNECT,
253 LGSMQTT_ESTABLISHED,
254
255 LGSMQTT_SENT_SUBSCRIBE,
256 LGSMQTT_SUBSCRIBED,
257
258 } lwsgs_mqtt_states_t;
259
260 typedef struct lws_mqtt_parser_st {
261 /* struct lws_mqtt_str_st s_content_type; */
262 lws_mqtt_packet_parse_state_t state;
263 lws_mqtt_vbi vbit;
264
265 lws_mqtt_reason_t reason;
266
267 lws_mqtt_str_t s_temp;
268
269 uint8_t fixed_seen[4];
270 uint8_t props_seen[8];
271
272 uint8_t cpkt_flags;
273 uint32_t cpkt_remlen;
274
275 uint32_t props_len;
276 uint32_t consumed;
277 uint32_t prop_id;
278 uint32_t props_consumed;
279 uint32_t payload_consumed;
280
281 uint16_t keepalive;
282 uint16_t cpkt_id;
283 uint32_t n;
284
285 uint8_t temp[32];
286 uint8_t conn_rc;
287 uint8_t payload_format;
288 uint8_t packet_type_flags;
289 uint8_t conn_protocol_version;
290 uint8_t fixed;
291
292 uint8_t flag_pending_send_connack_close:1;
293 uint8_t flag_pending_send_reason_close:1;
294 uint8_t flag_prop_multi:1;
295 uint8_t flag_server:1;
296
297 } lws_mqtt_parser_t;
298
299 typedef enum {
300 LMVTR_VALID = 0,
301 LMVTR_VALID_WILDCARD = 1,
302 LMVTR_VALID_SHADOW = 2,
303
304 LMVTR_FAILED_OVERSIZE = -1,
305 LMVTR_FAILED_WILDCARD_FORMAT = -2,
306 LMVTR_FAILED_SHADOW_FORMAT = -3,
307 } lws_mqtt_validate_topic_return_t;
308
309 typedef enum {
310 LMMTR_TOPIC_NOMATCH = 0,
311 LMMTR_TOPIC_MATCH = 1,
312
313 LMMTR_TOPIC_MATCH_ERROR = -1
314 } lws_mqtt_match_topic_return_t;
315
316 typedef struct lws_mqtt_subs {
317 struct lws_mqtt_subs *next;
318
319 uint8_t ref_count; /* number of children referencing */
320
321 /* Flags */
322 uint8_t wildcard:1;
323 uint8_t shadow:1;
324
325 /* subscription name + NUL overallocated here */
326 char topic[];
327 } lws_mqtt_subs_t;
328
329 typedef struct lws_mqtts {
330 lws_mqtt_parser_t par;
331 lwsgs_mqtt_states_t estate;
332 struct lws_dll2 active_session_list_head;
333 struct lws_dll2 limbo_session_list_head;
334 } lws_mqtts_t;
335
336 typedef struct lws_mqttc {
337 lws_mqtt_parser_t par;
338 lwsgs_mqtt_states_t estate;
339 struct lws_mqtt_str_st *id;
340 struct lws_mqtt_str_st *username;
341 struct lws_mqtt_str_st *password;
342 struct {
343 struct lws_mqtt_str_st *topic;
344 struct lws_mqtt_str_st *message;
345 lws_mqtt_qos_levels_t qos;
346 uint8_t retain;
347 } will;
348 uint16_t keep_alive_secs;
349 uint16_t conn_flags;
350 uint8_t aws_iot;
351 } lws_mqttc_t;
352
353 struct _lws_mqtt_related {
354 lws_mqttc_t client;
355 lws_sorted_usec_list_t sul_qos_puback_pubrec_wait; /* QoS1 puback or QoS2 pubrec wait TO */
356 lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */
357 lws_sorted_usec_list_t sul_unsuback_wait; /* QoS1 unsuback wait TO */
358 lws_sorted_usec_list_t sul_qos2_pubrec_wait; /* QoS2 pubrec wait TO */
359 struct lws *wsi; /**< so sul can use lws_container_of */
360 lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */
361 void *rx_cpkt_param;
362 uint16_t pkt_id;
363 uint16_t ack_pkt_id;
364 uint16_t peer_ack_pkt_id;
365 uint16_t sub_size;
366
367 #if defined(LWS_WITH_CLIENT)
368 uint8_t send_pingreq:1;
369 uint8_t session_resumed:1;
370 #endif
371 uint8_t inside_payload:1;
372 uint8_t inside_subscribe:1;
373 uint8_t inside_unsubscribe:1;
374 uint8_t inside_birth:1;
375 uint8_t inside_resume_session:1;
376 uint8_t send_puback:1;
377 uint8_t send_pubrel:1;
378 uint8_t send_pubrec:1;
379 uint8_t send_pubcomp:1;
380 uint8_t unacked_publish:1;
381 uint8_t unacked_pubrel:1;
382
383 uint8_t done_subscribe:1;
384 uint8_t done_birth:1;
385 };
386
387 /*
388 * New sessions are created by starting CONNECT. If the ClientID sent
389 * by the client matches a different, extant session, then the
390 * existing one is taken over and the new one created for duration of
391 * CONNECT processing is destroyed.
392 *
393 * On the server side, bearing in mind multiple simultaneous,
394 * fragmented CONNECTs may be interleaved ongoing, all state and
395 * parsing temps for a session must live in the session object.
396 */
397
398 struct lws_mqtt_endpoint_st;
399
400 typedef struct lws_mqtts_session_st {
401 struct lws_dll2 session_list;
402
403 } lws_mqtts_session_t;
404
405 #define ctl_pkt_type(x) (x->packet_type_flags >> 4)
406
407
408 void
409 lws_mqttc_state_transition(lws_mqttc_t *ep, lwsgs_mqtt_states_t s);
410
411 int
412 _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
413 const uint8_t *buf, size_t len);
414
415 int
416 lws_mqtt_client_socket_service(struct lws *wsi, struct lws_pollfd *pollfd,
417 struct lws *wsi_conn);
418
419 int
420 lws_create_client_mqtt_object(const struct lws_client_connect_info *i,
421 struct lws *wsi);
422
423 struct lws *
424 lws_mqtt_client_send_connect(struct lws *wsi);
425
426 struct lws *
427 lws_mqtt_client_send_disconnect(struct lws *wsi);
428
429 int
430 lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
431 uint8_t dup, lws_mqtt_qos_levels_t qos,
432 uint8_t retain);
433
434 struct lws *
435 lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi);
436
437 lws_mqtt_subs_t *
438 lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic);
439
440 #endif /* _PRIVATE_LIB_ROLES_MQTT */
441
442