1 /*
2 * libwebsockets-test-server - libwebsockets test implementation
3 *
4 * Written in 2010-2021 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * The person who associated a work with this deed has dedicated
10 * the work to the public domain by waiving all of his or her rights
11 * to the work worldwide under copyright law, including all related
12 * and neighboring rights, to the extent allowed by law. You can copy,
13 * modify, distribute and perform the work, even for commercial purposes,
14 * all without asking permission.
15 *
16 * The test apps are intended to be adapted for use in your code, which
17 * may be proprietary. So unlike the library itself, they are licensed
18 * Public Domain.
19 *
20 * Scrapeable, proxiable OpenMetrics metrics (compatible with Prometheus)
21 *
22 * https://tools.ietf.org/html/draft-richih-opsawg-openmetrics-00
23 *
24 * This plugin provides four protocols related to openmetrics handling:
25 *
26 * 1) "lws-openmetrics" direct http listener so scraper can directly get metrics
27 *
28 * 2) "lws-openmetrics-prox-agg" metrics proxy server that scraper can connect
29 * to locally to proxy through to connected remote clients at 3)
30 *
31 * 3) "lws-openmetrics-prox-server" metrics proxy server that remote clients can
32 * connect to, providing a path where scrapers at 2) can get metrics from
33 * clients connected us
34 *
35 * 4) "lws-openmetrics-prox-client" nailed-up metrics proxy client that tries to
36 * keep up a connection to the server at 3), allowing to scraper to reach
37 * clients that have no reachable way to serve.
38 *
39 * These are provided like this to maximize flexibility in being able to add
40 * openmetrics serving, proxying, or client->proxy to existing lws code.
41 *
42 * Openmetrics supports a "metric" at the top of its report that describes the
43 * source aka "target metadata".
44 *
45 * Since we want to enable collection from devices that are not externally
46 * reachable, we must provide a reachable server that the clients can attach to
47 * and have their stats aggregated and then read by Prometheus or whatever.
48 * Openmetrics says that it wants to present the aggregated stats in a flat
49 * summary with only the aggregator's "target metadata" and contributor targets
50 * getting their data tagged with the source
51 *
52 * "The above discussion is in the context of individual exposers. An
53 * exposition from a general purpose monitoring system may contain
54 * metrics from many individual targets, and thus may expose multiple
55 * target info Metrics. The metrics may already have had target
56 * metadata added to them as labels as part of ingestion. The metric
57 * names MUST NOT be varied based on target metadata. For example it
58 * would be incorrect for all metrics to end up being prefixed with
59 * staging_ even if they all originated from targets in a staging
60 * environment)."
61 */
62
63 #if !defined (LWS_PLUGIN_STATIC)
64 #if !defined(LWS_DLL)
65 #define LWS_DLL
66 #endif
67 #if !defined(LWS_INTERNAL)
68 #define LWS_INTERNAL
69 #endif
70 #include <libwebsockets.h>
71 #endif
72 #include <string.h>
73 #include <stdlib.h>
74 #include <sys/stat.h>
75 #include <fcntl.h>
76 #if !defined(WIN32)
77 #include <unistd.h>
78 #endif
79 #include <assert.h>
80
81 struct vhd {
82 struct lws_context *cx;
83 struct lws_vhost *vhost;
84
85 char ws_server_uri[128];
86 char metrics_proxy_path[128];
87 char ba_secret[128];
88
89 const char *proxy_side_bind_name;
90 /**< name used to bind the two halves of the proxy together, must be
91 * the same name given in a pvo for both "lws-openmetrics-prox-agg"
92 * (the side local to the scraper) and "lws-openmetrics-prox-server"
93 * (the side the clients connect to)
94 */
95
96 char sanity[8];
97
98 lws_dll2_owner_t clients;
99
100 lws_sorted_usec_list_t sul; /* schedule connection retry */
101
102 struct vhd *bind_partner_vhd;
103
104 struct lws *wsi; /* related wsi if any */
105 uint16_t retry_count; /* count of consequetive retries */
106 };
107
108 struct pss {
109 lws_dll2_t list;
110 char proxy_path[64];
111 struct lwsac *ac; /* the translated metrics, one ac per line */
112 struct lwsac *walk; /* iterator for ac when writing */
113 size_t tot; /* content-length computation */
114 struct lws *wsi;
115
116 uint8_t greet:1; /* set if client needs to send proxy path */
117 uint8_t trigger:1; /* we want to ask the client to dump */
118 };
119
120 #if defined(LWS_WITH_CLIENT)
121 static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };
122
123 static const lws_retry_bo_t retry = {
124 .retry_ms_table = backoff_ms,
125 .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
126 .conceal_count = LWS_ARRAY_SIZE(backoff_ms),
127
128 .secs_since_valid_ping = 400, /* force PINGs after secs idle */
129 .secs_since_valid_hangup = 400, /* hangup after secs idle */
130
131 .jitter_percent = 0,
132 };
133
134 static void
omc_connect_client(lws_sorted_usec_list_t * sul)135 omc_connect_client(lws_sorted_usec_list_t *sul)
136 {
137 struct vhd *vhd = lws_container_of(sul, struct vhd, sul);
138 struct lws_client_connect_info i;
139 const char *prot;
140 char url[128];
141
142 memset(&i, 0, sizeof(i));
143
144 lwsl_notice("%s: %s %s %s\n", __func__, vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);
145
146 lws_strncpy(url, vhd->ws_server_uri, sizeof(url));
147
148 if (lws_parse_uri(url, &prot, &i.address, &i.port, &i.path)) {
149 lwsl_err("%s: unable to parse uri %s\n", __func__,
150 vhd->ws_server_uri);
151 return;
152 }
153
154 i.context = vhd->cx;
155 i.origin = i.address;
156 i.host = i.address;
157 i.ssl_connection = LCCSCF_USE_SSL;
158 i.protocol = "lws-openmetrics-prox-server"; /* public subprot */
159 i.local_protocol_name = "lws-openmetrics-prox-client";
160 i.pwsi = &vhd->wsi;
161 i.retry_and_idle_policy = &retry;
162 i.userdata = vhd;
163 i.vhost = vhd->vhost;
164
165 lwsl_notice("%s: %s %u %s\n", __func__, i.address, i.port, i.path);
166
167 if (lws_client_connect_via_info(&i))
168 return;
169
170 /*
171 * Failed... schedule a retry... we can't use the _retry_wsi()
172 * convenience wrapper api here because no valid wsi at this
173 * point.
174 */
175 if (!lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
176 omc_connect_client, &vhd->retry_count))
177 return;
178
179 vhd->retry_count = 0;
180 lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
181 omc_connect_client, &vhd->retry_count);
182 }
183 #endif
184
185 static void
openmetrics_san(char * nm,size_t nl)186 openmetrics_san(char *nm, size_t nl)
187 {
188 size_t m;
189
190 /* Openmetrics has a very restricted token charset */
191
192 for (m = 0; m < nl; m++)
193 if ((nm[m] < 'A' || nm[m] > 'Z') &&
194 (nm[m] < 'a' || nm[m] > 'z') &&
195 (nm[m] < '0' || nm[m] > '9') &&
196 nm[m] != '_')
197 nm[m] = '_';
198 }
199
200 static int
lws_metrics_om_format_agg(lws_metric_pub_t * pub,const char * nm,lws_usec_t now,int gng,char * buf,size_t len)201 lws_metrics_om_format_agg(lws_metric_pub_t *pub, const char *nm, lws_usec_t now,
202 int gng, char *buf, size_t len)
203 {
204 const char *_gng = gng ? "_nogo" : "_go";
205 char *end = buf + len - 1, *obuf = buf;
206
207 if (pub->flags & LWSMTFL_REPORT_ONLY_GO)
208 _gng = "";
209
210 if (!(pub->flags & LWSMTFL_REPORT_MEAN)) {
211 /* only the sum is meaningful */
212 if (pub->flags & LWSMTFL_REPORT_DUTY_WALLCLOCK_US) {
213 buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
214 "%s_count %u\n"
215 "%s_us_sum %llu\n"
216 "%s_created %lu.%06u\n",
217 nm, (unsigned int)pub->u.agg.count[gng],
218 nm, (unsigned long long)pub->u.agg.sum[gng],
219 nm, (unsigned long)(pub->us_first / 1000000),
220 (unsigned int)(pub->us_first % 1000000));
221
222 return lws_ptr_diff(buf, obuf);
223 }
224
225 /* it's a monotonic ordinal, like total tx */
226 buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
227 "%s%s_count %u\n"
228 "%s%s_sum %llu\n",
229 nm, _gng,
230 (unsigned int)pub->u.agg.count[gng],
231 nm, _gng,
232 (unsigned long long)pub->u.agg.sum[gng]);
233
234 } else
235 buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
236 "%s%s_count %u\n"
237 "%s%s_mean %llu\n",
238 nm, _gng,
239 (unsigned int)pub->u.agg.count[gng],
240 nm, _gng, (unsigned long long)
241 (pub->u.agg.count[gng] ?
242 pub->u.agg.sum[gng] /
243 pub->u.agg.count[gng] : 0));
244
245 return lws_ptr_diff(buf, obuf);
246 }
247
248 static int
lws_metrics_om_ac_stash(struct pss * pss,const char * buf,size_t len)249 lws_metrics_om_ac_stash(struct pss *pss, const char *buf, size_t len)
250 {
251 char *q;
252
253 q = lwsac_use(&pss->ac, LWS_PRE + len + 2, LWS_PRE + len + 2);
254 if (!q) {
255 lwsac_free(&pss->ac);
256
257 return -1;
258 }
259 q[LWS_PRE] = (char)((len >> 8) & 0xff);
260 q[LWS_PRE + 1] = (char)(len & 0xff);
261 memcpy(q + LWS_PRE + 2, buf, len);
262 pss->tot += len;
263
264 return 0;
265 }
266
267 /*
268 * We have to do the ac listing at this level, because there can be too large
269 * a number to metrics tags to iterate that can fit in a reasonable buffer.
270 */
271
272 static int
lws_metrics_om_format(struct pss * pss,lws_metric_pub_t * pub,const char * nm)273 lws_metrics_om_format(struct pss *pss, lws_metric_pub_t *pub, const char *nm)
274 {
275 char buf[1200], *p = buf, *end = buf + sizeof(buf) - 1, tmp[512];
276 lws_usec_t t = lws_now_usecs();
277
278 if (pub->flags & LWSMTFL_REPORT_HIST) {
279 lws_metric_bucket_t *buck = pub->u.hist.head;
280
281 p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
282 "%s_count %llu\n",
283 nm, (unsigned long long)
284 pub->u.hist.total_count);
285
286 while (buck) {
287 lws_strncpy(tmp, lws_metric_bucket_name(buck),
288 sizeof(tmp));
289
290 p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
291 "%s{%s} %llu\n", nm, tmp,
292 (unsigned long long)buck->count);
293
294 lws_metrics_om_ac_stash(pss, buf,
295 lws_ptr_diff_size_t(p, buf));
296 p = buf;
297
298 buck = buck->next;
299 }
300
301 goto happy;
302 }
303
304 if (!pub->u.agg.count[METRES_GO] && !pub->u.agg.count[METRES_NOGO])
305 return 0;
306
307 if (pub->u.agg.count[METRES_GO])
308 p += lws_metrics_om_format_agg(pub, nm, t, METRES_GO, p,
309 lws_ptr_diff_size_t(end, p));
310
311 if (!(pub->flags & LWSMTFL_REPORT_ONLY_GO) &&
312 pub->u.agg.count[METRES_NOGO])
313 p += lws_metrics_om_format_agg(pub, nm, t, METRES_NOGO, p,
314 lws_ptr_diff_size_t(end, p));
315
316 if (pub->flags & LWSMTFL_REPORT_MEAN)
317 p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
318 "%s_min %llu\n"
319 "%s_max %llu\n",
320 nm, (unsigned long long)pub->u.agg.min,
321 nm, (unsigned long long)pub->u.agg.max);
322
323 happy:
324 return lws_metrics_om_ac_stash(pss, buf, lws_ptr_diff_size_t(p, buf));
325 }
326
327 static int
append_om_metric(lws_metric_pub_t * pub,void * user)328 append_om_metric(lws_metric_pub_t *pub, void *user)
329 {
330 struct pss *pss = (struct pss *)user;
331 char nm[64];
332 size_t nl;
333
334 /*
335 * Convert lws_metrics to openmetrics metrics data, stashing into an
336 * lwsac without backfill. Since it's not backfilling, use areas are in
337 * linear sequence simplifying walking them. Limiting the lwsac alloc
338 * to less than a typical mtu means we can write one per write
339 * efficiently
340 */
341
342 lws_strncpy(nm, pub->name, sizeof(nm));
343 nl = strlen(nm);
344
345 openmetrics_san(nm, nl);
346
347 return lws_metrics_om_format(pss, pub, nm);
348 }
349
350 #if defined(__linux__)
351 static int
grabfile(const char * fi,char * buf,size_t len)352 grabfile(const char *fi, char *buf, size_t len)
353 {
354 int n, fd = lws_open(fi, LWS_O_RDONLY);
355
356 buf[0] = '\0';
357 if (fd < 0)
358 return -1;
359
360 n = (int)read(fd, buf, len - 1);
361 close(fd);
362 if (n < 0) {
363 buf[0] = '\0';
364 return -1;
365 }
366
367 buf[n] = '\0';
368 if (n > 0 && buf[n - 1] == '\n')
369 buf[--n] = '\0';
370
371 return n;
372 }
373 #endif
374
375 /*
376 * Let's pregenerate the output into an lwsac all at once and
377 * then spool it back to the peer afterwards
378 *
379 * - there's not going to be that much of it (a few kB)
380 * - we then know the content-length for the headers
381 * - it's stretchy to arbitrary numbers of metrics
382 * - lwsac block list provides the per-metric structure to
383 * hold the data in a way we can walk to write it simply
384 */
385
386 int
ome_prepare(struct lws_context * ctx,struct pss * pss)387 ome_prepare(struct lws_context *ctx, struct pss *pss)
388 {
389 char buf[1224], *start = buf + LWS_PRE, *p = start,
390 *end = buf + sizeof(buf) - 1;
391 char hn[64];
392
393 pss->tot = 0;
394
395 /*
396 * Target metadata
397 */
398
399 hn[0] = '\0';
400 gethostname(hn, sizeof(hn) - 1);
401 p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
402 "# TYPE target info\n"
403 "# HELP target Target metadata\n"
404 "target_info{hostname=\"%s\"", hn);
405
406 #if defined(__linux__)
407 if (grabfile("/proc/self/cmdline", hn, sizeof(hn)))
408 p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p),
409 ",cmdline=\"%s\"", hn);
410 #endif
411
412 p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "} 1\n");
413
414 if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
415 lws_ptr_diff_size_t(p, buf + LWS_PRE)))
416 return 1;
417
418 /* lws version */
419
420 p = start;
421 p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
422 "# TYPE lws_info info\n"
423 "# HELP lws_info Version of lws producing this\n"
424 "lws_info{version=\"%s\"} 1\n", LWS_BUILD_HASH);
425 if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
426 lws_ptr_diff_size_t(p, buf + LWS_PRE)))
427 return 1;
428
429 /* system scalars */
430
431 #if defined(__linux__)
432 if (grabfile("/proc/loadavg", hn, sizeof(hn))) {
433 char *sp = strchr(hn, ' ');
434 if (sp) {
435 p = start;
436 p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
437 "load_1m %.*s\n",
438 lws_ptr_diff(sp, hn), hn);
439 if (lws_metrics_om_ac_stash(pss,
440 (char *)buf + LWS_PRE,
441 lws_ptr_diff_size_t(p,
442 start)))
443 return 1;
444 }
445 }
446 #endif
447
448 if (lws_metrics_foreach(ctx, pss, append_om_metric))
449 return 1;
450
451 p = start;
452 p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
453 "# EOF\n");
454 if (lws_metrics_om_ac_stash(pss, (char *)buf + LWS_PRE,
455 lws_ptr_diff_size_t(p, buf + LWS_PRE)))
456 return 1;
457
458 pss->walk = pss->ac;
459
460 return 0;
461 }
462
463 #if defined(LWS_WITH_SERVER)
464
465 /* 1) direct http export for scraper */
466
467 static int
callback_lws_openmetrics_export(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)468 callback_lws_openmetrics_export(struct lws *wsi,
469 enum lws_callback_reasons reason,
470 void *user, void *in, size_t len)
471 {
472 unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
473 *end = buf + sizeof(buf) - 1, *ip;
474 struct lws_context *cx = lws_get_context(wsi);
475 struct pss *pss = (struct pss *)user;
476 unsigned int m, wm;
477
478 switch (reason) {
479 case LWS_CALLBACK_HTTP:
480
481 ome_prepare(cx, pss);
482
483 p = start;
484 if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
485 "application/openmetrics-text; "
486 "version=1.0.0; charset=utf-8",
487 pss->tot, &p, end) ||
488 lws_finalize_write_http_header(wsi, start, &p, end))
489 return 1;
490
491 lws_callback_on_writable(wsi);
492
493 return 0;
494
495 case LWS_CALLBACK_CLOSED_HTTP:
496 lwsac_free(&pss->ac);
497 break;
498
499 case LWS_CALLBACK_HTTP_WRITEABLE:
500 if (!pss->walk)
501 return 0;
502
503 do {
504 ip = (uint8_t *)pss->walk +
505 lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
506 m = (unsigned int)((ip[0] << 8) | ip[1]);
507
508 /* coverity */
509 if (m > lwsac_get_tail_pos(pss->walk) -
510 lwsac_sizeof(pss->walk == pss->ac))
511 return -1;
512
513 if (lws_ptr_diff_size_t(end, p) < m)
514 break;
515
516 memcpy(p, ip + 2, m);
517 p += m;
518
519 pss->walk = lwsac_get_next(pss->walk);
520 } while (pss->walk);
521
522 if (!lws_ptr_diff_size_t(p, start)) {
523 lwsl_err("%s: stuck\n", __func__);
524 return -1;
525 }
526
527 wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
528
529 if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
530 (enum lws_write_protocol)wm) < 0)
531 return 1;
532
533 if (!pss->walk) {
534 if (lws_http_transaction_completed(wsi))
535 return -1;
536 } else
537 lws_callback_on_writable(wsi);
538
539 return 0;
540
541 default:
542 break;
543 }
544
545 return lws_callback_http_dummy(wsi, reason, user, in, len);
546 }
547
548 static struct pss *
omc_lws_om_get_other_side_pss_client(struct vhd * vhd,struct pss * pss)549 omc_lws_om_get_other_side_pss_client(struct vhd *vhd, struct pss *pss)
550 {
551 /*
552 * Search through our partner's clients list looking for one with the
553 * same proxy path
554 */
555 lws_start_foreach_dll(struct lws_dll2 *, d,
556 vhd->bind_partner_vhd->clients.head) {
557 struct pss *apss = lws_container_of(d, struct pss, list);
558
559 if (!strcmp(pss->proxy_path, apss->proxy_path))
560 return apss;
561
562 } lws_end_foreach_dll(d);
563
564 return NULL;
565 }
566
567 /* 2) "lws-openmetrics-prox-agg": http server export via proxy to connected clients */
568
569 static int
callback_lws_openmetrics_prox_agg(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)570 callback_lws_openmetrics_prox_agg(struct lws *wsi,
571 enum lws_callback_reasons reason,
572 void *user, void *in, size_t len)
573 {
574 unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
575 *end = buf + sizeof(buf) - 1, *ip;
576 struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
577 lws_get_vhost(wsi), lws_get_protocol(wsi));
578 struct lws_context *cx = lws_get_context(wsi);
579 struct pss *pss = (struct pss *)user, *partner_pss;
580 unsigned int m, wm;
581
582 switch (reason) {
583
584 case LWS_CALLBACK_PROTOCOL_INIT:
585 lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
586 /*
587 * We get told what to do when we are bound to the vhost
588 */
589 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
590 lws_get_protocol(wsi), sizeof(struct vhd));
591 if (!vhd) {
592 lwsl_err("%s: vhd alloc failed\n", __func__);
593 return 0;
594 }
595
596 vhd->cx = cx;
597
598 /*
599 * Try to bind to the counterpart server in the proxy, binding
600 * to the right one by having a common bind name set in a pvo.
601 * We don't know who will get instantiated last, so both parts
602 * try to bind if not already bound
603 */
604
605 if (!lws_pvo_get_str(in, "proxy-side-bind-name",
606 &vhd->proxy_side_bind_name)) {
607 /*
608 * Attempt to find the vhd that belongs to a vhost
609 * that has instantiated protocol
610 * "lws-openmetrics-prox-server", and has set pvo
611 * "proxy-side-bind-name" on it to whatever our
612 * vhd->proxy_side_bind_name was also set to.
613 *
614 * If found, inform the two sides of the same proxy
615 * what their partner vhd is
616 */
617 lws_strncpy(vhd->sanity, "isagg", sizeof(vhd->sanity));
618 vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
619 "lws-openmetrics-prox-server",
620 "proxy-side-bind-name",
621 vhd->proxy_side_bind_name);
622 if (vhd->bind_partner_vhd) {
623 assert(!strcmp(vhd->bind_partner_vhd->sanity, "isws"));
624 lwsl_notice("%s: proxy binding OK\n", __func__);
625 vhd->bind_partner_vhd->bind_partner_vhd = vhd;
626 }
627 } else {
628 lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
629 return 0;
630 }
631
632 break;
633
634 case LWS_CALLBACK_PROTOCOL_DESTROY:
635 if (vhd)
636 lws_sul_cancel(&vhd->sul);
637 break;
638
639 case LWS_CALLBACK_HTTP:
640
641 /*
642 * The scraper has connected to us, the local side of the proxy,
643 * we need to match what it wants to
644 */
645
646 if (!vhd->bind_partner_vhd)
647 return 0;
648
649 lws_strnncpy(pss->proxy_path, (const char *)in, len,
650 sizeof(pss->proxy_path));
651
652 if (pss->list.owner) {
653 lwsl_warn("%s: double HTTP?\n", __func__);
654 return 0;
655 }
656
657 pss->wsi = wsi;
658
659 lws_start_foreach_dll(struct lws_dll2 *, d,
660 vhd->bind_partner_vhd->clients.head) {
661 struct pss *apss = lws_container_of(d, struct pss, list);
662
663 if (!strcmp((const char *)in, apss->proxy_path)) {
664 apss->trigger = 1;
665 lws_callback_on_writable(apss->wsi);
666
667 /* let's add him on the http server vhd list */
668
669 lws_dll2_add_tail(&pss->list, &vhd->clients);
670 return 0;
671 }
672
673 } lws_end_foreach_dll(d);
674
675 return 0;
676
677 case LWS_CALLBACK_CLOSED_HTTP:
678 lwsac_free(&pss->ac);
679 lws_dll2_remove(&pss->list);
680 break;
681
682 case LWS_CALLBACK_HTTP_WRITEABLE:
683
684 if (!pss->walk)
685 return 0;
686
687 /* locate the wss side if it's still around */
688
689 partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
690 if (!partner_pss)
691 return -1;
692
693 do {
694 ip = (uint8_t *)pss->walk +
695 lwsac_sizeof(pss->walk == partner_pss->ac) + LWS_PRE;
696 m = (unsigned int)((ip[0] << 8) | ip[1]);
697
698 /* coverity */
699 if (m > lwsac_get_tail_pos(pss->walk) -
700 lwsac_sizeof(pss->walk == partner_pss->ac))
701 return -1;
702
703 if (lws_ptr_diff_size_t(end, p) < m)
704 break;
705
706 memcpy(p, ip + 2, m);
707 p += m;
708
709 pss->walk = lwsac_get_next(pss->walk);
710 } while (pss->walk);
711
712 if (!lws_ptr_diff_size_t(p, start)) {
713 lwsl_err("%s: stuck\n", __func__);
714 return -1;
715 }
716
717 wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
718
719 if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
720 (enum lws_write_protocol)wm) < 0)
721 return 1;
722
723 if (!pss->walk) {
724 lwsl_info("%s: whole msg proxied to scraper\n", __func__);
725 lws_dll2_remove(&pss->list);
726 lwsac_free(&partner_pss->ac);
727 // if (lws_http_transaction_completed(wsi))
728 return -1;
729 } else
730 lws_callback_on_writable(wsi);
731
732 return 0;
733
734 default:
735 break;
736 }
737
738 return lws_callback_http_dummy(wsi, reason, user, in, len);
739 }
740
741 /* 3) "lws-openmetrics-prox-server": ws server side of metrics proxy, for
742 * ws clients to connect to */
743
744 static int
callback_lws_openmetrics_prox_server(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)745 callback_lws_openmetrics_prox_server(struct lws *wsi,
746 enum lws_callback_reasons reason,
747 void *user, void *in, size_t len)
748 {
749 unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
750 *end = buf + sizeof(buf) - 1;
751 struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
752 lws_get_vhost(wsi), lws_get_protocol(wsi));
753 struct lws_context *cx = lws_get_context(wsi);
754 struct pss *pss = (struct pss *)user, *partner_pss;
755
756 switch (reason) {
757
758 case LWS_CALLBACK_PROTOCOL_INIT:
759 /*
760 * We get told what to do when we are bound to the vhost
761 */
762
763 lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
764
765 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
766 lws_get_protocol(wsi), sizeof(struct vhd));
767 if (!vhd) {
768 lwsl_err("%s: vhd alloc failed\n", __func__);
769 return 0;
770 }
771
772 vhd->cx = cx;
773
774 /*
775 * Try to bind to the counterpart server in the proxy, binding
776 * to the right one by having a common bind name set in a pvo.
777 * We don't know who will get instantiated last, so both parts
778 * try to bind if not already bound
779 */
780
781 if (!lws_pvo_get_str(in, "proxy-side-bind-name",
782 &vhd->proxy_side_bind_name)) {
783 /*
784 * Attempt to find the vhd that belongs to a vhost
785 * that has instantiated protocol
786 * "lws-openmetrics-prox-server", and has set pvo
787 * "proxy-side-bind-name" on it to whatever our
788 * vhd->proxy_side_bind_name was also set to.
789 *
790 * If found, inform the two sides of the same proxy
791 * what their partner vhd is
792 */
793 lws_strncpy(vhd->sanity, "isws", sizeof(vhd->sanity));
794 vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
795 "lws-openmetrics-prox-agg",
796 "proxy-side-bind-name",
797 vhd->proxy_side_bind_name);
798 if (vhd->bind_partner_vhd) {
799 assert(!strcmp(vhd->bind_partner_vhd->sanity, "isagg"));
800 lwsl_notice("%s: proxy binding OK\n", __func__);
801 vhd->bind_partner_vhd->bind_partner_vhd = vhd;
802 }
803 } else {
804 lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
805 return 0;
806 }
807
808 break;
809
810 case LWS_CALLBACK_PROTOCOL_DESTROY:
811 break;
812
813 case LWS_CALLBACK_ESTABLISHED:
814 /*
815 * a client has joined... we need to add his pss to our list
816 * of live, joined clients
817 */
818
819 /* mark us as waiting for the reference name from the client */
820 pss->greet = 1;
821 pss->wsi = wsi;
822 lws_validity_confirmed(wsi);
823
824 return 0;
825
826 case LWS_CALLBACK_CLOSED:
827 /*
828 * a client has parted
829 */
830 lws_dll2_remove(&pss->list);
831 lwsl_warn("%s: client %s left (%u)\n", __func__,
832 pss->proxy_path,
833 (unsigned int)vhd->clients.count);
834 lwsac_free(&pss->ac);
835
836 /* let's kill the scraper connection accordingly, if still up */
837 partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
838 if (partner_pss)
839 lws_wsi_close(partner_pss->wsi, LWS_TO_KILL_ASYNC);
840 break;
841
842 case LWS_CALLBACK_RECEIVE:
843 if (pss->greet) {
844 pss->greet = 0;
845 lws_strnncpy(pss->proxy_path, (const char *)in, len,
846 sizeof(pss->proxy_path));
847
848 lws_validity_confirmed(wsi);
849 lwsl_notice("%s: received greet '%s'\n", __func__,
850 pss->proxy_path);
851 /*
852 * we need to add his pss to our list of configured,
853 * live, joined clients
854 */
855 lws_dll2_add_tail(&pss->list, &vhd->clients);
856 return 0;
857 }
858
859 /*
860 * He's sending us his results... let's collect chunks into the
861 * pss lwsac before worrying about anything else
862 */
863
864 if (lws_is_first_fragment(wsi))
865 pss->tot = 0;
866
867 lws_metrics_om_ac_stash(pss, (const char *)in, len);
868
869 if (lws_is_final_fragment(wsi)) {
870 struct pss *partner_pss;
871
872 lwsl_info("%s: ws side received complete msg\n",
873 __func__);
874
875 /* the lwsac is complete */
876 pss->walk = pss->ac;
877 partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
878 if (!partner_pss) {
879 lwsl_notice("%s: no partner A\n", __func__);
880 return -1;
881 }
882
883 /* indicate to scraper side we want to issue now */
884
885 p = start;
886 if (lws_add_http_common_headers(partner_pss->wsi, HTTP_STATUS_OK,
887 "application/openmetrics-text; "
888 "version=1.0.0; charset=utf-8",
889 pss->tot, &p, end) ||
890 lws_finalize_write_http_header(partner_pss->wsi,
891 start, &p, end))
892 return -1;
893
894 /* indicate to scraper side we want to issue now */
895
896 partner_pss->walk = pss->ac;
897 partner_pss->trigger = 1;
898 lws_callback_on_writable(partner_pss->wsi);
899 }
900
901 return 0;
902
903 case LWS_CALLBACK_SERVER_WRITEABLE:
904 if (!pss->trigger)
905 return 0;
906
907 pss->trigger = 0;
908
909 partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
910 if (!partner_pss) {
911 lwsl_err("%s: no partner\n", __func__);
912 return 0;
913 }
914
915 lwsl_info("%s: sending trigger to client\n", __func__);
916
917 *start = 'x';
918 if (lws_write(wsi, start, 1,
919 (enum lws_write_protocol)LWS_WRITE_TEXT) < 0)
920 return 1;
921
922 lws_validity_confirmed(wsi);
923
924 return 0;
925
926 default:
927 break;
928 }
929
930 return lws_callback_http_dummy(wsi, reason, user, in, len);
931 }
932 #endif
933
934 #if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
935
936 /* 4) ws client that keeps wss connection up to metrics proxy ws server */
937
938 static int
callback_lws_openmetrics_prox_client(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)939 callback_lws_openmetrics_prox_client(struct lws *wsi,
940 enum lws_callback_reasons reason,
941 void *user, void *in, size_t len)
942 {
943 unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
944 *end = buf + sizeof(buf) - 1, *ip;
945 struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
946 lws_get_vhost(wsi), lws_get_protocol(wsi));
947 struct lws_context *cx = lws_get_context(wsi);
948 struct pss *pss = (struct pss *)user;
949 unsigned int m, wm;
950 const char *cp;
951 char first;
952
953 switch (reason) {
954
955 case LWS_CALLBACK_PROTOCOL_INIT:
956
957 lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__,
958 lws_vh_tag(lws_get_vhost(wsi)));
959
960
961 /*
962 * We get told what to do when we are bound to the vhost
963 */
964 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
965 lws_get_protocol(wsi), sizeof(struct vhd));
966 if (!vhd)
967 return 0;
968
969 vhd->cx = cx;
970 vhd->vhost = lws_get_vhost(wsi);
971
972 /* the proxy server uri */
973
974 if (lws_pvo_get_str(in, "ws-server-uri", &cp)) {
975 lwsl_warn("%s: ws-server-uri pvo required\n", __func__);
976
977 return 0;
978 }
979 lws_strncpy(vhd->ws_server_uri, cp, sizeof(vhd->ws_server_uri));
980
981 /* how we should be referenced at the proxy */
982
983 if (lws_pvo_get_str(in, "metrics-proxy-path", &cp)) {
984 lwsl_err("%s: metrics-proxy-path pvo required\n", __func__);
985
986 return 1;
987 }
988 lws_strncpy(vhd->metrics_proxy_path, cp, sizeof(vhd->metrics_proxy_path));
989
990 /* the shared secret to authenticate us as allowed to join */
991
992 if (lws_pvo_get_str(in, "ba-secret", &cp)) {
993 lwsl_err("%s: ba-secret pvo required\n", __func__);
994
995 return 1;
996 }
997 lws_strncpy(vhd->ba_secret, cp, sizeof(vhd->ba_secret));
998
999 lwsl_notice("%s: scheduling connect %s %s %s\n", __func__,
1000 vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);
1001
1002 lws_validity_confirmed(wsi);
1003 lws_sul_schedule(cx, 0, &vhd->sul, omc_connect_client, 1);
1004 break;
1005
1006 case LWS_CALLBACK_PROTOCOL_DESTROY:
1007 if (vhd)
1008 lws_sul_cancel(&vhd->sul);
1009 break;
1010
1011 case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
1012 {
1013 unsigned char **pp = (unsigned char **)in, *pend = (*pp) + len;
1014 char b[128];
1015
1016 /* authorize ourselves to the metrics proxy using basic auth */
1017
1018 if (lws_http_basic_auth_gen("metricsclient", vhd->ba_secret,
1019 b, sizeof(b)))
1020 break;
1021
1022 if (lws_add_http_header_by_token(wsi,
1023 WSI_TOKEN_HTTP_AUTHORIZATION,
1024 (unsigned char *)b,
1025 (int)strlen(b), pp, pend))
1026 return -1;
1027
1028 break;
1029 }
1030
1031 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
1032 lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
1033 in ? (char *)in : "(null)");
1034 goto do_retry;
1035
1036 case LWS_CALLBACK_CLIENT_ESTABLISHED:
1037 lwsl_warn("%s: connected to ws metrics agg server\n", __func__);
1038 pss->greet = 1;
1039 lws_callback_on_writable(wsi);
1040 lws_validity_confirmed(wsi);
1041 return 0;
1042
1043 case LWS_CALLBACK_CLIENT_CLOSED:
1044 lwsl_notice("%s: client closed\n", __func__);
1045 lwsac_free(&pss->ac);
1046 goto do_retry;
1047
1048 case LWS_CALLBACK_CLIENT_RECEIVE:
1049 /*
1050 * Proxy serverside sends us something to trigger us to create
1051 * our metrics message and send it back over the ws link
1052 */
1053 ome_prepare(cx, pss);
1054 pss->walk = pss->ac;
1055 lws_callback_on_writable(wsi);
1056 lwsl_info("%s: dump requested\n", __func__);
1057 break;
1058
1059 case LWS_CALLBACK_CLIENT_WRITEABLE:
1060 if (pss->greet) {
1061 /*
1062 * At first after establishing the we link, we send a
1063 * message indicating to the metrics proxy how we
1064 * should be referred to by the scraper to particularly
1065 * select to talk to us
1066 */
1067 lwsl_info("%s: sending greet '%s'\n", __func__,
1068 vhd->metrics_proxy_path);
1069 lws_strncpy((char *)start, vhd->metrics_proxy_path,
1070 sizeof(buf) - LWS_PRE);
1071 if (lws_write(wsi, start,
1072 strlen(vhd->metrics_proxy_path),
1073 LWS_WRITE_TEXT) < 0)
1074 return 1;
1075
1076 lws_validity_confirmed(wsi);
1077
1078 pss->greet = 0;
1079 return 0;
1080 }
1081
1082 if (!pss->walk)
1083 return 0;
1084
1085 /*
1086 * We send the metrics dump in a single logical ws message,
1087 * using ws fragmentation to split it around 1 mtu boundary
1088 * and keep coming back until it's finished
1089 */
1090
1091 first = pss->walk == pss->ac;
1092
1093 do {
1094 ip = (uint8_t *)pss->walk +
1095 lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
1096 m = (unsigned int)((ip[0] << 8) | ip[1]);
1097
1098 /* coverity */
1099 if (m > lwsac_get_tail_pos(pss->walk) -
1100 lwsac_sizeof(pss->walk == pss->ac)) {
1101 lwsl_err("%s: size blow\n", __func__);
1102 return -1;
1103 }
1104
1105 if (lws_ptr_diff_size_t(end, p) < m)
1106 break;
1107
1108 memcpy(p, ip + 2, m);
1109 p += m;
1110
1111 pss->walk = lwsac_get_next(pss->walk);
1112 } while (pss->walk);
1113
1114 if (!lws_ptr_diff_size_t(p, start)) {
1115 lwsl_err("%s: stuck\n", __func__);
1116 return -1;
1117 }
1118
1119 wm = (unsigned int)lws_write_ws_flags(LWS_WRITE_TEXT, first,
1120 !pss->walk);
1121
1122 if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
1123 (enum lws_write_protocol)wm) < 0) {
1124 lwsl_notice("%s: write fail\n", __func__);
1125 return 1;
1126 }
1127
1128 lws_validity_confirmed(wsi);
1129 lwsl_info("%s: forwarded %d\n", __func__, lws_ptr_diff(p, start));
1130
1131 if (!pss->walk) {
1132 lwsl_info("%s: dump send completed\n", __func__);
1133 lwsac_free(&pss->ac);
1134 } else
1135 lws_callback_on_writable(wsi);
1136
1137 return 0;
1138
1139 default:
1140 break;
1141 }
1142
1143 return lws_callback_http_dummy(wsi, reason, user, in, len);
1144
1145 do_retry:
1146 if (!lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
1147 omc_connect_client, &vhd->retry_count))
1148 return 0;
1149
1150 vhd->retry_count = 0;
1151 lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
1152 omc_connect_client, &vhd->retry_count);
1153
1154 return 0;
1155 }
1156 #endif
1157
1158
1159 LWS_VISIBLE const struct lws_protocols lws_openmetrics_export_protocols[] = {
1160 #if defined(LWS_WITH_SERVER)
1161 { /* for scraper directly: http export on listen socket */
1162 "lws-openmetrics",
1163 callback_lws_openmetrics_export,
1164 sizeof(struct pss),
1165 1024, 0, NULL, 0
1166 },
1167 { /* for scraper via ws proxy: http export on listen socket */
1168 "lws-openmetrics-prox-agg",
1169 callback_lws_openmetrics_prox_agg,
1170 sizeof(struct pss),
1171 1024, 0, NULL, 0
1172 },
1173 { /* metrics proxy server side: ws server for clients to connect to */
1174 "lws-openmetrics-prox-server",
1175 callback_lws_openmetrics_prox_server,
1176 sizeof(struct pss),
1177 1024, 0, NULL, 0
1178 },
1179 #endif
1180 #if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
1181 { /* client to metrics proxy: ws client to connect to metrics proxy*/
1182 "lws-openmetrics-prox-client",
1183 callback_lws_openmetrics_prox_client,
1184 sizeof(struct pss),
1185 1024, 0, NULL, 0
1186 },
1187 #endif
1188 };
1189
1190 LWS_VISIBLE const lws_plugin_protocol_t lws_openmetrics_export = {
1191 .hdr = {
1192 "lws OpenMetrics export",
1193 "lws_protocol_plugin",
1194 LWS_BUILD_HASH,
1195 LWS_PLUGIN_API_MAGIC
1196 },
1197
1198 .protocols = lws_openmetrics_export_protocols,
1199 .count_protocols = LWS_ARRAY_SIZE(lws_openmetrics_export_protocols),
1200 };
1201