/* * lws System Message Distribution * * Copyright (C) 2019 - 2021 Andy Green * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. */ #include "private-lib-core.h" #include /* comment me to remove extra debug and sanity checks */ // #define LWS_SMD_DEBUG #if defined(LWS_SMD_DEBUG) #define lwsl_smd lwsl_notice #else #define lwsl_smd(_s, ...) #endif void * lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len) { lws_smd_msg_t *msg; /* only allow it if someone wants to consume this class of event */ if (!(ctx->smd._class_filter & _class)) { lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants", (unsigned int)_class); return NULL; } assert(len <= LWS_SMD_MAX_PAYLOAD); /* * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind * payload, ie, msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload */ msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len, __func__); if (!msg) return NULL; memset(msg, 0, sizeof(*msg)); msg->timestamp = lws_now_usecs(); msg->length = (uint16_t)len; msg->_class = _class; return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF; } void lws_smd_msg_free(void **ppay) { lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) - LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */ lws_free(msg); *ppay = NULL; } #if defined(LWS_SMD_DEBUG) static void lws_smd_dump(lws_smd_t *smd) { int n = 1; lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, smd->owner_messages.head) { lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n", n++, msg, msg->refcount, (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000), msg->length, msg->_class, (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF); } lws_end_foreach_dll_safe(p, p1); n = 1; lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n", n++, pr, pr->tail, pr->_class_filter); } lws_end_foreach_dll(p); } #endif static int _lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg) { return !!(msg->_class & pr->_class_filter); } /* * Figure out what to set the initial refcount for the message to */ static int _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg, struct lws_smd_peer *exc) { struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd); int interested = 0; lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg)) /* * This peer wants to consume it */ interested++; } lws_end_foreach_dll(p); return interested; } static int _lws_smd_class_mask_union(lws_smd_t *smd) { uint32_t mask = 0; lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, smd->owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); mask |= pr->_class_filter; } lws_end_foreach_dll_safe(p, p1); smd->_class_filter = mask; return 0; } /* Call with message lock held */ static void _lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg) { /* * We think we gave the message to everyone and can destroy it. * Sanity check that no peer holds a pointer to this guy */ lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, smd->owner_peers.head) { lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list); if (xpr->tail == msg) { lwsl_cx_err(cx, "peer %p has msg %p " "we are about to destroy as tail", xpr, msg); #if !defined(LWS_PLAT_FREERTOS) assert(0); #endif } } lws_end_foreach_dll_safe(p, p1); /* * We have fully delivered the message now, it * can be unlinked and destroyed */ lwsl_cx_info(cx, "destroy msg %p", msg); lws_dll2_remove(&msg->list); lws_free(msg); } /* * This is wanting to be threadsafe, limiting the apis we can call */ int _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) { lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) - LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) { lwsl_cx_warn(ctx, "rejecting message on queue depth %d", (int)ctx->smd.owner_messages.count); /* reject the message due to max queue depth reached */ return 1; } if (!ctx->smd.delivering && lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */ return 1; /* For Coverity */ if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */ goto bail; msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested( &ctx->smd, msg, exc); if (!msg->refcount) { /* possible, condsidering exc and no other participants */ lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ lws_free(msg); if (!ctx->smd.delivering) lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ return 0; } msg->exc = exc; /* let's add him on the queue... */ lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages); /* * Any peer with no active tail needs to check our class to see if we * should become his tail */ lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); if (pr != exc && !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) { pr->tail = msg; /* tail message has to actually be of interest to the peer */ assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); } } lws_end_foreach_dll(p); #if defined(LWS_SMD_DEBUG) lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__, msg, msg->refcount, ctx->smd.owner_messages.count); lws_smd_dump(&ctx->smd); #endif lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ bail: if (!ctx->smd.delivering) lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ /* we may be happening from another thread context */ lws_cancel_service(ctx); return 0; } /* * This is wanting to be threadsafe, limiting the apis we can call */ int lws_smd_msg_send(struct lws_context *ctx, void *pay) { return _lws_smd_msg_send(ctx, pay, NULL); } /* * This is wanting to be threadsafe, limiting the apis we can call */ int lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class, const char *format, ...) { lws_smd_msg_t *msg; va_list ap; void *p; int n; if (!(ctx->smd._class_filter & _class)) /* * There's nobody interested in messages of this class atm. * Don't bother generating it, and act like all is well. */ return 0; va_start(ap, format); n = vsnprintf(NULL, 0, format, ap); va_end(ap); if (n > LWS_SMD_MAX_PAYLOAD) /* too large to send */ return 1; p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2); if (!p) return 1; msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); msg->length = (uint16_t)n; va_start(ap, format); vsnprintf((char *)p, (unsigned int)n + 2, format, ap); va_end(ap); /* * locks taken and released in here */ if (lws_smd_msg_send(ctx, p)) { lws_smd_msg_free(&p); return 1; } return 0; } #if defined(LWS_WITH_SECURE_STREAMS) int lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len, lws_smd_class_t _class, const char *format, ...) { char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN; va_list ap; int n; if (*len < LWS_SMD_SS_RX_HEADER_LEN) return 1; lws_ser_wu64be(buf, _class); lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */ va_start(ap, format); n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap); va_end(ap); if (n > LWS_SMD_MAX_PAYLOAD || (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN) /* too large to send */ return 1; *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n; lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class, (unsigned int)n); return 0; } /* * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can * call through to with the payload it received from the proxy. It will then * forward the recieved SMD message to all local (same-context) participants * that are interested in that class (except ones with callback skip_cb, so * we don't loop). */ static int _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag, struct lws_smd_peer *pr, const uint8_t *buf, size_t len) { lws_smd_class_t _class; lws_smd_msg_t *msg; void *p; if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF) return 1; if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF) return 1; _class = (lws_smd_class_t)lws_ser_ru64be(buf); if (_class == LWSSMDCL_METRICS) { } /* only locally forward messages that we care about in this process */ if (!(ctx->smd._class_filter & _class)) /* * There's nobody interested in messages of this class atm. * Don't bother generating it, and act like all is well. */ return 0; p = lws_smd_msg_alloc(ctx, _class, len); if (!p) return 1; msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF); /* adopt the original source timestamp, not time we forwarded it */ msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8); /* copy the message payload in */ memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length); /* * locks taken and released in here */ if (_lws_smd_msg_send(ctx, p, pr)) { /* we couldn't send it after all that... */ lws_smd_msg_free(&p); return 1; } lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__, tag, (unsigned int)_class, msg->length, (unsigned long long)msg->timestamp); return 0; } int lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len) { struct lws_ss_handle *h = (struct lws_ss_handle *) (((char *)ss_user) - sizeof(*h)); struct lws_context *ctx = lws_ss_get_context(h); return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len); } #if defined(LWS_WITH_SECURE_STREAMS_PROXY_API) int lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len) { struct lws_sspc_handle *h = (struct lws_sspc_handle *) (((char *)ss_user) - sizeof(*h)); struct lws_context *ctx = lws_sspc_get_context(h); return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len); } #endif #endif /* * Peers that deregister need to adjust the refcount of messages they would * have been interested in, but didn't take delivery of yet */ static void _lws_smd_peer_destroy(lws_smd_peer_t *pr) { lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers); if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */ return; /* For Coverity */ lws_dll2_remove(&pr->list); /* * We take the approach to adjust the refcount of every would-have-been * delivered message we were interested in */ while (pr->tail) { lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next, lws_smd_msg_t, list); if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) { if (!--pr->tail->refcount) _lws_smd_msg_destroy(pr->ctx, smd, pr->tail); } pr->tail = m1; } lws_free(pr); lws_mutex_unlock(smd->lock_messages); /* messages ------- */ } static lws_smd_msg_t * _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr) { lws_dll2_t *tail = &pr->tail->list; lws_smd_msg_t *msg; do { tail = tail->next; if (!tail) return NULL; msg = lws_container_of(tail, lws_smd_msg_t, list); if (msg->exc != pr && _lws_smd_msg_peer_interested_in_msg(pr, msg)) return msg; } while (1); return NULL; } /* * Delivers only one message to the peer and advances the tail, or sets to NULL * if no more filtered queued messages. Returns nonzero if tail non-NULL. * * For Proxied SS, only asks for writeable and does not advance or change the * tail. * * This is done so if multiple messages queued, we don't get a situation where * one participant gets them all spammed, then the next etc. Instead they are * delivered round-robin. * * Requires peer lock, may take message lock */ static int _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr) { lws_smd_msg_t *msg; if (!pr->tail) return 0; msg = lws_container_of(pr->tail, lws_smd_msg_t, list); lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, refc %d, to peer %p", (unsigned int)msg->_class, (int)msg->length, (int)msg->refcount, pr); pr->cb(pr->opaque, msg->_class, msg->timestamp, ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF, (size_t)msg->length); assert(msg->refcount); /* * If there is one, move forward to the next queued * message that meets the filters of this peer */ pr->tail = _lws_smd_msg_next_matching_filter(pr); /* tail message has to actually be of interest to the peer */ assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */ return 1; /* For Coverity */ if (!--msg->refcount) _lws_smd_msg_destroy(ctx, &ctx->smd, msg); lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ return !!pr->tail; } /* * Called when the event loop could deliver messages synchronously, eg, on * entry to idle */ int lws_smd_msg_distribute(struct lws_context *ctx) { char more; /* commonly, no messages and nothing to do... */ if (!ctx->smd.owner_messages.count) return 0; ctx->smd.delivering = 1; do { more = 0; if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */ return 1; /* For Coverity */ lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr)); } lws_end_foreach_dll_safe(p, p1); lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ } while (more); ctx->smd.delivering = 0; return 0; } struct lws_smd_peer * lws_smd_register(struct lws_context *ctx, void *opaque, int flags, lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb) { lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__); if (!pr) return NULL; pr->cb = cb; pr->opaque = opaque; pr->_class_filter = _class_filter; pr->ctx = ctx; if (!ctx->smd.delivering && lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */ lws_free(pr); return NULL; /* For Coverity */ } /* * Let's lock the message list before adding this peer... because... */ if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */ lws_free(pr); pr = NULL; goto bail1; /* For Coverity */ } lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers); /* update the global class mask union to account for new peer mask */ _lws_smd_class_mask_union(&ctx->smd); /* * Now there's a new peer added, any messages we have stashed will try * to deliver to this guy too, if he's interested in that class. So we * have to update the message refcounts for queued messages-he's- * interested-in accordingly. */ lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, ctx->smd.owner_messages.head) { lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); if (_lws_smd_msg_peer_interested_in_msg(pr, msg)) msg->refcount++; } lws_end_foreach_dll_safe(p, p1); /* ... ok we are done adding the peer */ lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ lwsl_cx_info(ctx, "peer %p (count %u) registered", pr, (unsigned int)ctx->smd.owner_peers.count); bail1: if (!ctx->smd.delivering) lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ return pr; } void lws_smd_unregister(struct lws_smd_peer *pr) { lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers); if (!smd->delivering && lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */ return; /* For Coverity */ lwsl_cx_notice(pr->ctx, "destroying peer %p", pr); _lws_smd_peer_destroy(pr); if (!smd->delivering) lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */ } int lws_smd_message_pending(struct lws_context *ctx) { int ret = 1; /* * First cheaply check the common case no messages pending, so there's * definitely nothing for this tsi or anything else */ if (!ctx->smd.owner_messages.count) return 0; /* * If there are any messages, check their age and expire ones that * have been hanging around too long */ if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */ return 1; /* For Coverity */ if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */ goto bail; /* For Coverity */ lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, ctx->smd.owner_messages.head) { lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) { lwsl_cx_warn(ctx, "timing out queued message %p", msg); /* * We're forcibly yanking this guy, we can expect that * there might be peers that point to it as their tail. * * In that case, move their tails on to the next guy * they are interested in, if any. */ lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(pp, lws_smd_peer_t, list); if (pr->tail == msg) pr->tail = _lws_smd_msg_next_matching_filter(pr); } lws_end_foreach_dll_safe(pp, pp1); /* * No peer should fall foul of the peer tail checks * when destroying the message now. */ _lws_smd_msg_destroy(ctx, &ctx->smd, msg); } } lws_end_foreach_dll_safe(p, p1); lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ /* * Walk the peer list */ lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); if (pr->tail) goto bail; } lws_end_foreach_dll(p); /* * There's no message pending that we need to handle */ ret = 0; bail: lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */ return ret; } int _lws_smd_destroy(struct lws_context *ctx) { /* stop any message creation */ ctx->smd._class_filter = 0; /* * Walk the message list, destroying them */ lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, ctx->smd.owner_messages.head) { lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); lws_dll2_remove(&msg->list); lws_free(msg); } lws_end_foreach_dll_safe(p, p1); /* * Walk the peer list, destroying them */ lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); pr->tail = NULL; /* we just nuked all the messages, ignore */ _lws_smd_peer_destroy(pr); } lws_end_foreach_dll_safe(p, p1); lws_mutex_destroy(ctx->smd.lock_messages); lws_mutex_destroy(ctx->smd.lock_peers); return 0; }