• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws System Message Distribution
3  *
4  * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #include "private-lib-core.h"
26 #include <assert.h>
27 
28 /* comment me to remove extra debug and sanity checks */
29 // #define LWS_SMD_DEBUG
30 
31 
32 #if defined(LWS_SMD_DEBUG)
33 #define lwsl_smd lwsl_notice
34 #else
35 #define lwsl_smd(_s, ...)
36 #endif
37 
38 void *
lws_smd_msg_alloc(struct lws_context * ctx,lws_smd_class_t _class,size_t len)39 lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
40 {
41 	lws_smd_msg_t *msg;
42 
43 	/* only allow it if someone wants to consume this class of event */
44 
45 	if (!(ctx->smd._class_filter & _class)) {
46 		lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants",
47 				(unsigned int)_class);
48 		return NULL;
49 	}
50 
51 	assert(len <= LWS_SMD_MAX_PAYLOAD);
52 
53 
54 	/*
55 	 * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
56 	 * payload, ie,  msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
57 	 */
58 	msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
59 			 __func__);
60 	if (!msg)
61 		return NULL;
62 
63 	memset(msg, 0, sizeof(*msg));
64 	msg->timestamp = lws_now_usecs();
65 	msg->length = (uint16_t)len;
66 	msg->_class = _class;
67 
68 	return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
69 }
70 
71 void
lws_smd_msg_free(void ** ppay)72 lws_smd_msg_free(void **ppay)
73 {
74 	lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
75 				LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
76 
77 	/* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
78 	lws_free(msg);
79 	*ppay = NULL;
80 }
81 
82 #if defined(LWS_SMD_DEBUG)
83 static void
lws_smd_dump(lws_smd_t * smd)84 lws_smd_dump(lws_smd_t *smd)
85 {
86 	int n = 1;
87 
88 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
89 				   smd->owner_messages.head) {
90 		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
91 
92 		lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n",
93 			    n++, msg, msg->refcount,
94 			    (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000),
95 			    msg->length, msg->_class,
96 			    (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF);
97 
98 	} lws_end_foreach_dll_safe(p, p1);
99 
100 	n = 1;
101 	lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) {
102 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
103 
104 		lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n",
105 			    n++, pr, pr->tail, pr->_class_filter);
106 	} lws_end_foreach_dll(p);
107 }
108 #endif
109 
110 static int
_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t * pr,lws_smd_msg_t * msg)111 _lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
112 {
113     return !!(msg->_class & pr->_class_filter);
114 }
115 
116 /*
117  * Figure out what to set the initial refcount for the message to
118  */
119 
120 static int
_lws_smd_msg_assess_peers_interested(lws_smd_t * smd,lws_smd_msg_t * msg,struct lws_smd_peer * exc)121 _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
122 				     struct lws_smd_peer *exc)
123 {
124 	struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
125 	int interested = 0;
126 
127 	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
128 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
129 
130 		if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg))
131 			/*
132 			 * This peer wants to consume it
133 			 */
134 			interested++;
135 
136 	} lws_end_foreach_dll(p);
137 
138 	return interested;
139 }
140 
141 static int
_lws_smd_class_mask_union(lws_smd_t * smd)142 _lws_smd_class_mask_union(lws_smd_t *smd)
143 {
144 	uint32_t mask = 0;
145 
146 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
147 				   smd->owner_peers.head) {
148 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
149 
150 		mask |= pr->_class_filter;
151 
152 	} lws_end_foreach_dll_safe(p, p1);
153 
154 	smd->_class_filter = mask;
155 
156 	return 0;
157 }
158 
159 /* Call with message lock held */
160 
161 static void
_lws_smd_msg_destroy(struct lws_context * cx,lws_smd_t * smd,lws_smd_msg_t * msg)162 _lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg)
163 {
164 	/*
165 	 * We think we gave the message to everyone and can destroy it.
166 	 * Sanity check that no peer holds a pointer to this guy
167 	 */
168 
169 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
170 				   smd->owner_peers.head) {
171 		lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list);
172 
173 		if (xpr->tail == msg) {
174 			lwsl_cx_err(cx, "peer %p has msg %p "
175 				 "we are about to destroy as tail", xpr, msg);
176 #if !defined(LWS_PLAT_FREERTOS)
177 			assert(0);
178 #endif
179 		}
180 
181 	} lws_end_foreach_dll_safe(p, p1);
182 
183 	/*
184 	 * We have fully delivered the message now, it
185 	 * can be unlinked and destroyed
186 	 */
187 	lwsl_cx_info(cx, "destroy msg %p", msg);
188 	lws_dll2_remove(&msg->list);
189 	lws_free(msg);
190 }
191 
192 /*
193  * This is wanting to be threadsafe, limiting the apis we can call
194  */
195 
196 int
_lws_smd_msg_send(struct lws_context * ctx,void * pay,struct lws_smd_peer * exc)197 _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
198 {
199 	lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
200 				LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
201 
202 	if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) {
203 		lwsl_cx_warn(ctx, "rejecting message on queue depth %d",
204 				  (int)ctx->smd.owner_messages.count);
205 		/* reject the message due to max queue depth reached */
206 		return 1;
207 	}
208 
209 	if (!ctx->smd.delivering &&
210 	    lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
211 		return 1; /* For Coverity */
212 
213 	if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
214 		goto bail;
215 
216 	msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(
217 							&ctx->smd, msg, exc);
218 	if (!msg->refcount) {
219 		/* possible, condsidering exc and no other participants */
220 		lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
221 
222 		lws_free(msg);
223 		if (!ctx->smd.delivering)
224 			lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
225 
226 		return 0;
227 	}
228 
229 	msg->exc = exc;
230 
231 	/* let's add him on the queue... */
232 
233 	lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
234 
235 	/*
236 	 * Any peer with no active tail needs to check our class to see if we
237 	 * should become his tail
238 	 */
239 
240 	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
241 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
242 
243 		if (pr != exc &&
244                    !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) {
245 			pr->tail = msg;
246 			/* tail message has to actually be of interest to the peer */
247 			assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
248 		}
249 
250 	} lws_end_foreach_dll(p);
251 
252 #if defined(LWS_SMD_DEBUG)
253 	lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__,
254 		 msg, msg->refcount, ctx->smd.owner_messages.count);
255 	lws_smd_dump(&ctx->smd);
256 #endif
257 
258 	lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
259 
260 bail:
261 	if (!ctx->smd.delivering)
262 		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
263 
264 	/* we may be happening from another thread context */
265 	lws_cancel_service(ctx);
266 
267 	return 0;
268 }
269 
270 /*
271  * This is wanting to be threadsafe, limiting the apis we can call
272  */
273 
274 int
lws_smd_msg_send(struct lws_context * ctx,void * pay)275 lws_smd_msg_send(struct lws_context *ctx, void *pay)
276 {
277 	return _lws_smd_msg_send(ctx, pay, NULL);
278 }
279 
280 /*
281  * This is wanting to be threadsafe, limiting the apis we can call
282  */
283 
284 int
lws_smd_msg_printf(struct lws_context * ctx,lws_smd_class_t _class,const char * format,...)285 lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
286 		   const char *format, ...)
287 {
288 	lws_smd_msg_t *msg;
289 	va_list ap;
290 	void *p;
291 	int n;
292 
293 	if (!(ctx->smd._class_filter & _class))
294 		/*
295 		 * There's nobody interested in messages of this class atm.
296 		 * Don't bother generating it, and act like all is well.
297 		 */
298 		return 0;
299 
300 	va_start(ap, format);
301 	n = vsnprintf(NULL, 0, format, ap);
302 	va_end(ap);
303 	if (n > LWS_SMD_MAX_PAYLOAD)
304 		/* too large to send */
305 		return 1;
306 
307 	p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
308 	if (!p)
309 		return 1;
310 	msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
311 								sizeof(*msg));
312 	msg->length = (uint16_t)n;
313 	va_start(ap, format);
314 	vsnprintf((char *)p, (unsigned int)n + 2, format, ap);
315 	va_end(ap);
316 
317 	/*
318 	 * locks taken and released in here
319 	 */
320 
321 	if (lws_smd_msg_send(ctx, p)) {
322 		lws_smd_msg_free(&p);
323 		return 1;
324 	}
325 
326 	return 0;
327 }
328 
329 #if defined(LWS_WITH_SECURE_STREAMS)
330 int
lws_smd_ss_msg_printf(const char * tag,uint8_t * buf,size_t * len,lws_smd_class_t _class,const char * format,...)331 lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
332 		      lws_smd_class_t _class, const char *format, ...)
333 {
334 	char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN;
335 	va_list ap;
336 	int n;
337 
338 	if (*len < LWS_SMD_SS_RX_HEADER_LEN)
339 		return 1;
340 
341 	lws_ser_wu64be(buf, _class);
342 	lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
343 
344 	va_start(ap, format);
345 	n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap);
346 	va_end(ap);
347 
348 	if (n > LWS_SMD_MAX_PAYLOAD ||
349 	    (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN)
350 		/* too large to send */
351 		return 1;
352 
353 	*len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n;
354 
355 	lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class,
356 			(unsigned int)n);
357 
358 	return 0;
359 }
360 
361 /*
362  * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can
363  * call through to with the payload it received from the proxy.  It will then
364  * forward the recieved SMD message to all local (same-context) participants
365  * that are interested in that class (except ones with callback skip_cb, so
366  * we don't loop).
367  */
368 
369 static int
_lws_smd_ss_rx_forward(struct lws_context * ctx,const char * tag,struct lws_smd_peer * pr,const uint8_t * buf,size_t len)370 _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
371 		       struct lws_smd_peer *pr, const uint8_t *buf, size_t len)
372 {
373 	lws_smd_class_t _class;
374 	lws_smd_msg_t *msg;
375 	void *p;
376 
377 	if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF)
378 		return 1;
379 
380 	if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF)
381 		return 1;
382 
383 	_class = (lws_smd_class_t)lws_ser_ru64be(buf);
384 
385 	if (_class == LWSSMDCL_METRICS) {
386 
387 	}
388 
389 	/* only locally forward messages that we care about in this process */
390 
391 	if (!(ctx->smd._class_filter & _class))
392 		/*
393 		 * There's nobody interested in messages of this class atm.
394 		 * Don't bother generating it, and act like all is well.
395 		 */
396 		return 0;
397 
398 	p = lws_smd_msg_alloc(ctx, _class, len);
399 	if (!p)
400 		return 1;
401 
402 	msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
403 								sizeof(*msg));
404 	msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF);
405 	/* adopt the original source timestamp, not time we forwarded it */
406 	msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8);
407 
408 	/* copy the message payload in */
409 	memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length);
410 
411 	/*
412 	 * locks taken and released in here
413 	 */
414 
415 	if (_lws_smd_msg_send(ctx, p, pr)) {
416 		/* we couldn't send it after all that... */
417 		lws_smd_msg_free(&p);
418 
419 		return 1;
420 	}
421 
422 	lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
423 		    tag, (unsigned int)_class, msg->length,
424 		    (unsigned long long)msg->timestamp);
425 
426 	return 0;
427 }
428 
429 int
lws_smd_ss_rx_forward(void * ss_user,const uint8_t * buf,size_t len)430 lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
431 {
432 	struct lws_ss_handle *h = (struct lws_ss_handle *)
433 					(((char *)ss_user) - sizeof(*h));
434 	struct lws_context *ctx = lws_ss_get_context(h);
435 
436 	return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len);
437 }
438 
439 #if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)
440 int
lws_smd_sspc_rx_forward(void * ss_user,const uint8_t * buf,size_t len)441 lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
442 {
443 	struct lws_sspc_handle *h = (struct lws_sspc_handle *)
444 					(((char *)ss_user) - sizeof(*h));
445 	struct lws_context *ctx = lws_sspc_get_context(h);
446 
447 	return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len);
448 }
449 #endif
450 
451 #endif
452 
453 /*
454  * Peers that deregister need to adjust the refcount of messages they would
455  * have been interested in, but didn't take delivery of yet
456  */
457 
458 static void
_lws_smd_peer_destroy(lws_smd_peer_t * pr)459 _lws_smd_peer_destroy(lws_smd_peer_t *pr)
460 {
461 	lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
462 					  owner_peers);
463 
464 	if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */
465 		return; /* For Coverity */
466 
467 	lws_dll2_remove(&pr->list);
468 
469 	/*
470 	 * We take the approach to adjust the refcount of every would-have-been
471 	 * delivered message we were interested in
472 	 */
473 
474 	while (pr->tail) {
475 
476 		lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next,
477 							lws_smd_msg_t, list);
478 
479 		if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) {
480 			if (!--pr->tail->refcount)
481 				_lws_smd_msg_destroy(pr->ctx, smd, pr->tail);
482 		}
483 
484 		pr->tail = m1;
485 	}
486 
487 	lws_free(pr);
488 
489 	lws_mutex_unlock(smd->lock_messages); /* messages ------- */
490 }
491 
492 static lws_smd_msg_t *
_lws_smd_msg_next_matching_filter(lws_smd_peer_t * pr)493 _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
494 {
495 	lws_dll2_t *tail = &pr->tail->list;
496 	lws_smd_msg_t *msg;
497 
498 	do {
499 		tail = tail->next;
500 		if (!tail)
501 			return NULL;
502 
503 		msg = lws_container_of(tail, lws_smd_msg_t, list);
504 		if (msg->exc != pr &&
505 		    _lws_smd_msg_peer_interested_in_msg(pr, msg))
506 			return msg;
507 	} while (1);
508 
509 	return NULL;
510 }
511 
512 /*
513  * Delivers only one message to the peer and advances the tail, or sets to NULL
514  * if no more filtered queued messages.  Returns nonzero if tail non-NULL.
515  *
516  * For Proxied SS, only asks for writeable and does not advance or change the
517  * tail.
518  *
519  * This is done so if multiple messages queued, we don't get a situation where
520  * one participant gets them all spammed, then the next etc.  Instead they are
521  * delivered round-robin.
522  *
523  * Requires peer lock, may take message lock
524  */
525 
526 static int
_lws_smd_msg_deliver_peer(struct lws_context * ctx,lws_smd_peer_t * pr)527 _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
528 {
529 	lws_smd_msg_t *msg;
530 
531 	if (!pr->tail)
532 		return 0;
533 
534 	msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
535 
536 
537 	lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, refc %d, to peer %p",
538 		    (unsigned int)msg->_class, (int)msg->length,
539 		    (int)msg->refcount, pr);
540 
541 	pr->cb(pr->opaque, msg->_class, msg->timestamp,
542 	       ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
543 	       (size_t)msg->length);
544 
545 	assert(msg->refcount);
546 
547 	/*
548 	 * If there is one, move forward to the next queued
549 	 * message that meets the filters of this peer
550 	 */
551 	pr->tail = _lws_smd_msg_next_matching_filter(pr);
552 
553 	/* tail message has to actually be of interest to the peer */
554 	assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
555 
556 	if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */
557 		return 1; /* For Coverity */
558 
559 	if (!--msg->refcount)
560 		_lws_smd_msg_destroy(ctx, &ctx->smd, msg);
561 	lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
562 
563 	return !!pr->tail;
564 }
565 
566 /*
567  * Called when the event loop could deliver messages synchronously, eg, on
568  * entry to idle
569  */
570 
571 int
lws_smd_msg_distribute(struct lws_context * ctx)572 lws_smd_msg_distribute(struct lws_context *ctx)
573 {
574 	char more;
575 
576 	/* commonly, no messages and nothing to do... */
577 
578 	if (!ctx->smd.owner_messages.count)
579 		return 0;
580 
581 	ctx->smd.delivering = 1;
582 
583 	do {
584 		more = 0;
585 		if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
586 			return 1; /* For Coverity */
587 
588 		lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
589 					   ctx->smd.owner_peers.head) {
590 			lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
591 
592 			more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr));
593 
594 		} lws_end_foreach_dll_safe(p, p1);
595 
596 		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
597 	} while (more);
598 
599 	ctx->smd.delivering = 0;
600 
601 	return 0;
602 }
603 
604 struct lws_smd_peer *
lws_smd_register(struct lws_context * ctx,void * opaque,int flags,lws_smd_class_t _class_filter,lws_smd_notification_cb_t cb)605 lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
606 		 lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
607 {
608 	lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
609 
610 	if (!pr)
611 		return NULL;
612 
613 	pr->cb = cb;
614 	pr->opaque = opaque;
615 	pr->_class_filter = _class_filter;
616 	pr->ctx = ctx;
617 
618 	if (!ctx->smd.delivering &&
619 	    lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */
620 			lws_free(pr);
621 			return NULL; /* For Coverity */
622 		}
623 
624 	/*
625 	 * Let's lock the message list before adding this peer... because...
626 	 */
627 
628 	if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */
629 		lws_free(pr);
630 		pr = NULL;
631 		goto bail1; /* For Coverity */
632 	}
633 
634 	lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
635 
636 	/* update the global class mask union to account for new peer mask */
637 	_lws_smd_class_mask_union(&ctx->smd);
638 
639 	/*
640 	 * Now there's a new peer added, any messages we have stashed will try
641 	 * to deliver to this guy too, if he's interested in that class.  So we
642 	 * have to update the message refcounts for queued messages-he's-
643 	 * interested-in accordingly.
644 	 */
645 
646 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
647 				   ctx->smd.owner_messages.head) {
648 		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
649 
650 		if (_lws_smd_msg_peer_interested_in_msg(pr, msg))
651 			msg->refcount++;
652 
653 	} lws_end_foreach_dll_safe(p, p1);
654 
655 	/* ... ok we are done adding the peer */
656 
657 	lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
658 
659 	lwsl_cx_info(ctx, "peer %p (count %u) registered", pr,
660 			(unsigned int)ctx->smd.owner_peers.count);
661 
662 bail1:
663 	if (!ctx->smd.delivering)
664 		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
665 
666 	return pr;
667 }
668 
669 void
lws_smd_unregister(struct lws_smd_peer * pr)670 lws_smd_unregister(struct lws_smd_peer *pr)
671 {
672 	lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
673 
674 	if (!smd->delivering &&
675 	    lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */
676 		return; /* For Coverity */
677 	lwsl_cx_notice(pr->ctx, "destroying peer %p", pr);
678 	_lws_smd_peer_destroy(pr);
679 	if (!smd->delivering)
680 		lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */
681 }
682 
683 int
lws_smd_message_pending(struct lws_context * ctx)684 lws_smd_message_pending(struct lws_context *ctx)
685 {
686 	int ret = 1;
687 
688 	/*
689 	 * First cheaply check the common case no messages pending, so there's
690 	 * definitely nothing for this tsi or anything else
691 	 */
692 
693 	if (!ctx->smd.owner_messages.count)
694 		return 0;
695 
696 	/*
697 	 * If there are any messages, check their age and expire ones that
698 	 * have been hanging around too long
699 	 */
700 
701 	if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */
702 		return 1; /* For Coverity */
703 	if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
704 		goto bail; /* For Coverity */
705 
706 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
707 				   ctx->smd.owner_messages.head) {
708 		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
709 
710 		if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) {
711 			lwsl_cx_warn(ctx, "timing out queued message %p",
712 					msg);
713 
714 			/*
715 			 * We're forcibly yanking this guy, we can expect that
716 			 * there might be peers that point to it as their tail.
717 			 *
718 			 * In that case, move their tails on to the next guy
719 			 * they are interested in, if any.
720 			 */
721 
722 			lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1,
723 						   ctx->smd.owner_peers.head) {
724 				lws_smd_peer_t *pr = lws_container_of(pp,
725 							lws_smd_peer_t, list);
726 
727 				if (pr->tail == msg)
728 					pr->tail = _lws_smd_msg_next_matching_filter(pr);
729 
730 			} lws_end_foreach_dll_safe(pp, pp1);
731 
732 			/*
733 			 * No peer should fall foul of the peer tail checks
734 			 * when destroying the message now.
735 			 */
736 
737 			_lws_smd_msg_destroy(ctx, &ctx->smd, msg);
738 		}
739 	} lws_end_foreach_dll_safe(p, p1);
740 
741 	lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
742 
743 	/*
744 	 * Walk the peer list
745 	 */
746 
747 	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
748 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
749 
750 		if (pr->tail)
751 			goto bail;
752 
753 	} lws_end_foreach_dll(p);
754 
755 	/*
756 	 * There's no message pending that we need to handle
757 	 */
758 
759 	ret = 0;
760 
761 bail:
762 	lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
763 
764 	return ret;
765 }
766 
767 int
_lws_smd_destroy(struct lws_context * ctx)768 _lws_smd_destroy(struct lws_context *ctx)
769 {
770 	/* stop any message creation */
771 
772 	ctx->smd._class_filter = 0;
773 
774 	/*
775 	 * Walk the message list, destroying them
776 	 */
777 
778 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
779 				   ctx->smd.owner_messages.head) {
780 		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
781 
782 		lws_dll2_remove(&msg->list);
783 		lws_free(msg);
784 
785 	} lws_end_foreach_dll_safe(p, p1);
786 
787 	/*
788 	 * Walk the peer list, destroying them
789 	 */
790 
791 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
792 				   ctx->smd.owner_peers.head) {
793 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
794 
795 		pr->tail = NULL; /* we just nuked all the messages, ignore */
796 		_lws_smd_peer_destroy(pr);
797 
798 	} lws_end_foreach_dll_safe(p, p1);
799 
800 	lws_mutex_destroy(ctx->smd.lock_messages);
801 	lws_mutex_destroy(ctx->smd.lock_peers);
802 
803 	return 0;
804 }
805