1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include "private-lib-core.h"
26
27 static int
rops_handle_POLLIN_mqtt(struct lws_context_per_thread * pt,struct lws * wsi,struct lws_pollfd * pollfd)28 rops_handle_POLLIN_mqtt(struct lws_context_per_thread *pt, struct lws *wsi,
29 struct lws_pollfd *pollfd)
30 {
31 unsigned int pending = 0;
32 struct lws_tokens ebuf;
33 int n = 0;
34 char buffered = 0;
35
36 lwsl_debug("%s: wsistate 0x%x, %s pollout %d\n", __func__,
37 (unsigned int)wsi->wsistate, wsi->protocol->name,
38 pollfd->revents);
39
40 /*
41 * After the CONNACK and nwsi establishment, the first logical
42 * stream is migrated out of the nwsi to be child sid 1, and the
43 * nwsi no longer has a wsi->mqtt of its own.
44 *
45 * RX events on the nwsi must be converted to events seen or not
46 * seen by one or more child streams.
47 *
48 * SUBACK - reflected to child stream that asked for it
49 * PUBACK - routed to child that did the related publish
50 */
51
52 ebuf.token = NULL;
53 ebuf.len = 0;
54
55 if (lwsi_state(wsi) != LRS_ESTABLISHED) {
56 #if defined(LWS_WITH_CLIENT)
57
58 if (lwsi_state(wsi) == LRS_WAITING_SSL &&
59 ((pollfd->revents & LWS_POLLOUT)) &&
60 lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
61 lwsl_info("failed at set pollfd\n");
62 return LWS_HPI_RET_PLEASE_CLOSE_ME;
63 }
64
65 if ((pollfd->revents & LWS_POLLOUT) &&
66 lws_handle_POLLOUT_event(wsi, pollfd)) {
67 lwsl_debug("POLLOUT event closed it\n");
68 return LWS_HPI_RET_PLEASE_CLOSE_ME;
69 }
70
71 n = lws_mqtt_client_socket_service(wsi, pollfd, NULL);
72 if (n)
73 return LWS_HPI_RET_WSI_ALREADY_DIED;
74 #endif
75 return LWS_HPI_RET_HANDLED;
76 }
77
78 /* 1: something requested a callback when it was OK to write */
79
80 if ((pollfd->revents & LWS_POLLOUT) &&
81 lwsi_state_can_handle_POLLOUT(wsi) &&
82 lws_handle_POLLOUT_event(wsi, pollfd)) {
83 if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
84 lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE);
85
86 return LWS_HPI_RET_PLEASE_CLOSE_ME;
87 }
88
89 /* 3: buflist needs to be drained
90 */
91 read:
92 // lws_buflist_describe(&wsi->buflist, wsi, __func__);
93 ebuf.len = (int)lws_buflist_next_segment_len(&wsi->buflist, &ebuf.token);
94 if (ebuf.len) {
95 lwsl_info("draining buflist (len %d)\n", ebuf.len);
96 buffered = 1;
97 goto drain;
98 }
99
100 if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
101 return LWS_HPI_RET_HANDLED;
102
103 /* if (lws_is_flowcontrolled(wsi)) { */
104 /* lwsl_info("%s: %p should be rxflow (bm 0x%x)..\n", */
105 /* __func__, wsi, wsi->rxflow_bitmap); */
106 /* return LWS_HPI_RET_HANDLED; */
107 /* } */
108
109 if (!(lwsi_role_client(wsi) && lwsi_state(wsi) != LRS_ESTABLISHED)) {
110 /*
111 * In case we are going to react to this rx by scheduling
112 * writes, we need to restrict the amount of rx to the size
113 * the protocol reported for rx buffer.
114 *
115 * Otherwise we get a situation we have to absorb possibly a
116 * lot of reads before we get a chance to drain them by writing
117 * them, eg, with echo type tests in autobahn.
118 */
119
120 buffered = 0;
121 ebuf.token = pt->serv_buf;
122 ebuf.len = wsi->context->pt_serv_buf_size;
123
124 if ((unsigned int)ebuf.len > wsi->context->pt_serv_buf_size)
125 ebuf.len = wsi->context->pt_serv_buf_size;
126
127 if ((int)pending > ebuf.len)
128 pending = ebuf.len;
129
130 ebuf.len = lws_ssl_capable_read(wsi, ebuf.token,
131 pending ? (int)pending :
132 ebuf.len);
133 switch (ebuf.len) {
134 case 0:
135 lwsl_info("%s: zero length read\n",
136 __func__);
137 return LWS_HPI_RET_PLEASE_CLOSE_ME;
138 case LWS_SSL_CAPABLE_MORE_SERVICE:
139 lwsl_info("SSL Capable more service\n");
140 return LWS_HPI_RET_HANDLED;
141 case LWS_SSL_CAPABLE_ERROR:
142 lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n",
143 __func__);
144 return LWS_HPI_RET_PLEASE_CLOSE_ME;
145 }
146
147 /*
148 * coverity thinks ssl_capable_read() may read over
149 * 2GB. Dissuade it...
150 */
151 ebuf.len &= 0x7fffffff;
152 }
153
154 drain:
155 /* service incoming data */
156 //lws_buflist_describe(&wsi->buflist, wsi, __func__);
157 if (ebuf.len) {
158 n = lws_read_mqtt(wsi, ebuf.token, ebuf.len);
159 if (n < 0) {
160 lwsl_notice("%s: lws_read_mqtt returned %d\n",
161 __func__, n);
162 /* we closed wsi */
163 n = 0;
164 goto fail;
165 }
166 // lws_buflist_describe(&wsi->buflist, wsi, __func__);
167 lwsl_debug("%s: consuming %d / %d\n", __func__, n, ebuf.len);
168 if (lws_buflist_aware_finished_consuming(wsi, &ebuf, ebuf.len,
169 buffered, __func__))
170 return LWS_HPI_RET_PLEASE_CLOSE_ME;
171 }
172
173 ebuf.token = NULL;
174 ebuf.len = 0;
175
176 pending = lws_ssl_pending(wsi);
177 if (pending) {
178 pending = pending > wsi->context->pt_serv_buf_size ?
179 wsi->context->pt_serv_buf_size : pending;
180 goto read;
181 }
182
183 if (buffered && /* were draining, now nothing left */
184 !lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
185 lwsl_info("%s: %p flow buf: drained\n", __func__, wsi);
186 /* having drained the rxflow buffer, can rearm POLLIN */
187 #if !defined(LWS_WITH_SERVER)
188 n =
189 #endif
190 __lws_rx_flow_control(wsi);
191 /* n ignored, needed for NO_SERVER case */
192 }
193
194 /* n = 0 */
195 return LWS_HPI_RET_HANDLED;
196
197 fail:
198 lwsl_err("%s: Failed, bailing\n", __func__);
199 lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "mqtt svc fail");
200
201 return LWS_HPI_RET_WSI_ALREADY_DIED;
202 }
203
204 #if 0 /* defined(LWS_WITH_SERVER) */
205
206 static int
207 rops_adoption_bind_mqtt(struct lws *wsi, int type, const char *vh_prot_name)
208 {
209 /* no http but socket... must be mqtt */
210 if ((type & LWS_ADOPT_HTTP) || !(type & LWS_ADOPT_SOCKET) ||
211 (type & _LWS_ADOPT_FINISH))
212 return 0; /* no match */
213
214 lws_role_transition(wsi, 0, (type & LWS_ADOPT_ALLOW_SSL) ? LRS_SSL_INIT :
215 LRS_ESTABLISHED, &role_ops_mqtt);
216
217 if (vh_prot_name)
218 lws_bind_protocol(wsi, wsi->protocol, __func__);
219 else
220 /* this is the only time he will transition */
221 lws_bind_protocol(wsi,
222 &wsi->vhost->protocols[wsi->vhost->mqtt_protocol_index],
223 __func__);
224
225 return 1; /* bound */
226 }
227 #endif
228
229 static int
rops_client_bind_mqtt(struct lws * wsi,const struct lws_client_connect_info * i)230 rops_client_bind_mqtt(struct lws *wsi, const struct lws_client_connect_info *i)
231 {
232 lwsl_debug("%s: i = %p\n", __func__, i);
233 if (!i) {
234
235 /* finalize */
236
237 if (!wsi->user_space && wsi->stash->cis[CIS_METHOD])
238 if (lws_ensure_user_space(wsi))
239 return 1;
240
241 if (!wsi->stash->cis[CIS_METHOD] && !wsi->stash->cis[CIS_ALPN])
242 wsi->stash->cis[CIS_ALPN] = "x-amzn-mqtt-ca";
243
244 /* if we went on the ah waiting list, it's ok, we can
245 * wait.
246 *
247 * When we do get the ah, now or later, he will end up
248 * at lws_http_client_connect_via_info2().
249 */
250 #if defined(LWS_WITH_CLIENT)
251 if (lws_header_table_attach(wsi, 0) < 0)
252 /*
253 * if we failed here, the connection is already closed
254 * and freed.
255 */
256 return -1;
257 #else
258 if (lws_header_table_attach(wsi, 0))
259 return 0;
260 #endif
261 return 0;
262 }
263
264 /* if a recognized mqtt method, bind to it */
265 if (strcmp(i->method, "MQTT"))
266 return 0; /* no match */
267
268 if (lws_create_client_mqtt_object(i, wsi))
269 return 1;
270
271 lws_role_transition(wsi, LWSIFR_CLIENT, LRS_UNCONNECTED,
272 &role_ops_mqtt);
273 return 1; /* matched */
274 }
275
276 static int
rops_handle_POLLOUT_mqtt(struct lws * wsi)277 rops_handle_POLLOUT_mqtt(struct lws *wsi)
278 {
279 struct lws **wsi2;
280
281 lwsl_debug("%s\n", __func__);
282
283 #if defined(LWS_WITH_CLIENT)
284 if (wsi->mqtt && wsi->mqtt->send_pingreq && !wsi->mqtt->inside_payload) {
285 uint8_t buf[LWS_PRE + 2];
286
287 /*
288 * We are swallowing this POLLOUT in order to send a PINGREQ
289 * autonomously
290 */
291
292 wsi->mqtt->send_pingreq = 0;
293
294 lwsl_notice("%s: issuing PINGREQ\n", __func__);
295
296 buf[LWS_PRE] = LMQCP_CTOS_PINGREQ << 4;
297 buf[LWS_PRE + 1] = 0;
298
299 if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 2,
300 LWS_WRITE_BINARY) != 2)
301 return LWS_HP_RET_BAIL_DIE;
302
303 return LWS_HP_RET_BAIL_OK;
304 }
305 #endif
306
307 wsi = lws_get_network_wsi(wsi);
308
309 wsi->mux.requested_POLLOUT = 0;
310
311 wsi2 = &wsi->mux.child_list;
312 if (!*wsi2) {
313 lwsl_debug("%s: no children\n", __func__);
314 return LWS_HP_RET_DROP_POLLOUT;
315 }
316
317 lws_wsi_mux_dump_waiting_children(wsi);
318
319 do {
320 struct lws *w, **wa;
321
322 wa = &(*wsi2)->mux.sibling_list;
323 if (!(*wsi2)->mux.requested_POLLOUT)
324 goto next_child;
325
326 if (!lwsi_state_can_handle_POLLOUT(wsi))
327 goto next_child;
328
329 /*
330 * If the nwsi is in the middle of a frame, we can only
331 * continue to send that
332 */
333
334 if (wsi->mqtt->inside_payload && !(*wsi2)->mqtt->inside_payload)
335 goto next_child;
336
337 /*
338 * we're going to do writable callback for this child.
339 * move him to be the last child
340 */
341 w = lws_wsi_mux_move_child_to_tail(wsi2);
342 if (!w) {
343 wa = &wsi->mux.child_list;
344 goto next_child;
345 }
346
347 lwsl_debug("%s: child %p (wsistate 0x%x)\n", __func__, w,
348 (unsigned int)w->wsistate);
349
350 if (lwsi_state(wsi) == LRS_ESTABLISHED &&
351 !wsi->mqtt->inside_payload &&
352 wsi->mqtt->send_puback) {
353 uint8_t buf[LWS_PRE + 4];
354 lwsl_notice("%s: issuing PUBACK for pkt id: %d\n",
355 __func__, wsi->mqtt->ack_pkt_id);
356
357 /* Fixed header */
358 buf[LWS_PRE] = LMQCP_PUBACK << 4;
359 /* Remaining len = 2 */
360 buf[LWS_PRE + 1] = 2;
361 /* Packet ID */
362 lws_ser_wu16be(&buf[LWS_PRE + 2], wsi->mqtt->ack_pkt_id);
363
364 if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
365 LWS_WRITE_BINARY) != 4)
366 return LWS_HP_RET_BAIL_DIE;
367
368 wsi->mqtt->send_puback = 0;
369 w->mux.requested_POLLOUT = 1;
370
371 wa = &wsi->mux.child_list;
372 goto next_child;
373 }
374
375 if (lws_callback_as_writeable(w)) {
376 lwsl_notice("%s: Closing child %p\n", __func__, w);
377 lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
378 "mqtt pollout handle");
379 wa = &wsi->mux.child_list;
380 }
381
382 next_child:
383 wsi2 = wa;
384 } while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi));
385
386 // lws_wsi_mux_dump_waiting_children(wsi);
387
388 if (lws_wsi_mux_action_pending_writeable_reqs(wsi))
389 return LWS_HP_RET_BAIL_DIE;
390
391 return LWS_HP_RET_BAIL_OK;
392 }
393
394 #if defined(LWS_WITH_CLIENT)
395 static int
rops_issue_keepalive_mqtt(struct lws * wsi,int isvalid)396 rops_issue_keepalive_mqtt(struct lws *wsi, int isvalid)
397 {
398 struct lws *nwsi = lws_get_network_wsi(wsi);
399
400 if (isvalid) {
401 _lws_validity_confirmed_role(nwsi);
402
403 return 0;
404 }
405
406 nwsi->mqtt->send_pingreq = 1;
407 lws_callback_on_writable(nwsi);
408
409 return 0;
410 }
411 #endif
412
413 static int
rops_close_role_mqtt(struct lws_context_per_thread * pt,struct lws * wsi)414 rops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi)
415 {
416 struct lws *nwsi = lws_get_network_wsi(wsi);
417 lws_mqtt_subs_t *s, *s1, *mysub;
418 lws_mqttc_t *c;
419
420 if (!wsi->mqtt)
421 return 0;
422
423 c = &wsi->mqtt->client;
424
425 __lws_sul_insert(&pt->pt_sul_owner, &wsi->mqtt->sul_qos1_puback_wait,
426 LWS_SET_TIMER_USEC_CANCEL);
427
428 lws_mqtt_str_free(&c->username);
429 lws_mqtt_str_free(&c->password);
430 lws_mqtt_str_free(&c->will.message);
431 lws_mqtt_str_free(&c->will.topic);
432 lws_mqtt_str_free(&c->id);
433
434 /* clean up any subscription allocations */
435
436 s = wsi->mqtt->subs_head;
437 wsi->mqtt->subs_head = NULL;
438 while (s) {
439 s1 = s->next;
440 /*
441 * Account for children no longer using nwsi subscription
442 */
443 mysub = lws_mqtt_find_sub(nwsi->mqtt, (const char *)&s[1]);
444 // assert(mysub); /* if child subscribed, nwsi must feel the same */
445 if (mysub) {
446 assert(mysub->ref_count);
447 mysub->ref_count--;
448 }
449 lws_free(s);
450 s = s1;
451 }
452
453 lws_mqtt_publish_param_t *pub =
454 (lws_mqtt_publish_param_t *)
455 wsi->mqtt->rx_cpkt_param;
456
457 if (pub)
458 lws_free_set_NULL(pub->topic);
459
460 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
461
462 lws_free_set_NULL(wsi->mqtt);
463
464 return 0;
465 }
466
467 static int
rops_callback_on_writable_mqtt(struct lws * wsi)468 rops_callback_on_writable_mqtt(struct lws *wsi)
469 {
470 #if defined(LWS_WITH_CLIENT)
471 struct lws *network_wsi;
472 #endif
473 int already;
474
475 lwsl_debug("%s: %p (wsistate 0x%x)\n", __func__, wsi, (unsigned int)wsi->wsistate);
476
477 if (wsi->mux.requested_POLLOUT
478 #if defined(LWS_WITH_CLIENT)
479 && !wsi->client_h2_alpn
480 #endif
481 ) {
482 lwsl_debug("already pending writable\n");
483 return 1;
484 }
485 #if 0
486 /* is this for DATA or for control messages? */
487 if (wsi->upgraded_to_http2 && !wsi->h2.h2n->pps &&
488 !lws_h2_tx_cr_get(wsi)) {
489 /*
490 * other side is not able to cope with us sending DATA
491 * anything so no matter if we have POLLOUT on our side if it's
492 * DATA we want to send.
493 *
494 * Delay waiting for our POLLOUT until peer indicates he has
495 * space for more using tx window command in http2 layer
496 */
497 lwsl_notice("%s: %p: skint (%d)\n", __func__, wsi,
498 wsi->h2.tx_cr);
499 wsi->h2.skint = 1;
500 return 0;
501 }
502
503 wsi->h2.skint = 0;
504 #endif
505 #if defined(LWS_WITH_CLIENT)
506 network_wsi = lws_get_network_wsi(wsi);
507 #endif
508 already = lws_wsi_mux_mark_parents_needing_writeable(wsi);
509
510 /* for network action, act only on the network wsi */
511
512 if (already
513 #if defined(LWS_WITH_CLIENT)
514 && !network_wsi->client_mux_substream
515 #endif
516 )
517 return 1;
518
519 return 0;
520 }
521
522 static int
rops_close_kill_connection_mqtt(struct lws * wsi,enum lws_close_status reason)523 rops_close_kill_connection_mqtt(struct lws *wsi, enum lws_close_status reason)
524 {
525 lwsl_info(" wsi: %p, his parent %p: child list %p, siblings:\n", wsi,
526 wsi->mux.parent_wsi, wsi->mux.child_list);
527 //lws_wsi_mux_dump_children(wsi);
528
529 if (wsi->mux_substream
530 #if defined(LWS_WITH_CLIENT)
531 || wsi->client_mux_substream
532 #endif
533 ) {
534 lwsl_info("closing %p: parent %p: first child %p\n", wsi,
535 wsi->mux.parent_wsi, wsi->mux.child_list);
536
537 if (wsi->mux.child_list && lwsl_visible(LLL_INFO)) {
538 lwsl_info(" parent %p: closing children: list:\n", wsi);
539 lws_wsi_mux_dump_children(wsi);
540 }
541
542 lws_wsi_mux_close_children(wsi, reason);
543 }
544
545 if ((
546 #if defined(LWS_WITH_CLIENT)
547 wsi->client_mux_substream ||
548 #endif
549 wsi->mux_substream) &&
550 wsi->mux.parent_wsi) {
551 lws_wsi_mux_sibling_disconnect(wsi);
552 }
553
554 return 0;
555 }
556
557
558 struct lws_role_ops role_ops_mqtt = {
559 /* role name */ "mqtt",
560 /* alpn id */ "x-amzn-mqtt-ca", /* "mqtt/3.1.1" */
561 /* check_upgrades */ NULL,
562 /* pt_init_destroy */ NULL,
563 /* init_vhost */ NULL,
564 /* destroy_vhost */ NULL,
565 /* service_flag_pending */ NULL,
566 .handle_POLLIN = rops_handle_POLLIN_mqtt,
567 .handle_POLLOUT = rops_handle_POLLOUT_mqtt,
568 /* perform_user_POLLOUT */ NULL,
569 /* callback_on_writable */ rops_callback_on_writable_mqtt,
570 /* tx_credit */ NULL,
571 .write_role_protocol = NULL,
572 /* encapsulation_parent */ NULL,
573 /* alpn_negotiated */ NULL,
574 /* close_via_role_protocol */ NULL,
575 .close_role = rops_close_role_mqtt,
576 .close_kill_connection = rops_close_kill_connection_mqtt,
577 /* destroy_role */ NULL,
578 #if 0 /* defined(LWS_WITH_SERVER) */
579 /* adoption_bind */ rops_adoption_bind_mqtt,
580 #else
581 NULL,
582 #endif
583 #if defined(LWS_WITH_CLIENT)
584 .client_bind = rops_client_bind_mqtt,
585 .issue_keepalive = rops_issue_keepalive_mqtt,
586 #else
587 .client_bind = NULL,
588 .issue_keepalive = NULL,
589 #endif
590 .adoption_cb = { LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED,
591 LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED },
592 .rx_cb = { LWS_CALLBACK_MQTT_CLIENT_RX,
593 LWS_CALLBACK_MQTT_CLIENT_RX },
594 .writeable_cb = { LWS_CALLBACK_MQTT_CLIENT_WRITEABLE,
595 LWS_CALLBACK_MQTT_CLIENT_WRITEABLE },
596 .close_cb = { LWS_CALLBACK_MQTT_CLIENT_CLOSED,
597 LWS_CALLBACK_MQTT_CLIENT_CLOSED },
598 .protocol_bind_cb = { LWS_CALLBACK_MQTT_IDLE,
599 LWS_CALLBACK_MQTT_IDLE },
600 .protocol_unbind_cb = { LWS_CALLBACK_MQTT_DROP_PROTOCOL,
601 LWS_CALLBACK_MQTT_DROP_PROTOCOL },
602 .file_handle = 0,
603 };
604