• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #if !defined (LWS_PLUGIN_STATIC)
26 #define LWS_DLL
27 #define LWS_INTERNAL
28 #include <libwebsockets.h>
29 #endif
30 
31 #include <string.h>
32 #include <sys/types.h>
33 #include <fcntl.h>
34 
35 #define RING_DEPTH 8
36 
37 struct packet {
38 	void *payload;
39 	uint32_t len;
40 	uint32_t ticket;
41 };
42 
43 enum {
44 	ACC,
45 	ONW
46 };
47 
48 /*
49  * Because both sides of the connection want to share this, we allocate it
50  * during accepted adoption and both sides have a pss that is just a wrapper
51  * pointing to this.
52  *
53  * The last one of the accepted side and the onward side to close frees it.
54  * This removes any chance of one side or the other having an invalidated
55  * pointer to the pss.
56  */
57 
58 struct conn {
59 	struct lws *wsi[2];
60 
61 	/* rings containing unsent rx from accepted and onward sides */
62 	struct lws_ring *r[2];
63 	uint32_t t[2]; /* ring tail */
64 
65 	uint32_t ticket_next;
66 	uint32_t ticket_retired;
67 
68 	char rx_enabled[2];
69 	char closed[2];
70 	char established[2];
71 };
72 
73 struct raw_pss {
74 	struct conn *conn;
75 };
76 
77 /* one of these is created for each vhost our protocol is used with */
78 
79 struct raw_vhd {
80 	char addr[128];
81 	uint16_t port;
82 	char ipv6;
83 };
84 
85 static void
__destroy_packet(void * _pkt)86 __destroy_packet(void *_pkt)
87 {
88 	struct packet *pkt = _pkt;
89 
90 	free(pkt->payload);
91 	pkt->payload = NULL;
92 	pkt->len = 0;
93 }
94 
95 static void
destroy_conn(struct raw_vhd * vhd,struct raw_pss * pss)96 destroy_conn(struct raw_vhd *vhd, struct raw_pss *pss)
97 {
98 	struct conn *conn = pss->conn;
99 
100 	if (conn->r[ACC])
101 		lws_ring_destroy(conn->r[ACC]);
102 	if (conn->r[ONW])
103 		lws_ring_destroy(conn->r[ONW]);
104 
105 	pss->conn = NULL;
106 
107 	free(conn);
108 }
109 
110 static int
connect_client(struct raw_vhd * vhd,struct raw_pss * pss)111 connect_client(struct raw_vhd *vhd, struct raw_pss *pss)
112 {
113 	struct lws_client_connect_info i;
114 	char host[128];
115 	struct lws *cwsi;
116 
117 	lws_snprintf(host, sizeof(host), "%s:%u", vhd->addr, vhd->port);
118 
119 	memset(&i, 0, sizeof(i));
120 
121 	i.method = "RAW";
122 	i.context = lws_get_context(pss->conn->wsi[ACC]);
123 	i.port = vhd->port;
124 	i.address = vhd->addr;
125 	i.host = host;
126 	i.origin = host;
127 	i.ssl_connection = 0;
128 	i.vhost = lws_get_vhost(pss->conn->wsi[ACC]);
129 	i.local_protocol_name = "raw-proxy";
130 	i.protocol = "raw-proxy";
131 	i.path = "/";
132 	/*
133 	 * The "onward" client wsi has its own pss but shares the "conn"
134 	 * created when the inbound connection was accepted.  We need to stash
135 	 * the address of the shared conn and apply it to the client psss
136 	 * when the client connection completes.
137 	 */
138 	i.opaque_user_data = pss->conn;
139 	i.pwsi = &pss->conn->wsi[ONW];
140 
141 	lwsl_info("%s: onward: %s:%d%s\n", __func__, i.address, i.port, i.path);
142 
143 	cwsi = lws_client_connect_via_info(&i);
144 	if (!cwsi)
145 		lwsl_err("%s: client connect failed early\n", __func__);
146 
147 	return !cwsi;
148 }
149 
150 static int
flow_control(struct conn * conn,int side,int enable)151 flow_control(struct conn *conn, int side, int enable)
152 {
153 	if (conn->closed[side] ||
154 	    enable == conn->rx_enabled[side] ||
155 	    !conn->established[side])
156 		return 0;
157 
158 	if (lws_rx_flow_control(conn->wsi[side], enable))
159 		return 1;
160 
161 	conn->rx_enabled[side] = enable;
162 	lwsl_info("%s: %s side: %s\n", __func__, side ? "ONW" : "ACC",
163 		  enable ? "rx enabled" : "rx flow controlled");
164 
165 	return 0;
166 }
167 
168 static int
callback_raw_proxy(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)169 callback_raw_proxy(struct lws *wsi, enum lws_callback_reasons reason,
170 		   void *user, void *in, size_t len)
171 {
172 	struct raw_pss *pss = (struct raw_pss *)user;
173 	struct raw_vhd *vhd = (struct raw_vhd *)lws_protocol_vh_priv_get(
174 				     lws_get_vhost(wsi), lws_get_protocol(wsi));
175 	const struct packet *ppkt;
176 	struct conn *conn = NULL;
177 	struct lws_tokenize ts;
178 	lws_tokenize_elem e;
179 	struct packet pkt;
180 	const char *cp;
181 	int n;
182 
183 	if (pss)
184 		conn = pss->conn;
185 
186 	switch (reason) {
187 	case LWS_CALLBACK_PROTOCOL_INIT:
188 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
189 				lws_get_protocol(wsi), sizeof(struct raw_vhd));
190 		if (lws_pvo_get_str(in, "onward", &cp)) {
191 			lwsl_err("%s: vh %s: pvo 'onward' required\n", __func__,
192 				 lws_get_vhost_name(lws_get_vhost(wsi)));
193 
194 			return -1;
195 		}
196 		lws_tokenize_init(&ts, cp, LWS_TOKENIZE_F_DOT_NONTERM |
197 					   LWS_TOKENIZE_F_MINUS_NONTERM |
198 					   LWS_TOKENIZE_F_NO_FLOATS);
199 		ts.len = strlen(cp);
200 
201 		if (lws_tokenize(&ts) != LWS_TOKZE_TOKEN)
202 			goto bad_onward;
203 		if (!strncmp(ts.token, "ipv6", ts.token_len))
204 			vhd->ipv6 = 1;
205 		else
206 			if (strncmp(ts.token, "ipv4", ts.token_len))
207 				goto bad_onward;
208 
209 		/* then the colon */
210 		if (lws_tokenize(&ts) != LWS_TOKZE_DELIMITER)
211 			goto bad_onward;
212 
213 		e = lws_tokenize(&ts);
214 		if (!vhd->ipv6) {
215 			if (e != LWS_TOKZE_TOKEN ||
216 			    ts.token_len + 1 >= (int)sizeof(vhd->addr))
217 				goto bad_onward;
218 
219 			lws_strncpy(vhd->addr, ts.token, ts.token_len + 1);
220 			e = lws_tokenize(&ts);
221 			if (e == LWS_TOKZE_DELIMITER) {
222 				/* there should be a port then */
223 				e = lws_tokenize(&ts);
224 				if (e != LWS_TOKZE_INTEGER)
225 					goto bad_onward;
226 				vhd->port = atoi(ts.token);
227 				e = lws_tokenize(&ts);
228 			}
229 			if (e != LWS_TOKZE_ENDED)
230 				goto bad_onward;
231 		} else
232 			lws_strncpy(vhd->addr, ts.token, sizeof(vhd->addr));
233 
234 		lwsl_notice("%s: vh %s: onward %s:%s:%d\n", __func__,
235 			    lws_get_vhost_name(lws_get_vhost(wsi)),
236 			    vhd->ipv6 ? "ipv6": "ipv4", vhd->addr, vhd->port);
237 		break;
238 
239 bad_onward:
240 		lwsl_err("%s: onward pvo format must be ipv4:addr[:port] "
241 			 " or ipv6:addr, not '%s'\n", __func__, cp);
242 		return -1;
243 
244 	case LWS_CALLBACK_PROTOCOL_DESTROY:
245 		break;
246 
247 	/* callbacks related to client "onward side" */
248 
249 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
250 		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
251 			 in ? (char *)in : "(null)");
252 		break;
253 
254         case LWS_CALLBACK_RAW_PROXY_CLI_ADOPT:
255 		lwsl_debug("%s: %p: LWS_CALLBACK_RAW_CLI_ADOPT: pss %p\n", __func__, wsi, pss);
256 		if (conn || !pss)
257 			break;
258 		conn = pss->conn = lws_get_opaque_user_data(wsi);
259 		if (!conn)
260 			break;
261 		conn->established[ONW] = 1;
262 		/* they start enabled */
263 		conn->rx_enabled[ACC] = 1;
264 		conn->rx_enabled[ONW] = 1;
265 
266 		/* he disabled his rx while waiting for use to be established */
267 		flow_control(conn, ACC, 1);
268 
269 		lws_callback_on_writable(wsi);
270 		lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
271 		break;
272 
273 	case LWS_CALLBACK_RAW_PROXY_CLI_CLOSE:
274 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_CLOSE\n");
275 		if (!conn)
276 			break;
277 
278 		conn->closed[ONW] = 1;
279 
280 		if (conn->closed[ACC])
281 			destroy_conn(vhd, pss);
282 
283 		break;
284 
285 	case LWS_CALLBACK_RAW_PROXY_CLI_RX:
286 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_RX: %d\n", (int)len);
287 
288 		if (!conn)
289 			return 0;
290 
291 		if (!pss || !conn->wsi[ACC] || conn->closed[ACC]) {
292 			lwsl_info(" pss %p, wsi[ACC] %p, closed[ACC] %d\n",
293 				  pss, conn->wsi[ACC], conn->closed[ACC]);
294 			return -1;
295 		}
296 		pkt.payload = malloc(len);
297 		if (!pkt.payload) {
298 			lwsl_notice("OOM: dropping\n");
299 			return -1;
300 		}
301 		pkt.len = len;
302 		pkt.ticket = conn->ticket_next++;
303 
304 		memcpy(pkt.payload, in, len);
305 		if (!lws_ring_insert(conn->r[ONW], &pkt, 1)) {
306 			__destroy_packet(&pkt);
307 			lwsl_notice("dropping!\n");
308 			return -1;
309 		}
310 
311 		lwsl_debug("After onward RX: acc free: %d...\n",
312 			   (int)lws_ring_get_count_free_elements(conn->r[ONW]));
313 
314 		if (conn->rx_enabled[ONW] &&
315 		    lws_ring_get_count_free_elements(conn->r[ONW]) < 2)
316 			flow_control(conn, ONW, 0);
317 
318 		if (!conn->closed[ACC])
319 			lws_callback_on_writable(conn->wsi[ACC]);
320 		break;
321 
322 	case LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE:
323 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE\n");
324 
325 		if (!conn)
326 			break;
327 
328 		ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
329 		if (!ppkt) {
330 			lwsl_info("%s: CLI_WRITABLE had nothing in acc ring\n",
331 				  __func__);
332 			break;
333 		}
334 
335 		if (ppkt->ticket != conn->ticket_retired + 1) {
336 			lwsl_info("%s: acc ring has %d but next %d\n", __func__,
337 				  ppkt->ticket, conn->ticket_retired + 1);
338 			lws_callback_on_writable(conn->wsi[ACC]);
339 			break;
340 		}
341 
342 		n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
343 		if (n < 0) {
344 			lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
345 
346 			return -1;
347 		}
348 
349 		conn->ticket_retired = ppkt->ticket;
350 		lws_ring_consume(conn->r[ACC], &conn->t[ACC], NULL, 1);
351 		lws_ring_update_oldest_tail(conn->r[ACC], conn->t[ACC]);
352 
353 		lwsl_debug("acc free: %d...\n",
354 			  (int)lws_ring_get_count_free_elements(conn->r[ACC]));
355 
356 		if (!conn->rx_enabled[ACC] &&
357 		    lws_ring_get_count_free_elements(conn->r[ACC]) > 2)
358 			flow_control(conn, ACC, 1);
359 
360 		ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
361 		lwsl_debug("%s: CLI_WRITABLE: next acc pkt %p idx %d vs %d\n",
362 			   __func__, ppkt, ppkt ? ppkt->ticket : 0,
363 					   conn->ticket_retired + 1);
364 
365 		if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
366 			lws_callback_on_writable(wsi);
367 		else {
368 			/*
369 			 * defer checking for accepted side closing until we
370 			 * sent everything in the ring to onward
371 			 */
372 			if (conn->closed[ACC])
373 				/*
374 				 * there is never going to be any more... but
375 				 * we may have some tx still in tx buflist /
376 				 * partial
377 				 */
378 				return lws_raw_transaction_completed(wsi);
379 
380 			if (lws_ring_get_element(conn->r[ONW], &conn->t[ONW]))
381 				lws_callback_on_writable(conn->wsi[ACC]);
382 		}
383 		break;
384 
385 	/* callbacks related to raw socket descriptor "accepted side" */
386 
387         case LWS_CALLBACK_RAW_PROXY_SRV_ADOPT:
388 		lwsl_debug("LWS_CALLBACK_RAW_SRV_ADOPT\n");
389 		if (!pss)
390 			return -1;
391 		conn = pss->conn = malloc(sizeof(struct conn));
392 		if (!pss->conn)
393 			return -1;
394 		memset(conn, 0, sizeof(*conn));
395 
396 		conn->wsi[ACC] = wsi;
397 		conn->ticket_next = 1;
398 
399 		conn->r[ACC] = lws_ring_create(sizeof(struct packet),
400 					       RING_DEPTH, __destroy_packet);
401 		if (!conn->r[ACC]) {
402 			lwsl_err("%s: OOM\n", __func__);
403 			return -1;
404 		}
405 		conn->r[ONW] = lws_ring_create(sizeof(struct packet),
406 					       RING_DEPTH, __destroy_packet);
407 		if (!conn->r[ONW]) {
408 			lws_ring_destroy(conn->r[ACC]);
409 			conn->r[ACC] = NULL;
410 			lwsl_err("%s: OOM\n", __func__);
411 
412 			return -1;
413 		}
414 
415 		conn->established[ACC] = 1;
416 
417 		/* disable any rx until the client side is up */
418 		flow_control(conn, ACC, 0);
419 
420 		lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
421 
422 		/* try to create the onward client connection */
423 		connect_client(vhd, pss);
424                 break;
425 
426 	case LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:
427 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:\n");
428 
429 		if (!conn)
430 			break;
431 
432 		conn->closed[ACC] = 1;
433 		if (conn->closed[ONW])
434 			destroy_conn(vhd, pss);
435 		break;
436 
437 	case LWS_CALLBACK_RAW_PROXY_SRV_RX:
438 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_RX: rx %d\n", (int)len);
439 
440 		if (!conn || !conn->wsi[ONW]) {
441 			lwsl_err("%s: LWS_CALLBACK_RAW_PROXY_SRV_RX: "
442 				 "conn->wsi[ONW] NULL\n", __func__);
443 			return -1;
444 		}
445 		if (conn->closed[ONW]) {
446 			lwsl_info(" closed[ONW] %d\n", conn->closed[ONW]);
447 			return -1;
448 		}
449 
450 		if (!len)
451 			return 0;
452 
453 		pkt.payload = malloc(len);
454 		if (!pkt.payload) {
455 			lwsl_notice("OOM: dropping\n");
456 			return -1;
457 		}
458 		pkt.len = len;
459 		pkt.ticket = conn->ticket_next++;
460 
461 		memcpy(pkt.payload, in, len);
462 		if (!lws_ring_insert(conn->r[ACC], &pkt, 1)) {
463 			__destroy_packet(&pkt);
464 			lwsl_notice("dropping!\n");
465 			return -1;
466 		}
467 
468 		lwsl_debug("After acc RX: acc free: %d...\n",
469 			   (int)lws_ring_get_count_free_elements(conn->r[ACC]));
470 
471 		if (conn->rx_enabled[ACC] &&
472 		    lws_ring_get_count_free_elements(conn->r[ACC]) <= 2)
473 			flow_control(conn, ACC, 0);
474 
475 		if (conn->established[ONW] && !conn->closed[ONW])
476 			lws_callback_on_writable(conn->wsi[ONW]);
477 		break;
478 
479 	case LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE:
480 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE\n");
481 
482 		if (!conn || !conn->established[ONW] || conn->closed[ONW])
483 			break;
484 
485 		ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
486 		if (!ppkt) {
487 			lwsl_info("%s: SRV_WRITABLE nothing in onw ring\n",
488 				  __func__);
489 			break;
490 		}
491 
492 		if (ppkt->ticket != conn->ticket_retired + 1) {
493 			lwsl_info("%s: onw ring has %d but next %d\n", __func__,
494 				  ppkt->ticket, conn->ticket_retired + 1);
495 			lws_callback_on_writable(conn->wsi[ONW]);
496 			break;
497 		}
498 
499 		n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
500 		if (n < 0) {
501 			lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
502 
503 			return -1;
504 		}
505 
506 		conn->ticket_retired = ppkt->ticket;
507 		lws_ring_consume(conn->r[ONW], &conn->t[ONW], NULL, 1);
508 		lws_ring_update_oldest_tail(conn->r[ONW], conn->t[ONW]);
509 
510 		lwsl_debug("onward free: %d... waiting %d\n",
511 			  (int)lws_ring_get_count_free_elements(conn->r[ONW]),
512 			  (int)lws_ring_get_count_waiting_elements(conn->r[ONW],
513 								&conn->t[ONW]));
514 
515 		if (!conn->rx_enabled[ONW] &&
516 		    lws_ring_get_count_free_elements(conn->r[ONW]) > 2)
517 			flow_control(conn, ONW, 1);
518 
519 		ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
520 		lwsl_debug("%s: SRV_WRITABLE: next onw pkt %p idx %d vs %d\n",
521 			   __func__, ppkt, ppkt ? ppkt->ticket : 0,
522 					   conn->ticket_retired + 1);
523 
524 		if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
525 			lws_callback_on_writable(wsi);
526 		else {
527 			/*
528 			 * defer checking for onward side closing until we
529 			 * sent everything in the ring to accepted side
530 			 */
531 			if (conn->closed[ONW])
532 				/*
533 				 * there is never going to be any more... but
534 				 * we may have some tx still in tx buflist /
535 				 * partial
536 				 */
537 				return lws_raw_transaction_completed(wsi);
538 
539 		if (lws_ring_get_element(conn->r[ACC], &conn->t[ACC]))
540 			lws_callback_on_writable(conn->wsi[ONW]);
541 		}
542 		break;
543 
544 	default:
545 		break;
546 	}
547 
548 	return lws_callback_http_dummy(wsi, reason, user, in, len);
549 }
550 
551 #define LWS_PLUGIN_PROTOCOL_RAW_PROXY { \
552 		"raw-proxy", \
553 		callback_raw_proxy, \
554 		sizeof(struct raw_pss), \
555 		8192, \
556 		8192, NULL, 0 \
557 	}
558 
559 #if !defined (LWS_PLUGIN_STATIC)
560 
561 static const struct lws_protocols protocols[] = {
562 	LWS_PLUGIN_PROTOCOL_RAW_PROXY
563 };
564 
565 LWS_VISIBLE int
init_protocol_lws_raw_proxy(struct lws_context * context,struct lws_plugin_capability * c)566 init_protocol_lws_raw_proxy(struct lws_context *context,
567 			    struct lws_plugin_capability *c)
568 {
569 	if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
570 		lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
571 			 c->api_magic);
572 		return 1;
573 	}
574 
575 	c->protocols = protocols;
576 	c->count_protocols = LWS_ARRAY_SIZE(protocols);
577 	c->extensions = NULL;
578 	c->count_extensions = 0;
579 
580 	return 0;
581 }
582 
583 LWS_VISIBLE int
destroy_protocol_lws_raw_proxy(struct lws_context * context)584 destroy_protocol_lws_raw_proxy(struct lws_context *context)
585 {
586 	return 0;
587 }
588 #endif
589 
590 
591