• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #if !defined (LWS_PLUGIN_STATIC)
26 #if !defined(LWS_DLL)
27 #define LWS_DLL
28 #endif
29 #if !defined(LWS_INTERNAL)
30 #define LWS_INTERNAL
31 #endif
32 #include <libwebsockets.h>
33 #endif
34 
35 #include <string.h>
36 #include <sys/types.h>
37 #include <fcntl.h>
38 
39 #define RING_DEPTH 8
40 
41 struct packet {
42 	void *payload;
43 	uint32_t len;
44 	uint32_t ticket;
45 };
46 
47 enum {
48 	ACC,
49 	ONW
50 };
51 
52 /*
53  * Because both sides of the connection want to share this, we allocate it
54  * during accepted adoption and both sides have a pss that is just a wrapper
55  * pointing to this.
56  *
57  * The last one of the accepted side and the onward side to close frees it.
58  * This removes any chance of one side or the other having an invalidated
59  * pointer to the pss.
60  */
61 
62 struct conn {
63 	struct lws *wsi[2];
64 
65 	/* rings containing unsent rx from accepted and onward sides */
66 	struct lws_ring *r[2];
67 	uint32_t t[2]; /* ring tail */
68 
69 	uint32_t ticket_next;
70 	uint32_t ticket_retired;
71 
72 	char rx_enabled[2];
73 	char closed[2];
74 	char established[2];
75 };
76 
77 struct raw_pss {
78 	struct conn *conn;
79 };
80 
81 /* one of these is created for each vhost our protocol is used with */
82 
83 struct raw_vhd {
84 	char addr[128];
85 	uint16_t port;
86 	char ipv6;
87 };
88 
89 static void
__destroy_packet(void * _pkt)90 __destroy_packet(void *_pkt)
91 {
92 	struct packet *pkt = _pkt;
93 
94 	free(pkt->payload);
95 	pkt->payload = NULL;
96 	pkt->len = 0;
97 }
98 
99 static void
destroy_conn(struct raw_vhd * vhd,struct raw_pss * pss)100 destroy_conn(struct raw_vhd *vhd, struct raw_pss *pss)
101 {
102 	struct conn *conn = pss->conn;
103 
104 	if (conn->r[ACC])
105 		lws_ring_destroy(conn->r[ACC]);
106 	if (conn->r[ONW])
107 		lws_ring_destroy(conn->r[ONW]);
108 
109 	pss->conn = NULL;
110 
111 	free(conn);
112 }
113 
114 static int
connect_client(struct raw_vhd * vhd,struct raw_pss * pss)115 connect_client(struct raw_vhd *vhd, struct raw_pss *pss)
116 {
117 	struct lws_client_connect_info i;
118 	char host[128];
119 	struct lws *cwsi;
120 
121 	lws_snprintf(host, sizeof(host), "%s:%u", vhd->addr, vhd->port);
122 
123 	memset(&i, 0, sizeof(i));
124 
125 	i.method = "RAW";
126 	i.context = lws_get_context(pss->conn->wsi[ACC]);
127 	i.port = vhd->port;
128 	i.address = vhd->addr;
129 	i.host = host;
130 	i.origin = host;
131 	i.ssl_connection = 0;
132 	i.vhost = lws_get_vhost(pss->conn->wsi[ACC]);
133 	i.local_protocol_name = "raw-proxy";
134 	i.protocol = "raw-proxy";
135 	i.path = "/";
136 	/*
137 	 * The "onward" client wsi has its own pss but shares the "conn"
138 	 * created when the inbound connection was accepted.  We need to stash
139 	 * the address of the shared conn and apply it to the client psss
140 	 * when the client connection completes.
141 	 */
142 	i.opaque_user_data = pss->conn;
143 	i.pwsi = &pss->conn->wsi[ONW];
144 
145 	lwsl_info("%s: onward: %s:%d%s\n", __func__, i.address, i.port, i.path);
146 
147 	cwsi = lws_client_connect_via_info(&i);
148 	if (!cwsi)
149 		lwsl_err("%s: client connect failed early\n", __func__);
150 
151 	return !cwsi;
152 }
153 
154 static int
flow_control(struct conn * conn,int side,int enable)155 flow_control(struct conn *conn, int side, int enable)
156 {
157 	if (conn->closed[side] ||
158 	    enable == conn->rx_enabled[side] ||
159 	    !conn->established[side])
160 		return 0;
161 
162 	if (lws_rx_flow_control(conn->wsi[side], enable))
163 		return 1;
164 
165 	conn->rx_enabled[side] = (char)enable;
166 	lwsl_info("%s: %s side: %s\n", __func__, side ? "ONW" : "ACC",
167 		  enable ? "rx enabled" : "rx flow controlled");
168 
169 	return 0;
170 }
171 
172 static int
callback_raw_proxy(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)173 callback_raw_proxy(struct lws *wsi, enum lws_callback_reasons reason,
174 		   void *user, void *in, size_t len)
175 {
176 	struct raw_pss *pss = (struct raw_pss *)user;
177 	struct raw_vhd *vhd = (struct raw_vhd *)lws_protocol_vh_priv_get(
178 				     lws_get_vhost(wsi), lws_get_protocol(wsi));
179 	const struct packet *ppkt;
180 	struct conn *conn = NULL;
181 	struct lws_tokenize ts;
182 	lws_tokenize_elem e;
183 	struct packet pkt;
184 	const char *cp;
185 	int n;
186 
187 	if (pss)
188 		conn = pss->conn;
189 
190 	switch (reason) {
191 	case LWS_CALLBACK_PROTOCOL_INIT:
192 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
193 				lws_get_protocol(wsi), sizeof(struct raw_vhd));
194 		if (!vhd)
195 			return 0;
196 		if (lws_pvo_get_str(in, "onward", &cp)) {
197 			lwsl_warn("%s: vh %s: pvo 'onward' required\n", __func__,
198 				 lws_get_vhost_name(lws_get_vhost(wsi)));
199 
200 			return 0;
201 		}
202 		lws_tokenize_init(&ts, cp, LWS_TOKENIZE_F_DOT_NONTERM |
203 					   LWS_TOKENIZE_F_MINUS_NONTERM |
204 					   LWS_TOKENIZE_F_NO_FLOATS);
205 		ts.len = strlen(cp);
206 
207 		if (lws_tokenize(&ts) != LWS_TOKZE_TOKEN)
208 			goto bad_onward;
209 		if (!strncmp(ts.token, "ipv6", ts.token_len))
210 			vhd->ipv6 = 1;
211 		else
212 			if (strncmp(ts.token, "ipv4", ts.token_len))
213 				goto bad_onward;
214 
215 		/* then the colon */
216 		if (lws_tokenize(&ts) != LWS_TOKZE_DELIMITER)
217 			goto bad_onward;
218 
219 		e = lws_tokenize(&ts);
220 		if (!vhd->ipv6) {
221 			if (e != LWS_TOKZE_TOKEN ||
222 			    ts.token_len + 1 >= (int)sizeof(vhd->addr))
223 				goto bad_onward;
224 
225 			lws_strncpy(vhd->addr, ts.token, ts.token_len + 1);
226 			e = lws_tokenize(&ts);
227 			if (e == LWS_TOKZE_DELIMITER) {
228 				/* there should be a port then */
229 				e = lws_tokenize(&ts);
230 				if (e != LWS_TOKZE_INTEGER)
231 					goto bad_onward;
232 				vhd->port = (uint16_t)atoi(ts.token);
233 				e = lws_tokenize(&ts);
234 			}
235 			if (e != LWS_TOKZE_ENDED)
236 				goto bad_onward;
237 		} else
238 			lws_strncpy(vhd->addr, ts.token, sizeof(vhd->addr));
239 
240 		lwsl_notice("%s: vh %s: onward %s:%s:%d\n", __func__,
241 			    lws_get_vhost_name(lws_get_vhost(wsi)),
242 			    vhd->ipv6 ? "ipv6": "ipv4", vhd->addr, vhd->port);
243 		break;
244 
245 bad_onward:
246 		lwsl_err("%s: onward pvo format must be ipv4:addr[:port] "
247 			 " or ipv6:addr, not '%s'\n", __func__, cp);
248 		return -1;
249 
250 	case LWS_CALLBACK_PROTOCOL_DESTROY:
251 		break;
252 
253 	/* callbacks related to client "onward side" */
254 
255 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
256 		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
257 			 in ? (char *)in : "(null)");
258 		break;
259 
260         case LWS_CALLBACK_RAW_PROXY_CLI_ADOPT:
261 		lwsl_debug("%s: %p: LWS_CALLBACK_RAW_CLI_ADOPT: pss %p\n", __func__, wsi, pss);
262 		if (conn || !pss)
263 			break;
264 		conn = pss->conn = lws_get_opaque_user_data(wsi);
265 		if (!conn)
266 			break;
267 		conn->established[ONW] = 1;
268 		/* they start enabled */
269 		conn->rx_enabled[ACC] = 1;
270 		conn->rx_enabled[ONW] = 1;
271 
272 		/* he disabled his rx while waiting for use to be established */
273 		flow_control(conn, ACC, 1);
274 
275 		lws_callback_on_writable(wsi);
276 		lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
277 		break;
278 
279 	case LWS_CALLBACK_RAW_PROXY_CLI_CLOSE:
280 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_CLOSE\n");
281 		if (!conn)
282 			break;
283 
284 		conn->closed[ONW] = 1;
285 
286 		if (conn->closed[ACC])
287 			destroy_conn(vhd, pss);
288 
289 		break;
290 
291 	case LWS_CALLBACK_RAW_PROXY_CLI_RX:
292 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_RX: %d\n", (int)len);
293 
294 		if (!conn)
295 			return 0;
296 
297 		if (!pss || !conn->wsi[ACC] || conn->closed[ACC]) {
298 			lwsl_info(" pss %p, wsi[ACC] %p, closed[ACC] %d\n",
299 				  pss, conn->wsi[ACC], conn->closed[ACC]);
300 			return -1;
301 		}
302 		pkt.payload = malloc(len);
303 		if (!pkt.payload) {
304 			lwsl_notice("OOM: dropping\n");
305 			return -1;
306 		}
307 		pkt.len = (uint32_t)len;
308 		pkt.ticket = conn->ticket_next++;
309 
310 		memcpy(pkt.payload, in, len);
311 		if (!lws_ring_insert(conn->r[ONW], &pkt, 1)) {
312 			__destroy_packet(&pkt);
313 			lwsl_notice("dropping!\n");
314 			return -1;
315 		}
316 
317 		lwsl_debug("After onward RX: acc free: %d...\n",
318 			   (int)lws_ring_get_count_free_elements(conn->r[ONW]));
319 
320 		if (conn->rx_enabled[ONW] &&
321 		    lws_ring_get_count_free_elements(conn->r[ONW]) < 2)
322 			flow_control(conn, ONW, 0);
323 
324 		if (!conn->closed[ACC])
325 			lws_callback_on_writable(conn->wsi[ACC]);
326 		break;
327 
328 	case LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE:
329 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE\n");
330 
331 		if (!conn)
332 			break;
333 
334 		ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
335 		if (!ppkt) {
336 			lwsl_info("%s: CLI_WRITABLE had nothing in acc ring\n",
337 				  __func__);
338 			break;
339 		}
340 
341 		if (ppkt->ticket != conn->ticket_retired + 1) {
342 			lwsl_info("%s: acc ring has %d but next %d\n", __func__,
343 				  ppkt->ticket, conn->ticket_retired + 1);
344 			lws_callback_on_writable(conn->wsi[ACC]);
345 			break;
346 		}
347 
348 		n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
349 		if (n < 0) {
350 			lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
351 
352 			return -1;
353 		}
354 
355 		conn->ticket_retired = ppkt->ticket;
356 		lws_ring_consume(conn->r[ACC], &conn->t[ACC], NULL, 1);
357 		lws_ring_update_oldest_tail(conn->r[ACC], conn->t[ACC]);
358 
359 		lwsl_debug("acc free: %d...\n",
360 			  (int)lws_ring_get_count_free_elements(conn->r[ACC]));
361 
362 		if (!conn->rx_enabled[ACC] &&
363 		    lws_ring_get_count_free_elements(conn->r[ACC]) > 2)
364 			flow_control(conn, ACC, 1);
365 
366 		ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
367 		lwsl_debug("%s: CLI_WRITABLE: next acc pkt %p idx %d vs %d\n",
368 			   __func__, ppkt, ppkt ? ppkt->ticket : 0,
369 					   conn->ticket_retired + 1);
370 
371 		if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
372 			lws_callback_on_writable(wsi);
373 		else {
374 			/*
375 			 * defer checking for accepted side closing until we
376 			 * sent everything in the ring to onward
377 			 */
378 			if (conn->closed[ACC])
379 				/*
380 				 * there is never going to be any more... but
381 				 * we may have some tx still in tx buflist /
382 				 * partial
383 				 */
384 				return lws_raw_transaction_completed(wsi);
385 
386 			if (lws_ring_get_element(conn->r[ONW], &conn->t[ONW]))
387 				lws_callback_on_writable(conn->wsi[ACC]);
388 		}
389 		break;
390 
391 	/* callbacks related to raw socket descriptor "accepted side" */
392 
393         case LWS_CALLBACK_RAW_PROXY_SRV_ADOPT:
394 		lwsl_debug("LWS_CALLBACK_RAW_SRV_ADOPT\n");
395 		if (!pss)
396 			return -1;
397 		conn = pss->conn = malloc(sizeof(struct conn));
398 		if (!pss->conn)
399 			return -1;
400 		memset(conn, 0, sizeof(*conn));
401 
402 		conn->wsi[ACC] = wsi;
403 		conn->ticket_next = 1;
404 
405 		conn->r[ACC] = lws_ring_create(sizeof(struct packet),
406 					       RING_DEPTH, __destroy_packet);
407 		if (!conn->r[ACC]) {
408 			lwsl_err("%s: OOM\n", __func__);
409 			return -1;
410 		}
411 		conn->r[ONW] = lws_ring_create(sizeof(struct packet),
412 					       RING_DEPTH, __destroy_packet);
413 		if (!conn->r[ONW]) {
414 			lws_ring_destroy(conn->r[ACC]);
415 			conn->r[ACC] = NULL;
416 			lwsl_err("%s: OOM\n", __func__);
417 
418 			return -1;
419 		}
420 
421 		conn->established[ACC] = 1;
422 
423 		/* disable any rx until the client side is up */
424 		flow_control(conn, ACC, 0);
425 
426 		lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
427 
428 		/* try to create the onward client connection */
429 		connect_client(vhd, pss);
430                 break;
431 
432 	case LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:
433 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:\n");
434 
435 		if (!conn)
436 			break;
437 
438 		conn->closed[ACC] = 1;
439 		if (conn->closed[ONW])
440 			destroy_conn(vhd, pss);
441 		break;
442 
443 	case LWS_CALLBACK_RAW_PROXY_SRV_RX:
444 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_RX: rx %d\n", (int)len);
445 
446 		if (!conn || !conn->wsi[ONW]) {
447 			lwsl_err("%s: LWS_CALLBACK_RAW_PROXY_SRV_RX: "
448 				 "conn->wsi[ONW] NULL\n", __func__);
449 			return -1;
450 		}
451 		if (conn->closed[ONW]) {
452 			lwsl_info(" closed[ONW] %d\n", conn->closed[ONW]);
453 			return -1;
454 		}
455 
456 		if (!len)
457 			return 0;
458 
459 		pkt.payload = malloc(len);
460 		if (!pkt.payload) {
461 			lwsl_notice("OOM: dropping\n");
462 			return -1;
463 		}
464 		pkt.len = (uint32_t)len;
465 		pkt.ticket = conn->ticket_next++;
466 
467 		memcpy(pkt.payload, in, len);
468 		if (!lws_ring_insert(conn->r[ACC], &pkt, 1)) {
469 			__destroy_packet(&pkt);
470 			lwsl_notice("dropping!\n");
471 			return -1;
472 		}
473 
474 		lwsl_debug("After acc RX: acc free: %d...\n",
475 			   (int)lws_ring_get_count_free_elements(conn->r[ACC]));
476 
477 		if (conn->rx_enabled[ACC] &&
478 		    lws_ring_get_count_free_elements(conn->r[ACC]) <= 2)
479 			flow_control(conn, ACC, 0);
480 
481 		if (conn->established[ONW] && !conn->closed[ONW])
482 			lws_callback_on_writable(conn->wsi[ONW]);
483 		break;
484 
485 	case LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE:
486 		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE\n");
487 
488 		if (!conn || !conn->established[ONW] || conn->closed[ONW])
489 			break;
490 
491 		ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
492 		if (!ppkt) {
493 			lwsl_info("%s: SRV_WRITABLE nothing in onw ring\n",
494 				  __func__);
495 			break;
496 		}
497 
498 		if (ppkt->ticket != conn->ticket_retired + 1) {
499 			lwsl_info("%s: onw ring has %d but next %d\n", __func__,
500 				  ppkt->ticket, conn->ticket_retired + 1);
501 			lws_callback_on_writable(conn->wsi[ONW]);
502 			break;
503 		}
504 
505 		n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
506 		if (n < 0) {
507 			lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
508 
509 			return -1;
510 		}
511 
512 		conn->ticket_retired = ppkt->ticket;
513 		lws_ring_consume(conn->r[ONW], &conn->t[ONW], NULL, 1);
514 		lws_ring_update_oldest_tail(conn->r[ONW], conn->t[ONW]);
515 
516 		lwsl_debug("onward free: %d... waiting %d\n",
517 			  (int)lws_ring_get_count_free_elements(conn->r[ONW]),
518 			  (int)lws_ring_get_count_waiting_elements(conn->r[ONW],
519 								&conn->t[ONW]));
520 
521 		if (!conn->rx_enabled[ONW] &&
522 		    lws_ring_get_count_free_elements(conn->r[ONW]) > 2)
523 			flow_control(conn, ONW, 1);
524 
525 		ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
526 		lwsl_debug("%s: SRV_WRITABLE: next onw pkt %p idx %d vs %d\n",
527 			   __func__, ppkt, ppkt ? ppkt->ticket : 0,
528 					   conn->ticket_retired + 1);
529 
530 		if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
531 			lws_callback_on_writable(wsi);
532 		else {
533 			/*
534 			 * defer checking for onward side closing until we
535 			 * sent everything in the ring to accepted side
536 			 */
537 			if (conn->closed[ONW])
538 				/*
539 				 * there is never going to be any more... but
540 				 * we may have some tx still in tx buflist /
541 				 * partial
542 				 */
543 				return lws_raw_transaction_completed(wsi);
544 
545 		if (lws_ring_get_element(conn->r[ACC], &conn->t[ACC]))
546 			lws_callback_on_writable(conn->wsi[ONW]);
547 		}
548 		break;
549 
550 	default:
551 		break;
552 	}
553 
554 	return lws_callback_http_dummy(wsi, reason, user, in, len);
555 }
556 
557 #define LWS_PLUGIN_PROTOCOL_RAW_PROXY { \
558 		"raw-proxy", \
559 		callback_raw_proxy, \
560 		sizeof(struct raw_pss), \
561 		8192, \
562 		8192, NULL, 0 \
563 	}
564 
565 #if !defined (LWS_PLUGIN_STATIC)
566 
567 LWS_VISIBLE const struct lws_protocols lws_raw_proxy_protocols[] = {
568 	LWS_PLUGIN_PROTOCOL_RAW_PROXY
569 };
570 
571 LWS_VISIBLE const lws_plugin_protocol_t lws_raw_proxy = {
572 	.hdr = {
573 		"raw proxy",
574 		"lws_protocol_plugin",
575 		LWS_BUILD_HASH,
576 		LWS_PLUGIN_API_MAGIC
577 	},
578 
579 	.protocols = lws_raw_proxy_protocols,
580 	.count_protocols = LWS_ARRAY_SIZE(lws_raw_proxy_protocols),
581 	.extensions = NULL,
582 	.count_extensions = 0,
583 };
584 #endif
585 
586 
587