• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * ws protocol handler plugin for "lws-minimal-client-echo"
3  *
4  * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * The protocol shows how to send and receive bulk messages over a ws connection
10  * that optionally may have the permessage-deflate extension negotiated on it.
11  */
12 
13 #if !defined (LWS_PLUGIN_STATIC)
14 #define LWS_DLL
15 #define LWS_INTERNAL
16 #include <libwebsockets.h>
17 #endif
18 
19 #include <string.h>
20 
21 #define RING_DEPTH 1024
22 
23 /* one of these created for each message */
24 
25 struct msg {
26 	void *payload; /* is malloc'd */
27 	size_t len;
28 	char binary;
29 	char first;
30 	char final;
31 };
32 
33 struct per_session_data__minimal_client_echo {
34 	struct lws_ring *ring;
35 	uint32_t tail;
36 	char flow_controlled;
37 	uint8_t completed:1;
38 	uint8_t write_consume_pending:1;
39 };
40 
41 struct vhd_minimal_client_echo {
42 	struct lws_context *context;
43 	struct lws_vhost *vhost;
44 	struct lws *client_wsi;
45 
46 	lws_sorted_usec_list_t sul;
47 
48 	int *interrupted;
49 	int *options;
50 	const char **url;
51 	const char **ads;
52 	const char **iface;
53 	int *port;
54 };
55 
56 static void
sul_connect_attempt(struct lws_sorted_usec_list * sul)57 sul_connect_attempt(struct lws_sorted_usec_list *sul)
58 {
59 	struct vhd_minimal_client_echo *vhd =
60 		lws_container_of(sul, struct vhd_minimal_client_echo, sul);
61 	struct lws_client_connect_info i;
62 	char host[128];
63 
64 	lws_snprintf(host, sizeof(host), "%s:%u", *vhd->ads, *vhd->port);
65 
66 	memset(&i, 0, sizeof(i));
67 
68 	i.context = vhd->context;
69 	i.port = *vhd->port;
70 	i.address = *vhd->ads;
71 	i.path = *vhd->url;
72 	i.host = host;
73 	i.origin = host;
74 	i.ssl_connection = 0;
75 	if ((*vhd->options) & 2)
76 		i.ssl_connection |= LCCSCF_USE_SSL;
77 	i.vhost = vhd->vhost;
78 	i.iface = *vhd->iface;
79 	//i.protocol = ;
80 	i.pwsi = &vhd->client_wsi;
81 
82 	lwsl_user("connecting to %s:%d/%s\n", i.address, i.port, i.path);
83 
84 	if (!lws_client_connect_via_info(&i))
85 		lws_sul_schedule(vhd->context, 0, &vhd->sul,
86 				 sul_connect_attempt, 10 * LWS_US_PER_SEC);
87 }
88 
89 static void
__minimal_destroy_message(void * _msg)90 __minimal_destroy_message(void *_msg)
91 {
92 	struct msg *msg = _msg;
93 
94 	free(msg->payload);
95 	msg->payload = NULL;
96 	msg->len = 0;
97 }
98 
99 static int
callback_minimal_client_echo(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)100 callback_minimal_client_echo(struct lws *wsi, enum lws_callback_reasons reason,
101 			  void *user, void *in, size_t len)
102 {
103 	struct per_session_data__minimal_client_echo *pss =
104 			(struct per_session_data__minimal_client_echo *)user;
105 	struct vhd_minimal_client_echo *vhd = (struct vhd_minimal_client_echo *)
106 			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
107 				lws_get_protocol(wsi));
108 	const struct msg *pmsg;
109 	struct msg amsg;
110 	int n, m, flags;
111 
112 	switch (reason) {
113 
114 	case LWS_CALLBACK_PROTOCOL_INIT:
115 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
116 				lws_get_protocol(wsi),
117 				sizeof(struct vhd_minimal_client_echo));
118 		if (!vhd)
119 			return -1;
120 
121 		vhd->context = lws_get_context(wsi);
122 		vhd->vhost = lws_get_vhost(wsi);
123 
124 		/* get the pointer to "interrupted" we were passed in pvo */
125 		vhd->interrupted = (int *)lws_pvo_search(
126 			(const struct lws_protocol_vhost_options *)in,
127 			"interrupted")->value;
128 		vhd->port = (int *)lws_pvo_search(
129 			(const struct lws_protocol_vhost_options *)in,
130 			"port")->value;
131 		vhd->options = (int *)lws_pvo_search(
132 			(const struct lws_protocol_vhost_options *)in,
133 			"options")->value;
134 		vhd->ads = (const char **)lws_pvo_search(
135 			(const struct lws_protocol_vhost_options *)in,
136 			"ads")->value;
137 		vhd->url = (const char **)lws_pvo_search(
138 			(const struct lws_protocol_vhost_options *)in,
139 			"url")->value;
140 		vhd->iface = (const char **)lws_pvo_search(
141 			(const struct lws_protocol_vhost_options *)in,
142 			"iface")->value;
143 
144 		sul_connect_attempt(&vhd->sul);
145 		break;
146 
147 	case LWS_CALLBACK_PROTOCOL_DESTROY:
148 		lws_sul_cancel(&vhd->sul);
149 		break;
150 
151 	case LWS_CALLBACK_CLIENT_ESTABLISHED:
152 		lwsl_user("LWS_CALLBACK_CLIENT_ESTABLISHED\n");
153 		pss->ring = lws_ring_create(sizeof(struct msg), RING_DEPTH,
154 					    __minimal_destroy_message);
155 		if (!pss->ring)
156 			return 1;
157 		pss->tail = 0;
158 		break;
159 
160 	case LWS_CALLBACK_CLIENT_WRITEABLE:
161 
162 		lwsl_user("LWS_CALLBACK_CLIENT_WRITEABLE\n");
163 
164 		if (pss->write_consume_pending) {
165 			/* perform the deferred fifo consume */
166 			lws_ring_consume_single_tail(pss->ring, &pss->tail, 1);
167 			pss->write_consume_pending = 0;
168 		}
169 		pmsg = lws_ring_get_element(pss->ring, &pss->tail);
170 		if (!pmsg) {
171 			lwsl_user(" (nothing in ring)\n");
172 			break;
173 		}
174 
175 		flags = lws_write_ws_flags(
176 			    pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT,
177 			    pmsg->first, pmsg->final);
178 
179 		/* notice we allowed for LWS_PRE in the payload already */
180 		m = lws_write(wsi, ((unsigned char *)pmsg->payload) +
181 			      LWS_PRE, pmsg->len, (enum lws_write_protocol)flags);
182 		if (m < (int)pmsg->len) {
183 			lwsl_err("ERROR %d writing to ws socket\n", m);
184 			return -1;
185 		}
186 
187 		lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n",
188 				m, flags, pmsg->first, pmsg->final);
189 
190 		if ((*vhd->options & 1) && pmsg && pmsg->final)
191 			pss->completed = 1;
192 
193 		/*
194 		 * Workaround deferred deflate in pmd extension by only
195 		 * consuming the fifo entry when we are certain it has been
196 		 * fully deflated at the next WRITABLE callback.  You only need
197 		 * this if you're using pmd.
198 		 */
199 		pss->write_consume_pending = 1;
200 		lws_callback_on_writable(wsi);
201 
202 		if (pss->flow_controlled &&
203 		    (int)lws_ring_get_count_free_elements(pss->ring) > RING_DEPTH - 5) {
204 			lws_rx_flow_control(wsi, 1);
205 			pss->flow_controlled = 0;
206 		}
207 
208 		break;
209 
210 	case LWS_CALLBACK_CLIENT_RECEIVE:
211 
212 		lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: %4d (rpp %5d, first %d, last %d, bin %d)\n",
213 			(int)len, (int)lws_remaining_packet_payload(wsi),
214 			lws_is_first_fragment(wsi),
215 			lws_is_final_fragment(wsi),
216 			lws_frame_is_binary(wsi));
217 
218 		// lwsl_hexdump_notice(in, len);
219 
220 		amsg.first = (char)lws_is_first_fragment(wsi);
221 		amsg.final = (char)lws_is_final_fragment(wsi);
222 		amsg.binary = (char)lws_frame_is_binary(wsi);
223 		n = (int)lws_ring_get_count_free_elements(pss->ring);
224 		if (!n) {
225 			lwsl_user("dropping!\n");
226 			break;
227 		}
228 
229 		amsg.len = len;
230 		/* notice we over-allocate by LWS_PRE */
231 		amsg.payload = malloc(LWS_PRE + len);
232 		if (!amsg.payload) {
233 			lwsl_user("OOM: dropping\n");
234 			break;
235 		}
236 
237 		memcpy((char *)amsg.payload + LWS_PRE, in, len);
238 		if (!lws_ring_insert(pss->ring, &amsg, 1)) {
239 			__minimal_destroy_message(&amsg);
240 			lwsl_user("dropping!\n");
241 			break;
242 		}
243 		lws_callback_on_writable(wsi);
244 
245 		if (!pss->flow_controlled && n < 3) {
246 			pss->flow_controlled = 1;
247 			lws_rx_flow_control(wsi, 0);
248 		}
249 		break;
250 
251 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
252 		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
253 			 in ? (char *)in : "(null)");
254 		vhd->client_wsi = NULL;
255 		if (!*vhd->interrupted)
256 			*vhd->interrupted = 3;
257 		lws_cancel_service(lws_get_context(wsi));
258 		break;
259 
260 	case LWS_CALLBACK_CLIENT_CLOSED:
261 		lwsl_user("LWS_CALLBACK_CLIENT_CLOSED\n");
262 		lws_ring_destroy(pss->ring);
263 		vhd->client_wsi = NULL;
264 		if (!*vhd->interrupted)
265 			*vhd->interrupted = 1 + pss->completed;
266 		lws_cancel_service(lws_get_context(wsi));
267 		break;
268 
269 	default:
270 		break;
271 	}
272 
273 	return 0;
274 }
275 
276 #define LWS_PLUGIN_PROTOCOL_MINIMAL_CLIENT_ECHO \
277 	{ \
278 		"lws-minimal-client-echo", \
279 		callback_minimal_client_echo, \
280 		sizeof(struct per_session_data__minimal_client_echo), \
281 		1024, \
282 		0, NULL, 0 \
283 	}
284