1 /*
2 * lws-minimal-secure-streams-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 *
10 * This client does not perform any INET networking... instead it opens a unix
11 * domain socket on a proxy that is listening for it, and that creates the
12 * actual secure stream connection.
13 *
14 * We are able to use the usual secure streams api in the client process, with
15 * payloads and connection state information proxied over the unix domain
16 * socket and fulfilled in the proxy process.
17 *
18 * The public client helper pieces are built as part of lws
19 */
20 #include <private-lib-core.h>
21
22 extern const uint32_t ss_state_txn_validity[17];
23
24 int
lws_ss_check_next_state_sspc(lws_sspc_handle_t * ss,uint8_t * prevstate,lws_ss_constate_t cs)25 lws_ss_check_next_state_sspc(lws_sspc_handle_t *ss, uint8_t *prevstate,
26 lws_ss_constate_t cs)
27 {
28 if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED)
29 /*
30 * we can't judge user or transient states, leave the old state
31 * and just wave them through
32 */
33 return 0;
34
35 if (cs >= LWS_ARRAY_SIZE(ss_state_txn_validity)) {
36 /* we don't recognize this state as usable */
37 lwsl_sspc_err(ss, "bad new state %u", cs);
38 assert(0);
39 return 1;
40 }
41
42 if (*prevstate >= LWS_ARRAY_SIZE(ss_state_txn_validity)) {
43 /* existing state is broken */
44 lwsl_sspc_err(ss, "bad existing state %u",
45 (unsigned int)*prevstate);
46 assert(0);
47 return 1;
48 }
49
50 if (ss_state_txn_validity[*prevstate] & (1u << cs)) {
51
52 lwsl_sspc_notice(ss, "%s -> %s",
53 lws_ss_state_name((int)*prevstate),
54 lws_ss_state_name((int)cs));
55
56 /* this is explicitly allowed, update old state to new */
57 *prevstate = (uint8_t)cs;
58
59 return 0;
60 }
61
62 lwsl_sspc_err(ss, "transition from %s -> %s is illegal",
63 lws_ss_state_name((int)*prevstate),
64 lws_ss_state_name((int)cs));
65
66 assert(0);
67
68 return 1;
69 }
70
71 lws_ss_state_return_t
lws_sspc_event_helper(lws_sspc_handle_t * h,lws_ss_constate_t cs,lws_ss_tx_ordinal_t flags)72 lws_sspc_event_helper(lws_sspc_handle_t *h, lws_ss_constate_t cs,
73 lws_ss_tx_ordinal_t flags)
74 {
75 lws_ss_state_return_t ret;
76
77 if (!h)
78 return LWSSSSRET_OK;
79
80 if (lws_ss_check_next_state_sspc(h, &h->prev_ss_state, cs))
81 return LWSSSSRET_DESTROY_ME;
82
83 if (!h->ssi.state)
84 return LWSSSSRET_OK;
85
86 h->h_in_svc = h;
87 ret = h->ssi.state((void *)((uint8_t *)(h + 1)), NULL, cs, flags);
88 h->h_in_svc = NULL;
89
90 return ret;
91 }
92
93 static void
lws_sspc_sul_retry_cb(lws_sorted_usec_list_t * sul)94 lws_sspc_sul_retry_cb(lws_sorted_usec_list_t *sul)
95 {
96 lws_sspc_handle_t *h = lws_container_of(sul, lws_sspc_handle_t, sul_retry);
97 static struct lws_client_connect_info i;
98
99 /*
100 * We may have started up before the system proxy, so be prepared with
101 * a sul to retry at 1Hz
102 */
103
104 memset(&i, 0, sizeof i);
105 i.context = h->context;
106 if (h->context->ss_proxy_port) { /* tcp */
107 i.address = h->context->ss_proxy_address;
108 i.port = h->context->ss_proxy_port;
109 i.iface = h->context->ss_proxy_bind;
110 } else {
111 if (h->context->ss_proxy_bind)
112 i.address = h->context->ss_proxy_bind;
113 else
114 #if defined(__linux__)
115 i.address = "+@proxy.ss.lws";
116 #else
117 i.address = "+/tmp/proxy.ss.lws";
118 #endif
119 }
120 i.host = i.address;
121 i.origin = i.address;
122 i.method = "RAW";
123 i.protocol = lws_sspc_protocols[0].name;
124 i.local_protocol_name = lws_sspc_protocols[0].name;
125 i.path = "";
126 i.pwsi = &h->cwsi;
127 i.opaque_user_data = (void *)h;
128 i.ssl_connection = LCCSCF_SECSTREAM_PROXY_LINK;
129
130 lws_metrics_caliper_bind(h->cal_txn, h->context->mt_ss_cliprox_conn);
131 #if defined(LWS_WITH_SYS_METRICS)
132 lws_metrics_tag_add(&h->cal_txn.mtags_owner, "ss", h->ssi.streamtype);
133 #endif
134
135 /* this wsi is the link to the proxy */
136
137 if (!lws_client_connect_via_info(&i)) {
138
139 #if defined(LWS_WITH_SYS_METRICS)
140 /*
141 * If any hanging caliper measurement, dump it, and free any tags
142 */
143 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
144 #endif
145
146 lws_sul_schedule(h->context, 0, &h->sul_retry,
147 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
148
149 return;
150 }
151
152 lwsl_sspc_notice(h, "%s", h->cwsi->lc.gutag);
153 }
154
155 static int
lws_sspc_serialize_metadata(lws_sspc_handle_t * h,lws_sspc_metadata_t * md,uint8_t * p,uint8_t * end)156 lws_sspc_serialize_metadata(lws_sspc_handle_t *h, lws_sspc_metadata_t *md,
157 uint8_t *p, uint8_t *end)
158 {
159 int n, txc;
160
161 if (md->name[0] == '\0') {
162
163 lwsl_info("sending tx credit update %d\n",
164 md->tx_cr_adjust);
165
166 p[0] = LWSSS_SER_TXPRE_TXCR_UPDATE;
167 lws_ser_wu16be(&p[1], 4);
168 lws_ser_wu32be(&p[3], (uint32_t)md->tx_cr_adjust);
169
170 n = 7;
171
172 } else {
173
174 lwsl_sspc_info(h, "sending metadata");
175
176 p[0] = LWSSS_SER_TXPRE_METADATA;
177 txc = (int)strlen(md->name);
178 n = txc + 1 + (int)md->len;
179 if (n > 0xffff)
180 /* we can't serialize this metadata in 16b length */
181 return -1;
182 if (n > lws_ptr_diff(end, &p[4]))
183 /* we don't have space for this metadata */
184 return -1;
185 lws_ser_wu16be(&p[1], (uint16_t)n);
186 p[3] = (uint8_t)txc;
187 memcpy(&p[4], md->name, (unsigned int)txc);
188 memcpy(&p[4 + txc], &md[1], md->len);
189 n = 4 + txc + (int)md->len;
190 }
191
192 lws_dll2_remove(&md->list);
193 lws_free(md);
194
195 return n;
196 }
197
198 static int
callback_sspc_client(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)199 callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
200 void *user, void *in, size_t len)
201 {
202 lws_sspc_handle_t *h = (lws_sspc_handle_t *)lws_get_opaque_user_data(wsi);
203 size_t pktsize = wsi->a.context->max_http_header_data;
204 void *m = (void *)((uint8_t *)(h + 1));
205 uint8_t *pkt = NULL, *p = NULL, *end = NULL;
206 lws_ss_state_return_t r;
207 uint64_t interval;
208 const uint8_t *cp;
209 uint8_t s[64];
210 lws_usec_t us;
211 int flags, n;
212
213 switch (reason) {
214
215 case LWS_CALLBACK_CONNECTING:
216 /*
217 * In our particular case, we want CCEs even inside the
218 * initial connect loop time
219 */
220 wsi->client_suppress_CONNECTION_ERROR = 0;
221 break;
222
223 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
224 lwsl_warn("%s: CCE: %s\n", __func__,
225 in ? (const char *)in : "null");
226 #if defined(LWS_WITH_SYS_METRICS)
227 /*
228 * If any hanging caliper measurement, dump it, and free any tags
229 */
230 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
231 #endif
232 lws_set_opaque_user_data(wsi, NULL);
233 h->cwsi = NULL;
234 lws_sul_schedule(h->context, 0, &h->sul_retry,
235 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
236 if (h->ssi.state) {
237 interval = (uint64_t)(lws_now_usecs() - h->us_start_upstream) /
238 LWS_US_PER_MS;
239 if (interval > 0xffffffffull)
240 interval = 0xffffffffull;
241 r = h->ssi.state(lws_sspc_to_user_object(h), NULL,
242 LWSSSCS_UPSTREAM_LINK_RETRY,
243 (uint32_t)interval);
244 if (r == LWSSSSRET_DESTROY_ME)
245 lws_sspc_destroy(&h);
246 }
247 break;
248
249 case LWS_CALLBACK_RAW_CONNECTED:
250 if (!h || lws_fi(&h->fic, "sspc_fail_on_linkup"))
251 return -1;
252 lwsl_sspc_info(h, "CONNECTED (%s)", h->ssi.streamtype);
253
254 h->state = LPCSCLI_SENDING_INITIAL_TX;
255 /*
256 * We create the dsh at the response to the initial tx, which
257 * will let us know the policy's max size for it... let's
258 * protect the connection with a promise to complete the
259 * SS serialization streamtype negotation within a short period,
260 * we will cancel this timeout when we have the proxy's ack
261 * of the streamtype serialization, eg, it exists in the proxy
262 * policy etc
263 */
264 lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
265 lws_callback_on_writable(wsi);
266 h->us_start_upstream = 0;
267 break;
268
269 case LWS_CALLBACK_RAW_CLOSE:
270 /*
271 * our ss proxy Unix Domain socket has closed...
272 */
273 if (!h) {
274 lwsl_info("%s: no sspc on client proxy link close", __func__);
275 break;
276 }
277 lwsl_sspc_info(h, "LWS_CALLBACK_RAW_CLOSE: proxy conn down, wsi %s",
278 lws_wsi_tag(wsi));
279
280 lws_dsh_destroy(&h->dsh);
281 if (h->ss_dangling_connected && h->ssi.state) {
282
283 lwsl_sspc_notice(h, "setting _DISCONNECTED");
284 h->ss_dangling_connected = 0;
285 h->prev_ss_state = LWSSSCS_DISCONNECTED;
286 r = h->ssi.state(ss_to_userobj(h), NULL,
287 LWSSSCS_DISCONNECTED, 0);
288 if (r == LWSSSSRET_DESTROY_ME) {
289 h->cwsi = NULL;
290 lws_set_opaque_user_data(wsi, NULL);
291 lws_sspc_destroy(&h);
292 break;
293 }
294 }
295
296 h->cwsi = NULL;
297 /*
298 * schedule a reconnect in 1s
299 */
300 lws_sul_schedule(h->context, 0, &h->sul_retry,
301 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
302
303 break;
304
305 case LWS_CALLBACK_RAW_RX:
306 /*
307 * ie, the proxy has sent us something
308 */
309
310 if (!h || !h->cwsi) {
311 lwsl_info("%s: rx when client ss destroyed\n", __func__);
312
313 return -1;
314 }
315
316 lwsl_sspc_info(h, "%s: RAW_RX: rx %d\n", __func__, (int)len);
317
318 if (!len) {
319 lwsl_sspc_notice(h, "RAW_RX: zero len");
320
321 return -1;
322 }
323
324 if (lws_fi(&h->fic, "sspc_fake_rxparse_disconnect_me"))
325 n = LWSSSSRET_DISCONNECT_ME;
326 else
327 if (lws_fi(&h->fic, "sspc_fake_rxparse_destroy_me"))
328 n = LWSSSSRET_DESTROY_ME;
329 else
330 n = lws_ss_deserialize_parse(&h->parser,
331 lws_get_context(wsi),
332 h->dsh, in, len,
333 &h->state, h,
334 (lws_ss_handle_t **)m,
335 &h->ssi, 1);
336 switch (n) {
337 case LWSSSSRET_OK:
338 break;
339 case LWSSSSRET_DISCONNECT_ME:
340 lwsl_info("%s: proxlicent RX ended with DISCONNECT_ME\n",
341 __func__);
342 return -1;
343 case LWSSSSRET_DESTROY_ME:
344 lwsl_info("%s: proxlicent RX ended with DESTROY_ME\n",
345 __func__);
346 lws_set_opaque_user_data(wsi, NULL);
347 lws_sspc_destroy(&h);
348 return -1;
349 }
350
351 if (h->state == LPCSCLI_LOCAL_CONNECTED ||
352 h->state == LPCSCLI_ONWARD_CONNECT)
353 lws_set_timeout(wsi, 0, 0);
354
355 break;
356
357 case LWS_CALLBACK_RAW_WRITEABLE:
358
359 /*
360 * We can transmit something to the proxy...
361 */
362
363 if (!h)
364 break;
365
366 lwsl_sspc_debug(h, "WRITEABLE %s, state %d",
367 wsi->lc.gutag, h->state);
368
369 /*
370 * Management of ss timeout can happen any time and doesn't
371 * depend on wsi existence or state
372 */
373
374 n = 0;
375 cp = s;
376
377 if (h->pending_timeout_update) {
378 s[0] = LWSSS_SER_TXPRE_TIMEOUT_UPDATE;
379 s[1] = 0;
380 s[2] = 4;
381 /*
382 * 0: use policy timeout value
383 * 0xffffffff: cancel the timeout
384 */
385 lws_ser_wu32be(&s[3], h->timeout_ms);
386 /* in case anything else to write */
387 lws_callback_on_writable(h->cwsi);
388 h->pending_timeout_update = 0;
389 n = 7;
390 goto do_write;
391 }
392
393 s[1] = 0;
394 /*
395 * This is the state of the link that connects us to the onward
396 * proxy
397 */
398 switch (h->state) {
399 case LPCSCLI_SENDING_INITIAL_TX:
400 /*
401 * We are negotating the opening of a particular
402 * streamtype
403 */
404 n = (int)strlen(h->ssi.streamtype) + 1 + 4 + 4;
405
406 s[0] = LWSSS_SER_TXPRE_STREAMTYPE;
407 lws_ser_wu16be(&s[1], (uint16_t)n);
408 /* SSSv1: add protocol version byte (initially 1) */
409 s[3] = (uint8_t)LWS_SSS_CLIENT_PROTOCOL_VERSION;
410 lws_ser_wu32be(&s[4], (uint32_t)getpid());
411 lws_ser_wu32be(&s[8], (uint32_t)h->txc.peer_tx_cr_est);
412 //h->txcr_out = txc;
413 lws_strncpy((char *)&s[12], h->ssi.streamtype, sizeof(s) - 12);
414 n += 3;
415 h->state = LPCSCLI_WAITING_CREATE_RESULT;
416
417 break;
418
419 case LPCSCLI_LOCAL_CONNECTED:
420
421 // lwsl_notice("%s: LPCSCLI_LOCAL_CONNECTED\n", __func__);
422
423 /*
424 * Do we need to prioritize sending any metadata
425 * changes?
426 */
427
428 if (h->metadata_owner.count) {
429 lws_sspc_metadata_t *md = lws_container_of(
430 lws_dll2_get_tail(&h->metadata_owner),
431 lws_sspc_metadata_t, list);
432
433 pkt = lws_malloc(pktsize + LWS_PRE, __func__);
434 if (!pkt)
435 goto hangup;
436 cp = p = pkt + LWS_PRE;
437 end = p + pktsize;
438
439 n = lws_sspc_serialize_metadata(h, md, p, end);
440 if (n < 0)
441 goto metadata_hangup;
442
443 lwsl_sspc_debug(h, "(local_conn) metadata");
444
445 goto req_write_and_issue;
446 }
447
448 if (h->pending_writeable_len) {
449 lwsl_sspc_debug(h, "(local_conn) PAYLOAD_LENGTH_HINT %u",
450 (unsigned int)h->writeable_len);
451 s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
452 lws_ser_wu16be(&s[1], 4);
453 lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
454 h->pending_writeable_len = 0;
455 n = 7;
456 goto req_write_and_issue;
457 }
458
459 if (h->conn_req_state >= LWSSSPC_ONW_ONGOING) {
460 lwsl_sspc_info(h, "conn_req_state %d",
461 h->conn_req_state);
462 break;
463 }
464
465 lwsl_sspc_info(h, "(local_conn) onward connect");
466
467 h->conn_req_state = LWSSSPC_ONW_ONGOING;
468
469 s[0] = LWSSS_SER_TXPRE_ONWARD_CONNECT;
470 s[1] = 0;
471 s[2] = 0;
472 n = 3;
473 break;
474
475 case LPCSCLI_OPERATIONAL:
476
477 /*
478 *
479 * - Do we need to prioritize sending any metadata
480 * changes? (includes txcr updates)
481 *
482 * - Do we need to forward a hint about the payload
483 * length?
484 */
485
486 pkt = lws_malloc(pktsize + LWS_PRE, __func__);
487 if (!pkt)
488 goto hangup;
489 cp = p = pkt + LWS_PRE;
490 end = p + pktsize;
491
492 if (h->metadata_owner.count) {
493 lws_sspc_metadata_t *md = lws_container_of(
494 lws_dll2_get_tail(&h->metadata_owner),
495 lws_sspc_metadata_t, list);
496
497 n = lws_sspc_serialize_metadata(h, md, p, end);
498 if (n < 0)
499 goto metadata_hangup;
500
501 goto req_write_and_issue;
502 }
503
504 if (h->pending_writeable_len) {
505 lwsl_sspc_info(h, "PAYLOAD_LENGTH_HINT %u",
506 (unsigned int)h->writeable_len);
507 s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
508 lws_ser_wu16be(&s[1], 4);
509 lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
510 h->pending_writeable_len = 0;
511 n = 7;
512 goto req_write_and_issue;
513 }
514
515 /* we can't write anything if we don't have credit */
516 if (!h->ignore_txc && h->txc.tx_cr <= 0) {
517 lwsl_sspc_info(h, "WRITEABLE / OPERATIONAL:"
518 " lack credit (%d)",
519 h->txc.tx_cr);
520 // break;
521 }
522
523 len = pktsize - LWS_PRE - 19;
524 flags = 0;
525 if (!h->ssi.tx) {
526 n = 0;
527 goto do_write_nz;
528 }
529
530 n = h->ssi.tx(m, h->ord++, pkt + LWS_PRE + 19, &len,
531 &flags);
532 switch (n) {
533 case LWSSSSRET_TX_DONT_SEND:
534 n = 0;
535 goto do_write_nz;
536
537 case LWSSSSRET_DISCONNECT_ME:
538 case LWSSSSRET_DESTROY_ME:
539 lwsl_notice("%s: sspc tx DISCONNECT/DESTROY unimplemented\n", __func__);
540 break;
541 default:
542 break;
543 }
544
545 h->txc.tx_cr = h->txc.tx_cr - (int)len;
546
547 cp = p;
548 n = (int)(len + 19);
549 us = lws_now_usecs();
550 p[0] = LWSSS_SER_TXPRE_TX_PAYLOAD;
551 lws_ser_wu16be(&p[1], (uint16_t)(len + 19 - 3));
552 lws_ser_wu32be(&p[3], (uint32_t)flags);
553 /* time spent here waiting to send this */
554 lws_ser_wu32be(&p[7], (uint32_t)(us - h->us_earliest_write_req));
555 /* ust that the client write happened */
556 lws_ser_wu64be(&p[11], (uint64_t)us);
557 h->us_earliest_write_req = 0;
558
559 if (flags & LWSSS_FLAG_EOM)
560 if (h->rsidx + 1 < (int)LWS_ARRAY_SIZE(h->rideshare_ofs) &&
561 h->rideshare_ofs[h->rsidx + 1])
562 h->rsidx++;
563
564 break;
565 default:
566 break;
567 }
568
569 do_write_nz:
570
571 if (!n)
572 break;
573
574 do_write:
575 if (lws_fi(&h->fic, "sspc_link_write_fail"))
576 n = -1;
577 else
578 n = lws_write(wsi, (uint8_t *)cp, (unsigned int)n, LWS_WRITE_RAW);
579 if (n < 0) {
580 lwsl_sspc_notice(h, "WRITEABLE: %d", n);
581
582 goto hangup;
583 }
584 break;
585
586 default:
587 break;
588 }
589
590 lws_free(pkt);
591
592 return lws_callback_http_dummy(wsi, reason, user, in, len);
593
594 metadata_hangup:
595 lwsl_sspc_err(h, "metadata too large");
596
597 hangup:
598 lws_free(pkt);
599 lwsl_warn("hangup\n");
600 /* hang up on him */
601 return -1;
602
603 req_write_and_issue:
604 /* in case anything else to write */
605 lws_callback_on_writable(h->cwsi);
606 goto do_write_nz;
607 }
608
609 const struct lws_protocols lws_sspc_protocols[] = {
610 {
611 "ssproxy-protocol",
612 callback_sspc_client,
613 0,
614 2048, 2048, NULL, 0
615 },
616 { NULL, NULL, 0, 0, 0, NULL, 0 }
617 };
618
619 int
lws_sspc_create(struct lws_context * context,int tsi,const lws_ss_info_t * ssi,void * opaque_user_data,lws_sspc_handle_t ** ppss,struct lws_sequencer * seq_owner,const char ** ppayload_fmt)620 lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
621 void *opaque_user_data, lws_sspc_handle_t **ppss,
622 struct lws_sequencer *seq_owner, const char **ppayload_fmt)
623 {
624 lws_sspc_handle_t *h;
625 uint8_t *ua;
626 char *p;
627
628 lws_service_assert_loop_thread(context, tsi);
629
630 /* allocate the handle (including ssi), the user alloc,
631 * and the streamname */
632
633 h = malloc(sizeof(lws_sspc_handle_t) + ssi->user_alloc +
634 strlen(ssi->streamtype) + 1);
635 if (!h)
636 return 1;
637 memset(h, 0, sizeof(*h));
638
639 h->lc.log_cx = context->log_cx;
640
641 #if defined(LWS_WITH_SYS_FAULT_INJECTION)
642 h->fic.name = "sspc";
643 lws_xos_init(&h->fic.xos, lws_xos(&context->fic.xos));
644 if (ssi->fic.fi_owner.count)
645 lws_fi_import(&h->fic, &ssi->fic);
646
647 lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype);
648 #endif
649
650 if (lws_fi(&h->fic, "sspc_create_oom")) {
651 /*
652 * We have to do this a litte later, so we can cleanly inherit
653 * the OOM pieces and drain the info fic
654 */
655 lws_fi_destroy(&h->fic);
656 free(h);
657 return 1;
658 }
659
660 __lws_lc_tag(context, &context->lcg[LWSLCG_SSP_CLIENT], &h->lc,
661 ssi->streamtype);
662
663 memcpy(&h->ssi, ssi, sizeof(*ssi));
664 ua = (uint8_t *)(h + 1);
665 memset(ua, 0, ssi->user_alloc);
666 p = (char *)ua + ssi->user_alloc;
667 memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
668 h->ssi.streamtype = (const char *)p;
669 h->context = context;
670 h->us_start_upstream = lws_now_usecs();
671
672 if (!ssi->manual_initial_tx_credit)
673 h->txc.peer_tx_cr_est = 500000000;
674 else
675 h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit;
676
677 if (!strcmp(ssi->streamtype, LWS_SMD_STREAMTYPENAME))
678 h->ignore_txc = 1;
679
680 lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner);
681
682 /* fill in the things the real api does for the caller */
683
684 *((void **)(ua + ssi->opaque_user_data_offset)) = opaque_user_data;
685 *((void **)(ua + ssi->handle_offset)) = h;
686
687 if (ppss)
688 *ppss = h;
689
690 /* try the actual connect */
691
692 lws_sspc_sul_retry_cb(&h->sul_retry);
693
694 return 0;
695 }
696
697 /* used on context destroy when iterating listed lws_ss on a pt */
698
699 int
lws_sspc_destroy_dll(struct lws_dll2 * d,void * user)700 lws_sspc_destroy_dll(struct lws_dll2 *d, void *user)
701 {
702 lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
703
704 lws_sspc_destroy(&h);
705
706 return 0;
707 }
708
709 void
lws_sspc_rxmetadata_destroy(lws_sspc_handle_t * h)710 lws_sspc_rxmetadata_destroy(lws_sspc_handle_t *h)
711 {
712 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
713 lws_dll2_get_head(&h->metadata_owner_rx)) {
714 lws_sspc_metadata_t *md =
715 lws_container_of(d, lws_sspc_metadata_t, list);
716
717 lws_dll2_remove(&md->list);
718 lws_free(md);
719
720 } lws_end_foreach_dll_safe(d, d1);
721 }
722
723 void
lws_sspc_destroy(lws_sspc_handle_t ** ph)724 lws_sspc_destroy(lws_sspc_handle_t **ph)
725 {
726 lws_sspc_handle_t *h;
727
728 if (!*ph)
729 return;
730
731 h = *ph;
732 if (h == h->h_in_svc) {
733 lwsl_err("%s: illegal destroy, return LWSSSSRET_DESTROY_ME instead\n",
734 __func__);
735 assert(0);
736 return;
737 }
738
739 lws_service_assert_loop_thread(h->context, 0);
740
741 if (h->destroying)
742 return;
743
744 h->destroying = 1;
745
746 /* if this caliper is still dangling at destroy, we failed */
747 #if defined(LWS_WITH_SYS_METRICS)
748 /*
749 * If any hanging caliper measurement, dump it, and free any tags
750 */
751 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
752 #endif
753 if (h->ss_dangling_connected && h->ssi.state) {
754 lws_sspc_event_helper(h, LWSSSCS_DISCONNECTED, 0);
755 h->ss_dangling_connected = 0;
756 }
757
758 #if defined(LWS_WITH_SYS_FAULT_INJECTION)
759 lws_fi_destroy(&h->fic);
760 #endif
761
762 lws_sul_cancel(&h->sul_retry);
763 lws_dll2_remove(&h->client_list);
764
765 if (h->dsh)
766 lws_dsh_destroy(&h->dsh);
767 if (h->cwsi) {
768 lws_set_opaque_user_data(h->cwsi, NULL);
769 lws_wsi_close(h->cwsi, LWS_TO_KILL_ASYNC);
770 h->cwsi = NULL;
771 }
772
773 /* clean out any pending metadata changes that didn't make it */
774
775 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
776 lws_dll2_get_head(&(*ph)->metadata_owner)) {
777 lws_sspc_metadata_t *md =
778 lws_container_of(d, lws_sspc_metadata_t, list);
779
780 lws_dll2_remove(&md->list);
781 lws_free(md);
782
783 } lws_end_foreach_dll_safe(d, d1);
784
785 lws_sspc_rxmetadata_destroy(h);
786
787 lws_sspc_event_helper(h, LWSSSCS_DESTROYING, 0);
788 *ph = NULL;
789
790 lws_sul_cancel(&h->sul_retry);
791
792
793 /* confirm no sul left scheduled in handle or user allocation object */
794 lws_sul_debug_zombies(h->context, h, sizeof(*h) + h->ssi.user_alloc,
795 __func__);
796
797 __lws_lc_untag(h->context, &h->lc);
798
799 free(h);
800 }
801
802 lws_ss_state_return_t
lws_sspc_request_tx(lws_sspc_handle_t * h)803 lws_sspc_request_tx(lws_sspc_handle_t *h)
804 {
805 if (!h || !h->cwsi)
806 return LWSSSSRET_OK;
807
808 lws_service_assert_loop_thread(h->context, 0);
809
810 if (!h->us_earliest_write_req)
811 h->us_earliest_write_req = lws_now_usecs();
812
813 if (h->state == LPCSCLI_LOCAL_CONNECTED &&
814 h->conn_req_state == LWSSSPC_ONW_NONE)
815 h->conn_req_state = LWSSSPC_ONW_REQ;
816
817 lws_callback_on_writable(h->cwsi);
818
819 return LWSSSSRET_OK;
820 }
821
822 /*
823 * Currently we fulfil the writeable part locally by just enabling POLLOUT on
824 * the UDS link, without serialization footprint, which is reasonable as far as
825 * it goes.
826 *
827 * But for the ..._len() variant, the expected payload length hint we are being
828 * told is something that must be serialized to the onward peer, since either
829 * that guy or someone upstream of him is the guy who will compose the framing
830 * with it that actually goes out.
831 *
832 * This information is needed at the upstream guy before we have sent any
833 * payload, eg, for http POST, he has to prepare the content-length in the
834 * headers, before any payload. So we have to issue a serialization of the
835 * length at this point.
836 */
837
838 lws_ss_state_return_t
lws_sspc_request_tx_len(lws_sspc_handle_t * h,unsigned long len)839 lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
840 {
841 /*
842 * for client conns, they cannot even complete creation of the handle
843 * without the onwared connection to the proxy, it's not legal to start
844 * using it until it's operation and has the onward connection (and the
845 * link has called CREATED state)
846 */
847
848 if (!h)
849 return LWSSSSRET_OK;
850
851 lws_service_assert_loop_thread(h->context, 0);
852
853 lwsl_sspc_notice(h, "setting writeable_len %u", (unsigned int)len);
854 h->writeable_len = len;
855 h->pending_writeable_len = 1;
856
857 if (!h->us_earliest_write_req)
858 h->us_earliest_write_req = lws_now_usecs();
859
860 if (h->state == LPCSCLI_LOCAL_CONNECTED &&
861 h->conn_req_state == LWSSSPC_ONW_NONE)
862 h->conn_req_state = LWSSSPC_ONW_REQ;
863
864 /*
865 * We're going to use this up with serializing h->writeable_len... that
866 * will request again.
867 */
868
869 if (h->cwsi)
870 lws_callback_on_writable(h->cwsi);
871
872 return LWSSSSRET_OK;
873 }
874
875 int
lws_sspc_client_connect(lws_sspc_handle_t * h)876 lws_sspc_client_connect(lws_sspc_handle_t *h)
877 {
878 if (!h || h->state == LPCSCLI_OPERATIONAL)
879 return 0;
880
881 lws_service_assert_loop_thread(h->context, 0);
882
883 assert(h->state == LPCSCLI_LOCAL_CONNECTED);
884 if (h->state == LPCSCLI_LOCAL_CONNECTED &&
885 h->conn_req_state == LWSSSPC_ONW_NONE)
886 h->conn_req_state = LWSSSPC_ONW_REQ;
887 if (h->cwsi)
888 lws_callback_on_writable(h->cwsi);
889
890 return 0;
891 }
892
893 struct lws_context *
lws_sspc_get_context(struct lws_sspc_handle * h)894 lws_sspc_get_context(struct lws_sspc_handle *h)
895 {
896 return h->context;
897 }
898
899 const char *
lws_sspc_rideshare(struct lws_sspc_handle * h)900 lws_sspc_rideshare(struct lws_sspc_handle *h)
901 {
902 /*
903 * ...the serialized RX rideshare name if any...
904 */
905
906 if (h->parser.rideshare[0]) {
907 lwsl_sspc_info(h, "parser %s", h->parser.rideshare);
908
909 return h->parser.rideshare;
910 }
911
912 /*
913 * The tx rideshare index
914 */
915
916 if (h->rideshare_list[0]) {
917 lwsl_sspc_info(h, "tx list %s",
918 &h->rideshare_list[h->rideshare_ofs[h->rsidx]]);
919 return &h->rideshare_list[h->rideshare_ofs[h->rsidx]];
920 }
921
922 /*
923 * ... otherwise default to our stream type name
924 */
925
926 lwsl_sspc_info(h, "def %s\n", h->ssi.streamtype);
927
928 return h->ssi.streamtype;
929 }
930
931 static int
_lws_sspc_set_metadata(struct lws_sspc_handle * h,const char * name,const void * value,size_t len,int tx_cr_adjust)932 _lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
933 const void *value, size_t len, int tx_cr_adjust)
934 {
935 lws_sspc_metadata_t *md;
936
937 lws_service_assert_loop_thread(h->context, 0);
938
939 /*
940 * Are we replacing a pending metadata of the same name? It's not
941 * efficient to do this but user code can do what it likes... let's
942 * optimize away the old one.
943 *
944 * Tx credit adjust always has name ""
945 */
946
947 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
948 lws_dll2_get_head(&h->metadata_owner)) {
949 md = lws_container_of(d, lws_sspc_metadata_t, list);
950
951 if (!strcmp(name, md->name)) {
952 lws_dll2_remove(&md->list);
953 lws_free(md);
954 break;
955 }
956
957 } lws_end_foreach_dll_safe(d, d1);
958
959 /*
960 * We have to stash the metadata and pass it to the proxy
961 */
962
963 if (lws_fi(&h->fic, "sspc_fail_metadata_set"))
964 md = NULL;
965 else
966 md = lws_malloc(sizeof(*md) + len, "set metadata");
967 if (!md) {
968 lwsl_sspc_err(h, "OOM");
969
970 return 1;
971 }
972
973 memset(md, 0, sizeof(*md));
974
975 md->tx_cr_adjust = tx_cr_adjust;
976 h->txc.peer_tx_cr_est += tx_cr_adjust;
977
978 lws_strncpy(md->name, name, sizeof(md->name));
979 md->len = len;
980 if (len)
981 memcpy(&md[1], value, len);
982
983 lws_dll2_add_tail(&md->list, &h->metadata_owner);
984
985 if (len) {
986 lwsl_sspc_info(h, "set metadata %s", name);
987 lwsl_hexdump_sspc_info(h, value, len);
988 } else
989 lwsl_sspc_info(h, "serializing tx cr adj %d",
990 (int)tx_cr_adjust);
991
992 if (h->cwsi)
993 lws_callback_on_writable(h->cwsi);
994
995 return 0;
996 }
997
998 int
lws_sspc_set_metadata(struct lws_sspc_handle * h,const char * name,const void * value,size_t len)999 lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
1000 const void *value, size_t len)
1001 {
1002 return _lws_sspc_set_metadata(h, name, value, len, 0);
1003 }
1004
1005 int
lws_sspc_get_metadata(struct lws_sspc_handle * h,const char * name,const void ** value,size_t * len)1006 lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name,
1007 const void **value, size_t *len)
1008 {
1009 lws_sspc_metadata_t *md;
1010
1011 /*
1012 * client side does not have access to policy
1013 * and any metadata are new to it each time,
1014 * we allocate them, removing any existing with
1015 * the same name first
1016 */
1017
1018 lws_service_assert_loop_thread(h->context, 0);
1019
1020 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
1021 lws_dll2_get_head(&h->metadata_owner_rx)) {
1022 md = lws_container_of(d,
1023 lws_sspc_metadata_t, list);
1024
1025 if (!strcmp(md->name, name)) {
1026 *len = md->len;
1027 *value = &md[1];
1028
1029 return 0;
1030 }
1031
1032 } lws_end_foreach_dll_safe(d, d1);
1033
1034 return 1;
1035 }
1036
1037 int
lws_sspc_add_peer_tx_credit(struct lws_sspc_handle * h,int32_t bump)1038 lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump)
1039 {
1040 lws_service_assert_loop_thread(h->context, 0);
1041 lwsl_sspc_notice(h, "%d\n", bump);
1042 return _lws_sspc_set_metadata(h, "", NULL, 0, (int)bump);
1043 }
1044
1045 int
lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle * h)1046 lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle *h)
1047 {
1048 lws_service_assert_loop_thread(h->context, 0);
1049 return h->txc.peer_tx_cr_est;
1050 }
1051
1052 void
lws_sspc_start_timeout(struct lws_sspc_handle * h,unsigned int timeout_ms)1053 lws_sspc_start_timeout(struct lws_sspc_handle *h, unsigned int timeout_ms)
1054 {
1055 lws_service_assert_loop_thread(h->context, 0);
1056 if (!h->cwsi)
1057 /* we can't fulfil it */
1058 return;
1059 h->timeout_ms = (uint32_t)timeout_ms;
1060 h->pending_timeout_update = 1;
1061 lws_callback_on_writable(h->cwsi);
1062 }
1063
1064 void
lws_sspc_cancel_timeout(struct lws_sspc_handle * h)1065 lws_sspc_cancel_timeout(struct lws_sspc_handle *h)
1066 {
1067 lws_sspc_start_timeout(h, (unsigned int)-1);
1068 }
1069
1070 void *
lws_sspc_to_user_object(struct lws_sspc_handle * h)1071 lws_sspc_to_user_object(struct lws_sspc_handle *h)
1072 {
1073 return (void *)(h + 1);
1074 }
1075
1076 void
lws_sspc_change_handlers(struct lws_sspc_handle * h,lws_ss_state_return_t (* rx)(void * userobj,const uint8_t * buf,size_t len,int flags),lws_ss_state_return_t (* tx)(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags),lws_ss_state_return_t (* state)(void * userobj,void * h_src,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack))1077 lws_sspc_change_handlers(struct lws_sspc_handle *h,
1078 lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
1079 lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
1080 size_t *len, int *flags),
1081 lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */,
1082 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack))
1083 {
1084 if (rx)
1085 h->ssi.rx = rx;
1086 if (tx)
1087 h->ssi.tx = tx;
1088 if (state)
1089 h->ssi.state = state;
1090 }
1091
1092 const char *
lws_sspc_tag(struct lws_sspc_handle * h)1093 lws_sspc_tag(struct lws_sspc_handle *h)
1094 {
1095 if (!h)
1096 return "[null sspc]";
1097 return lws_lc_tag(&h->lc);
1098 }
1099
1100 int
lws_sspc_cancel_notify_dll(struct lws_dll2 * d,void * user)1101 lws_sspc_cancel_notify_dll(struct lws_dll2 *d, void *user)
1102 {
1103 lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
1104
1105 lws_sspc_event_helper(h, LWSSSCS_EVENT_WAIT_CANCELLED, 0);
1106
1107 return 0;
1108 }
1109
1110