1 /*
2 * lws System Message Distribution
3 *
4 * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include "private-lib-core.h"
26 #include <assert.h>
27
28 /* comment me to remove extra debug and sanity checks */
29 // #define LWS_SMD_DEBUG
30
31
32 #if defined(LWS_SMD_DEBUG)
33 #define lwsl_smd lwsl_notice
34 #else
35 #define lwsl_smd(_s, ...)
36 #endif
37
38 void *
lws_smd_msg_alloc(struct lws_context * ctx,lws_smd_class_t _class,size_t len)39 lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
40 {
41 lws_smd_msg_t *msg;
42
43 /* only allow it if someone wants to consume this class of event */
44
45 if (!(ctx->smd._class_filter & _class)) {
46 lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants",
47 (unsigned int)_class);
48 return NULL;
49 }
50
51 assert(len <= LWS_SMD_MAX_PAYLOAD);
52
53
54 /*
55 * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
56 * payload, ie, msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
57 */
58 msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
59 __func__);
60 if (!msg)
61 return NULL;
62
63 memset(msg, 0, sizeof(*msg));
64 msg->timestamp = lws_now_usecs();
65 msg->length = (uint16_t)len;
66 msg->_class = _class;
67
68 return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
69 }
70
71 void
lws_smd_msg_free(void ** ppay)72 lws_smd_msg_free(void **ppay)
73 {
74 lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
75 LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
76
77 /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
78 lws_free(msg);
79 *ppay = NULL;
80 }
81
82 #if defined(LWS_SMD_DEBUG)
83 static void
lws_smd_dump(lws_smd_t * smd)84 lws_smd_dump(lws_smd_t *smd)
85 {
86 int n = 1;
87
88 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
89 smd->owner_messages.head) {
90 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
91
92 lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n",
93 n++, msg, msg->refcount,
94 (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000),
95 msg->length, msg->_class,
96 (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF);
97
98 } lws_end_foreach_dll_safe(p, p1);
99
100 n = 1;
101 lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) {
102 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
103
104 lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n",
105 n++, pr, pr->tail, pr->_class_filter);
106 } lws_end_foreach_dll(p);
107 }
108 #endif
109
110 static int
_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t * pr,lws_smd_msg_t * msg)111 _lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
112 {
113 return !!(msg->_class & pr->_class_filter);
114 }
115
116 /*
117 * Figure out what to set the initial refcount for the message to
118 */
119
120 static int
_lws_smd_msg_assess_peers_interested(lws_smd_t * smd,lws_smd_msg_t * msg,struct lws_smd_peer * exc)121 _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
122 struct lws_smd_peer *exc)
123 {
124 struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
125 int interested = 0;
126
127 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
128 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
129
130 if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg))
131 /*
132 * This peer wants to consume it
133 */
134 interested++;
135
136 } lws_end_foreach_dll(p);
137
138 return interested;
139 }
140
141 static int
_lws_smd_class_mask_union(lws_smd_t * smd)142 _lws_smd_class_mask_union(lws_smd_t *smd)
143 {
144 uint32_t mask = 0;
145
146 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
147 smd->owner_peers.head) {
148 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
149
150 mask |= pr->_class_filter;
151
152 } lws_end_foreach_dll_safe(p, p1);
153
154 smd->_class_filter = mask;
155
156 return 0;
157 }
158
159 /* Call with message lock held */
160
161 static void
_lws_smd_msg_destroy(struct lws_context * cx,lws_smd_t * smd,lws_smd_msg_t * msg)162 _lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg)
163 {
164 /*
165 * We think we gave the message to everyone and can destroy it.
166 * Sanity check that no peer holds a pointer to this guy
167 */
168
169 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
170 smd->owner_peers.head) {
171 lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list);
172
173 if (xpr->tail == msg) {
174 lwsl_cx_err(cx, "peer %p has msg %p "
175 "we are about to destroy as tail", xpr, msg);
176 #if !defined(LWS_PLAT_FREERTOS)
177 assert(0);
178 #endif
179 }
180
181 } lws_end_foreach_dll_safe(p, p1);
182
183 /*
184 * We have fully delivered the message now, it
185 * can be unlinked and destroyed
186 */
187 lwsl_cx_info(cx, "destroy msg %p", msg);
188 lws_dll2_remove(&msg->list);
189 lws_free(msg);
190 }
191
192 /*
193 * This is wanting to be threadsafe, limiting the apis we can call
194 */
195
196 int
_lws_smd_msg_send(struct lws_context * ctx,void * pay,struct lws_smd_peer * exc)197 _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
198 {
199 lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
200 LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
201
202 if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) {
203 lwsl_cx_warn(ctx, "rejecting message on queue depth %d",
204 (int)ctx->smd.owner_messages.count);
205 /* reject the message due to max queue depth reached */
206 return 1;
207 }
208
209 if (!ctx->smd.delivering &&
210 lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
211 return 1; /* For Coverity */
212
213 if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
214 goto bail;
215
216 msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(
217 &ctx->smd, msg, exc);
218 if (!msg->refcount) {
219 /* possible, condsidering exc and no other participants */
220 lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
221
222 lws_free(msg);
223 if (!ctx->smd.delivering)
224 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
225
226 return 0;
227 }
228
229 msg->exc = exc;
230
231 /* let's add him on the queue... */
232
233 lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
234
235 /*
236 * Any peer with no active tail needs to check our class to see if we
237 * should become his tail
238 */
239
240 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
241 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
242
243 if (pr != exc &&
244 !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) {
245 pr->tail = msg;
246 /* tail message has to actually be of interest to the peer */
247 assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
248 }
249
250 } lws_end_foreach_dll(p);
251
252 #if defined(LWS_SMD_DEBUG)
253 lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__,
254 msg, msg->refcount, ctx->smd.owner_messages.count);
255 lws_smd_dump(&ctx->smd);
256 #endif
257
258 lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
259
260 bail:
261 if (!ctx->smd.delivering)
262 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
263
264 /* we may be happening from another thread context */
265 lws_cancel_service(ctx);
266
267 return 0;
268 }
269
270 /*
271 * This is wanting to be threadsafe, limiting the apis we can call
272 */
273
274 int
lws_smd_msg_send(struct lws_context * ctx,void * pay)275 lws_smd_msg_send(struct lws_context *ctx, void *pay)
276 {
277 return _lws_smd_msg_send(ctx, pay, NULL);
278 }
279
280 /*
281 * This is wanting to be threadsafe, limiting the apis we can call
282 */
283
284 int
lws_smd_msg_printf(struct lws_context * ctx,lws_smd_class_t _class,const char * format,...)285 lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
286 const char *format, ...)
287 {
288 lws_smd_msg_t *msg;
289 va_list ap;
290 void *p;
291 int n;
292
293 if (!(ctx->smd._class_filter & _class))
294 /*
295 * There's nobody interested in messages of this class atm.
296 * Don't bother generating it, and act like all is well.
297 */
298 return 0;
299
300 va_start(ap, format);
301 n = vsnprintf(NULL, 0, format, ap);
302 va_end(ap);
303 if (n > LWS_SMD_MAX_PAYLOAD)
304 /* too large to send */
305 return 1;
306
307 p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
308 if (!p)
309 return 1;
310 msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
311 sizeof(*msg));
312 msg->length = (uint16_t)n;
313 va_start(ap, format);
314 vsnprintf((char *)p, (unsigned int)n + 2, format, ap);
315 va_end(ap);
316
317 /*
318 * locks taken and released in here
319 */
320
321 if (lws_smd_msg_send(ctx, p)) {
322 lws_smd_msg_free(&p);
323 return 1;
324 }
325
326 return 0;
327 }
328
329 #if defined(LWS_WITH_SECURE_STREAMS)
330 int
lws_smd_ss_msg_printf(const char * tag,uint8_t * buf,size_t * len,lws_smd_class_t _class,const char * format,...)331 lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
332 lws_smd_class_t _class, const char *format, ...)
333 {
334 char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN;
335 va_list ap;
336 int n;
337
338 if (*len < LWS_SMD_SS_RX_HEADER_LEN)
339 return 1;
340
341 lws_ser_wu64be(buf, _class);
342 lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
343
344 va_start(ap, format);
345 n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap);
346 va_end(ap);
347
348 if (n > LWS_SMD_MAX_PAYLOAD ||
349 (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN)
350 /* too large to send */
351 return 1;
352
353 *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n;
354
355 lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class,
356 (unsigned int)n);
357
358 return 0;
359 }
360
361 /*
362 * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can
363 * call through to with the payload it received from the proxy. It will then
364 * forward the recieved SMD message to all local (same-context) participants
365 * that are interested in that class (except ones with callback skip_cb, so
366 * we don't loop).
367 */
368
369 static int
_lws_smd_ss_rx_forward(struct lws_context * ctx,const char * tag,struct lws_smd_peer * pr,const uint8_t * buf,size_t len)370 _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
371 struct lws_smd_peer *pr, const uint8_t *buf, size_t len)
372 {
373 lws_smd_class_t _class;
374 lws_smd_msg_t *msg;
375 void *p;
376
377 if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF)
378 return 1;
379
380 if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF)
381 return 1;
382
383 _class = (lws_smd_class_t)lws_ser_ru64be(buf);
384
385 if (_class == LWSSMDCL_METRICS) {
386
387 }
388
389 /* only locally forward messages that we care about in this process */
390
391 if (!(ctx->smd._class_filter & _class))
392 /*
393 * There's nobody interested in messages of this class atm.
394 * Don't bother generating it, and act like all is well.
395 */
396 return 0;
397
398 p = lws_smd_msg_alloc(ctx, _class, len);
399 if (!p)
400 return 1;
401
402 msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
403 sizeof(*msg));
404 msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF);
405 /* adopt the original source timestamp, not time we forwarded it */
406 msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8);
407
408 /* copy the message payload in */
409 memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length);
410
411 /*
412 * locks taken and released in here
413 */
414
415 if (_lws_smd_msg_send(ctx, p, pr)) {
416 /* we couldn't send it after all that... */
417 lws_smd_msg_free(&p);
418
419 return 1;
420 }
421
422 lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
423 tag, (unsigned int)_class, msg->length,
424 (unsigned long long)msg->timestamp);
425
426 return 0;
427 }
428
429 int
lws_smd_ss_rx_forward(void * ss_user,const uint8_t * buf,size_t len)430 lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
431 {
432 struct lws_ss_handle *h = (struct lws_ss_handle *)
433 (((char *)ss_user) - sizeof(*h));
434 struct lws_context *ctx = lws_ss_get_context(h);
435
436 return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len);
437 }
438
439 #if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)
440 int
lws_smd_sspc_rx_forward(void * ss_user,const uint8_t * buf,size_t len)441 lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
442 {
443 struct lws_sspc_handle *h = (struct lws_sspc_handle *)
444 (((char *)ss_user) - sizeof(*h));
445 struct lws_context *ctx = lws_sspc_get_context(h);
446
447 return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len);
448 }
449 #endif
450
451 #endif
452
453 /*
454 * Peers that deregister need to adjust the refcount of messages they would
455 * have been interested in, but didn't take delivery of yet
456 */
457
458 static void
_lws_smd_peer_destroy(lws_smd_peer_t * pr)459 _lws_smd_peer_destroy(lws_smd_peer_t *pr)
460 {
461 lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
462 owner_peers);
463
464 if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */
465 return; /* For Coverity */
466
467 lws_dll2_remove(&pr->list);
468
469 /*
470 * We take the approach to adjust the refcount of every would-have-been
471 * delivered message we were interested in
472 */
473
474 while (pr->tail) {
475
476 lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next,
477 lws_smd_msg_t, list);
478
479 if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) {
480 if (!--pr->tail->refcount)
481 _lws_smd_msg_destroy(pr->ctx, smd, pr->tail);
482 }
483
484 pr->tail = m1;
485 }
486
487 lws_free(pr);
488
489 lws_mutex_unlock(smd->lock_messages); /* messages ------- */
490 }
491
492 static lws_smd_msg_t *
_lws_smd_msg_next_matching_filter(lws_smd_peer_t * pr)493 _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
494 {
495 lws_dll2_t *tail = &pr->tail->list;
496 lws_smd_msg_t *msg;
497
498 do {
499 tail = tail->next;
500 if (!tail)
501 return NULL;
502
503 msg = lws_container_of(tail, lws_smd_msg_t, list);
504 if (msg->exc != pr &&
505 _lws_smd_msg_peer_interested_in_msg(pr, msg))
506 return msg;
507 } while (1);
508
509 return NULL;
510 }
511
512 /*
513 * Delivers only one message to the peer and advances the tail, or sets to NULL
514 * if no more filtered queued messages. Returns nonzero if tail non-NULL.
515 *
516 * For Proxied SS, only asks for writeable and does not advance or change the
517 * tail.
518 *
519 * This is done so if multiple messages queued, we don't get a situation where
520 * one participant gets them all spammed, then the next etc. Instead they are
521 * delivered round-robin.
522 *
523 * Requires peer lock, may take message lock
524 */
525
526 static int
_lws_smd_msg_deliver_peer(struct lws_context * ctx,lws_smd_peer_t * pr)527 _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
528 {
529 lws_smd_msg_t *msg;
530
531 if (!pr->tail)
532 return 0;
533
534 msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
535
536
537 lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, refc %d, to peer %p",
538 (unsigned int)msg->_class, (int)msg->length,
539 (int)msg->refcount, pr);
540
541 pr->cb(pr->opaque, msg->_class, msg->timestamp,
542 ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
543 (size_t)msg->length);
544
545 assert(msg->refcount);
546
547 /*
548 * If there is one, move forward to the next queued
549 * message that meets the filters of this peer
550 */
551 pr->tail = _lws_smd_msg_next_matching_filter(pr);
552
553 /* tail message has to actually be of interest to the peer */
554 assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
555
556 if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */
557 return 1; /* For Coverity */
558
559 if (!--msg->refcount)
560 _lws_smd_msg_destroy(ctx, &ctx->smd, msg);
561 lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
562
563 return !!pr->tail;
564 }
565
566 /*
567 * Called when the event loop could deliver messages synchronously, eg, on
568 * entry to idle
569 */
570
571 int
lws_smd_msg_distribute(struct lws_context * ctx)572 lws_smd_msg_distribute(struct lws_context *ctx)
573 {
574 char more;
575
576 /* commonly, no messages and nothing to do... */
577
578 if (!ctx->smd.owner_messages.count)
579 return 0;
580
581 ctx->smd.delivering = 1;
582
583 do {
584 more = 0;
585 if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
586 return 1; /* For Coverity */
587
588 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
589 ctx->smd.owner_peers.head) {
590 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
591
592 more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr));
593
594 } lws_end_foreach_dll_safe(p, p1);
595
596 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
597 } while (more);
598
599 ctx->smd.delivering = 0;
600
601 return 0;
602 }
603
604 struct lws_smd_peer *
lws_smd_register(struct lws_context * ctx,void * opaque,int flags,lws_smd_class_t _class_filter,lws_smd_notification_cb_t cb)605 lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
606 lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
607 {
608 lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
609
610 if (!pr)
611 return NULL;
612
613 pr->cb = cb;
614 pr->opaque = opaque;
615 pr->_class_filter = _class_filter;
616 pr->ctx = ctx;
617
618 if (!ctx->smd.delivering &&
619 lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */
620 lws_free(pr);
621 return NULL; /* For Coverity */
622 }
623
624 /*
625 * Let's lock the message list before adding this peer... because...
626 */
627
628 if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */
629 lws_free(pr);
630 pr = NULL;
631 goto bail1; /* For Coverity */
632 }
633
634 lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
635
636 /* update the global class mask union to account for new peer mask */
637 _lws_smd_class_mask_union(&ctx->smd);
638
639 /*
640 * Now there's a new peer added, any messages we have stashed will try
641 * to deliver to this guy too, if he's interested in that class. So we
642 * have to update the message refcounts for queued messages-he's-
643 * interested-in accordingly.
644 */
645
646 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
647 ctx->smd.owner_messages.head) {
648 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
649
650 if (_lws_smd_msg_peer_interested_in_msg(pr, msg))
651 msg->refcount++;
652
653 } lws_end_foreach_dll_safe(p, p1);
654
655 /* ... ok we are done adding the peer */
656
657 lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
658
659 lwsl_cx_info(ctx, "peer %p (count %u) registered", pr,
660 (unsigned int)ctx->smd.owner_peers.count);
661
662 bail1:
663 if (!ctx->smd.delivering)
664 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
665
666 return pr;
667 }
668
669 void
lws_smd_unregister(struct lws_smd_peer * pr)670 lws_smd_unregister(struct lws_smd_peer *pr)
671 {
672 lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
673
674 if (!smd->delivering &&
675 lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */
676 return; /* For Coverity */
677 lwsl_cx_notice(pr->ctx, "destroying peer %p", pr);
678 _lws_smd_peer_destroy(pr);
679 if (!smd->delivering)
680 lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */
681 }
682
683 int
lws_smd_message_pending(struct lws_context * ctx)684 lws_smd_message_pending(struct lws_context *ctx)
685 {
686 int ret = 1;
687
688 /*
689 * First cheaply check the common case no messages pending, so there's
690 * definitely nothing for this tsi or anything else
691 */
692
693 if (!ctx->smd.owner_messages.count)
694 return 0;
695
696 /*
697 * If there are any messages, check their age and expire ones that
698 * have been hanging around too long
699 */
700
701 if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */
702 return 1; /* For Coverity */
703 if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
704 goto bail; /* For Coverity */
705
706 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
707 ctx->smd.owner_messages.head) {
708 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
709
710 if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) {
711 lwsl_cx_warn(ctx, "timing out queued message %p",
712 msg);
713
714 /*
715 * We're forcibly yanking this guy, we can expect that
716 * there might be peers that point to it as their tail.
717 *
718 * In that case, move their tails on to the next guy
719 * they are interested in, if any.
720 */
721
722 lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1,
723 ctx->smd.owner_peers.head) {
724 lws_smd_peer_t *pr = lws_container_of(pp,
725 lws_smd_peer_t, list);
726
727 if (pr->tail == msg)
728 pr->tail = _lws_smd_msg_next_matching_filter(pr);
729
730 } lws_end_foreach_dll_safe(pp, pp1);
731
732 /*
733 * No peer should fall foul of the peer tail checks
734 * when destroying the message now.
735 */
736
737 _lws_smd_msg_destroy(ctx, &ctx->smd, msg);
738 }
739 } lws_end_foreach_dll_safe(p, p1);
740
741 lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
742
743 /*
744 * Walk the peer list
745 */
746
747 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
748 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
749
750 if (pr->tail)
751 goto bail;
752
753 } lws_end_foreach_dll(p);
754
755 /*
756 * There's no message pending that we need to handle
757 */
758
759 ret = 0;
760
761 bail:
762 lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
763
764 return ret;
765 }
766
767 int
_lws_smd_destroy(struct lws_context * ctx)768 _lws_smd_destroy(struct lws_context *ctx)
769 {
770 /* stop any message creation */
771
772 ctx->smd._class_filter = 0;
773
774 /*
775 * Walk the message list, destroying them
776 */
777
778 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
779 ctx->smd.owner_messages.head) {
780 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
781
782 lws_dll2_remove(&msg->list);
783 lws_free(msg);
784
785 } lws_end_foreach_dll_safe(p, p1);
786
787 /*
788 * Walk the peer list, destroying them
789 */
790
791 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
792 ctx->smd.owner_peers.head) {
793 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
794
795 pr->tail = NULL; /* we just nuked all the messages, ignore */
796 _lws_smd_peer_destroy(pr);
797
798 } lws_end_foreach_dll_safe(p, p1);
799
800 lws_mutex_destroy(ctx->smd.lock_messages);
801 lws_mutex_destroy(ctx->smd.lock_peers);
802
803 return 0;
804 }
805