1 /*
2 * libwebsockets-test-server - libwebsockets test implementation
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * The person who associated a work with this deed has dedicated
10 * the work to the public domain by waiving all of his or her rights
11 * to the work worldwide under copyright law, including all related
12 * and neighboring rights, to the extent allowed by law. You can copy,
13 * modify, distribute and perform the work, even for commercial purposes,
14 * all without asking permission.
15 *
16 * The test apps are intended to be adapted for use in your code, which
17 * may be proprietary. So unlike the library itself, they are licensed
18 * Public Domain.
19 *
20 * Notice that the lws_pthread... locking apis are all zero-footprint
21 * NOPs in the case LWS_MAX_SMP == 1, which is the default. When lws
22 * is built for multiple service threads though, they resolve to their
23 * pthreads equivalents.
24 */
25
26 #if !defined (LWS_PLUGIN_STATIC)
27 #if !defined(LWS_DLL)
28 #define LWS_DLL
29 #endif
30 #if !defined(LWS_INTERNAL)
31 #define LWS_INTERNAL
32 #endif
33 #include <libwebsockets.h>
34 #endif
35
36 #include <string.h>
37 #include <stdlib.h>
38
39 #define QUEUELEN 32
40 /* queue free space below this, rx flow is disabled */
41 #define RXFLOW_MIN (4)
42 /* queue free space above this, rx flow is enabled */
43 #define RXFLOW_MAX ((2 * QUEUELEN) / 3)
44
45 #define MAX_MIRROR_INSTANCES 3
46
47 struct mirror_instance;
48
49 struct per_session_data__lws_mirror {
50 struct lws *wsi;
51 struct mirror_instance *mi;
52 struct per_session_data__lws_mirror *same_mi_pss_list;
53 uint32_t tail;
54 };
55
56 /* this is the element in the ring */
57 struct a_message {
58 void *payload;
59 size_t len;
60 };
61
62 struct mirror_instance {
63 struct mirror_instance *next;
64 lws_pthread_mutex(lock) /* protects all mirror instance data */
65 struct per_session_data__lws_mirror *same_mi_pss_list;
66 /**< must hold the the per_vhost_data__lws_mirror.lock as well
67 * to change mi list membership */
68 struct lws_ring *ring;
69 int messages_allocated;
70 char name[30];
71 char rx_enabled;
72 };
73
74 struct per_vhost_data__lws_mirror {
75 lws_pthread_mutex(lock) /* protects mi_list membership changes */
76 struct mirror_instance *mi_list;
77 };
78
79
80 /* enable or disable rx from all connections to this mirror instance */
81 static void
__mirror_rxflow_instance(struct mirror_instance * mi,int enable)82 __mirror_rxflow_instance(struct mirror_instance *mi, int enable)
83 {
84 lws_start_foreach_ll(struct per_session_data__lws_mirror *,
85 pss, mi->same_mi_pss_list) {
86 lws_rx_flow_control(pss->wsi, enable);
87 } lws_end_foreach_ll(pss, same_mi_pss_list);
88
89 mi->rx_enabled = (char)enable;
90 }
91
92 /*
93 * Find out which connection to this mirror instance has the longest number
94 * of still unread elements in the ringbuffer and update the lws_ring "oldest
95 * tail" with it. Elements behind the "oldest tail" are freed and recycled for
96 * new head content. Elements after the "oldest tail" are still waiting to be
97 * read by somebody.
98 *
99 * If the oldest tail moved on from before, check if it created enough space
100 * in the queue to re-enable RX flow control for the mirror instance.
101 *
102 * Mark connections that are at the oldest tail as being on a 3s timeout to
103 * transmit something, otherwise the connection will be closed. Without this,
104 * a choked or nonresponsive connection can block the FIFO from freeing up any
105 * new space for new data.
106 *
107 * You can skip calling this if on your connection, before processing, the tail
108 * was not equal to the current worst, ie, if the tail you will work on is !=
109 * lws_ring_get_oldest_tail(ring) then no need to call this when the tail
110 * has changed; it wasn't the oldest so it won't change the oldest.
111 *
112 * Returns 0 if oldest unchanged or 1 if oldest changed from this call.
113 */
114 static int
__mirror_update_worst_tail(struct mirror_instance * mi)115 __mirror_update_worst_tail(struct mirror_instance *mi)
116 {
117 uint32_t wai, worst = 0, worst_tail = 0, oldest;
118 struct per_session_data__lws_mirror *worst_pss = NULL;
119
120 oldest = lws_ring_get_oldest_tail(mi->ring);
121
122 lws_start_foreach_ll(struct per_session_data__lws_mirror *,
123 pss, mi->same_mi_pss_list) {
124 wai = (uint32_t)lws_ring_get_count_waiting_elements(mi->ring,
125 &pss->tail);
126 if (wai >= worst) {
127 worst = wai;
128 worst_tail = pss->tail;
129 worst_pss = pss;
130 }
131 } lws_end_foreach_ll(pss, same_mi_pss_list);
132
133 if (!worst_pss)
134 return 0;
135
136 lws_ring_update_oldest_tail(mi->ring, worst_tail);
137 if (oldest == lws_ring_get_oldest_tail(mi->ring))
138 return 0;
139 /*
140 * The oldest tail did move on. Check if we should re-enable rx flow
141 * for the mirror instance since we made some space now.
142 */
143 if (!mi->rx_enabled && /* rx is disabled */
144 lws_ring_get_count_free_elements(mi->ring) >= RXFLOW_MAX)
145 /* there is enough space, let's re-enable rx for our instance */
146 __mirror_rxflow_instance(mi, 1);
147
148 /* if nothing in queue, no timeout needed */
149 if (!worst)
150 return 1;
151
152 /*
153 * The guy(s) with the oldest tail block the ringbuffer from recycling
154 * the FIFO entries he has not read yet. Don't allow those guys to
155 * block the FIFO operation for very long.
156 */
157 lws_start_foreach_ll(struct per_session_data__lws_mirror *,
158 pss, mi->same_mi_pss_list) {
159 if (pss->tail == worst_tail)
160 /*
161 * Our policy is if you are the slowest connection,
162 * you had better transmit something to help with that
163 * within 3s, or we will hang up on you to stop you
164 * blocking the FIFO for everyone else.
165 */
166 lws_set_timeout(pss->wsi,
167 PENDING_TIMEOUT_USER_REASON_BASE, 3);
168 } lws_end_foreach_ll(pss, same_mi_pss_list);
169
170 return 1;
171 }
172
173 static void
__mirror_callback_all_in_mi_on_writable(struct mirror_instance * mi)174 __mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi)
175 {
176 /* ask for WRITABLE callback for every wsi on this mi */
177 lws_start_foreach_ll(struct per_session_data__lws_mirror *,
178 pss, mi->same_mi_pss_list) {
179 lws_callback_on_writable(pss->wsi);
180 } lws_end_foreach_ll(pss, same_mi_pss_list);
181 }
182
183 static void
__mirror_destroy_message(void * _msg)184 __mirror_destroy_message(void *_msg)
185 {
186 struct a_message *msg = _msg;
187
188 free(msg->payload);
189 msg->payload = NULL;
190 msg->len = 0;
191 }
192
193 static int
callback_lws_mirror(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)194 callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
195 void *user, void *in, size_t len)
196 {
197 struct per_session_data__lws_mirror *pss =
198 (struct per_session_data__lws_mirror *)user;
199 struct per_vhost_data__lws_mirror *v =
200 (struct per_vhost_data__lws_mirror *)
201 lws_protocol_vh_priv_get(lws_get_vhost(wsi),
202 lws_get_protocol(wsi));
203 char name[300], update_worst, sent_something, *pn = name;
204 struct mirror_instance *mi = NULL;
205 const struct a_message *msg;
206 struct a_message amsg;
207 uint32_t oldest_tail;
208 int n, count_mi = 0;
209
210 switch (reason) {
211 case LWS_CALLBACK_ESTABLISHED:
212 lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__);
213 if (!v) {
214 lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
215 lws_get_protocol(wsi),
216 sizeof(struct per_vhost_data__lws_mirror));
217 v = (struct per_vhost_data__lws_mirror *)
218 lws_protocol_vh_priv_get(lws_get_vhost(wsi),
219 lws_get_protocol(wsi));
220 lws_pthread_mutex_init(&v->lock);
221 }
222
223 /*
224 * mirror instance name... defaults to "", but if URL includes
225 * "?mirror=xxx", will be "xxx"
226 */
227
228 if (lws_get_urlarg_by_name_safe(wsi, "mirror", name,
229 sizeof(name) - 1) < 0) {
230 lwsl_debug("get urlarg failed\n");
231 name[0] = '\0';
232 }
233
234 //lwsl_notice("%s: mirror name '%s'\n", __func__, pn);
235
236 /* is there already a mirror instance of this name? */
237
238 lws_pthread_mutex_lock(&v->lock); /* vhost lock { */
239
240 lws_start_foreach_ll(struct mirror_instance *, mi1,
241 v->mi_list) {
242 count_mi++;
243 if (!strcmp(pn, mi1->name)) {
244 /* yes... we will join it */
245 mi = mi1;
246 break;
247 }
248 } lws_end_foreach_ll(mi1, next);
249
250 if (!mi) {
251
252 /* no existing mirror instance for name */
253 if (count_mi == MAX_MIRROR_INSTANCES) {
254 lws_pthread_mutex_unlock(&v->lock); /* } vh lock */
255 return -1;
256 }
257
258 /* create one with this name, and join it */
259 mi = malloc(sizeof(*mi));
260 if (!mi)
261 goto bail1;
262 memset(mi, 0, sizeof(*mi));
263 mi->ring = lws_ring_create(sizeof(struct a_message),
264 QUEUELEN,
265 __mirror_destroy_message);
266 if (!mi->ring) {
267 free(mi);
268 goto bail1;
269 }
270
271 mi->next = v->mi_list;
272 v->mi_list = mi;
273 lws_snprintf(mi->name, sizeof(mi->name) - 1, "%s", pn);
274 mi->rx_enabled = 1;
275
276 lws_pthread_mutex_init(&mi->lock);
277
278 lwsl_notice("Created new mi %p '%s'\n", mi, pn);
279 }
280
281 /* add our pss to list of guys bound to this mi */
282
283 lws_ll_fwd_insert(pss, same_mi_pss_list, mi->same_mi_pss_list);
284
285 /* init the pss */
286
287 pss->mi = mi;
288 pss->tail = lws_ring_get_oldest_tail(mi->ring);
289 pss->wsi = wsi;
290
291 lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */
292 break;
293
294 bail1:
295 lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */
296 return 1;
297
298 case LWS_CALLBACK_CLOSED:
299 /* detach our pss from the mirror instance */
300 mi = pss->mi;
301 if (!mi)
302 break;
303
304 lws_pthread_mutex_lock(&v->lock); /* vhost lock { */
305
306 /* remove our closing pss from its mirror instance list */
307 lws_ll_fwd_remove(struct per_session_data__lws_mirror,
308 same_mi_pss_list, pss, mi->same_mi_pss_list);
309 pss->mi = NULL;
310
311 if (mi->same_mi_pss_list) {
312 /*
313 * Still other pss using the mirror instance. The pss
314 * going away may have had the oldest tail, reconfirm
315 * using the remaining pss what is the current oldest
316 * tail. If the oldest tail moves on, this call also
317 * will re-enable rx flow control when appropriate.
318 */
319 lws_pthread_mutex_lock(&mi->lock); /* mi lock { */
320 __mirror_update_worst_tail(mi);
321 lws_pthread_mutex_unlock(&mi->lock); /* } mi lock */
322 lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */
323 break;
324 }
325
326 /* No more pss using the mirror instance... delete mi */
327
328 lws_start_foreach_llp(struct mirror_instance **,
329 pmi, v->mi_list) {
330 if (*pmi == mi) {
331 *pmi = (*pmi)->next;
332
333 lws_ring_destroy(mi->ring);
334 lws_pthread_mutex_destroy(&mi->lock);
335
336 free(mi);
337 break;
338 }
339 } lws_end_foreach_llp(pmi, next);
340
341 lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */
342 break;
343
344 case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY:
345 return 1; /* disallow compression */
346
347 case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */
348 if (!v) {
349 lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
350 lws_get_protocol(wsi),
351 sizeof(struct per_vhost_data__lws_mirror));
352 v = (struct per_vhost_data__lws_mirror *)
353 lws_protocol_vh_priv_get(lws_get_vhost(wsi),
354 lws_get_protocol(wsi));
355 if (!v)
356 return 0;
357 lws_pthread_mutex_init(&v->lock);
358 }
359 break;
360
361 case LWS_CALLBACK_PROTOCOL_DESTROY:
362 lws_pthread_mutex_destroy(&v->lock);
363 break;
364
365 case LWS_CALLBACK_SERVER_WRITEABLE:
366 lws_pthread_mutex_lock(&pss->mi->lock); /* instance lock { */
367 oldest_tail = lws_ring_get_oldest_tail(pss->mi->ring);
368 update_worst = oldest_tail == pss->tail;
369 sent_something = 0;
370
371 do {
372 msg = lws_ring_get_element(pss->mi->ring, &pss->tail);
373 if (!msg)
374 break;
375
376 if (!msg->payload) {
377 lwsl_err("%s: NULL payload: worst = %d,"
378 " pss->tail = %d\n", __func__,
379 oldest_tail, pss->tail);
380 if (lws_ring_consume(pss->mi->ring, &pss->tail,
381 NULL, 1))
382 continue;
383 break;
384 }
385
386 n = lws_write(wsi, (unsigned char *)msg->payload +
387 LWS_PRE, msg->len, LWS_WRITE_TEXT);
388 if (n < 0) {
389 lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
390
391 goto bail2;
392 }
393 sent_something = 1;
394 lws_ring_consume(pss->mi->ring, &pss->tail, NULL, 1);
395
396 } while (!lws_send_pipe_choked(wsi));
397
398 /* if any left for us to send, ask for writeable again */
399 if (lws_ring_get_count_waiting_elements(pss->mi->ring,
400 &pss->tail))
401 lws_callback_on_writable(wsi);
402
403 if (!sent_something || !update_worst)
404 goto done1;
405
406 /*
407 * We are no longer holding the oldest tail (since we sent
408 * something. So free us of the timeout related to hogging the
409 * oldest tail.
410 */
411 lws_set_timeout(pss->wsi, NO_PENDING_TIMEOUT, 0);
412 /*
413 * If we were originally at the oldest fifo position of
414 * all the tails, now we used some up we may have
415 * changed the oldest fifo position and made some space.
416 */
417 __mirror_update_worst_tail(pss->mi);
418
419 done1:
420 lws_pthread_mutex_unlock(&pss->mi->lock); /* } instance lock */
421 break;
422
423 bail2:
424 lws_pthread_mutex_unlock(&pss->mi->lock); /* } instance lock */
425
426 return -1;
427
428 case LWS_CALLBACK_RECEIVE:
429 lws_pthread_mutex_lock(&pss->mi->lock); /* mi lock { */
430 n = (int)lws_ring_get_count_free_elements(pss->mi->ring);
431 if (!n) {
432 lwsl_notice("dropping!\n");
433 if (pss->mi->rx_enabled)
434 __mirror_rxflow_instance(pss->mi, 0);
435 goto req_writable;
436 }
437
438 amsg.payload = malloc(LWS_PRE + len);
439 amsg.len = len;
440 if (!amsg.payload) {
441 lwsl_notice("OOM: dropping\n");
442 goto done2;
443 }
444
445 memcpy((char *)amsg.payload + LWS_PRE, in, len);
446 if (!lws_ring_insert(pss->mi->ring, &amsg, 1)) {
447 __mirror_destroy_message(&amsg);
448 lwsl_notice("dropping!\n");
449 if (pss->mi->rx_enabled)
450 __mirror_rxflow_instance(pss->mi, 0);
451 goto req_writable;
452 }
453
454 if (pss->mi->rx_enabled &&
455 lws_ring_get_count_free_elements(pss->mi->ring) <
456 RXFLOW_MIN)
457 __mirror_rxflow_instance(pss->mi, 0);
458
459 req_writable:
460 __mirror_callback_all_in_mi_on_writable(pss->mi);
461
462 done2:
463 lws_pthread_mutex_unlock(&pss->mi->lock); /* } mi lock */
464 break;
465
466 case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
467 lwsl_info("LWS_CALLBACK_EVENT_WAIT_CANCELLED\n");
468 break;
469
470 default:
471 break;
472 }
473
474 return 0;
475 }
476
477 #define LWS_PLUGIN_PROTOCOL_MIRROR { \
478 "lws-mirror-protocol", \
479 callback_lws_mirror, \
480 sizeof(struct per_session_data__lws_mirror), \
481 4096, /* rx buf size must be >= permessage-deflate rx size */ \
482 0, NULL, 0 \
483 }
484
485 #if !defined (LWS_PLUGIN_STATIC)
486
487 LWS_VISIBLE const struct lws_protocols lws_mirror_protocols[] = {
488 LWS_PLUGIN_PROTOCOL_MIRROR
489 };
490
491 LWS_VISIBLE const lws_plugin_protocol_t lws_mirror = {
492 .hdr = {
493 "lws mirror",
494 "lws_protocol_plugin",
495 LWS_BUILD_HASH,
496 LWS_PLUGIN_API_MAGIC
497 },
498
499 .protocols = lws_mirror_protocols,
500 .count_protocols = LWS_ARRAY_SIZE(lws_mirror_protocols),
501 .extensions = NULL,
502 .count_extensions = 0,
503 };
504
505 #endif
506