Lines Matching +full:no +full:- +full:ct
2 * libwebsockets - small server side websockets and web server implementation
4 * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 #include <private-lib-core.h>
32 if (h->u.mqtt.heap_baggage) { in secstream_mqtt_cleanup()
33 lws_free(h->u.mqtt.heap_baggage); in secstream_mqtt_cleanup()
34 h->u.mqtt.heap_baggage = NULL; in secstream_mqtt_cleanup()
37 if (h->u.mqtt.sub_info.topic) { in secstream_mqtt_cleanup()
38 for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) { in secstream_mqtt_cleanup()
39 if (h->u.mqtt.sub_info.topic[i].name) { in secstream_mqtt_cleanup()
40 lws_free((void*)h->u.mqtt.sub_info.topic[i].name); in secstream_mqtt_cleanup()
41 h->u.mqtt.sub_info.topic[i].name = NULL; in secstream_mqtt_cleanup()
44 lws_free(h->u.mqtt.sub_info.topic); in secstream_mqtt_cleanup()
45 h->u.mqtt.sub_info.topic = NULL; in secstream_mqtt_cleanup()
57 if (!h || !h->policy) in secstream_mqtt_subscribe()
58 return -1; in secstream_mqtt_subscribe()
60 if (h->policy->u.mqtt.aws_iot) in secstream_mqtt_subscribe()
65 if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe) in secstream_mqtt_subscribe()
71 * Expand with no output first to calculate the size of in secstream_mqtt_subscribe()
75 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, in secstream_mqtt_subscribe()
76 strlen(h->policy->u.mqtt.subscribe), &used_in, in secstream_mqtt_subscribe()
80 " topic with no output\n", in secstream_mqtt_subscribe()
97 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, in secstream_mqtt_subscribe()
98 strlen(h->policy->u.mqtt.subscribe), &used_in, in secstream_mqtt_subscribe()
105 lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); in secstream_mqtt_subscribe()
106 h->u.mqtt.sub_top.name = expbuf; in secstream_mqtt_subscribe()
110 * haven't done it yet. Do it using the pre-prepared in secstream_mqtt_subscribe()
111 * string-substituted version of the policy string. in secstream_mqtt_subscribe()
115 h->u.mqtt.sub_top.name); in secstream_mqtt_subscribe()
117 h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos; in secstream_mqtt_subscribe()
118 memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info)); in secstream_mqtt_subscribe()
119 h->u.mqtt.sub_info.num_topics = 1; in secstream_mqtt_subscribe()
120 h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top; in secstream_mqtt_subscribe()
121 h->u.mqtt.sub_info.topic = in secstream_mqtt_subscribe()
123 h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf); in secstream_mqtt_subscribe()
124 h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos; in secstream_mqtt_subscribe()
126 if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) { in secstream_mqtt_subscribe()
129 h->u.mqtt.sub_top.name = NULL; in secstream_mqtt_subscribe()
130 return -1; in secstream_mqtt_subscribe()
133 h->u.mqtt.sub_top.name = NULL; in secstream_mqtt_subscribe()
138 return -1; in secstream_mqtt_subscribe()
154 if (h->policy->u.mqtt.aws_iot) in secstream_mqtt_publish()
167 " topic with no output\n", __func__); in secstream_mqtt_publish()
185 lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); in secstream_mqtt_publish()
189 mqpp.packet_id = (uint16_t)(h->txord - 1); in secstream_mqtt_publish()
191 if (h->writeable_len) in secstream_mqtt_publish()
192 mqpp.payload_len = (uint32_t)h->writeable_len; in secstream_mqtt_publish()
199 mqpp.qos = h->policy->u.mqtt.qos; in secstream_mqtt_publish()
207 return -1; in secstream_mqtt_publish()
221 size_t buflen = sizeof(buf) - LWS_PRE; in secstream_mqtt()
238 h->wsi = NULL; in secstream_mqtt()
254 lws_sul_cancel(&h->sul_timeout); in secstream_mqtt()
258 if (h->ss_dangling_connected) in secstream_mqtt()
262 if (h->wsi) in secstream_mqtt()
263 lws_set_opaque_user_data(h->wsi, NULL); in secstream_mqtt()
264 h->wsi = NULL; in secstream_mqtt()
271 if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) && in secstream_mqtt()
272 !h->txn_ok && !wsi->a.context->being_destroyed) { in secstream_mqtt()
284 h->wsi = wsi; in secstream_mqtt()
285 h->retry = 0; in secstream_mqtt()
286 h->seqstate = SSSEQ_CONNECTED; in secstream_mqtt()
288 if (!h->policy->u.mqtt.subscribe || in secstream_mqtt()
289 !h->policy->u.mqtt.subscribe[0]) { in secstream_mqtt()
295 wsi->mqtt->done_subscribe = 1; in secstream_mqtt()
296 } else if (!h->policy->u.mqtt.clean_start && in secstream_mqtt()
297 wsi->mqtt->session_resumed) { in secstream_mqtt()
298 wsi->mqtt->inside_resume_session = 1; in secstream_mqtt()
304 wsi->mqtt->done_subscribe = 1; in secstream_mqtt()
305 wsi->mqtt->inside_resume_session = 0; in secstream_mqtt()
306 } else if (h->policy->u.mqtt.subscribe && in secstream_mqtt()
307 !wsi->mqtt->done_subscribe) { in secstream_mqtt()
316 lws_sul_cancel(&h->sul); in secstream_mqtt()
321 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL); in secstream_mqtt()
326 if (h->policy->u.mqtt.topic) in secstream_mqtt()
332 if (!h || !h->info.rx) in secstream_mqtt()
338 if (!pmqpp->payload_pos) in secstream_mqtt()
340 if (pmqpp->payload_pos + len == pmqpp->payload_len) in secstream_mqtt()
343 h->subseq = 1; in secstream_mqtt()
345 r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload, in secstream_mqtt()
357 if (wsi->mqtt->done_subscribe == 0) { in secstream_mqtt()
358 lws_sul_cancel(&h->sul); in secstream_mqtt()
363 wsi->mqtt->done_subscribe = 1; in secstream_mqtt()
368 lws_sul_cancel(&h->sul_timeout); in secstream_mqtt()
369 if (wsi->mqtt->inside_birth) { in secstream_mqtt()
373 wsi->mqtt->inside_birth = 0; in secstream_mqtt()
374 wsi->mqtt->done_birth = 1; in secstream_mqtt()
384 if (!h || !h->info.tx) in secstream_mqtt()
388 if (h->seqstate != SSSEQ_CONNECTED) { in secstream_mqtt()
389 lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate); in secstream_mqtt()
393 if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe) in secstream_mqtt()
396 if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) { in secstream_mqtt()
399 if (h->policy->u.mqtt.birth_message) { in secstream_mqtt()
402 if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message, in secstream_mqtt()
403 strlen(h->policy->u.mqtt.birth_message), in secstream_mqtt()
408 wsi->mqtt->inside_birth = 1; in secstream_mqtt()
410 used_out, h->policy->u.mqtt.birth_topic, in secstream_mqtt()
411 h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM); in secstream_mqtt()
413 r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, in secstream_mqtt()
420 if (h->u.mqtt.sub_info.num_topics) { in secstream_mqtt()
421 lmsp.num_topics = h->u.mqtt.sub_info.num_topics; in secstream_mqtt()
422 lmsp.topic = h->u.mqtt.sub_info.topic; in secstream_mqtt()
423 lmsp.packet_id = (uint16_t)(h->txord - 1); in secstream_mqtt()
428 return -1; in secstream_mqtt()
438 h->policy->u.mqtt.topic, in secstream_mqtt()
439 h->policy->u.mqtt.qos, f); in secstream_mqtt()
445 if (nwsi && (nwsi->mux.child_count == 1)) in secstream_mqtt()
447 return -1; in secstream_mqtt()
451 if (wsi->mqtt->inside_unsubscribe) { in secstream_mqtt()
454 return -1; in secstream_mqtt()
466 "lws-secstream-mqtt",
471 * Munge connect info according to protocol-specific considerations... this
472 * usually means interpreting aux in a protocol-specific way and using the
493 union lws_ss_contemp *ct) in secstream_connect_munge_mqtt() argument
496 /* we're going to string-substitute these before use */ in secstream_connect_munge_mqtt()
497 h->policy->u.mqtt.will_topic, in secstream_connect_munge_mqtt()
498 h->policy->u.mqtt.will_message, in secstream_connect_munge_mqtt()
499 h->policy->u.mqtt.subscribe, in secstream_connect_munge_mqtt()
500 h->policy->u.mqtt.topic, in secstream_connect_munge_mqtt()
501 h->policy->u.mqtt.birth_topic, in secstream_connect_munge_mqtt()
502 h->policy->u.mqtt.birth_message in secstream_connect_munge_mqtt()
508 int n = -1; in secstream_connect_munge_mqtt()
512 memset(&ct->ccp, 0, sizeof(ct->ccp)); in secstream_connect_munge_mqtt()
513 b = lws_system_get_blob(i->context, in secstream_connect_munge_mqtt()
519 lwsl_err("%s - Client ID too long.\n", in secstream_connect_munge_mqtt()
521 return -1; in secstream_connect_munge_mqtt()
525 return -1; in secstream_connect_munge_mqtt()
528 ct->ccp.client_id = NULL; in secstream_connect_munge_mqtt()
530 ct->ccp.client_id = (const char *)p; in secstream_connect_munge_mqtt()
531 lwsl_notice("%s - Client ID = %s\n", in secstream_connect_munge_mqtt()
532 __func__, ct->ccp.client_id); in secstream_connect_munge_mqtt()
536 ct->ccp.client_id = NULL; in secstream_connect_munge_mqtt()
539 b = lws_system_get_blob(i->context, in secstream_connect_munge_mqtt()
546 return -1; in secstream_connect_munge_mqtt()
549 ct->ccp.username = NULL; in secstream_connect_munge_mqtt()
551 ct->ccp.username = (const char *)p; in secstream_connect_munge_mqtt()
552 lwsl_notice("%s - Username ID = %s\n", in secstream_connect_munge_mqtt()
553 __func__, ct->ccp.username); in secstream_connect_munge_mqtt()
557 b = lws_system_get_blob(i->context, in secstream_connect_munge_mqtt()
564 return -1; in secstream_connect_munge_mqtt()
567 ct->ccp.password = NULL; in secstream_connect_munge_mqtt()
569 ct->ccp.password = (const char *)p; in secstream_connect_munge_mqtt()
570 lwsl_notice("%s - Password ID = %s\n", in secstream_connect_munge_mqtt()
571 __func__, ct->ccp.password); in secstream_connect_munge_mqtt()
575 ct->ccp.keep_alive = h->policy->u.mqtt.keep_alive; in secstream_connect_munge_mqtt()
576 ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u); in secstream_connect_munge_mqtt()
577 ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos; in secstream_connect_munge_mqtt()
578 ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain; in secstream_connect_munge_mqtt()
579 ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos; in secstream_connect_munge_mqtt()
580 ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain; in secstream_connect_munge_mqtt()
581 ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot; in secstream_connect_munge_mqtt()
582 h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos; in secstream_connect_munge_mqtt()
585 * We're going to string-substitute several of these parameters, which in secstream_connect_munge_mqtt()
590 * Notice these allocations at h->u.mqtt.heap_baggage belong to the in secstream_connect_munge_mqtt()
605 NULL, (size_t)-1); in secstream_connect_munge_mqtt()
620 h->u.mqtt.heap_baggage = lws_malloc(tot, __func__); in secstream_connect_munge_mqtt()
621 if (!h->u.mqtt.heap_baggage) in secstream_connect_munge_mqtt()
629 p = h->u.mqtt.heap_baggage; in secstream_connect_munge_mqtt()
632 (char *)p, (size_t)-1); in secstream_connect_munge_mqtt()
650 ct->ccp.will_param.topic = ps[SSCMM_STRSUB_WILL_TOPIC]; in secstream_connect_munge_mqtt()
651 ct->ccp.will_param.message = ps[SSCMM_STRSUB_WILL_MESSAGE]; in secstream_connect_munge_mqtt()
652 h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE]; in secstream_connect_munge_mqtt()
653 h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE]; in secstream_connect_munge_mqtt()
654 h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC]; in secstream_connect_munge_mqtt()
655 ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC]; in secstream_connect_munge_mqtt()
656 ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE]; in secstream_connect_munge_mqtt()
658 i->method = "MQTT"; in secstream_connect_munge_mqtt()
659 i->mqtt_cp = &ct->ccp; in secstream_connect_munge_mqtt()
661 i->alpn = "x-amzn-mqtt-ca"; in secstream_connect_munge_mqtt()
664 i->ssl_connection |= LCCSCF_PIPELINE; in secstream_connect_munge_mqtt()
671 "x-amzn-mqtt-ca", //"mqtt/3.1.1",