1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include "private-lib-core.h"
26
27 static int
rops_handle_POLLIN_mqtt(struct lws_context_per_thread * pt,struct lws * wsi,struct lws_pollfd * pollfd)28 rops_handle_POLLIN_mqtt(struct lws_context_per_thread *pt, struct lws *wsi,
29 struct lws_pollfd *pollfd)
30 {
31 unsigned int pending = 0;
32 struct lws_tokens ebuf;
33 int n = 0;
34 char buffered = 0;
35
36 lwsl_debug("%s: wsistate 0x%x, %s pollout %d\n", __func__,
37 (unsigned int)wsi->wsistate, wsi->a.protocol->name,
38 pollfd->revents);
39
40 /*
41 * After the CONNACK and nwsi establishment, the first logical
42 * stream is migrated out of the nwsi to be child sid 1, and the
43 * nwsi no longer has a wsi->mqtt of its own.
44 *
45 * RX events on the nwsi must be converted to events seen or not
46 * seen by one or more child streams.
47 *
48 * SUBACK - reflected to child stream that asked for it
49 * PUBACK - routed to child that did the related publish
50 */
51
52 ebuf.token = NULL;
53 ebuf.len = 0;
54
55 if (lwsi_state(wsi) != LRS_ESTABLISHED) {
56 #if defined(LWS_WITH_CLIENT)
57
58 if (lwsi_state(wsi) == LRS_WAITING_SSL &&
59 ((pollfd->revents & LWS_POLLOUT)) &&
60 lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
61 lwsl_info("failed at set pollfd\n");
62 return LWS_HPI_RET_PLEASE_CLOSE_ME;
63 }
64
65 if ((pollfd->revents & LWS_POLLOUT) &&
66 lws_handle_POLLOUT_event(wsi, pollfd)) {
67 lwsl_debug("POLLOUT event closed it\n");
68 return LWS_HPI_RET_PLEASE_CLOSE_ME;
69 }
70
71 n = lws_mqtt_client_socket_service(wsi, pollfd, NULL);
72 if (n)
73 return LWS_HPI_RET_WSI_ALREADY_DIED;
74 #endif
75 return LWS_HPI_RET_HANDLED;
76 }
77
78 /* 1: something requested a callback when it was OK to write */
79
80 if ((pollfd->revents & LWS_POLLOUT) &&
81 lwsi_state_can_handle_POLLOUT(wsi) &&
82 lws_handle_POLLOUT_event(wsi, pollfd)) {
83 if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
84 lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE);
85
86 return LWS_HPI_RET_PLEASE_CLOSE_ME;
87 }
88
89 /* 3: buflist needs to be drained
90 */
91 read:
92 // lws_buflist_describe(&wsi->buflist, wsi, __func__);
93 ebuf.len = (int)lws_buflist_next_segment_len(&wsi->buflist, &ebuf.token);
94 if (ebuf.len) {
95 lwsl_info("draining buflist (len %d)\n", ebuf.len);
96 buffered = 1;
97 goto drain;
98 }
99
100 if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
101 return LWS_HPI_RET_HANDLED;
102
103 /* if (lws_is_flowcontrolled(wsi)) { */
104 /* lwsl_info("%s: %p should be rxflow (bm 0x%x)..\n", */
105 /* __func__, wsi, wsi->rxflow_bitmap); */
106 /* return LWS_HPI_RET_HANDLED; */
107 /* } */
108
109 if (!(lwsi_role_client(wsi) && lwsi_state(wsi) != LRS_ESTABLISHED)) {
110 /*
111 * In case we are going to react to this rx by scheduling
112 * writes, we need to restrict the amount of rx to the size
113 * the protocol reported for rx buffer.
114 *
115 * Otherwise we get a situation we have to absorb possibly a
116 * lot of reads before we get a chance to drain them by writing
117 * them, eg, with echo type tests in autobahn.
118 */
119
120 buffered = 0;
121 ebuf.token = pt->serv_buf;
122 ebuf.len = (int)wsi->a.context->pt_serv_buf_size;
123
124 if ((unsigned int)ebuf.len > wsi->a.context->pt_serv_buf_size)
125 ebuf.len = (int)wsi->a.context->pt_serv_buf_size;
126
127 if ((int)pending > ebuf.len)
128 pending = (unsigned int)ebuf.len;
129
130 ebuf.len = lws_ssl_capable_read(wsi, ebuf.token,
131 pending ? pending :
132 (unsigned int)ebuf.len);
133 switch (ebuf.len) {
134 case 0:
135 lwsl_info("%s: zero length read\n",
136 __func__);
137 return LWS_HPI_RET_PLEASE_CLOSE_ME;
138 case LWS_SSL_CAPABLE_MORE_SERVICE:
139 lwsl_info("SSL Capable more service\n");
140 return LWS_HPI_RET_HANDLED;
141 case LWS_SSL_CAPABLE_ERROR:
142 lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n",
143 __func__);
144 return LWS_HPI_RET_PLEASE_CLOSE_ME;
145 }
146
147 /*
148 * coverity thinks ssl_capable_read() may read over
149 * 2GB. Dissuade it...
150 */
151 ebuf.len &= 0x7fffffff;
152 }
153
154 drain:
155 /* service incoming data */
156 //lws_buflist_describe(&wsi->buflist, wsi, __func__);
157 if (ebuf.len) {
158 n = lws_read_mqtt(wsi, ebuf.token, (unsigned int)ebuf.len);
159 if (n < 0) {
160 lwsl_notice("%s: lws_read_mqtt returned %d\n",
161 __func__, n);
162 /* we closed wsi */
163 goto fail;
164 }
165 // lws_buflist_describe(&wsi->buflist, wsi, __func__);
166 lwsl_debug("%s: consuming %d / %d\n", __func__, n, ebuf.len);
167 if (lws_buflist_aware_finished_consuming(wsi, &ebuf, ebuf.len,
168 buffered, __func__))
169 return LWS_HPI_RET_PLEASE_CLOSE_ME;
170 }
171
172 ebuf.token = NULL;
173 ebuf.len = 0;
174
175 pending = (unsigned int)lws_ssl_pending(wsi);
176 if (pending) {
177 pending = pending > wsi->a.context->pt_serv_buf_size ?
178 wsi->a.context->pt_serv_buf_size : pending;
179 goto read;
180 }
181
182 if (buffered && /* were draining, now nothing left */
183 !lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
184 lwsl_info("%s: %s flow buf: drained\n", __func__, lws_wsi_tag(wsi));
185 /* having drained the rxflow buffer, can rearm POLLIN */
186 #if !defined(LWS_WITH_SERVER)
187 n =
188 #endif
189 __lws_rx_flow_control(wsi);
190 /* n ignored, needed for NO_SERVER case */
191 }
192
193 /* n = 0 */
194 return LWS_HPI_RET_HANDLED;
195
196 fail:
197 lwsl_err("%s: Failed, bailing\n", __func__);
198 lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "mqtt svc fail");
199
200 return LWS_HPI_RET_WSI_ALREADY_DIED;
201 }
202
203 #if 0 /* defined(LWS_WITH_SERVER) */
204
205 static int
206 rops_adoption_bind_mqtt(struct lws *wsi, int type, const char *vh_prot_name)
207 {
208 /* no http but socket... must be mqtt */
209 if ((type & LWS_ADOPT_HTTP) || !(type & LWS_ADOPT_SOCKET) ||
210 (type & _LWS_ADOPT_FINISH))
211 return 0; /* no match */
212
213 lws_role_transition(wsi, 0, (type & LWS_ADOPT_ALLOW_SSL) ? LRS_SSL_INIT :
214 LRS_ESTABLISHED, &role_ops_mqtt);
215
216 if (vh_prot_name)
217 lws_bind_protocol(wsi, wsi->a.protocol, __func__);
218 else
219 /* this is the only time he will transition */
220 lws_bind_protocol(wsi,
221 &wsi->a.vhost->protocols[wsi->a.vhost->mqtt_protocol_index],
222 __func__);
223
224 return 1; /* bound */
225 }
226 #endif
227
228 static int
rops_client_bind_mqtt(struct lws * wsi,const struct lws_client_connect_info * i)229 rops_client_bind_mqtt(struct lws *wsi, const struct lws_client_connect_info *i)
230 {
231 lwsl_debug("%s: i = %p\n", __func__, i);
232 if (!i) {
233
234 /* finalize */
235
236 if (!wsi->user_space && wsi->stash->cis[CIS_METHOD])
237 if (lws_ensure_user_space(wsi))
238 return 1;
239
240 if (!wsi->stash->cis[CIS_METHOD] && !wsi->stash->cis[CIS_ALPN])
241 wsi->stash->cis[CIS_ALPN] = "x-amzn-mqtt-ca";
242
243 /* if we went on the ah waiting list, it's ok, we can
244 * wait.
245 *
246 * When we do get the ah, now or later, he will end up
247 * at lws_http_client_connect_via_info2().
248 */
249 #if defined(LWS_WITH_CLIENT)
250 if (lws_header_table_attach(wsi, 0) < 0)
251 /*
252 * if we failed here, the connection is already closed
253 * and freed.
254 */
255 return -1;
256 #else
257 if (lws_header_table_attach(wsi, 0))
258 return 0;
259 #endif
260 return 0;
261 }
262
263 /* if a recognized mqtt method, bind to it */
264 if (strcmp(i->method, "MQTT"))
265 return 0; /* no match */
266
267 if (lws_create_client_mqtt_object(i, wsi))
268 return 1;
269
270 lws_role_transition(wsi, LWSIFR_CLIENT, LRS_UNCONNECTED,
271 &role_ops_mqtt);
272 return 1; /* matched */
273 }
274
275 static int
rops_handle_POLLOUT_mqtt(struct lws * wsi)276 rops_handle_POLLOUT_mqtt(struct lws *wsi)
277 {
278 struct lws **wsi2;
279
280 lwsl_debug("%s\n", __func__);
281
282 #if defined(LWS_WITH_CLIENT)
283 if (wsi->mqtt && wsi->mqtt->send_pingreq && !wsi->mqtt->inside_payload) {
284 uint8_t buf[LWS_PRE + 2];
285
286 /*
287 * We are swallowing this POLLOUT in order to send a PINGREQ
288 * autonomously
289 */
290
291 wsi->mqtt->send_pingreq = 0;
292
293 lwsl_notice("%s: issuing PINGREQ\n", __func__);
294
295 buf[LWS_PRE] = LMQCP_CTOS_PINGREQ << 4;
296 buf[LWS_PRE + 1] = 0;
297
298 if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 2,
299 LWS_WRITE_BINARY) != 2)
300 return LWS_HP_RET_BAIL_DIE;
301
302 return LWS_HP_RET_BAIL_OK;
303 }
304 #endif
305 if (wsi->mqtt && !wsi->mqtt->inside_payload &&
306 (wsi->mqtt->send_pubrec || wsi->mqtt->send_pubrel ||
307 wsi->mqtt->send_pubcomp)) {
308 uint8_t buf[LWS_PRE + 4];
309 /* Remaining len = 2 */
310 buf[LWS_PRE + 1] = 2;
311 if (wsi->mqtt->send_pubrec) {
312 lwsl_notice("%s: issuing PUBREC for pkt id: %d\n",
313 __func__, wsi->mqtt->peer_ack_pkt_id);
314 buf[LWS_PRE] = LMQCP_PUBREC << 4 | 0x2;
315 /* Packet ID */
316 lws_ser_wu16be(&buf[LWS_PRE + 2],
317 wsi->mqtt->peer_ack_pkt_id);
318 wsi->mqtt->send_pubrec = 0;
319 } else if (wsi->mqtt->send_pubrel) {
320 lwsl_notice("%s: issuing PUBREL for pkt id: %d\n",
321 __func__, wsi->mqtt->ack_pkt_id);
322 buf[LWS_PRE] = LMQCP_PUBREL << 4 | 0x2;
323 lws_ser_wu16be(&buf[LWS_PRE + 2],
324 wsi->mqtt->ack_pkt_id);
325 wsi->mqtt->send_pubrel = 0;
326 } else {
327 lwsl_notice("%s: issuing PUBCOMP for pkt id: %d\n",
328 __func__, wsi->mqtt->peer_ack_pkt_id);
329 buf[LWS_PRE] = LMQCP_PUBCOMP << 4 | 0x2;
330 lws_ser_wu16be(&buf[LWS_PRE + 2],
331 wsi->mqtt->peer_ack_pkt_id);
332 wsi->mqtt->send_pubcomp = 0;
333 }
334 if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
335 LWS_WRITE_BINARY) != 4)
336 return LWS_HP_RET_BAIL_DIE;
337 return LWS_HP_RET_BAIL_OK;
338 }
339
340 wsi = lws_get_network_wsi(wsi);
341
342 wsi->mux.requested_POLLOUT = 0;
343
344 wsi2 = &wsi->mux.child_list;
345 if (!*wsi2) {
346 lwsl_debug("%s: no children\n", __func__);
347 return LWS_HP_RET_DROP_POLLOUT;
348 }
349
350 if (!wsi->mqtt)
351 return LWS_HP_RET_BAIL_DIE;
352
353 lws_wsi_mux_dump_waiting_children(wsi);
354
355 do {
356 struct lws *w, **wa;
357
358 wa = &(*wsi2)->mux.sibling_list;
359 if (!(*wsi2)->mux.requested_POLLOUT)
360 goto next_child;
361
362 if (!lwsi_state_can_handle_POLLOUT(wsi))
363 goto next_child;
364
365 /*
366 * If the nwsi is in the middle of a frame, we can only
367 * continue to send that
368 */
369
370 if (wsi->mqtt->inside_payload && !(*wsi2)->mqtt->inside_payload)
371 goto next_child;
372
373 /*
374 * we're going to do writable callback for this child.
375 * move him to be the last child
376 */
377 w = lws_wsi_mux_move_child_to_tail(wsi2);
378 if (!w) {
379 wa = &wsi->mux.child_list;
380 goto next_child;
381 }
382
383 lwsl_debug("%s: child %s (wsistate 0x%x)\n", __func__,
384 lws_wsi_tag(w), (unsigned int)w->wsistate);
385
386 if (lwsi_state(wsi) == LRS_ESTABLISHED &&
387 !wsi->mqtt->inside_payload &&
388 wsi->mqtt->send_puback) {
389 uint8_t buf[LWS_PRE + 4];
390 lwsl_notice("%s: issuing PUBACK for pkt id: %d\n",
391 __func__, wsi->mqtt->ack_pkt_id);
392
393 /* Fixed header */
394 buf[LWS_PRE] = LMQCP_PUBACK << 4;
395 /* Remaining len = 2 */
396 buf[LWS_PRE + 1] = 2;
397 /* Packet ID */
398 lws_ser_wu16be(&buf[LWS_PRE + 2], wsi->mqtt->peer_ack_pkt_id);
399
400 if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
401 LWS_WRITE_BINARY) != 4)
402 return LWS_HP_RET_BAIL_DIE;
403
404 wsi->mqtt->send_puback = 0;
405 w->mux.requested_POLLOUT = 1;
406
407 wa = &wsi->mux.child_list;
408 goto next_child;
409 }
410
411 if (lws_callback_as_writeable(w)) {
412 lwsl_notice("%s: Closing child %s\n", __func__, lws_wsi_tag(w));
413 lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
414 "mqtt pollout handle");
415 wa = &wsi->mux.child_list;
416 }
417
418 next_child:
419 wsi2 = wa;
420 } while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi));
421
422 // lws_wsi_mux_dump_waiting_children(wsi);
423
424 if (lws_wsi_mux_action_pending_writeable_reqs(wsi))
425 return LWS_HP_RET_BAIL_DIE;
426
427 return LWS_HP_RET_BAIL_OK;
428 }
429
430 #if defined(LWS_WITH_CLIENT)
431 static int
rops_issue_keepalive_mqtt(struct lws * wsi,int isvalid)432 rops_issue_keepalive_mqtt(struct lws *wsi, int isvalid)
433 {
434 struct lws *nwsi = lws_get_network_wsi(wsi);
435
436 if (isvalid) {
437 _lws_validity_confirmed_role(nwsi);
438
439 return 0;
440 }
441
442 nwsi->mqtt->send_pingreq = 1;
443 lws_callback_on_writable(nwsi);
444
445 return 0;
446 }
447 #endif
448
449 static int
rops_close_role_mqtt(struct lws_context_per_thread * pt,struct lws * wsi)450 rops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi)
451 {
452 struct lws *nwsi = lws_get_network_wsi(wsi);
453 lws_mqtt_subs_t *s, *s1, *mysub;
454 lws_mqttc_t *c;
455
456 if (!wsi->mqtt)
457 return 0;
458
459 c = &wsi->mqtt->client;
460
461 lws_sul_cancel(&wsi->mqtt->sul_qos_puback_pubrec_wait);
462
463 lws_mqtt_str_free(&c->username);
464 lws_mqtt_str_free(&c->password);
465 lws_mqtt_str_free(&c->will.message);
466 lws_mqtt_str_free(&c->will.topic);
467 lws_mqtt_str_free(&c->id);
468
469 /* clean up any subscription allocations */
470
471 s = wsi->mqtt->subs_head;
472 wsi->mqtt->subs_head = NULL;
473 while (s) {
474 s1 = s->next;
475 /*
476 * Account for children no longer using nwsi subscription
477 */
478 mysub = lws_mqtt_find_sub(nwsi->mqtt, (const char *)&s[1]);
479 // assert(mysub); /* if child subscribed, nwsi must feel the same */
480 if (mysub) {
481 assert(mysub->ref_count);
482 mysub->ref_count--;
483 }
484 lws_free(s);
485 s = s1;
486 }
487
488 lws_mqtt_publish_param_t *pub =
489 (lws_mqtt_publish_param_t *)
490 wsi->mqtt->rx_cpkt_param;
491
492 if (pub)
493 lws_free_set_NULL(pub->topic);
494
495 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
496
497 lws_free_set_NULL(wsi->mqtt);
498
499 return 0;
500 }
501
502 static int
rops_callback_on_writable_mqtt(struct lws * wsi)503 rops_callback_on_writable_mqtt(struct lws *wsi)
504 {
505 #if defined(LWS_WITH_CLIENT)
506 struct lws *network_wsi;
507 #endif
508 int already;
509
510 lwsl_debug("%s: %s (wsistate 0x%x)\n", __func__, lws_wsi_tag(wsi),
511 (unsigned int)wsi->wsistate);
512
513 if (wsi->mux.requested_POLLOUT
514 #if defined(LWS_WITH_CLIENT)
515 && !wsi->client_h2_alpn
516 #endif
517 ) {
518 lwsl_debug("already pending writable\n");
519 return 1;
520 }
521 #if 0
522 /* is this for DATA or for control messages? */
523 if (wsi->upgraded_to_http2 && !wsi->h2.h2n->pps &&
524 !lws_h2_tx_cr_get(wsi)) {
525 /*
526 * other side is not able to cope with us sending DATA
527 * anything so no matter if we have POLLOUT on our side if it's
528 * DATA we want to send.
529 *
530 * Delay waiting for our POLLOUT until peer indicates he has
531 * space for more using tx window command in http2 layer
532 */
533 lwsl_notice("%s: %p: skint (%d)\n", __func__, wsi,
534 wsi->h2.tx_cr);
535 wsi->h2.skint = 1;
536 return 0;
537 }
538
539 wsi->h2.skint = 0;
540 #endif
541 #if defined(LWS_WITH_CLIENT)
542 network_wsi = lws_get_network_wsi(wsi);
543 #endif
544 already = lws_wsi_mux_mark_parents_needing_writeable(wsi);
545
546 /* for network action, act only on the network wsi */
547
548 if (already
549 #if defined(LWS_WITH_CLIENT)
550 && !network_wsi->client_mux_substream
551 #endif
552 )
553 return 1;
554
555 return 0;
556 }
557
558 static int
rops_close_kill_connection_mqtt(struct lws * wsi,enum lws_close_status reason)559 rops_close_kill_connection_mqtt(struct lws *wsi, enum lws_close_status reason)
560 {
561 lwsl_info(" %s, his parent %s: child list %p, siblings:\n",
562 lws_wsi_tag(wsi),
563 lws_wsi_tag(wsi->mux.parent_wsi), wsi->mux.child_list);
564 //lws_wsi_mux_dump_children(wsi);
565
566 if (wsi->mux_substream
567 #if defined(LWS_WITH_CLIENT)
568 || wsi->client_mux_substream
569 #endif
570 ) {
571 lwsl_info("closing %s: parent %s: first child %p\n",
572 lws_wsi_tag(wsi),
573 lws_wsi_tag(wsi->mux.parent_wsi),
574 wsi->mux.child_list);
575
576 if (wsi->mux.child_list && lwsl_visible(LLL_INFO)) {
577 lwsl_info(" parent %s: closing children: list:\n", lws_wsi_tag(wsi));
578 lws_wsi_mux_dump_children(wsi);
579 }
580
581 lws_wsi_mux_close_children(wsi, (int)reason);
582 }
583
584 if ((
585 #if defined(LWS_WITH_CLIENT)
586 wsi->client_mux_substream ||
587 #endif
588 wsi->mux_substream) &&
589 wsi->mux.parent_wsi) {
590 lws_wsi_mux_sibling_disconnect(wsi);
591 }
592
593 return 0;
594 }
595
596 static const lws_rops_t rops_table_mqtt[] = {
597 /* 1 */ { .handle_POLLIN = rops_handle_POLLIN_mqtt },
598 /* 2 */ { .handle_POLLOUT = rops_handle_POLLOUT_mqtt },
599 /* 3 */ { .callback_on_writable = rops_callback_on_writable_mqtt },
600 /* 4 */ { .close_role = rops_close_role_mqtt },
601 /* 5 */ { .close_kill_connection = rops_close_kill_connection_mqtt },
602 #if defined(LWS_WITH_CLIENT)
603 /* 6 */ { .client_bind = rops_client_bind_mqtt },
604 /* 7 */ { .issue_keepalive = rops_issue_keepalive_mqtt },
605 #endif
606 };
607
608 struct lws_role_ops role_ops_mqtt = {
609 /* role name */ "mqtt",
610 /* alpn id */ "x-amzn-mqtt-ca", /* "mqtt/3.1.1" */
611
612 /* rops_table */ rops_table_mqtt,
613 /* rops_idx */ {
614 /* LWS_ROPS_check_upgrades */
615 /* LWS_ROPS_pt_init_destroy */ 0x00,
616 /* LWS_ROPS_init_vhost */
617 /* LWS_ROPS_destroy_vhost */ 0x00,
618 /* LWS_ROPS_service_flag_pending */
619 /* LWS_ROPS_handle_POLLIN */ 0x01,
620 /* LWS_ROPS_handle_POLLOUT */
621 /* LWS_ROPS_perform_user_POLLOUT */ 0x20,
622 /* LWS_ROPS_callback_on_writable */
623 /* LWS_ROPS_tx_credit */ 0x30,
624 /* LWS_ROPS_write_role_protocol */
625 /* LWS_ROPS_encapsulation_parent */ 0x00,
626 /* LWS_ROPS_alpn_negotiated */
627 /* LWS_ROPS_close_via_role_protocol */ 0x00,
628 /* LWS_ROPS_close_role */
629 /* LWS_ROPS_close_kill_connection */ 0x45,
630 /* LWS_ROPS_destroy_role */
631 /* LWS_ROPS_adoption_bind */ 0x00,
632
633 /* LWS_ROPS_client_bind */
634 #if defined(LWS_WITH_CLIENT)
635 /* LWS_ROPS_issue_keepalive */ 0x67,
636 #else
637 /* LWS_ROPS_issue_keepalive */ 0x00,
638 #endif
639 },
640
641 .adoption_cb = { LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED,
642 LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED },
643 .rx_cb = { LWS_CALLBACK_MQTT_CLIENT_RX,
644 LWS_CALLBACK_MQTT_CLIENT_RX },
645 .writeable_cb = { LWS_CALLBACK_MQTT_CLIENT_WRITEABLE,
646 LWS_CALLBACK_MQTT_CLIENT_WRITEABLE },
647 .close_cb = { LWS_CALLBACK_MQTT_CLIENT_CLOSED,
648 LWS_CALLBACK_MQTT_CLIENT_CLOSED },
649 .protocol_bind_cb = { LWS_CALLBACK_MQTT_IDLE,
650 LWS_CALLBACK_MQTT_IDLE },
651 .protocol_unbind_cb = { LWS_CALLBACK_MQTT_DROP_PROTOCOL,
652 LWS_CALLBACK_MQTT_DROP_PROTOCOL },
653 .file_handle = 0,
654 };
655