1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2019 - 2020 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include <private-lib-core.h>
26
27 static const struct ss_pcols *ss_pcols[] = {
28 #if defined(LWS_ROLE_H1)
29 &ss_pcol_h1, /* LWSSSP_H1 */
30 #else
31 NULL,
32 #endif
33 #if defined(LWS_ROLE_H2)
34 &ss_pcol_h2, /* LWSSSP_H2 */
35 #else
36 NULL,
37 #endif
38 #if defined(LWS_ROLE_WS)
39 &ss_pcol_ws, /* LWSSSP_WS */
40 #else
41 NULL,
42 #endif
43 #if defined(LWS_ROLE_MQTT)
44 &ss_pcol_mqtt, /* LWSSSP_MQTT */
45 #else
46 NULL,
47 #endif
48 };
49
50 static const char *state_names[] = {
51 "LWSSSCS_CREATING",
52 "LWSSSCS_DISCONNECTED",
53 "LWSSSCS_UNREACHABLE",
54 "LWSSSCS_AUTH_FAILED",
55 "LWSSSCS_CONNECTED",
56 "LWSSSCS_CONNECTING",
57 "LWSSSCS_DESTROYING",
58 "LWSSSCS_POLL",
59 "LWSSSCS_ALL_RETRIES_FAILED",
60 "LWSSSCS_QOS_ACK_REMOTE",
61 "LWSSSCS_QOS_NACK_REMOTE",
62 "LWSSSCS_QOS_ACK_LOCAL",
63 "LWSSSCS_QOS_NACK_LOCAL",
64 };
65
66 const char *
lws_ss_state_name(int state)67 lws_ss_state_name(int state)
68 {
69 if (state >= (int)LWS_ARRAY_SIZE(state_names))
70 return "unknown";
71
72 return state_names[state];
73 }
74
75 int
lws_ss_event_helper(lws_ss_handle_t * h,lws_ss_constate_t cs)76 lws_ss_event_helper(lws_ss_handle_t *h, lws_ss_constate_t cs)
77 {
78 if (!h)
79 return 0;
80
81 #if defined(LWS_WITH_SEQUENCER)
82 /*
83 * A parent sequencer for the ss is optional, if we have one, keep it
84 * informed of state changes on the ss connection
85 */
86 if (h->seq && cs != LWSSSCS_DESTROYING)
87 lws_seq_queue_event(h->seq, LWSSEQ_SS_STATE_BASE + cs,
88 (void *)h, NULL);
89 #endif
90
91 if (h->h_sink &&h->h_sink->info.state(h->sink_obj, h->h_sink, cs, 0))
92 return 1;
93
94 return h->info.state(ss_to_userobj(h), NULL, cs, 0);
95 }
96
97 static void
lws_ss_timeout_sul_check_cb(lws_sorted_usec_list_t * sul)98 lws_ss_timeout_sul_check_cb(lws_sorted_usec_list_t *sul)
99 {
100 lws_ss_handle_t *h = lws_container_of(sul, lws_ss_handle_t, sul);
101
102 lwsl_err("%s: retrying ss h %p after backoff\n", __func__, h);
103 /* we want to retry... */
104 h->seqstate = SSSEQ_DO_RETRY;
105
106 lws_ss_request_tx(h);
107 }
108
109 int
lws_ss_exp_cb_metadata(void * priv,const char * name,char * out,size_t * pos,size_t olen,size_t * exp_ofs)110 lws_ss_exp_cb_metadata(void *priv, const char *name, char *out, size_t *pos,
111 size_t olen, size_t *exp_ofs)
112 {
113 lws_ss_handle_t *h = (lws_ss_handle_t *)priv;
114 const char *replace = NULL;
115 size_t total, budget;
116 lws_ss_metadata_t *md = lws_ss_policy_metadata(h->policy, name);
117
118 if (!md) {
119 lwsl_err("%s: Unknown metadata %s\n", __func__, name);
120
121 return LSTRX_FATAL_NAME_UNKNOWN;
122 }
123
124 lwsl_info("%s %s %d\n", __func__, name, (int)md->length);
125
126 replace = h->metadata[md->length].value;
127 total = h->metadata[md->length].length;
128 // lwsl_hexdump_err(replace, total);
129
130 budget = olen - *pos;
131 total -= *exp_ofs;
132 if (total < budget)
133 budget = total;
134
135 memcpy(out + *pos, replace + (*exp_ofs), budget);
136 *exp_ofs += budget;
137 *pos += budget;
138
139 if (budget == total)
140 return LSTRX_DONE;
141
142 return LSTRX_FILLED_OUT;
143 }
144
145 int
lws_ss_set_timeout_us(lws_ss_handle_t * h,lws_usec_t us)146 lws_ss_set_timeout_us(lws_ss_handle_t *h, lws_usec_t us)
147 {
148 struct lws_context_per_thread *pt = &h->context->pt[h->tsi];
149
150 h->sul.cb = lws_ss_timeout_sul_check_cb;
151 __lws_sul_insert(&pt->pt_sul_owner, &h->sul, us);
152
153 return 0;
154 }
155
156 int
lws_ss_backoff(lws_ss_handle_t * h)157 lws_ss_backoff(lws_ss_handle_t *h)
158 {
159 uint64_t ms;
160 char conceal;
161
162 if (h->seqstate == SSSEQ_RECONNECT_WAIT)
163 return 0;
164
165 /* figure out what we should do about another retry */
166
167 lwsl_info("%s: ss %p: retry backoff after failure\n", __func__, h);
168 ms = lws_retry_get_delay_ms(h->context, h->policy->retry_bo,
169 &h->retry, &conceal);
170 if (!conceal) {
171 lwsl_info("%s: ss %p: abandon conn attempt \n",__func__, h);
172 h->seqstate = SSSEQ_IDLE;
173 lws_ss_event_helper(h, LWSSSCS_ALL_RETRIES_FAILED);
174 return 1;
175 }
176
177 h->seqstate = SSSEQ_RECONNECT_WAIT;
178 lws_ss_set_timeout_us(h, ms * LWS_US_PER_MS);
179
180 lwsl_info("%s: ss %p: retry wait %"PRIu64"ms\n", __func__, h, ms);
181
182 return 0;
183 }
184
185 int
lws_ss_client_connect(lws_ss_handle_t * h)186 lws_ss_client_connect(lws_ss_handle_t *h)
187 {
188 struct lws_client_connect_info i;
189 const struct ss_pcols *ssp;
190 size_t used_in, used_out;
191 union lws_ss_contemp ct;
192 char path[128], ep[96];
193 lws_strexp_t exp;
194
195 if (!h->policy) {
196 lwsl_err("%s: ss with no policy\n", __func__);
197
198 return -1;
199 }
200
201 /*
202 * We are already bound to a sink?
203 */
204
205 if (h->h_sink)
206 return 0;
207
208 memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
209 i.context = h->context;
210
211 if (h->policy->flags & LWSSSPOLF_TLS) {
212 lwsl_info("%s: using tls\n", __func__);
213 i.ssl_connection = LCCSCF_USE_SSL;
214
215 if (!h->policy->trust_store)
216 lwsl_info("%s: using platform trust store\n", __func__);
217 else {
218
219 i.vhost = lws_get_vhost_by_name(h->context,
220 h->policy->trust_store->name);
221 if (!i.vhost) {
222 lwsl_err("%s: missing vh for policy ca\n", __func__);
223
224 return -1;
225 }
226 }
227 }
228
229 /* expand metadata ${symbols} that may be inside the endpoint string */
230
231 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, ep, sizeof(ep));
232
233 if (lws_strexp_expand(&exp, h->policy->endpoint,
234 strlen(h->policy->endpoint),
235 &used_in, &used_out) != LSTRX_DONE) {
236 lwsl_err("%s: address strexp failed\n", __func__);
237
238 return -1;
239 }
240
241 i.address = ep;
242 i.port = h->policy->port;
243 i.host = i.address;
244 i.origin = i.address;
245 i.opaque_user_data = h;
246 i.seq = h->seq;
247 i.retry_and_idle_policy = h->policy->retry_bo;
248 i.sys_tls_client_cert = h->policy->client_cert;
249
250 i.path = "";
251
252 ssp = ss_pcols[(int)h->policy->protocol];
253 if (!ssp) {
254 lwsl_err("%s: unsupported protocol\n", __func__);
255
256 return -1;
257 }
258 i.alpn = ssp->alpn;
259
260 /*
261 * For http, we can get the method from the http object, override in
262 * the protocol-specific munge callback below if not http
263 */
264 i.method = h->policy->u.http.method;
265 i.protocol = ssp->protocol_name; /* lws protocol name */
266 i.local_protocol_name = i.protocol;
267
268 ssp->munge(h, path, sizeof(path), &i, &ct);
269
270 i.pwsi = &h->wsi;
271
272 if (h->policy->plugins[0] && h->policy->plugins[0]->munge)
273 h->policy->plugins[0]->munge(h, path, sizeof(path));
274
275 lwsl_info("%s: connecting %s, '%s' '%s' %s\n", __func__, i.method,
276 i.alpn, i.address, i.path);
277
278 h->txn_ok = 0;
279 if (lws_ss_event_helper(h, LWSSSCS_CONNECTING))
280 return -1;
281
282 if (!lws_client_connect_via_info(&i)) {
283 lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
284 lws_ss_backoff(h);
285
286 return 1;
287 }
288
289 return 0;
290 }
291
292
293 /*
294 * Public API
295 */
296
297 /*
298 * Create either a stream or a sink
299 */
300
301 int
lws_ss_create(struct lws_context * context,int tsi,const lws_ss_info_t * ssi,void * opaque_user_data,lws_ss_handle_t ** ppss,struct lws_sequencer * seq_owner,const char ** ppayload_fmt)302 lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
303 void *opaque_user_data, lws_ss_handle_t **ppss,
304 struct lws_sequencer *seq_owner, const char **ppayload_fmt)
305 {
306 struct lws_context_per_thread *pt = &context->pt[tsi];
307 const lws_ss_policy_t *pol;
308 lws_ss_metadata_t *smd;
309 lws_ss_handle_t *h;
310 size_t size;
311 void **v;
312 char *p;
313 int n;
314
315 pol = lws_ss_policy_lookup(context, ssi->streamtype);
316 if (!pol) {
317 lwsl_info("%s: unknown stream type %s\n", __func__,
318 ssi->streamtype);
319 return 1;
320 }
321
322 if (ssi->register_sink) {
323 /*
324 * This can register a secure streams sink as well as normal
325 * secure streams connections. If that's what's happening,
326 * confirm the policy agrees that this streamtype should be
327 * directed to a sink.
328 */
329 if (!(pol->flags & LWSSSPOLF_LOCAL_SINK)) {
330 /*
331 * Caller wanted to create a sink for this streamtype,
332 * but the policy does not agree the streamtype should
333 * be routed to a local sink.
334 */
335 lwsl_err("%s: %s policy does not allow local sink\n",
336 __func__, ssi->streamtype);
337
338 return 1;
339 }
340 } else {
341
342 if (!(pol->flags & LWSSSPOLF_LOCAL_SINK)) {
343
344 }
345 // lws_dll2_foreach_safe(&pt->ss_owner, NULL, lws_ss_destroy_dll);
346 }
347
348 /*
349 * We overallocate and point to things in the overallocation...
350 *
351 * 1) the user_alloc from the stream info
352 * 2) network auth plugin instantiation data
353 * 3) stream auth plugin instantiation data
354 * 4) as many metadata pointer structs as the policy tells
355 * 5) the streamtype name (length is not aligned)
356 *
357 * ... when we come to destroy it, just one free to do.
358 */
359
360 size = sizeof(*h) + ssi->user_alloc + strlen(ssi->streamtype) + 1;
361 if (pol->plugins[0])
362 size += pol->plugins[0]->alloc;
363 if (pol->plugins[1])
364 size += pol->plugins[1]->alloc;
365 size += pol->metadata_count * sizeof(lws_ss_metadata_t);
366
367 h = lws_zalloc(size, __func__);
368 if (!h)
369 return 2;
370
371 h->info = *ssi;
372 h->policy = pol;
373 h->context = context;
374 h->tsi = tsi;
375 h->seq = seq_owner;
376
377 /* start of overallocated area */
378 p = (char *)&h[1];
379
380 /* set the handle pointer in the user data struct */
381 v = (void **)(p + ssi->handle_offset);
382 *v = h;
383
384 /* set the opaque user data in the user data struct */
385 v = (void **)(p + ssi->opaque_user_data_offset);
386 *v = opaque_user_data;
387
388 p += ssi->user_alloc;
389
390 if (pol->plugins[0]) {
391 h->nauthi = p;
392 p += pol->plugins[0]->alloc;
393 }
394 if (pol->plugins[1]) {
395 h->sauthi = p;
396 p += pol->plugins[1]->alloc;
397 }
398
399 if (pol->metadata_count) {
400 h->metadata = (lws_ss_metadata_t *)p;
401 p += pol->metadata_count * sizeof(lws_ss_metadata_t);
402
403 lwsl_info("%s: %s metadata count %d\n", __func__,
404 pol->streamtype, pol->metadata_count);
405 }
406
407 smd = pol->metadata;
408 for (n = 0; n < pol->metadata_count; n++) {
409 h->metadata[n].name = smd->name;
410 if (n + 1 == pol->metadata_count)
411 h->metadata[n].next = NULL;
412 else
413 h->metadata[n].next = &h->metadata[n + 1];
414 smd = smd->next;
415 }
416
417 memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
418 h->info.streamtype = p;
419
420 lws_pt_lock(pt, __func__);
421 lws_dll2_add_head(&h->list, &pt->ss_owner);
422 lws_pt_unlock(pt);
423
424 if (ppss)
425 *ppss = h;
426
427 if (ppayload_fmt)
428 *ppayload_fmt = pol->payload_fmt;
429
430 if (ssi->register_sink) {
431 /*
432 *
433 */
434 }
435
436 lws_ss_event_helper(h, LWSSSCS_CREATING);
437
438 if (!ssi->register_sink && (h->policy->flags & LWSSSPOLF_NAILED_UP))
439 if (lws_ss_client_connect(h))
440 lws_ss_backoff(h);
441
442 return 0;
443 }
444
445 void
lws_ss_destroy(lws_ss_handle_t ** ppss)446 lws_ss_destroy(lws_ss_handle_t **ppss)
447 {
448 struct lws_context_per_thread *pt;
449 lws_ss_handle_t *h = *ppss;
450 lws_ss_metadata_t *pmd;
451
452 if (!h)
453 return;
454
455 if (h->wsi) {
456 /*
457 * Don't let the wsi point to us any more,
458 * we (the ss object bound to the wsi) are going away now
459 */
460 // lws_set_opaque_user_data(h->wsi, NULL);
461 lws_set_timeout(h->wsi, 1, LWS_TO_KILL_SYNC);
462 }
463
464 pt = &h->context->pt[h->tsi];
465
466 lws_pt_lock(pt, __func__);
467 *ppss = NULL;
468 lws_dll2_remove(&h->list);
469 lws_dll2_remove(&h->to_list);
470 lws_ss_event_helper(h, LWSSSCS_DESTROYING);
471 lws_pt_unlock(pt);
472
473 /* in proxy case, metadata value on heap may need cleaning up */
474
475 pmd = h->metadata;
476 while (pmd) {
477 lwsl_info("%s: pmd %p\n", __func__, pmd);
478 if (pmd->value_on_lws_heap)
479 lws_free_set_NULL(pmd->value);
480 pmd = pmd->next;
481 }
482
483 lws_sul_schedule(h->context, 0, &h->sul, NULL, LWS_SET_TIMER_USEC_CANCEL);
484
485 lws_free_set_NULL(h);
486 }
487
488 void
lws_ss_request_tx(lws_ss_handle_t * h)489 lws_ss_request_tx(lws_ss_handle_t *h)
490 {
491 lwsl_info("%s: wsi %p\n", __func__, h->wsi);
492
493 if (h->wsi) {
494 lws_callback_on_writable(h->wsi);
495
496 return;
497 }
498
499 if (h->seqstate != SSSEQ_IDLE &&
500 h->seqstate != SSSEQ_DO_RETRY)
501 return;
502
503 h->seqstate = SSSEQ_TRY_CONNECT;
504 lws_ss_event_helper(h, LWSSSCS_POLL);
505
506 if (lws_ss_client_connect(h))
507 lws_ss_backoff(h);
508 }
509
510 void
lws_ss_request_tx_len(lws_ss_handle_t * h,unsigned long len)511 lws_ss_request_tx_len(lws_ss_handle_t *h, unsigned long len)
512 {
513 if (h->wsi)
514 h->wsi->http.writeable_len = len;
515 else
516 h->writeable_len = len;
517 lws_ss_request_tx(h);
518 }
519
520 /*
521 * private helpers
522 */
523
524 /* used on context destroy when iterating listed lws_ss on a pt */
525
526 int
lws_ss_destroy_dll(struct lws_dll2 * d,void * user)527 lws_ss_destroy_dll(struct lws_dll2 *d, void *user)
528 {
529 lws_ss_handle_t *h = lws_container_of(d, lws_ss_handle_t, list);
530
531 lws_ss_destroy(&h);
532
533 return 0;
534 }
535
536 struct lws_sequencer *
lws_ss_get_sequencer(lws_ss_handle_t * h)537 lws_ss_get_sequencer(lws_ss_handle_t *h)
538 {
539 return h->seq;
540 }
541
542 struct lws_context *
lws_ss_get_context(struct lws_ss_handle * h)543 lws_ss_get_context(struct lws_ss_handle *h)
544 {
545 return h->context;
546 }
547
548 const char *
lws_ss_rideshare(struct lws_ss_handle * h)549 lws_ss_rideshare(struct lws_ss_handle *h)
550 {
551 if (!h->rideshare)
552 return h->policy->streamtype;
553
554 return h->rideshare->streamtype;
555 }
556
557 int
lws_ss_add_peer_tx_credit(struct lws_ss_handle * h,int32_t bump)558 lws_ss_add_peer_tx_credit(struct lws_ss_handle *h, int32_t bump)
559 {
560 const struct ss_pcols *ssp;
561
562 ssp = ss_pcols[(int)h->policy->protocol];
563
564 if (h->wsi && ssp && ssp->tx_cr_add)
565 return ssp->tx_cr_add(h, bump);
566
567 return 0;
568 }
569
570 int
lws_ss_get_est_peer_tx_credit(struct lws_ss_handle * h)571 lws_ss_get_est_peer_tx_credit(struct lws_ss_handle *h)
572 {
573 const struct ss_pcols *ssp;
574
575 ssp = ss_pcols[(int)h->policy->protocol];
576
577 if (h->wsi && ssp && ssp->tx_cr_add)
578 return ssp->tx_cr_est(h);
579
580 return 0;
581 }
582