• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2019 - 2020 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #include <private-lib-core.h>
26 
27 static const struct ss_pcols *ss_pcols[] = {
28 #if defined(LWS_ROLE_H1)
29 	&ss_pcol_h1,		/* LWSSSP_H1 */
30 #else
31 	NULL,
32 #endif
33 #if defined(LWS_ROLE_H2)
34 	&ss_pcol_h2,		/* LWSSSP_H2 */
35 #else
36 	NULL,
37 #endif
38 #if defined(LWS_ROLE_WS)
39 	&ss_pcol_ws,		/* LWSSSP_WS */
40 #else
41 	NULL,
42 #endif
43 #if defined(LWS_ROLE_MQTT)
44 	&ss_pcol_mqtt,		/* LWSSSP_MQTT */
45 #else
46 	NULL,
47 #endif
48 };
49 
50 static const char *state_names[] = {
51 	"LWSSSCS_CREATING",
52 	"LWSSSCS_DISCONNECTED",
53 	"LWSSSCS_UNREACHABLE",
54 	"LWSSSCS_AUTH_FAILED",
55 	"LWSSSCS_CONNECTED",
56 	"LWSSSCS_CONNECTING",
57 	"LWSSSCS_DESTROYING",
58 	"LWSSSCS_POLL",
59 	"LWSSSCS_ALL_RETRIES_FAILED",
60 	"LWSSSCS_QOS_ACK_REMOTE",
61 	"LWSSSCS_QOS_NACK_REMOTE",
62 	"LWSSSCS_QOS_ACK_LOCAL",
63 	"LWSSSCS_QOS_NACK_LOCAL",
64 };
65 
66 const char *
lws_ss_state_name(int state)67 lws_ss_state_name(int state)
68 {
69 	if (state >= (int)LWS_ARRAY_SIZE(state_names))
70 		return "unknown";
71 
72 	return state_names[state];
73 }
74 
75 int
lws_ss_event_helper(lws_ss_handle_t * h,lws_ss_constate_t cs)76 lws_ss_event_helper(lws_ss_handle_t *h, lws_ss_constate_t cs)
77 {
78 	if (!h)
79 		return 0;
80 
81 #if defined(LWS_WITH_SEQUENCER)
82 	/*
83 	 * A parent sequencer for the ss is optional, if we have one, keep it
84 	 * informed of state changes on the ss connection
85 	 */
86 	if (h->seq && cs != LWSSSCS_DESTROYING)
87 		lws_seq_queue_event(h->seq, LWSSEQ_SS_STATE_BASE + cs,
88 				    (void *)h, NULL);
89 #endif
90 
91 	if (h->h_sink &&h->h_sink->info.state(h->sink_obj, h->h_sink, cs, 0))
92 		return 1;
93 
94 	return h->info.state(ss_to_userobj(h), NULL, cs, 0);
95 }
96 
97 static void
lws_ss_timeout_sul_check_cb(lws_sorted_usec_list_t * sul)98 lws_ss_timeout_sul_check_cb(lws_sorted_usec_list_t *sul)
99 {
100 	lws_ss_handle_t *h = lws_container_of(sul, lws_ss_handle_t, sul);
101 
102 	lwsl_err("%s: retrying ss h %p after backoff\n", __func__, h);
103 	/* we want to retry... */
104 	h->seqstate = SSSEQ_DO_RETRY;
105 
106 	lws_ss_request_tx(h);
107 }
108 
109 int
lws_ss_exp_cb_metadata(void * priv,const char * name,char * out,size_t * pos,size_t olen,size_t * exp_ofs)110 lws_ss_exp_cb_metadata(void *priv, const char *name, char *out, size_t *pos,
111 			size_t olen, size_t *exp_ofs)
112 {
113 	lws_ss_handle_t *h = (lws_ss_handle_t *)priv;
114 	const char *replace = NULL;
115 	size_t total, budget;
116 	lws_ss_metadata_t *md = lws_ss_policy_metadata(h->policy, name);
117 
118 	if (!md) {
119 		lwsl_err("%s: Unknown metadata %s\n", __func__, name);
120 
121 		return LSTRX_FATAL_NAME_UNKNOWN;
122 	}
123 
124 	lwsl_info("%s %s %d\n", __func__, name, (int)md->length);
125 
126 	replace = h->metadata[md->length].value;
127 	total = h->metadata[md->length].length;
128 	// lwsl_hexdump_err(replace, total);
129 
130 	budget = olen - *pos;
131 	total -= *exp_ofs;
132 	if (total < budget)
133 		budget = total;
134 
135 	memcpy(out + *pos, replace + (*exp_ofs), budget);
136 	*exp_ofs += budget;
137 	*pos += budget;
138 
139 	if (budget == total)
140 		return LSTRX_DONE;
141 
142 	return LSTRX_FILLED_OUT;
143 }
144 
145 int
lws_ss_set_timeout_us(lws_ss_handle_t * h,lws_usec_t us)146 lws_ss_set_timeout_us(lws_ss_handle_t *h, lws_usec_t us)
147 {
148 	struct lws_context_per_thread *pt = &h->context->pt[h->tsi];
149 
150 	h->sul.cb = lws_ss_timeout_sul_check_cb;
151 	__lws_sul_insert(&pt->pt_sul_owner, &h->sul, us);
152 
153 	return 0;
154 }
155 
156 int
lws_ss_backoff(lws_ss_handle_t * h)157 lws_ss_backoff(lws_ss_handle_t *h)
158 {
159 	uint64_t ms;
160 	char conceal;
161 
162 	if (h->seqstate == SSSEQ_RECONNECT_WAIT)
163 		return 0;
164 
165 	/* figure out what we should do about another retry */
166 
167 	lwsl_info("%s: ss %p: retry backoff after failure\n", __func__, h);
168 	ms = lws_retry_get_delay_ms(h->context, h->policy->retry_bo,
169 				    &h->retry, &conceal);
170 	if (!conceal) {
171 		lwsl_info("%s: ss %p: abandon conn attempt \n",__func__, h);
172 		h->seqstate = SSSEQ_IDLE;
173 		lws_ss_event_helper(h, LWSSSCS_ALL_RETRIES_FAILED);
174 		return 1;
175 	}
176 
177 	h->seqstate = SSSEQ_RECONNECT_WAIT;
178 	lws_ss_set_timeout_us(h, ms * LWS_US_PER_MS);
179 
180 	lwsl_info("%s: ss %p: retry wait %"PRIu64"ms\n", __func__, h, ms);
181 
182 	return 0;
183 }
184 
185 int
lws_ss_client_connect(lws_ss_handle_t * h)186 lws_ss_client_connect(lws_ss_handle_t *h)
187 {
188 	struct lws_client_connect_info i;
189 	const struct ss_pcols *ssp;
190 	size_t used_in, used_out;
191 	union lws_ss_contemp ct;
192 	char path[128], ep[96];
193 	lws_strexp_t exp;
194 
195 	if (!h->policy) {
196 		lwsl_err("%s: ss with no policy\n", __func__);
197 
198 		return -1;
199 	}
200 
201 	/*
202 	 * We are already bound to a sink?
203 	 */
204 
205 	if (h->h_sink)
206 		return 0;
207 
208 	memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
209 	i.context = h->context;
210 
211 	if (h->policy->flags & LWSSSPOLF_TLS) {
212 		lwsl_info("%s: using tls\n", __func__);
213 		i.ssl_connection = LCCSCF_USE_SSL;
214 
215 		if (!h->policy->trust_store)
216 			lwsl_info("%s: using platform trust store\n", __func__);
217 		else {
218 
219 			i.vhost = lws_get_vhost_by_name(h->context,
220 							h->policy->trust_store->name);
221 			if (!i.vhost) {
222 				lwsl_err("%s: missing vh for policy ca\n", __func__);
223 
224 				return -1;
225 			}
226 		}
227 	}
228 
229 	/* expand metadata ${symbols} that may be inside the endpoint string */
230 
231 	lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, ep, sizeof(ep));
232 
233 	if (lws_strexp_expand(&exp, h->policy->endpoint,
234 			      strlen(h->policy->endpoint),
235 			      &used_in, &used_out) != LSTRX_DONE) {
236 		lwsl_err("%s: address strexp failed\n", __func__);
237 
238 		return -1;
239 	}
240 
241 	i.address = ep;
242 	i.port = h->policy->port;
243 	i.host = i.address;
244 	i.origin = i.address;
245 	i.opaque_user_data = h;
246 	i.seq = h->seq;
247 	i.retry_and_idle_policy = h->policy->retry_bo;
248 	i.sys_tls_client_cert = h->policy->client_cert;
249 
250 	i.path = "";
251 
252 	ssp = ss_pcols[(int)h->policy->protocol];
253 	if (!ssp) {
254 		lwsl_err("%s: unsupported protocol\n", __func__);
255 
256 		return -1;
257 	}
258 	i.alpn = ssp->alpn;
259 
260 	/*
261 	 * For http, we can get the method from the http object, override in
262 	 * the protocol-specific munge callback below if not http
263 	 */
264 	i.method = h->policy->u.http.method;
265 	i.protocol = ssp->protocol_name; /* lws protocol name */
266 	i.local_protocol_name = i.protocol;
267 
268 	ssp->munge(h, path, sizeof(path), &i, &ct);
269 
270 	i.pwsi = &h->wsi;
271 
272 	if (h->policy->plugins[0] && h->policy->plugins[0]->munge)
273 		h->policy->plugins[0]->munge(h, path, sizeof(path));
274 
275 	lwsl_info("%s: connecting %s, '%s' '%s' %s\n", __func__, i.method,
276 			i.alpn, i.address, i.path);
277 
278 	h->txn_ok = 0;
279 	if (lws_ss_event_helper(h, LWSSSCS_CONNECTING))
280 		return -1;
281 
282 	if (!lws_client_connect_via_info(&i)) {
283 		lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
284 		lws_ss_backoff(h);
285 
286 		return 1;
287 	}
288 
289 	return 0;
290 }
291 
292 
293 /*
294  * Public API
295  */
296 
297 /*
298  * Create either a stream or a sink
299  */
300 
301 int
lws_ss_create(struct lws_context * context,int tsi,const lws_ss_info_t * ssi,void * opaque_user_data,lws_ss_handle_t ** ppss,struct lws_sequencer * seq_owner,const char ** ppayload_fmt)302 lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
303 	      void *opaque_user_data, lws_ss_handle_t **ppss,
304 	      struct lws_sequencer *seq_owner, const char **ppayload_fmt)
305 {
306 	struct lws_context_per_thread *pt = &context->pt[tsi];
307 	const lws_ss_policy_t *pol;
308 	lws_ss_metadata_t *smd;
309 	lws_ss_handle_t *h;
310 	size_t size;
311 	void **v;
312 	char *p;
313 	int n;
314 
315 	pol = lws_ss_policy_lookup(context, ssi->streamtype);
316 	if (!pol) {
317 		lwsl_info("%s: unknown stream type %s\n", __func__,
318 			  ssi->streamtype);
319 		return 1;
320 	}
321 
322 	if (ssi->register_sink) {
323 		/*
324 		 * This can register a secure streams sink as well as normal
325 		 * secure streams connections.  If that's what's happening,
326 		 * confirm the policy agrees that this streamtype should be
327 		 * directed to a sink.
328 		 */
329 		if (!(pol->flags & LWSSSPOLF_LOCAL_SINK)) {
330 			/*
331 			 * Caller wanted to create a sink for this streamtype,
332 			 * but the policy does not agree the streamtype should
333 			 * be routed to a local sink.
334 			 */
335 			lwsl_err("%s: %s policy does not allow local sink\n",
336 				 __func__, ssi->streamtype);
337 
338 			return 1;
339 		}
340 	} else {
341 
342 		if (!(pol->flags & LWSSSPOLF_LOCAL_SINK)) {
343 
344 		}
345 //		lws_dll2_foreach_safe(&pt->ss_owner, NULL, lws_ss_destroy_dll);
346 	}
347 
348 	/*
349 	 * We overallocate and point to things in the overallocation...
350 	 *
351 	 * 1) the user_alloc from the stream info
352 	 * 2) network auth plugin instantiation data
353 	 * 3) stream auth plugin instantiation data
354 	 * 4) as many metadata pointer structs as the policy tells
355 	 * 5) the streamtype name (length is not aligned)
356 	 *
357 	 * ... when we come to destroy it, just one free to do.
358 	 */
359 
360 	size = sizeof(*h) + ssi->user_alloc + strlen(ssi->streamtype) + 1;
361 	if (pol->plugins[0])
362 		size += pol->plugins[0]->alloc;
363 	if (pol->plugins[1])
364 		size += pol->plugins[1]->alloc;
365 	size += pol->metadata_count * sizeof(lws_ss_metadata_t);
366 
367 	h = lws_zalloc(size, __func__);
368 	if (!h)
369 		return 2;
370 
371 	h->info = *ssi;
372 	h->policy = pol;
373 	h->context = context;
374 	h->tsi = tsi;
375 	h->seq = seq_owner;
376 
377 	/* start of overallocated area */
378 	p = (char *)&h[1];
379 
380 	/* set the handle pointer in the user data struct */
381 	v = (void **)(p + ssi->handle_offset);
382 	*v = h;
383 
384 	/* set the opaque user data in the user data struct */
385 	v = (void **)(p + ssi->opaque_user_data_offset);
386 	*v = opaque_user_data;
387 
388 	p += ssi->user_alloc;
389 
390 	if (pol->plugins[0]) {
391 		h->nauthi = p;
392 		p += pol->plugins[0]->alloc;
393 	}
394 	if (pol->plugins[1]) {
395 		h->sauthi = p;
396 		p += pol->plugins[1]->alloc;
397 	}
398 
399 	if (pol->metadata_count) {
400 		h->metadata = (lws_ss_metadata_t *)p;
401 		p += pol->metadata_count * sizeof(lws_ss_metadata_t);
402 
403 		lwsl_info("%s: %s metadata count %d\n", __func__,
404 			  pol->streamtype, pol->metadata_count);
405 	}
406 
407 	smd = pol->metadata;
408 	for (n = 0; n < pol->metadata_count; n++) {
409 		h->metadata[n].name = smd->name;
410 		if (n + 1 == pol->metadata_count)
411 			h->metadata[n].next = NULL;
412 		else
413 			h->metadata[n].next = &h->metadata[n + 1];
414 		smd = smd->next;
415 	}
416 
417 	memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
418 	h->info.streamtype = p;
419 
420 	lws_pt_lock(pt, __func__);
421 	lws_dll2_add_head(&h->list, &pt->ss_owner);
422 	lws_pt_unlock(pt);
423 
424 	if (ppss)
425 		*ppss = h;
426 
427 	if (ppayload_fmt)
428 		*ppayload_fmt = pol->payload_fmt;
429 
430 	if (ssi->register_sink) {
431 		/*
432 		 *
433 		 */
434 	}
435 
436 	lws_ss_event_helper(h, LWSSSCS_CREATING);
437 
438 	if (!ssi->register_sink && (h->policy->flags & LWSSSPOLF_NAILED_UP))
439 		if (lws_ss_client_connect(h))
440 			lws_ss_backoff(h);
441 
442 	return 0;
443 }
444 
445 void
lws_ss_destroy(lws_ss_handle_t ** ppss)446 lws_ss_destroy(lws_ss_handle_t **ppss)
447 {
448 	struct lws_context_per_thread *pt;
449 	lws_ss_handle_t *h = *ppss;
450 	lws_ss_metadata_t *pmd;
451 
452 	if (!h)
453 		return;
454 
455 	if (h->wsi) {
456 		/*
457 		 * Don't let the wsi point to us any more,
458 		 * we (the ss object bound to the wsi) are going away now
459 		 */
460 //		lws_set_opaque_user_data(h->wsi, NULL);
461 		lws_set_timeout(h->wsi, 1, LWS_TO_KILL_SYNC);
462 	}
463 
464 	pt = &h->context->pt[h->tsi];
465 
466 	lws_pt_lock(pt, __func__);
467 	*ppss = NULL;
468 	lws_dll2_remove(&h->list);
469 	lws_dll2_remove(&h->to_list);
470 	lws_ss_event_helper(h, LWSSSCS_DESTROYING);
471 	lws_pt_unlock(pt);
472 
473 	/* in proxy case, metadata value on heap may need cleaning up */
474 
475 	pmd = h->metadata;
476 	while (pmd) {
477 		lwsl_info("%s: pmd %p\n", __func__, pmd);
478 		if (pmd->value_on_lws_heap)
479 			lws_free_set_NULL(pmd->value);
480 		pmd = pmd->next;
481 	}
482 
483 	lws_sul_schedule(h->context, 0, &h->sul, NULL, LWS_SET_TIMER_USEC_CANCEL);
484 
485 	lws_free_set_NULL(h);
486 }
487 
488 void
lws_ss_request_tx(lws_ss_handle_t * h)489 lws_ss_request_tx(lws_ss_handle_t *h)
490 {
491 	lwsl_info("%s: wsi %p\n", __func__, h->wsi);
492 
493 	if (h->wsi) {
494 		lws_callback_on_writable(h->wsi);
495 
496 		return;
497 	}
498 
499 	if (h->seqstate != SSSEQ_IDLE &&
500 	    h->seqstate != SSSEQ_DO_RETRY)
501 		return;
502 
503 	h->seqstate = SSSEQ_TRY_CONNECT;
504 	lws_ss_event_helper(h, LWSSSCS_POLL);
505 
506 	if (lws_ss_client_connect(h))
507 		lws_ss_backoff(h);
508 }
509 
510 void
lws_ss_request_tx_len(lws_ss_handle_t * h,unsigned long len)511 lws_ss_request_tx_len(lws_ss_handle_t *h, unsigned long len)
512 {
513 	if (h->wsi)
514 		h->wsi->http.writeable_len = len;
515 	else
516 		h->writeable_len = len;
517 	lws_ss_request_tx(h);
518 }
519 
520 /*
521  * private helpers
522  */
523 
524 /* used on context destroy when iterating listed lws_ss on a pt */
525 
526 int
lws_ss_destroy_dll(struct lws_dll2 * d,void * user)527 lws_ss_destroy_dll(struct lws_dll2 *d, void *user)
528 {
529 	lws_ss_handle_t *h = lws_container_of(d, lws_ss_handle_t, list);
530 
531 	lws_ss_destroy(&h);
532 
533 	return 0;
534 }
535 
536 struct lws_sequencer *
lws_ss_get_sequencer(lws_ss_handle_t * h)537 lws_ss_get_sequencer(lws_ss_handle_t *h)
538 {
539 	return h->seq;
540 }
541 
542 struct lws_context *
lws_ss_get_context(struct lws_ss_handle * h)543 lws_ss_get_context(struct lws_ss_handle *h)
544 {
545 	return h->context;
546 }
547 
548 const char *
lws_ss_rideshare(struct lws_ss_handle * h)549 lws_ss_rideshare(struct lws_ss_handle *h)
550 {
551 	if (!h->rideshare)
552 		return h->policy->streamtype;
553 
554 	return h->rideshare->streamtype;
555 }
556 
557 int
lws_ss_add_peer_tx_credit(struct lws_ss_handle * h,int32_t bump)558 lws_ss_add_peer_tx_credit(struct lws_ss_handle *h, int32_t bump)
559 {
560 	const struct ss_pcols *ssp;
561 
562 	ssp = ss_pcols[(int)h->policy->protocol];
563 
564 	if (h->wsi && ssp && ssp->tx_cr_add)
565 		return ssp->tx_cr_add(h, bump);
566 
567 	return 0;
568 }
569 
570 int
lws_ss_get_est_peer_tx_credit(struct lws_ss_handle * h)571 lws_ss_get_est_peer_tx_credit(struct lws_ss_handle *h)
572 {
573 	const struct ss_pcols *ssp;
574 
575 	ssp = ss_pcols[(int)h->policy->protocol];
576 
577 	if (h->wsi && ssp && ssp->tx_cr_add)
578 		return ssp->tx_cr_est(h);
579 
580 	return 0;
581 }
582