• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws-minimal-secure-streams-client
3  *
4  * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  *
10  * This client does not perform any INET networking... instead it opens a unix
11  * domain socket on a proxy that is listening for it, and that creates the
12  * actual secure stream connection.
13  *
14  * We are able to use the usual secure streams api in the client process, with
15  * payloads and connection state information proxied over the unix domain
16  * socket and fulfilled in the proxy process.
17  *
18  * The public client helper pieces are built as part of lws
19  */
20 #include <private-lib-core.h>
21 
22 extern const uint32_t ss_state_txn_validity[17];
23 
24 int
lws_ss_check_next_state_sspc(lws_sspc_handle_t * ss,uint8_t * prevstate,lws_ss_constate_t cs)25 lws_ss_check_next_state_sspc(lws_sspc_handle_t *ss, uint8_t *prevstate,
26 			     lws_ss_constate_t cs)
27 {
28 	if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED)
29 		/*
30 		 * we can't judge user or transient states, leave the old state
31 		 * and just wave them through
32 		 */
33 		return 0;
34 
35 	if (cs >= LWS_ARRAY_SIZE(ss_state_txn_validity)) {
36 		/* we don't recognize this state as usable */
37 		lwsl_sspc_err(ss, "bad new state %u", cs);
38 		assert(0);
39 		return 1;
40 	}
41 
42 	if (*prevstate >= LWS_ARRAY_SIZE(ss_state_txn_validity)) {
43 		/* existing state is broken */
44 		lwsl_sspc_err(ss, "bad existing state %u",
45 				(unsigned int)*prevstate);
46 		assert(0);
47 		return 1;
48 	}
49 
50 	if (ss_state_txn_validity[*prevstate] & (1u << cs)) {
51 
52 		lwsl_sspc_notice(ss, "%s -> %s",
53 			       lws_ss_state_name((int)*prevstate),
54 			       lws_ss_state_name((int)cs));
55 
56 		/* this is explicitly allowed, update old state to new */
57 		*prevstate = (uint8_t)cs;
58 
59 		return 0;
60 	}
61 
62 	lwsl_sspc_err(ss, "transition from %s -> %s is illegal",
63 		    lws_ss_state_name((int)*prevstate),
64 		    lws_ss_state_name((int)cs));
65 
66 	assert(0);
67 
68 	return 1;
69 }
70 
71 lws_ss_state_return_t
lws_sspc_event_helper(lws_sspc_handle_t * h,lws_ss_constate_t cs,lws_ss_tx_ordinal_t flags)72 lws_sspc_event_helper(lws_sspc_handle_t *h, lws_ss_constate_t cs,
73 		      lws_ss_tx_ordinal_t flags)
74 {
75 	lws_ss_state_return_t ret;
76 
77 	if (!h)
78 		return LWSSSSRET_OK;
79 
80 	if (lws_ss_check_next_state_sspc(h, &h->prev_ss_state, cs))
81 		return LWSSSSRET_DESTROY_ME;
82 
83 	if (!h->ssi.state)
84 		return LWSSSSRET_OK;
85 
86 	h->h_in_svc = h;
87 	ret = h->ssi.state((void *)((uint8_t *)(h + 1)), NULL, cs, flags);
88 	h->h_in_svc = NULL;
89 
90 	return ret;
91 }
92 
93 static void
lws_sspc_sul_retry_cb(lws_sorted_usec_list_t * sul)94 lws_sspc_sul_retry_cb(lws_sorted_usec_list_t *sul)
95 {
96 	lws_sspc_handle_t *h = lws_container_of(sul, lws_sspc_handle_t, sul_retry);
97 	static struct lws_client_connect_info i;
98 
99 	/*
100 	 * We may have started up before the system proxy, so be prepared with
101 	 * a sul to retry at 1Hz
102 	 */
103 
104 	memset(&i, 0, sizeof i);
105 	i.context = h->context;
106 	if (h->context->ss_proxy_port) { /* tcp */
107 		i.address = h->context->ss_proxy_address;
108 		i.port = h->context->ss_proxy_port;
109 		i.iface = h->context->ss_proxy_bind;
110 	} else {
111 		if (h->context->ss_proxy_bind)
112 			i.address = h->context->ss_proxy_bind;
113 		else
114 #if defined(__linux__)
115 			i.address = "+@proxy.ss.lws";
116 #else
117 			i.address = "+/tmp/proxy.ss.lws";
118 #endif
119 	}
120 	i.host = i.address;
121 	i.origin = i.address;
122 	i.method = "RAW";
123 	i.protocol = lws_sspc_protocols[0].name;
124 	i.local_protocol_name = lws_sspc_protocols[0].name;
125 	i.path = "";
126 	i.pwsi = &h->cwsi;
127 	i.opaque_user_data = (void *)h;
128 	i.ssl_connection = LCCSCF_SECSTREAM_PROXY_LINK;
129 
130 	lws_metrics_caliper_bind(h->cal_txn, h->context->mt_ss_cliprox_conn);
131 #if defined(LWS_WITH_SYS_METRICS)
132 	lws_metrics_tag_add(&h->cal_txn.mtags_owner, "ss", h->ssi.streamtype);
133 #endif
134 
135 	/* this wsi is the link to the proxy */
136 
137 	if (!lws_client_connect_via_info(&i)) {
138 
139 #if defined(LWS_WITH_SYS_METRICS)
140 		/*
141 		 * If any hanging caliper measurement, dump it, and free any tags
142 		 */
143 		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
144 #endif
145 
146 		lws_sul_schedule(h->context, 0, &h->sul_retry,
147 				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
148 
149 		return;
150 	}
151 
152 	lwsl_sspc_notice(h, "%s", h->cwsi->lc.gutag);
153 }
154 
155 static int
lws_sspc_serialize_metadata(lws_sspc_handle_t * h,lws_sspc_metadata_t * md,uint8_t * p,uint8_t * end)156 lws_sspc_serialize_metadata(lws_sspc_handle_t *h, lws_sspc_metadata_t *md,
157 				uint8_t *p, uint8_t *end)
158 {
159 	int n, txc;
160 
161 	if (md->name[0] == '\0') {
162 
163 		lwsl_info("sending tx credit update %d\n",
164 				md->tx_cr_adjust);
165 
166 		p[0] = LWSSS_SER_TXPRE_TXCR_UPDATE;
167 		lws_ser_wu16be(&p[1], 4);
168 		lws_ser_wu32be(&p[3], (uint32_t)md->tx_cr_adjust);
169 
170 		n = 7;
171 
172 	} else {
173 
174 		lwsl_sspc_info(h, "sending metadata");
175 
176 		p[0] = LWSSS_SER_TXPRE_METADATA;
177 		txc = (int)strlen(md->name);
178 		n = txc + 1 + (int)md->len;
179 		if (n > 0xffff)
180 			/* we can't serialize this metadata in 16b length */
181 			return -1;
182 		if (n > lws_ptr_diff(end, &p[4]))
183 			/* we don't have space for this metadata */
184 			return -1;
185 		lws_ser_wu16be(&p[1], (uint16_t)n);
186 		p[3] = (uint8_t)txc;
187 		memcpy(&p[4], md->name, (unsigned int)txc);
188 		memcpy(&p[4 + txc], &md[1], md->len);
189 		n = 4 + txc + (int)md->len;
190 	}
191 
192 	lws_dll2_remove(&md->list);
193 	lws_free(md);
194 
195 	return n;
196 }
197 
198 static int
callback_sspc_client(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)199 callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
200 		     void *user, void *in, size_t len)
201 {
202 	lws_sspc_handle_t *h = (lws_sspc_handle_t *)lws_get_opaque_user_data(wsi);
203 	size_t pktsize = wsi->a.context->max_http_header_data;
204 	void *m = (void *)((uint8_t *)(h + 1));
205 	uint8_t *pkt = NULL, *p = NULL, *end = NULL;
206 	lws_ss_state_return_t r;
207 	uint64_t interval;
208 	const uint8_t *cp;
209 	uint8_t s[64];
210 	lws_usec_t us;
211 	int flags, n;
212 
213 	switch (reason) {
214 
215 	case LWS_CALLBACK_CONNECTING:
216 		/*
217 		 * In our particular case, we want CCEs even inside the
218 		 * initial connect loop time
219 		 */
220 		wsi->client_suppress_CONNECTION_ERROR = 0;
221 		break;
222 
223 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
224 		lwsl_warn("%s: CCE: %s\n", __func__,
225 			  in ? (const char *)in : "null");
226 #if defined(LWS_WITH_SYS_METRICS)
227 		/*
228 		 * If any hanging caliper measurement, dump it, and free any tags
229 		 */
230 		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
231 #endif
232 		lws_set_opaque_user_data(wsi, NULL);
233 		h->cwsi = NULL;
234 		lws_sul_schedule(h->context, 0, &h->sul_retry,
235 				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
236 		if (h->ssi.state) {
237 			interval = (uint64_t)(lws_now_usecs() - h->us_start_upstream) /
238 								LWS_US_PER_MS;
239 			if (interval > 0xffffffffull)
240 				interval = 0xffffffffull;
241 			r = h->ssi.state(lws_sspc_to_user_object(h), NULL,
242 					  LWSSSCS_UPSTREAM_LINK_RETRY,
243 					  (uint32_t)interval);
244 			if (r == LWSSSSRET_DESTROY_ME)
245 				lws_sspc_destroy(&h);
246 		}
247 		break;
248 
249         case LWS_CALLBACK_RAW_CONNECTED:
250 		if (!h || lws_fi(&h->fic, "sspc_fail_on_linkup"))
251 			return -1;
252 		lwsl_sspc_info(h, "CONNECTED (%s)", h->ssi.streamtype);
253 
254 		h->state = LPCSCLI_SENDING_INITIAL_TX;
255 		/*
256 		 * We create the dsh at the response to the initial tx, which
257 		 * will let us know the policy's max size for it... let's
258 		 * protect the connection with a promise to complete the
259 		 * SS serialization streamtype negotation within a short period,
260 		 * we will cancel this timeout when we have the proxy's ack
261 		 * of the streamtype serialization, eg, it exists in the proxy
262 		 * policy etc
263 		 */
264 		lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
265 		lws_callback_on_writable(wsi);
266 		h->us_start_upstream = 0;
267                 break;
268 
269 	case LWS_CALLBACK_RAW_CLOSE:
270 		/*
271 		 * our ss proxy Unix Domain socket has closed...
272 		 */
273 		if (!h) {
274 			lwsl_info("%s: no sspc on client proxy link close", __func__);
275 			break;
276 		}
277 		lwsl_sspc_info(h, "LWS_CALLBACK_RAW_CLOSE: proxy conn down, wsi %s",
278 				lws_wsi_tag(wsi));
279 
280 		lws_dsh_destroy(&h->dsh);
281 		if (h->ss_dangling_connected && h->ssi.state) {
282 
283 			lwsl_sspc_notice(h, "setting _DISCONNECTED");
284 			h->ss_dangling_connected = 0;
285 			h->prev_ss_state = LWSSSCS_DISCONNECTED;
286 			r = h->ssi.state(ss_to_userobj(h), NULL,
287 						 LWSSSCS_DISCONNECTED, 0);
288 			if (r == LWSSSSRET_DESTROY_ME) {
289 				h->cwsi = NULL;
290 				lws_set_opaque_user_data(wsi, NULL);
291 				lws_sspc_destroy(&h);
292 				break;
293 			}
294 		}
295 
296 		h->cwsi = NULL;
297 		/*
298 		 * schedule a reconnect in 1s
299 		 */
300 		lws_sul_schedule(h->context, 0, &h->sul_retry,
301 				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
302 
303 		break;
304 
305 	case LWS_CALLBACK_RAW_RX:
306 		/*
307 		 * ie, the proxy has sent us something
308 		 */
309 
310 		if (!h || !h->cwsi) {
311 			lwsl_info("%s: rx when client ss destroyed\n", __func__);
312 
313 			return -1;
314 		}
315 
316 		lwsl_sspc_info(h, "%s: RAW_RX: rx %d\n", __func__, (int)len);
317 
318 		if (!len) {
319 			lwsl_sspc_notice(h, "RAW_RX: zero len");
320 
321 			return -1;
322 		}
323 
324 		if (lws_fi(&h->fic, "sspc_fake_rxparse_disconnect_me"))
325 			n = LWSSSSRET_DISCONNECT_ME;
326 		else
327 			if (lws_fi(&h->fic, "sspc_fake_rxparse_destroy_me"))
328 				n = LWSSSSRET_DESTROY_ME;
329 			else
330 				n = lws_ss_deserialize_parse(&h->parser,
331 							     lws_get_context(wsi),
332 							     h->dsh, in, len,
333 							     &h->state, h,
334 							     (lws_ss_handle_t **)m,
335 							     &h->ssi, 1);
336 		switch (n) {
337 		case LWSSSSRET_OK:
338 			break;
339 		case LWSSSSRET_DISCONNECT_ME:
340 			lwsl_info("%s: proxlicent RX ended with DISCONNECT_ME\n",
341 					__func__);
342 			return -1;
343 		case LWSSSSRET_DESTROY_ME:
344 			lwsl_info("%s: proxlicent RX ended with DESTROY_ME\n",
345 					__func__);
346 			lws_set_opaque_user_data(wsi, NULL);
347 			lws_sspc_destroy(&h);
348 			return -1;
349 		}
350 
351 		if (h->state == LPCSCLI_LOCAL_CONNECTED ||
352 		    h->state == LPCSCLI_ONWARD_CONNECT)
353 			lws_set_timeout(wsi, 0, 0);
354 
355 		break;
356 
357 	case LWS_CALLBACK_RAW_WRITEABLE:
358 
359 		/*
360 		 * We can transmit something to the proxy...
361 		 */
362 
363 		if (!h)
364 			break;
365 
366 		lwsl_sspc_debug(h, "WRITEABLE %s, state %d",
367 				wsi->lc.gutag, h->state);
368 
369 		/*
370 		 * Management of ss timeout can happen any time and doesn't
371 		 * depend on wsi existence or state
372 		 */
373 
374 		n = 0;
375 		cp = s;
376 
377 		if (h->pending_timeout_update) {
378 			s[0] = LWSSS_SER_TXPRE_TIMEOUT_UPDATE;
379 			s[1] = 0;
380 			s[2] = 4;
381 			/*
382 			 *          0: use policy timeout value
383 			 * 0xffffffff: cancel the timeout
384 			 */
385 			lws_ser_wu32be(&s[3], h->timeout_ms);
386 			/* in case anything else to write */
387 			lws_callback_on_writable(h->cwsi);
388 			h->pending_timeout_update = 0;
389 			n = 7;
390 			goto do_write;
391 		}
392 
393 		s[1] = 0;
394 		/*
395 		 * This is the state of the link that connects us to the onward
396 		 * proxy
397 		 */
398 		switch (h->state) {
399 		case LPCSCLI_SENDING_INITIAL_TX:
400 			/*
401 			 * We are negotating the opening of a particular
402 			 * streamtype
403 			 */
404 			n = (int)strlen(h->ssi.streamtype) + 1 + 4 + 4;
405 
406 			s[0] = LWSSS_SER_TXPRE_STREAMTYPE;
407 			lws_ser_wu16be(&s[1], (uint16_t)n);
408 			/* SSSv1: add protocol version byte (initially 1) */
409 			s[3] = (uint8_t)LWS_SSS_CLIENT_PROTOCOL_VERSION;
410 			lws_ser_wu32be(&s[4], (uint32_t)getpid());
411 			lws_ser_wu32be(&s[8], (uint32_t)h->txc.peer_tx_cr_est);
412 			//h->txcr_out = txc;
413 			lws_strncpy((char *)&s[12], h->ssi.streamtype, sizeof(s) - 12);
414 			n += 3;
415 			h->state = LPCSCLI_WAITING_CREATE_RESULT;
416 
417 			break;
418 
419 		case LPCSCLI_LOCAL_CONNECTED:
420 
421 			// lwsl_notice("%s: LPCSCLI_LOCAL_CONNECTED\n", __func__);
422 
423 			/*
424 			 * Do we need to prioritize sending any metadata
425 			 * changes?
426 			 */
427 
428 			if (h->metadata_owner.count) {
429 				lws_sspc_metadata_t *md = lws_container_of(
430 					lws_dll2_get_tail(&h->metadata_owner),
431 					lws_sspc_metadata_t, list);
432 
433 				pkt = lws_malloc(pktsize + LWS_PRE, __func__);
434 				if (!pkt)
435 					goto hangup;
436 				cp = p = pkt + LWS_PRE;
437 				end = p + pktsize;
438 
439 				n = lws_sspc_serialize_metadata(h, md, p, end);
440 				if (n < 0)
441 					goto metadata_hangup;
442 
443 				lwsl_sspc_debug(h, "(local_conn) metadata");
444 
445 				goto req_write_and_issue;
446 			}
447 
448 			if (h->pending_writeable_len) {
449 				lwsl_sspc_debug(h, "(local_conn) PAYLOAD_LENGTH_HINT %u",
450 					   (unsigned int)h->writeable_len);
451 				s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
452 				lws_ser_wu16be(&s[1], 4);
453 				lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
454 				h->pending_writeable_len = 0;
455 				n = 7;
456 				goto req_write_and_issue;
457 			}
458 
459 			if (h->conn_req_state >= LWSSSPC_ONW_ONGOING) {
460 				lwsl_sspc_info(h, "conn_req_state %d",
461 						h->conn_req_state);
462 				break;
463 			}
464 
465 			lwsl_sspc_info(h, "(local_conn) onward connect");
466 
467 			h->conn_req_state = LWSSSPC_ONW_ONGOING;
468 
469 			s[0] = LWSSS_SER_TXPRE_ONWARD_CONNECT;
470 			s[1] = 0;
471 			s[2] = 0;
472 			n = 3;
473 			break;
474 
475 		case LPCSCLI_OPERATIONAL:
476 
477 			/*
478 			 *
479 			 * - Do we need to prioritize sending any metadata
480 			 *   changes?  (includes txcr updates)
481 			 *
482 			 * - Do we need to forward a hint about the payload
483 			 *   length?
484 			 */
485 
486 			pkt = lws_malloc(pktsize + LWS_PRE, __func__);
487 			if (!pkt)
488 				goto hangup;
489 			cp = p = pkt + LWS_PRE;
490 			end = p + pktsize;
491 
492 			if (h->metadata_owner.count) {
493 				lws_sspc_metadata_t *md = lws_container_of(
494 					lws_dll2_get_tail(&h->metadata_owner),
495 					lws_sspc_metadata_t, list);
496 
497 				n = lws_sspc_serialize_metadata(h, md, p, end);
498 				if (n < 0)
499 					goto metadata_hangup;
500 
501 				goto req_write_and_issue;
502 			}
503 
504 			if (h->pending_writeable_len) {
505 				lwsl_sspc_info(h, "PAYLOAD_LENGTH_HINT %u",
506 					  (unsigned int)h->writeable_len);
507 				s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
508 				lws_ser_wu16be(&s[1], 4);
509 				lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
510 				h->pending_writeable_len = 0;
511 				n = 7;
512 				goto req_write_and_issue;
513 			}
514 
515 			/* we can't write anything if we don't have credit */
516 			if (!h->ignore_txc && h->txc.tx_cr <= 0) {
517 				lwsl_sspc_info(h, "WRITEABLE / OPERATIONAL:"
518 					    " lack credit (%d)",
519 					    h->txc.tx_cr);
520 				// break;
521 			}
522 
523 			len = pktsize - LWS_PRE - 19;
524 			flags = 0;
525 			if (!h->ssi.tx) {
526 				n = 0;
527 				goto do_write_nz;
528 			}
529 
530 			n = h->ssi.tx(m, h->ord++, pkt + LWS_PRE + 19, &len,
531 				      &flags);
532 			switch (n) {
533 			case LWSSSSRET_TX_DONT_SEND:
534 				n = 0;
535 				goto do_write_nz;
536 
537 			case LWSSSSRET_DISCONNECT_ME:
538 			case LWSSSSRET_DESTROY_ME:
539 				lwsl_notice("%s: sspc tx DISCONNECT/DESTROY unimplemented\n", __func__);
540 				break;
541 			default:
542 				break;
543 			}
544 
545 			h->txc.tx_cr = h->txc.tx_cr - (int)len;
546 
547 			cp = p;
548 			n = (int)(len + 19);
549 			us = lws_now_usecs();
550 			p[0] = LWSSS_SER_TXPRE_TX_PAYLOAD;
551 			lws_ser_wu16be(&p[1], (uint16_t)(len + 19 - 3));
552 			lws_ser_wu32be(&p[3], (uint32_t)flags);
553 			/* time spent here waiting to send this */
554 			lws_ser_wu32be(&p[7], (uint32_t)(us - h->us_earliest_write_req));
555 			/* ust that the client write happened */
556 			lws_ser_wu64be(&p[11], (uint64_t)us);
557 			h->us_earliest_write_req = 0;
558 
559 			if (flags & LWSSS_FLAG_EOM)
560 				if (h->rsidx + 1 < (int)LWS_ARRAY_SIZE(h->rideshare_ofs) &&
561 				    h->rideshare_ofs[h->rsidx + 1])
562 					h->rsidx++;
563 
564 			break;
565 		default:
566 			break;
567 		}
568 
569 do_write_nz:
570 
571 		if (!n)
572 			break;
573 
574 do_write:
575 		if (lws_fi(&h->fic, "sspc_link_write_fail"))
576 			n = -1;
577 		else
578 			n = lws_write(wsi, (uint8_t *)cp, (unsigned int)n, LWS_WRITE_RAW);
579 		if (n < 0) {
580 			lwsl_sspc_notice(h, "WRITEABLE: %d", n);
581 
582 			goto hangup;
583 		}
584 		break;
585 
586 	default:
587 		break;
588 	}
589 
590 	lws_free(pkt);
591 
592 	return lws_callback_http_dummy(wsi, reason, user, in, len);
593 
594 metadata_hangup:
595 	lwsl_sspc_err(h, "metadata too large");
596 
597 hangup:
598 	lws_free(pkt);
599 	lwsl_warn("hangup\n");
600 	/* hang up on him */
601 	return -1;
602 
603 req_write_and_issue:
604 	/* in case anything else to write */
605 	lws_callback_on_writable(h->cwsi);
606 	goto do_write_nz;
607 }
608 
609 const struct lws_protocols lws_sspc_protocols[] = {
610 	{
611 		"ssproxy-protocol",
612 		callback_sspc_client,
613 		0,
614 		2048, 2048, NULL, 0
615 	},
616 	{ NULL, NULL, 0, 0, 0, NULL, 0 }
617 };
618 
619 int
lws_sspc_create(struct lws_context * context,int tsi,const lws_ss_info_t * ssi,void * opaque_user_data,lws_sspc_handle_t ** ppss,struct lws_sequencer * seq_owner,const char ** ppayload_fmt)620 lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
621 	        void *opaque_user_data, lws_sspc_handle_t **ppss,
622 	        struct lws_sequencer *seq_owner, const char **ppayload_fmt)
623 {
624 	lws_sspc_handle_t *h;
625 	uint8_t *ua;
626 	char *p;
627 
628 	lws_service_assert_loop_thread(context, tsi);
629 
630 	/* allocate the handle (including ssi), the user alloc,
631 	 * and the streamname */
632 
633 	h = malloc(sizeof(lws_sspc_handle_t) + ssi->user_alloc +
634 				strlen(ssi->streamtype) + 1);
635 	if (!h)
636 		return 1;
637 	memset(h, 0, sizeof(*h));
638 
639 	h->lc.log_cx = context->log_cx;
640 
641 #if defined(LWS_WITH_SYS_FAULT_INJECTION)
642 	h->fic.name = "sspc";
643 	lws_xos_init(&h->fic.xos, lws_xos(&context->fic.xos));
644 	if (ssi->fic.fi_owner.count)
645 		lws_fi_import(&h->fic, &ssi->fic);
646 
647 	lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype);
648 #endif
649 
650 	if (lws_fi(&h->fic, "sspc_create_oom")) {
651 		/*
652 		 * We have to do this a litte later, so we can cleanly inherit
653 		 * the OOM pieces and drain the info fic
654 		 */
655 		lws_fi_destroy(&h->fic);
656 		free(h);
657 		return 1;
658 	}
659 
660 	__lws_lc_tag(context, &context->lcg[LWSLCG_SSP_CLIENT], &h->lc,
661 			ssi->streamtype);
662 
663 	memcpy(&h->ssi, ssi, sizeof(*ssi));
664 	ua = (uint8_t *)(h + 1);
665 	memset(ua, 0, ssi->user_alloc);
666 	p = (char *)ua + ssi->user_alloc;
667 	memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
668 	h->ssi.streamtype = (const char *)p;
669 	h->context = context;
670 	h->us_start_upstream = lws_now_usecs();
671 
672 	if (!ssi->manual_initial_tx_credit)
673 		h->txc.peer_tx_cr_est = 500000000;
674 	else
675 		h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit;
676 
677 	if (!strcmp(ssi->streamtype, LWS_SMD_STREAMTYPENAME))
678 		h->ignore_txc = 1;
679 
680 	lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner);
681 
682 	/* fill in the things the real api does for the caller */
683 
684 	*((void **)(ua + ssi->opaque_user_data_offset)) = opaque_user_data;
685 	*((void **)(ua + ssi->handle_offset)) = h;
686 
687 	if (ppss)
688 		*ppss = h;
689 
690 	/* try the actual connect */
691 
692 	lws_sspc_sul_retry_cb(&h->sul_retry);
693 
694 	return 0;
695 }
696 
697 /* used on context destroy when iterating listed lws_ss on a pt */
698 
699 int
lws_sspc_destroy_dll(struct lws_dll2 * d,void * user)700 lws_sspc_destroy_dll(struct lws_dll2 *d, void *user)
701 {
702 	lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
703 
704 	lws_sspc_destroy(&h);
705 
706 	return 0;
707 }
708 
709 void
lws_sspc_rxmetadata_destroy(lws_sspc_handle_t * h)710 lws_sspc_rxmetadata_destroy(lws_sspc_handle_t *h)
711 {
712 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
713 			lws_dll2_get_head(&h->metadata_owner_rx)) {
714 		lws_sspc_metadata_t *md =
715 				lws_container_of(d, lws_sspc_metadata_t, list);
716 
717 		lws_dll2_remove(&md->list);
718 		lws_free(md);
719 
720 	} lws_end_foreach_dll_safe(d, d1);
721 }
722 
723 void
lws_sspc_destroy(lws_sspc_handle_t ** ph)724 lws_sspc_destroy(lws_sspc_handle_t **ph)
725 {
726 	lws_sspc_handle_t *h;
727 
728 	if (!*ph)
729 		return;
730 
731 	h = *ph;
732 	if (h == h->h_in_svc) {
733 		lwsl_err("%s: illegal destroy, return LWSSSSRET_DESTROY_ME instead\n",
734 				__func__);
735 		assert(0);
736 		return;
737 	}
738 
739 	lws_service_assert_loop_thread(h->context, 0);
740 
741 	if (h->destroying)
742 		return;
743 
744 	h->destroying = 1;
745 
746 	/* if this caliper is still dangling at destroy, we failed */
747 #if defined(LWS_WITH_SYS_METRICS)
748 	/*
749 	 * If any hanging caliper measurement, dump it, and free any tags
750 	 */
751 	lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
752 #endif
753 	if (h->ss_dangling_connected && h->ssi.state) {
754 		lws_sspc_event_helper(h, LWSSSCS_DISCONNECTED, 0);
755 		h->ss_dangling_connected = 0;
756 	}
757 
758 #if defined(LWS_WITH_SYS_FAULT_INJECTION)
759 	lws_fi_destroy(&h->fic);
760 #endif
761 
762 	lws_sul_cancel(&h->sul_retry);
763 	lws_dll2_remove(&h->client_list);
764 
765 	if (h->dsh)
766 		lws_dsh_destroy(&h->dsh);
767 	if (h->cwsi) {
768 		lws_set_opaque_user_data(h->cwsi, NULL);
769 		lws_wsi_close(h->cwsi, LWS_TO_KILL_ASYNC);
770 		h->cwsi = NULL;
771 	}
772 
773 	/* clean out any pending metadata changes that didn't make it */
774 
775 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
776 			lws_dll2_get_head(&(*ph)->metadata_owner)) {
777 		lws_sspc_metadata_t *md =
778 				lws_container_of(d, lws_sspc_metadata_t, list);
779 
780 		lws_dll2_remove(&md->list);
781 		lws_free(md);
782 
783 	} lws_end_foreach_dll_safe(d, d1);
784 
785 	lws_sspc_rxmetadata_destroy(h);
786 
787 	lws_sspc_event_helper(h, LWSSSCS_DESTROYING, 0);
788 	*ph = NULL;
789 
790 	lws_sul_cancel(&h->sul_retry);
791 
792 
793 	/* confirm no sul left scheduled in handle or user allocation object */
794 	lws_sul_debug_zombies(h->context, h, sizeof(*h) + h->ssi.user_alloc,
795 			      __func__);
796 
797 	__lws_lc_untag(h->context, &h->lc);
798 
799 	free(h);
800 }
801 
802 lws_ss_state_return_t
lws_sspc_request_tx(lws_sspc_handle_t * h)803 lws_sspc_request_tx(lws_sspc_handle_t *h)
804 {
805 	if (!h || !h->cwsi)
806 		return LWSSSSRET_OK;
807 
808 	lws_service_assert_loop_thread(h->context, 0);
809 
810 	if (!h->us_earliest_write_req)
811 		h->us_earliest_write_req = lws_now_usecs();
812 
813 	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
814 	    h->conn_req_state == LWSSSPC_ONW_NONE)
815 		h->conn_req_state = LWSSSPC_ONW_REQ;
816 
817 	lws_callback_on_writable(h->cwsi);
818 
819 	return LWSSSSRET_OK;
820 }
821 
822 /*
823  * Currently we fulfil the writeable part locally by just enabling POLLOUT on
824  * the UDS link, without serialization footprint, which is reasonable as far as
825  * it goes.
826  *
827  * But for the ..._len() variant, the expected payload length hint we are being
828  * told is something that must be serialized to the onward peer, since either
829  * that guy or someone upstream of him is the guy who will compose the framing
830  * with it that actually goes out.
831  *
832  * This information is needed at the upstream guy before we have sent any
833  * payload, eg, for http POST, he has to prepare the content-length in the
834  * headers, before any payload.  So we have to issue a serialization of the
835  * length at this point.
836  */
837 
838 lws_ss_state_return_t
lws_sspc_request_tx_len(lws_sspc_handle_t * h,unsigned long len)839 lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
840 {
841 	/*
842 	 * for client conns, they cannot even complete creation of the handle
843 	 * without the onwared connection to the proxy, it's not legal to start
844 	 * using it until it's operation and has the onward connection (and the
845 	 * link has called CREATED state)
846 	 */
847 
848 	if (!h)
849 		return LWSSSSRET_OK;
850 
851 	lws_service_assert_loop_thread(h->context, 0);
852 
853 	lwsl_sspc_notice(h, "setting writeable_len %u", (unsigned int)len);
854 	h->writeable_len = len;
855 	h->pending_writeable_len = 1;
856 
857 	if (!h->us_earliest_write_req)
858 		h->us_earliest_write_req = lws_now_usecs();
859 
860 	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
861 	    h->conn_req_state == LWSSSPC_ONW_NONE)
862 		h->conn_req_state = LWSSSPC_ONW_REQ;
863 
864 	/*
865 	 * We're going to use this up with serializing h->writeable_len... that
866 	 * will request again.
867 	 */
868 
869 	if (h->cwsi)
870 		lws_callback_on_writable(h->cwsi);
871 
872 	return LWSSSSRET_OK;
873 }
874 
875 int
lws_sspc_client_connect(lws_sspc_handle_t * h)876 lws_sspc_client_connect(lws_sspc_handle_t *h)
877 {
878 	if (!h || h->state == LPCSCLI_OPERATIONAL)
879 		return 0;
880 
881 	lws_service_assert_loop_thread(h->context, 0);
882 
883 	assert(h->state == LPCSCLI_LOCAL_CONNECTED);
884 	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
885 	    h->conn_req_state == LWSSSPC_ONW_NONE)
886 		h->conn_req_state = LWSSSPC_ONW_REQ;
887 	if (h->cwsi)
888 		lws_callback_on_writable(h->cwsi);
889 
890 	return 0;
891 }
892 
893 struct lws_context *
lws_sspc_get_context(struct lws_sspc_handle * h)894 lws_sspc_get_context(struct lws_sspc_handle *h)
895 {
896 	return h->context;
897 }
898 
899 const char *
lws_sspc_rideshare(struct lws_sspc_handle * h)900 lws_sspc_rideshare(struct lws_sspc_handle *h)
901 {
902 	/*
903 	 * ...the serialized RX rideshare name if any...
904 	 */
905 
906 	if (h->parser.rideshare[0]) {
907 		lwsl_sspc_info(h, "parser %s", h->parser.rideshare);
908 
909 		return h->parser.rideshare;
910 	}
911 
912 	/*
913 	 * The tx rideshare index
914 	 */
915 
916 	if (h->rideshare_list[0]) {
917 		lwsl_sspc_info(h, "tx list %s",
918 			  &h->rideshare_list[h->rideshare_ofs[h->rsidx]]);
919 		return &h->rideshare_list[h->rideshare_ofs[h->rsidx]];
920 	}
921 
922 	/*
923 	 * ... otherwise default to our stream type name
924 	 */
925 
926 	lwsl_sspc_info(h, "def %s\n", h->ssi.streamtype);
927 
928 	return h->ssi.streamtype;
929 }
930 
931 static int
_lws_sspc_set_metadata(struct lws_sspc_handle * h,const char * name,const void * value,size_t len,int tx_cr_adjust)932 _lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
933 		       const void *value, size_t len, int tx_cr_adjust)
934 {
935 	lws_sspc_metadata_t *md;
936 
937 	lws_service_assert_loop_thread(h->context, 0);
938 
939 	/*
940 	 * Are we replacing a pending metadata of the same name?  It's not
941 	 * efficient to do this but user code can do what it likes... let's
942 	 * optimize away the old one.
943 	 *
944 	 * Tx credit adjust always has name ""
945 	 */
946 
947 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
948 				   lws_dll2_get_head(&h->metadata_owner)) {
949 		md = lws_container_of(d, lws_sspc_metadata_t, list);
950 
951 		if (!strcmp(name, md->name)) {
952 			lws_dll2_remove(&md->list);
953 			lws_free(md);
954 			break;
955 		}
956 
957 	} lws_end_foreach_dll_safe(d, d1);
958 
959 	/*
960 	 * We have to stash the metadata and pass it to the proxy
961 	 */
962 
963 	if (lws_fi(&h->fic, "sspc_fail_metadata_set"))
964 		md = NULL;
965 	else
966 		md = lws_malloc(sizeof(*md) + len, "set metadata");
967 	if (!md) {
968 		lwsl_sspc_err(h, "OOM");
969 
970 		return 1;
971 	}
972 
973 	memset(md, 0, sizeof(*md));
974 
975 	md->tx_cr_adjust = tx_cr_adjust;
976 	h->txc.peer_tx_cr_est += tx_cr_adjust;
977 
978 	lws_strncpy(md->name, name, sizeof(md->name));
979 	md->len = len;
980 	if (len)
981 		memcpy(&md[1], value, len);
982 
983 	lws_dll2_add_tail(&md->list, &h->metadata_owner);
984 
985 	if (len) {
986 		lwsl_sspc_info(h, "set metadata %s", name);
987 		lwsl_hexdump_sspc_info(h, value, len);
988 	} else
989 		lwsl_sspc_info(h, "serializing tx cr adj %d",
990 			    (int)tx_cr_adjust);
991 
992 	if (h->cwsi)
993 		lws_callback_on_writable(h->cwsi);
994 
995 	return 0;
996 }
997 
998 int
lws_sspc_set_metadata(struct lws_sspc_handle * h,const char * name,const void * value,size_t len)999 lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
1000 		      const void *value, size_t len)
1001 {
1002 	return _lws_sspc_set_metadata(h, name, value, len, 0);
1003 }
1004 
1005 int
lws_sspc_get_metadata(struct lws_sspc_handle * h,const char * name,const void ** value,size_t * len)1006 lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name,
1007 		      const void **value, size_t *len)
1008 {
1009 	lws_sspc_metadata_t *md;
1010 
1011 	/*
1012 	 * client side does not have access to policy
1013 	 * and any metadata are new to it each time,
1014 	 * we allocate them, removing any existing with
1015 	 * the same name first
1016 	 */
1017 
1018 	lws_service_assert_loop_thread(h->context, 0);
1019 
1020 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
1021 			lws_dll2_get_head(&h->metadata_owner_rx)) {
1022 		md = lws_container_of(d,
1023 			   lws_sspc_metadata_t, list);
1024 
1025 		if (!strcmp(md->name, name)) {
1026 			*len = md->len;
1027 			*value = &md[1];
1028 
1029 			return 0;
1030 		}
1031 
1032 	} lws_end_foreach_dll_safe(d, d1);
1033 
1034 	return 1;
1035 }
1036 
1037 int
lws_sspc_add_peer_tx_credit(struct lws_sspc_handle * h,int32_t bump)1038 lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump)
1039 {
1040 	lws_service_assert_loop_thread(h->context, 0);
1041 	lwsl_sspc_notice(h, "%d\n", bump);
1042 	return _lws_sspc_set_metadata(h, "", NULL, 0, (int)bump);
1043 }
1044 
1045 int
lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle * h)1046 lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle *h)
1047 {
1048 	lws_service_assert_loop_thread(h->context, 0);
1049 	return h->txc.peer_tx_cr_est;
1050 }
1051 
1052 void
lws_sspc_start_timeout(struct lws_sspc_handle * h,unsigned int timeout_ms)1053 lws_sspc_start_timeout(struct lws_sspc_handle *h, unsigned int timeout_ms)
1054 {
1055 	lws_service_assert_loop_thread(h->context, 0);
1056 	if (!h->cwsi)
1057 		/* we can't fulfil it */
1058 		return;
1059 	h->timeout_ms = (uint32_t)timeout_ms;
1060 	h->pending_timeout_update = 1;
1061 	lws_callback_on_writable(h->cwsi);
1062 }
1063 
1064 void
lws_sspc_cancel_timeout(struct lws_sspc_handle * h)1065 lws_sspc_cancel_timeout(struct lws_sspc_handle *h)
1066 {
1067 	lws_sspc_start_timeout(h, (unsigned int)-1);
1068 }
1069 
1070 void *
lws_sspc_to_user_object(struct lws_sspc_handle * h)1071 lws_sspc_to_user_object(struct lws_sspc_handle *h)
1072 {
1073 	return (void *)(h + 1);
1074 }
1075 
1076 void
lws_sspc_change_handlers(struct lws_sspc_handle * h,lws_ss_state_return_t (* rx)(void * userobj,const uint8_t * buf,size_t len,int flags),lws_ss_state_return_t (* tx)(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags),lws_ss_state_return_t (* state)(void * userobj,void * h_src,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack))1077 lws_sspc_change_handlers(struct lws_sspc_handle *h,
1078 	lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
1079 	lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
1080 		  size_t *len, int *flags),
1081 	lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */,
1082 		     lws_ss_constate_t state, lws_ss_tx_ordinal_t ack))
1083 {
1084 	if (rx)
1085 		h->ssi.rx = rx;
1086 	if (tx)
1087 		h->ssi.tx = tx;
1088 	if (state)
1089 		h->ssi.state = state;
1090 }
1091 
1092 const char *
lws_sspc_tag(struct lws_sspc_handle * h)1093 lws_sspc_tag(struct lws_sspc_handle *h)
1094 {
1095 	if (!h)
1096 		return "[null sspc]";
1097 	return lws_lc_tag(&h->lc);
1098 }
1099 
1100 int
lws_sspc_cancel_notify_dll(struct lws_dll2 * d,void * user)1101 lws_sspc_cancel_notify_dll(struct lws_dll2 *d, void *user)
1102 {
1103 	lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
1104 
1105 	lws_sspc_event_helper(h, LWSSSCS_EVENT_WAIT_CANCELLED, 0);
1106 
1107 	return 0;
1108 }
1109 
1110