• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets-test-server - libwebsockets test implementation
3  *
4  * Written in 2010-2021 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * The person who associated a work with this deed has dedicated
10  * the work to the public domain by waiving all of his or her rights
11  * to the work worldwide under copyright law, including all related
12  * and neighboring rights, to the extent allowed by law. You can copy,
13  * modify, distribute and perform the work, even for commercial purposes,
14  * all without asking permission.
15  *
16  * The test apps are intended to be adapted for use in your code, which
17  * may be proprietary.  So unlike the library itself, they are licensed
18  * Public Domain.
19  *
20  * Scrapeable, proxiable OpenMetrics metrics (compatible with Prometheus)
21  *
22  * https://tools.ietf.org/html/draft-richih-opsawg-openmetrics-00
23  *
24  * This plugin provides four protocols related to openmetrics handling:
25  *
26  * 1) "lws-openmetrics" direct http listener so scraper can directly get metrics
27  *
28  * 2) "lws-openmetrics-prox-agg" metrics proxy server that scraper can connect
29  *    to locally to proxy through to connected remote clients at 3)
30  *
31  * 3) "lws-openmetrics-prox-server" metrics proxy server that remote clients can
32  *    connect to, providing a path where scrapers at 2) can get metrics from
33  *    clients connected us
34  *
35  * 4) "lws-openmetrics-prox-client" nailed-up metrics proxy client that tries to
36  *    keep up a connection to the server at 3), allowing to scraper to reach
37  *    clients that have no reachable way to serve.
38  *
39  * These are provided like this to maximize flexibility in being able to add
40  * openmetrics serving, proxying, or client->proxy to existing lws code.
41  *
42  * Openmetrics supports a "metric" at the top of its report that describes the
43  * source aka "target metadata".
44  *
45  * Since we want to enable collection from devices that are not externally
46  * reachable, we must provide a reachable server that the clients can attach to
47  * and have their stats aggregated and then read by Prometheus or whatever.
48  * Openmetrics says that it wants to present the aggregated stats in a flat
49  * summary with only the aggregator's "target metadata" and contributor targets
50  * getting their data tagged with the source
51  *
52  * "The above discussion is in the context of individual exposers.  An
53  *  exposition from a general purpose monitoring system may contain
54  *  metrics from many individual targets, and thus may expose multiple
55  *  target info Metrics.  The metrics may already have had target
56  *  metadata added to them as labels as part of ingestion.  The metric
57  *  names MUST NOT be varied based on target metadata.  For example it
58  *  would be incorrect for all metrics to end up being prefixed with
59  *  staging_ even if they all originated from targets in a staging
60  *  environment)."
61  */
62 
63 #if !defined (LWS_PLUGIN_STATIC)
64 #if !defined(LWS_DLL)
65 #define LWS_DLL
66 #endif
67 #if !defined(LWS_INTERNAL)
68 #define LWS_INTERNAL
69 #endif
70 #include <libwebsockets.h>
71 #endif
72 #include <string.h>
73 #include <stdlib.h>
74 #include <sys/stat.h>
75 #include <fcntl.h>
76 #if !defined(WIN32)
77 #include <unistd.h>
78 #endif
79 #include <assert.h>
80 
81 struct vhd {
82 	struct lws_context	*cx;
83 	struct lws_vhost	*vhost;
84 
85 	char			ws_server_uri[128];
86 	char			metrics_proxy_path[128];
87 	char			ba_secret[128];
88 
89 	const char		*proxy_side_bind_name;
90 	/**< name used to bind the two halves of the proxy together, must be
91 	 * the same name given in a pvo for both "lws-openmetrics-prox-agg"
92 	 * (the side local to the scraper) and "lws-openmetrics-prox-server"
93 	 * (the side the clients connect to)
94 	 */
95 
96 	char			sanity[8];
97 
98 	lws_dll2_owner_t	clients;
99 
100 	lws_sorted_usec_list_t	sul;	     /* schedule connection retry */
101 
102 	struct vhd		*bind_partner_vhd;
103 
104 	struct lws		*wsi;	     /* related wsi if any */
105 	uint16_t		retry_count; /* count of consequetive retries */
106 };
107 
108 struct pss {
109 	lws_dll2_t		list;
110 	char			proxy_path[64];
111 	struct lwsac		*ac;	/* the translated metrics, one ac per line */
112 	struct lwsac		*walk;	/* iterator for ac when writing */
113 	size_t			tot;	/* content-length computation */
114 	struct lws		*wsi;
115 
116 	uint8_t			greet:1; /* set if client needs to send proxy path */
117 	uint8_t			trigger:1; /* we want to ask the client to dump */
118 };
119 
120 #if defined(LWS_WITH_CLIENT)
121 static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };
122 
123 static const lws_retry_bo_t retry = {
124 	.retry_ms_table			= backoff_ms,
125 	.retry_ms_table_count		= LWS_ARRAY_SIZE(backoff_ms),
126 	.conceal_count			= LWS_ARRAY_SIZE(backoff_ms),
127 
128 	.secs_since_valid_ping		= 400,  /* force PINGs after secs idle */
129 	.secs_since_valid_hangup	= 400, /* hangup after secs idle */
130 
131 	.jitter_percent			= 0,
132 };
133 
134 static void
omc_connect_client(lws_sorted_usec_list_t * sul)135 omc_connect_client(lws_sorted_usec_list_t *sul)
136 {
137 	struct vhd *vhd = lws_container_of(sul, struct vhd, sul);
138 	struct lws_client_connect_info i;
139 	const char *prot;
140 	char url[128];
141 
142 	memset(&i, 0, sizeof(i));
143 
144 	lwsl_notice("%s: %s %s %s\n", __func__, vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);
145 
146 	lws_strncpy(url, vhd->ws_server_uri, sizeof(url));
147 
148 	if (lws_parse_uri(url, &prot, &i.address, &i.port, &i.path)) {
149 		lwsl_err("%s: unable to parse uri %s\n", __func__,
150 			 vhd->ws_server_uri);
151 		return;
152 	}
153 
154 	i.context		= vhd->cx;
155 	i.origin		= i.address;
156 	i.host			= i.address;
157 	i.ssl_connection	= LCCSCF_USE_SSL;
158 	i.protocol		= "lws-openmetrics-prox-server"; /* public subprot */
159 	i.local_protocol_name	= "lws-openmetrics-prox-client";
160 	i.pwsi			= &vhd->wsi;
161 	i.retry_and_idle_policy = &retry;
162 	i.userdata		= vhd;
163 	i.vhost			= vhd->vhost;
164 
165 	lwsl_notice("%s: %s %u %s\n", __func__, i.address, i.port, i.path);
166 
167 	if (lws_client_connect_via_info(&i))
168 		return;
169 
170 	/*
171 	 * Failed... schedule a retry... we can't use the _retry_wsi()
172 	 * convenience wrapper api here because no valid wsi at this
173 	 * point.
174 	 */
175 	if (!lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
176 				    omc_connect_client, &vhd->retry_count))
177 		return;
178 
179 	vhd->retry_count = 0;
180 	lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
181 			       omc_connect_client, &vhd->retry_count);
182 }
183 #endif
184 
185 static void
openmetrics_san(char * nm,size_t nl)186 openmetrics_san(char *nm, size_t nl)
187 {
188 	size_t m;
189 
190 	/* Openmetrics has a very restricted token charset */
191 
192 	for (m = 0; m < nl; m++)
193 		if ((nm[m] < 'A' || nm[m] > 'Z') &&
194 		    (nm[m] < 'a' || nm[m] > 'z') &&
195 		    (nm[m] < '0' || nm[m] > '9') &&
196 		    nm[m] != '_')
197 			nm[m] = '_';
198 }
199 
200 static int
lws_metrics_om_format_agg(lws_metric_pub_t * pub,const char * nm,lws_usec_t now,int gng,char * buf,size_t len)201 lws_metrics_om_format_agg(lws_metric_pub_t *pub, const char *nm, lws_usec_t now,
202 			  int gng, char *buf, size_t len)
203 {
204 	const char *_gng = gng ? "_nogo" : "_go";
205 	char *end = buf + len - 1, *obuf = buf;
206 
207 	if (pub->flags & LWSMTFL_REPORT_ONLY_GO)
208 		_gng = "";
209 
210 	if (!(pub->flags & LWSMTFL_REPORT_MEAN)) {
211 		/* only the sum is meaningful */
212 		if (pub->flags & LWSMTFL_REPORT_DUTY_WALLCLOCK_US) {
213 			buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
214 				"%s_count %u\n"
215 				"%s_us_sum %llu\n"
216 				"%s_created %lu.%06u\n",
217 				nm, (unsigned int)pub->u.agg.count[gng],
218 				nm, (unsigned long long)pub->u.agg.sum[gng],
219 				nm, (unsigned long)(pub->us_first / 1000000),
220 				    (unsigned int)(pub->us_first % 1000000));
221 
222 			return lws_ptr_diff(buf, obuf);
223 		}
224 
225 		/* it's a monotonic ordinal, like total tx */
226 		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
227 				    "%s%s_count %u\n"
228 				    "%s%s_sum %llu\n",
229 				    nm, _gng,
230 				    (unsigned int)pub->u.agg.count[gng],
231 				    nm, _gng,
232 				    (unsigned long long)pub->u.agg.sum[gng]);
233 
234 	} else
235 		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
236 				    "%s%s_count %u\n"
237 				    "%s%s_mean %llu\n",
238 				    nm, _gng,
239 				    (unsigned int)pub->u.agg.count[gng],
240 				    nm, _gng, (unsigned long long)
241 				    (pub->u.agg.count[gng] ?
242 						pub->u.agg.sum[gng] /
243 						pub->u.agg.count[gng] : 0));
244 
245 	return lws_ptr_diff(buf, obuf);
246 }
247 
248 static int
lws_metrics_om_ac_stash(struct pss * pss,const char * buf,size_t len)249 lws_metrics_om_ac_stash(struct pss *pss, const char *buf, size_t len)
250 {
251 	char *q;
252 
253 	q = lwsac_use(&pss->ac, LWS_PRE + len + 2, LWS_PRE + len + 2);
254 	if (!q) {
255 		lwsac_free(&pss->ac);
256 
257 		return -1;
258 	}
259 	q[LWS_PRE] = (char)((len >> 8) & 0xff);
260 	q[LWS_PRE + 1] = (char)(len & 0xff);
261 	memcpy(q + LWS_PRE + 2, buf, len);
262 	pss->tot += len;
263 
264 	return 0;
265 }
266 
267 /*
268  * We have to do the ac listing at this level, because there can be too large
269  * a number to metrics tags to iterate that can fit in a reasonable buffer.
270  */
271 
272 static int
lws_metrics_om_format(struct pss * pss,lws_metric_pub_t * pub,const char * nm)273 lws_metrics_om_format(struct pss *pss, lws_metric_pub_t *pub, const char *nm)
274 {
275 	char buf[1200], *p = buf, *end = buf + sizeof(buf) - 1, tmp[512];
276 	lws_usec_t t = lws_now_usecs();
277 
278 	if (pub->flags & LWSMTFL_REPORT_HIST) {
279 		lws_metric_bucket_t *buck = pub->u.hist.head;
280 
281 		p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
282 				  "%s_count %llu\n",
283 				  nm, (unsigned long long)
284 				  pub->u.hist.total_count);
285 
286 		while (buck) {
287 			lws_strncpy(tmp, lws_metric_bucket_name(buck),
288 				    sizeof(tmp));
289 
290 			p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
291 					  "%s{%s} %llu\n", nm, tmp,
292 					  (unsigned long long)buck->count);
293 
294 			lws_metrics_om_ac_stash(pss, buf,
295 						lws_ptr_diff_size_t(p, buf));
296 			p = buf;
297 
298 			buck = buck->next;
299 		}
300 
301 		goto happy;
302 	}
303 
304 	if (!pub->u.agg.count[METRES_GO] && !pub->u.agg.count[METRES_NOGO])
305 		return 0;
306 
307 	if (pub->u.agg.count[METRES_GO])
308 		p += lws_metrics_om_format_agg(pub, nm, t, METRES_GO, p,
309 					       lws_ptr_diff_size_t(end, p));
310 
311 	if (!(pub->flags & LWSMTFL_REPORT_ONLY_GO) &&
312 	    pub->u.agg.count[METRES_NOGO])
313 		p += lws_metrics_om_format_agg(pub, nm, t, METRES_NOGO, p,
314 					       lws_ptr_diff_size_t(end, p));
315 
316 	if (pub->flags & LWSMTFL_REPORT_MEAN)
317 		p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
318 				  "%s_min %llu\n"
319 				  "%s_max %llu\n",
320 				  nm, (unsigned long long)pub->u.agg.min,
321 				  nm, (unsigned long long)pub->u.agg.max);
322 
323 happy:
324 	return lws_metrics_om_ac_stash(pss, buf, lws_ptr_diff_size_t(p, buf));
325 }
326 
327 static int
append_om_metric(lws_metric_pub_t * pub,void * user)328 append_om_metric(lws_metric_pub_t *pub, void *user)
329 {
330 	struct pss *pss = (struct pss *)user;
331 	char nm[64];
332 	size_t nl;
333 
334 	/*
335 	 * Convert lws_metrics to openmetrics metrics data, stashing into an
336 	 * lwsac without backfill.  Since it's not backfilling, use areas are in
337 	 * linear sequence simplifying walking them.  Limiting the lwsac alloc
338 	 * to less than a typical mtu means we can write one per write
339 	 * efficiently
340 	 */
341 
342 	lws_strncpy(nm, pub->name, sizeof(nm));
343 	nl = strlen(nm);
344 
345 	openmetrics_san(nm, nl);
346 
347 	return lws_metrics_om_format(pss, pub, nm);
348 }
349 
350 #if defined(__linux__)
351 static int
grabfile(const char * fi,char * buf,size_t len)352 grabfile(const char *fi, char *buf, size_t len)
353 {
354 	int n, fd = lws_open(fi, LWS_O_RDONLY);
355 
356 	buf[0] = '\0';
357 	if (fd < 0)
358 		return -1;
359 
360 	n = (int)read(fd, buf, len - 1);
361 	close(fd);
362 	if (n < 0) {
363 		buf[0] = '\0';
364 		return -1;
365 	}
366 
367 	buf[n] = '\0';
368 	if (n > 0 && buf[n - 1] == '\n')
369 		buf[--n] = '\0';
370 
371 	return n;
372 }
373 #endif
374 
375 /*
376  * Let's pregenerate the output into an lwsac all at once and
377  * then spool it back to the peer afterwards
378  *
379  * - there's not going to be that much of it (a few kB)
380  * - we then know the content-length for the headers
381  * - it's stretchy to arbitrary numbers of metrics
382  * - lwsac block list provides the per-metric structure to
383  *   hold the data in a way we can walk to write it simply
384  */
385 
386 int
ome_prepare(struct lws_context * ctx,struct pss * pss)387 ome_prepare(struct lws_context *ctx, struct pss *pss)
388 {
389 	char buf[1224], *start = buf + LWS_PRE, *p = start,
390 	     *end = buf + sizeof(buf) - 1;
391 	char hn[64];
392 
393 	pss->tot = 0;
394 
395 	/*
396 	 * Target metadata
397 	 */
398 
399 	hn[0] = '\0';
400 	gethostname(hn, sizeof(hn) - 1);
401 	p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
402 			  "# TYPE target info\n"
403 			  "# HELP target Target metadata\n"
404 			  "target_info{hostname=\"%s\"", hn);
405 
406 #if defined(__linux__)
407 	if (grabfile("/proc/self/cmdline", hn, sizeof(hn)))
408 		p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p),
409 				  ",cmdline=\"%s\"", hn);
410 #endif
411 
412 	p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "} 1\n");
413 
414 	if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
415 				    lws_ptr_diff_size_t(p, buf + LWS_PRE)))
416 		return 1;
417 
418 	/* lws version */
419 
420 	p = start;
421 	p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
422 			  "# TYPE lws_info info\n"
423 			  "# HELP lws_info Version of lws producing this\n"
424 			  "lws_info{version=\"%s\"} 1\n", LWS_BUILD_HASH);
425 	if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
426 				    lws_ptr_diff_size_t(p, buf + LWS_PRE)))
427 		return 1;
428 
429 	/* system scalars */
430 
431 #if defined(__linux__)
432 	if (grabfile("/proc/loadavg", hn, sizeof(hn))) {
433 		char *sp = strchr(hn, ' ');
434 		if (sp) {
435 			p = start;
436 			p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
437 					  "load_1m %.*s\n",
438 					  lws_ptr_diff(sp, hn), hn);
439 			if (lws_metrics_om_ac_stash(pss,
440 						    (char *)buf + LWS_PRE,
441 						    lws_ptr_diff_size_t(p,
442 								start)))
443 				return 1;
444 		}
445 	}
446 #endif
447 
448 	if (lws_metrics_foreach(ctx, pss, append_om_metric))
449 		return 1;
450 
451 	p = start;
452 	p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
453 			  "# EOF\n");
454 	if (lws_metrics_om_ac_stash(pss, (char *)buf + LWS_PRE,
455 				    lws_ptr_diff_size_t(p, buf + LWS_PRE)))
456 		return 1;
457 
458 	pss->walk = pss->ac;
459 
460 	return 0;
461 }
462 
463 #if defined(LWS_WITH_SERVER)
464 
465 /* 1) direct http export for scraper */
466 
467 static int
callback_lws_openmetrics_export(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)468 callback_lws_openmetrics_export(struct lws *wsi,
469 				enum lws_callback_reasons reason,
470 				void *user, void *in, size_t len)
471 {
472 	unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
473 		      *end = buf + sizeof(buf) - 1, *ip;
474 	struct lws_context *cx = lws_get_context(wsi);
475 	struct pss *pss = (struct pss *)user;
476 	unsigned int m, wm;
477 
478 	switch (reason) {
479 	case LWS_CALLBACK_HTTP:
480 
481 		ome_prepare(cx, pss);
482 
483 		p = start;
484 		if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
485 						"application/openmetrics-text; "
486 						"version=1.0.0; charset=utf-8",
487 						pss->tot, &p, end) ||
488 		    lws_finalize_write_http_header(wsi, start, &p, end))
489 			return 1;
490 
491 		lws_callback_on_writable(wsi);
492 
493 		return 0;
494 
495 	case LWS_CALLBACK_CLOSED_HTTP:
496 		lwsac_free(&pss->ac);
497 		break;
498 
499 	case LWS_CALLBACK_HTTP_WRITEABLE:
500 		if (!pss->walk)
501 			return 0;
502 
503 		do {
504 			ip = (uint8_t *)pss->walk +
505 				lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
506 			m = (unsigned int)((ip[0] << 8) | ip[1]);
507 
508 			/* coverity */
509 			if (m > lwsac_get_tail_pos(pss->walk) -
510 				lwsac_sizeof(pss->walk == pss->ac))
511 				return -1;
512 
513 			if (lws_ptr_diff_size_t(end, p) < m)
514 				break;
515 
516 			memcpy(p, ip + 2, m);
517 			p += m;
518 
519 			pss->walk = lwsac_get_next(pss->walk);
520 		} while (pss->walk);
521 
522 		if (!lws_ptr_diff_size_t(p, start)) {
523 			lwsl_err("%s: stuck\n", __func__);
524 			return -1;
525 		}
526 
527 		wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
528 
529 		if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
530 			      (enum lws_write_protocol)wm) < 0)
531 			return 1;
532 
533 		if (!pss->walk) {
534 			 if (lws_http_transaction_completed(wsi))
535 				return -1;
536 		} else
537 			lws_callback_on_writable(wsi);
538 
539 		return 0;
540 
541 	default:
542 		break;
543 	}
544 
545 	return lws_callback_http_dummy(wsi, reason, user, in, len);
546 }
547 
548 static struct pss *
omc_lws_om_get_other_side_pss_client(struct vhd * vhd,struct pss * pss)549 omc_lws_om_get_other_side_pss_client(struct vhd *vhd, struct pss *pss)
550 {
551 	/*
552 	 * Search through our partner's clients list looking for one with the
553 	 * same proxy path
554 	 */
555 	lws_start_foreach_dll(struct lws_dll2 *, d,
556 			vhd->bind_partner_vhd->clients.head) {
557 		struct pss *apss = lws_container_of(d, struct pss, list);
558 
559 		if (!strcmp(pss->proxy_path, apss->proxy_path))
560 			return apss;
561 
562 	} lws_end_foreach_dll(d);
563 
564 	return NULL;
565 }
566 
567 /* 2) "lws-openmetrics-prox-agg": http server export via proxy to connected clients */
568 
569 static int
callback_lws_openmetrics_prox_agg(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)570 callback_lws_openmetrics_prox_agg(struct lws *wsi,
571 				  enum lws_callback_reasons reason,
572 				  void *user, void *in, size_t len)
573 {
574 	unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
575 		      *end = buf + sizeof(buf) - 1, *ip;
576 	struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
577 				lws_get_vhost(wsi), lws_get_protocol(wsi));
578 	struct lws_context *cx = lws_get_context(wsi);
579 	struct pss *pss = (struct pss *)user, *partner_pss;
580 	unsigned int m, wm;
581 
582 	switch (reason) {
583 
584 	case LWS_CALLBACK_PROTOCOL_INIT:
585 		lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
586 		/*
587 		 * We get told what to do when we are bound to the vhost
588 		 */
589 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
590 				lws_get_protocol(wsi), sizeof(struct vhd));
591 		if (!vhd) {
592 			lwsl_err("%s: vhd alloc failed\n", __func__);
593 			return 0;
594 		}
595 
596 		vhd->cx = cx;
597 
598 		/*
599 		 * Try to bind to the counterpart server in the proxy, binding
600 		 * to the right one by having a common bind name set in a pvo.
601 		 * We don't know who will get instantiated last, so both parts
602 		 * try to bind if not already bound
603 		 */
604 
605 		if (!lws_pvo_get_str(in, "proxy-side-bind-name",
606 				     &vhd->proxy_side_bind_name)) {
607 			/*
608 			 * Attempt to find the vhd that belongs to a vhost
609 			 * that has instantiated protocol
610 			 * "lws-openmetrics-prox-server", and has set pvo
611 			 * "proxy-side-bind-name" on it to whatever our
612 			 * vhd->proxy_side_bind_name was also set to.
613 			 *
614 			 * If found, inform the two sides of the same proxy
615 			 * what their partner vhd is
616 			 */
617 			lws_strncpy(vhd->sanity, "isagg", sizeof(vhd->sanity));
618 			vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
619 						"lws-openmetrics-prox-server",
620 						"proxy-side-bind-name",
621 						vhd->proxy_side_bind_name);
622 			if (vhd->bind_partner_vhd) {
623 				assert(!strcmp(vhd->bind_partner_vhd->sanity, "isws"));
624 				lwsl_notice("%s: proxy binding OK\n", __func__);
625 				vhd->bind_partner_vhd->bind_partner_vhd = vhd;
626 			}
627 		} else {
628 			lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
629 			return 0;
630 		}
631 
632 		break;
633 
634 	case LWS_CALLBACK_PROTOCOL_DESTROY:
635 		if (vhd)
636 			lws_sul_cancel(&vhd->sul);
637 		break;
638 
639 	case LWS_CALLBACK_HTTP:
640 
641 		/*
642 		 * The scraper has connected to us, the local side of the proxy,
643 		 * we need to match what it wants to
644 		 */
645 
646 		if (!vhd->bind_partner_vhd)
647 			return 0;
648 
649 		lws_strnncpy(pss->proxy_path, (const char *)in, len,
650 			     sizeof(pss->proxy_path));
651 
652 		if (pss->list.owner) {
653 			lwsl_warn("%s: double HTTP?\n", __func__);
654 			return 0;
655 		}
656 
657 		pss->wsi = wsi;
658 
659 		lws_start_foreach_dll(struct lws_dll2 *, d,
660 				      vhd->bind_partner_vhd->clients.head) {
661 			struct pss *apss = lws_container_of(d, struct pss, list);
662 
663 			if (!strcmp((const char *)in, apss->proxy_path)) {
664 				apss->trigger = 1;
665 				lws_callback_on_writable(apss->wsi);
666 
667 				/* let's add him on the http server vhd list */
668 
669 				lws_dll2_add_tail(&pss->list, &vhd->clients);
670 				return 0;
671 			}
672 
673 		} lws_end_foreach_dll(d);
674 
675 		return 0;
676 
677 	case LWS_CALLBACK_CLOSED_HTTP:
678 		lwsac_free(&pss->ac);
679 		lws_dll2_remove(&pss->list);
680 		break;
681 
682 	case LWS_CALLBACK_HTTP_WRITEABLE:
683 
684 		if (!pss->walk)
685 			return 0;
686 
687 		/* locate the wss side if it's still around */
688 
689 		partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
690 		if (!partner_pss)
691 			return -1;
692 
693 		do {
694 			ip = (uint8_t *)pss->walk +
695 				lwsac_sizeof(pss->walk == partner_pss->ac) + LWS_PRE;
696 			m = (unsigned int)((ip[0] << 8) | ip[1]);
697 
698 			/* coverity */
699 			if (m > lwsac_get_tail_pos(pss->walk) -
700 				lwsac_sizeof(pss->walk == partner_pss->ac))
701 				return -1;
702 
703 			if (lws_ptr_diff_size_t(end, p) < m)
704 				break;
705 
706 			memcpy(p, ip + 2, m);
707 			p += m;
708 
709 			pss->walk = lwsac_get_next(pss->walk);
710 		} while (pss->walk);
711 
712 		if (!lws_ptr_diff_size_t(p, start)) {
713 			lwsl_err("%s: stuck\n", __func__);
714 			return -1;
715 		}
716 
717 		wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
718 
719 		if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
720 			      (enum lws_write_protocol)wm) < 0)
721 			return 1;
722 
723 		if (!pss->walk) {
724 			lwsl_info("%s: whole msg proxied to scraper\n", __func__);
725 			lws_dll2_remove(&pss->list);
726 			lwsac_free(&partner_pss->ac);
727 //			if (lws_http_transaction_completed(wsi))
728 			return -1;
729 		} else
730 			lws_callback_on_writable(wsi);
731 
732 		return 0;
733 
734 	default:
735 		break;
736 	}
737 
738 	return lws_callback_http_dummy(wsi, reason, user, in, len);
739 }
740 
741 /* 3) "lws-openmetrics-prox-server": ws server side of metrics proxy, for
742  *    ws clients to connect to */
743 
744 static int
callback_lws_openmetrics_prox_server(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)745 callback_lws_openmetrics_prox_server(struct lws *wsi,
746 				     enum lws_callback_reasons reason,
747 				     void *user, void *in, size_t len)
748 {
749 	unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
750 		      *end = buf + sizeof(buf) - 1;
751 	struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
752 				lws_get_vhost(wsi), lws_get_protocol(wsi));
753 	struct lws_context *cx = lws_get_context(wsi);
754 	struct pss *pss = (struct pss *)user, *partner_pss;
755 
756 	switch (reason) {
757 
758 	case LWS_CALLBACK_PROTOCOL_INIT:
759 		/*
760 		 * We get told what to do when we are bound to the vhost
761 		 */
762 
763 		lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
764 
765 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
766 				lws_get_protocol(wsi), sizeof(struct vhd));
767 		if (!vhd) {
768 			lwsl_err("%s: vhd alloc failed\n", __func__);
769 			return 0;
770 		}
771 
772 		vhd->cx = cx;
773 
774 		/*
775 		 * Try to bind to the counterpart server in the proxy, binding
776 		 * to the right one by having a common bind name set in a pvo.
777 		 * We don't know who will get instantiated last, so both parts
778 		 * try to bind if not already bound
779 		 */
780 
781 		if (!lws_pvo_get_str(in, "proxy-side-bind-name",
782 				     &vhd->proxy_side_bind_name)) {
783 			/*
784 			 * Attempt to find the vhd that belongs to a vhost
785 			 * that has instantiated protocol
786 			 * "lws-openmetrics-prox-server", and has set pvo
787 			 * "proxy-side-bind-name" on it to whatever our
788 			 * vhd->proxy_side_bind_name was also set to.
789 			 *
790 			 * If found, inform the two sides of the same proxy
791 			 * what their partner vhd is
792 			 */
793 			lws_strncpy(vhd->sanity, "isws", sizeof(vhd->sanity));
794 			vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
795 						"lws-openmetrics-prox-agg",
796 						"proxy-side-bind-name",
797 						vhd->proxy_side_bind_name);
798 			if (vhd->bind_partner_vhd) {
799 				assert(!strcmp(vhd->bind_partner_vhd->sanity, "isagg"));
800 				lwsl_notice("%s: proxy binding OK\n", __func__);
801 				vhd->bind_partner_vhd->bind_partner_vhd = vhd;
802 			}
803 		} else {
804 			lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
805 			return 0;
806 		}
807 
808 		break;
809 
810 	case LWS_CALLBACK_PROTOCOL_DESTROY:
811 		break;
812 
813 	case LWS_CALLBACK_ESTABLISHED:
814 		/*
815 		 * a client has joined... we need to add his pss to our list
816 		 * of live, joined clients
817 		 */
818 
819 		/* mark us as waiting for the reference name from the client */
820 		pss->greet = 1;
821 		pss->wsi = wsi;
822 		lws_validity_confirmed(wsi);
823 
824 		return 0;
825 
826 	case LWS_CALLBACK_CLOSED:
827 		/*
828 		 * a client has parted
829 		 */
830 		lws_dll2_remove(&pss->list);
831 		lwsl_warn("%s: client %s left (%u)\n", __func__,
832 				pss->proxy_path,
833 				(unsigned int)vhd->clients.count);
834 		lwsac_free(&pss->ac);
835 
836 		/* let's kill the scraper connection accordingly, if still up */
837 		partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
838 		if (partner_pss)
839 			lws_wsi_close(partner_pss->wsi, LWS_TO_KILL_ASYNC);
840 		break;
841 
842 	case LWS_CALLBACK_RECEIVE:
843 		if (pss->greet) {
844 			pss->greet = 0;
845 			lws_strnncpy(pss->proxy_path, (const char *)in, len,
846 				     sizeof(pss->proxy_path));
847 
848 			lws_validity_confirmed(wsi);
849 			lwsl_notice("%s: received greet '%s'\n", __func__,
850 				    pss->proxy_path);
851 			/*
852 			 * we need to add his pss to our list of configured,
853 			 * live, joined clients
854 			 */
855 			lws_dll2_add_tail(&pss->list, &vhd->clients);
856 			return 0;
857 		}
858 
859 		/*
860 		 * He's sending us his results... let's collect chunks into the
861 		 * pss lwsac before worrying about anything else
862 		 */
863 
864 		if (lws_is_first_fragment(wsi))
865 			pss->tot = 0;
866 
867 		lws_metrics_om_ac_stash(pss, (const char *)in, len);
868 
869 		if (lws_is_final_fragment(wsi)) {
870 			struct pss *partner_pss;
871 
872 			lwsl_info("%s: ws side received complete msg\n",
873 					__func__);
874 
875 			/* the lwsac is complete */
876 			pss->walk = pss->ac;
877 			partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
878 			if (!partner_pss) {
879 				lwsl_notice("%s: no partner A\n", __func__);
880 				return -1;
881 			}
882 
883 			/* indicate to scraper side we want to issue now */
884 
885 			p = start;
886 			if (lws_add_http_common_headers(partner_pss->wsi, HTTP_STATUS_OK,
887 							"application/openmetrics-text; "
888 							"version=1.0.0; charset=utf-8",
889 							pss->tot, &p, end) ||
890 			    lws_finalize_write_http_header(partner_pss->wsi,
891 							    start, &p, end))
892 				return -1;
893 
894 			/* indicate to scraper side we want to issue now */
895 
896 			partner_pss->walk = pss->ac;
897 			partner_pss->trigger = 1;
898 			lws_callback_on_writable(partner_pss->wsi);
899 		}
900 
901 		return 0;
902 
903 	case LWS_CALLBACK_SERVER_WRITEABLE:
904 		if (!pss->trigger)
905 			return 0;
906 
907 		pss->trigger = 0;
908 
909 		partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
910 		if (!partner_pss) {
911 			lwsl_err("%s: no partner\n", __func__);
912 			return 0;
913 		}
914 
915 		lwsl_info("%s: sending trigger to client\n", __func__);
916 
917 		*start = 'x';
918 		if (lws_write(wsi, start, 1,
919 			      (enum lws_write_protocol)LWS_WRITE_TEXT) < 0)
920 			return 1;
921 
922 		lws_validity_confirmed(wsi);
923 
924 		return 0;
925 
926 	default:
927 		break;
928 	}
929 
930 	return lws_callback_http_dummy(wsi, reason, user, in, len);
931 }
932 #endif
933 
934 #if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
935 
936 /* 4) ws client that keeps wss connection up to metrics proxy ws server */
937 
938 static int
callback_lws_openmetrics_prox_client(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)939 callback_lws_openmetrics_prox_client(struct lws *wsi,
940 				     enum lws_callback_reasons reason,
941 				     void *user, void *in, size_t len)
942 {
943 	unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
944 		      *end = buf + sizeof(buf) - 1, *ip;
945 	struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
946 				lws_get_vhost(wsi), lws_get_protocol(wsi));
947 	struct lws_context *cx = lws_get_context(wsi);
948 	struct pss *pss = (struct pss *)user;
949 	unsigned int m, wm;
950 	const char *cp;
951 	char first;
952 
953 	switch (reason) {
954 
955 	case LWS_CALLBACK_PROTOCOL_INIT:
956 
957 		lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__,
958 					lws_vh_tag(lws_get_vhost(wsi)));
959 
960 
961 		/*
962 		 * We get told what to do when we are bound to the vhost
963 		 */
964 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
965 				lws_get_protocol(wsi), sizeof(struct vhd));
966 		if (!vhd)
967 			return 0;
968 
969 		vhd->cx = cx;
970 		vhd->vhost = lws_get_vhost(wsi);
971 
972 		/* the proxy server uri */
973 
974 		if (lws_pvo_get_str(in, "ws-server-uri", &cp)) {
975 			lwsl_warn("%s: ws-server-uri pvo required\n", __func__);
976 
977 			return 0;
978 		}
979 		lws_strncpy(vhd->ws_server_uri, cp, sizeof(vhd->ws_server_uri));
980 
981 		/* how we should be referenced at the proxy */
982 
983 		if (lws_pvo_get_str(in, "metrics-proxy-path", &cp)) {
984 			lwsl_err("%s: metrics-proxy-path pvo required\n", __func__);
985 
986 			return 1;
987 		}
988 		lws_strncpy(vhd->metrics_proxy_path, cp, sizeof(vhd->metrics_proxy_path));
989 
990 		/* the shared secret to authenticate us as allowed to join */
991 
992 		if (lws_pvo_get_str(in, "ba-secret", &cp)) {
993 			lwsl_err("%s: ba-secret pvo required\n", __func__);
994 
995 			return 1;
996 		}
997 		lws_strncpy(vhd->ba_secret, cp, sizeof(vhd->ba_secret));
998 
999 		lwsl_notice("%s: scheduling connect %s %s %s\n", __func__,
1000 				vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);
1001 
1002 		lws_validity_confirmed(wsi);
1003 		lws_sul_schedule(cx, 0, &vhd->sul, omc_connect_client, 1);
1004 		break;
1005 
1006 	case LWS_CALLBACK_PROTOCOL_DESTROY:
1007 		if (vhd)
1008 			lws_sul_cancel(&vhd->sul);
1009 		break;
1010 
1011 	case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
1012 	{
1013 		unsigned char **pp = (unsigned char **)in, *pend = (*pp) + len;
1014 		char b[128];
1015 
1016 		/* authorize ourselves to the metrics proxy using basic auth */
1017 
1018 		if (lws_http_basic_auth_gen("metricsclient", vhd->ba_secret,
1019 					    b, sizeof(b)))
1020 			break;
1021 
1022 		if (lws_add_http_header_by_token(wsi,
1023 						 WSI_TOKEN_HTTP_AUTHORIZATION,
1024 						 (unsigned char *)b,
1025 						 (int)strlen(b), pp, pend))
1026 			return -1;
1027 
1028 		break;
1029 	}
1030 
1031 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
1032 		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
1033 			 in ? (char *)in : "(null)");
1034 		goto do_retry;
1035 
1036 	case LWS_CALLBACK_CLIENT_ESTABLISHED:
1037 		lwsl_warn("%s: connected to ws metrics agg server\n", __func__);
1038 		pss->greet = 1;
1039 		lws_callback_on_writable(wsi);
1040 		lws_validity_confirmed(wsi);
1041 		return 0;
1042 
1043 	case LWS_CALLBACK_CLIENT_CLOSED:
1044 		lwsl_notice("%s: client closed\n", __func__);
1045 		lwsac_free(&pss->ac);
1046 		goto do_retry;
1047 
1048 	case LWS_CALLBACK_CLIENT_RECEIVE:
1049 		/*
1050 		 * Proxy serverside sends us something to trigger us to create
1051 		 * our metrics message and send it back over the ws link
1052 		 */
1053 		ome_prepare(cx, pss);
1054 		pss->walk = pss->ac;
1055 		lws_callback_on_writable(wsi);
1056 		lwsl_info("%s: dump requested\n", __func__);
1057 		break;
1058 
1059 	case LWS_CALLBACK_CLIENT_WRITEABLE:
1060 		if (pss->greet) {
1061 			/*
1062 			 * At first after establishing the we link, we send a
1063 			 * message indicating to the metrics proxy how we
1064 			 * should be referred to by the scraper to particularly
1065 			 * select to talk to us
1066 			 */
1067 			lwsl_info("%s: sending greet '%s'\n", __func__,
1068 					vhd->metrics_proxy_path);
1069 			lws_strncpy((char *)start, vhd->metrics_proxy_path,
1070 					sizeof(buf) - LWS_PRE);
1071 			if (lws_write(wsi, start,
1072 				      strlen(vhd->metrics_proxy_path),
1073 				      LWS_WRITE_TEXT) < 0)
1074 				return 1;
1075 
1076 			lws_validity_confirmed(wsi);
1077 
1078 			pss->greet = 0;
1079 			return 0;
1080 		}
1081 
1082 		if (!pss->walk)
1083 			return 0;
1084 
1085 		/*
1086 		 * We send the metrics dump in a single logical ws message,
1087 		 * using ws fragmentation to split it around 1 mtu boundary
1088 		 * and keep coming back until it's finished
1089 		 */
1090 
1091 		first = pss->walk == pss->ac;
1092 
1093 		do {
1094 			ip = (uint8_t *)pss->walk +
1095 				lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
1096 			m = (unsigned int)((ip[0] << 8) | ip[1]);
1097 
1098 			/* coverity */
1099 			if (m > lwsac_get_tail_pos(pss->walk) -
1100 				lwsac_sizeof(pss->walk == pss->ac)) {
1101 				lwsl_err("%s: size blow\n", __func__);
1102 				return -1;
1103 			}
1104 
1105 			if (lws_ptr_diff_size_t(end, p) < m)
1106 				break;
1107 
1108 			memcpy(p, ip + 2, m);
1109 			p += m;
1110 
1111 			pss->walk = lwsac_get_next(pss->walk);
1112 		} while (pss->walk);
1113 
1114 		if (!lws_ptr_diff_size_t(p, start)) {
1115 			lwsl_err("%s: stuck\n", __func__);
1116 			return -1;
1117 		}
1118 
1119 		wm = (unsigned int)lws_write_ws_flags(LWS_WRITE_TEXT, first,
1120 						      !pss->walk);
1121 
1122 		if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
1123 			      (enum lws_write_protocol)wm) < 0) {
1124 			lwsl_notice("%s: write fail\n", __func__);
1125 			return 1;
1126 		}
1127 
1128 		lws_validity_confirmed(wsi);
1129 		lwsl_info("%s: forwarded %d\n", __func__, lws_ptr_diff(p, start));
1130 
1131 		if (!pss->walk) {
1132 			lwsl_info("%s: dump send completed\n", __func__);
1133 			lwsac_free(&pss->ac);
1134 		} else
1135 			lws_callback_on_writable(wsi);
1136 
1137 		return 0;
1138 
1139 	default:
1140 		break;
1141 	}
1142 
1143 	return lws_callback_http_dummy(wsi, reason, user, in, len);
1144 
1145 do_retry:
1146 	if (!lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
1147 				    omc_connect_client, &vhd->retry_count))
1148 		return 0;
1149 
1150 	vhd->retry_count = 0;
1151 	lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
1152 			       omc_connect_client, &vhd->retry_count);
1153 
1154 	return 0;
1155 }
1156 #endif
1157 
1158 
1159 LWS_VISIBLE const struct lws_protocols lws_openmetrics_export_protocols[] = {
1160 #if defined(LWS_WITH_SERVER)
1161 	{ /* for scraper directly: http export on listen socket */
1162 		"lws-openmetrics",
1163 		callback_lws_openmetrics_export,
1164 		sizeof(struct pss),
1165 		1024, 0, NULL, 0
1166 	},
1167 	{ /* for scraper via ws proxy: http export on listen socket */
1168 		"lws-openmetrics-prox-agg",
1169 		callback_lws_openmetrics_prox_agg,
1170 		sizeof(struct pss),
1171 		1024, 0, NULL, 0
1172 	},
1173 	{ /* metrics proxy server side: ws server for clients to connect to */
1174 		"lws-openmetrics-prox-server",
1175 		callback_lws_openmetrics_prox_server,
1176 		sizeof(struct pss),
1177 		1024, 0, NULL, 0
1178 	},
1179 #endif
1180 #if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
1181 	{ /* client to metrics proxy: ws client to connect to metrics proxy*/
1182 		"lws-openmetrics-prox-client",
1183 		callback_lws_openmetrics_prox_client,
1184 		sizeof(struct pss),
1185 		1024, 0, NULL, 0
1186 	},
1187 #endif
1188 };
1189 
1190 LWS_VISIBLE const lws_plugin_protocol_t lws_openmetrics_export = {
1191 	.hdr = {
1192 		"lws OpenMetrics export",
1193 		"lws_protocol_plugin",
1194 		LWS_BUILD_HASH,
1195 		LWS_PLUGIN_API_MAGIC
1196 	},
1197 
1198 	.protocols = lws_openmetrics_export_protocols,
1199 	.count_protocols = LWS_ARRAY_SIZE(lws_openmetrics_export_protocols),
1200 };
1201