1 /*
2 * ws protocol handler plugin for "lws-minimal-client-echo"
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * The protocol shows how to send and receive bulk messages over a ws connection
10 * that optionally may have the permessage-deflate extension negotiated on it.
11 */
12
13 #if !defined (LWS_PLUGIN_STATIC)
14 #define LWS_DLL
15 #define LWS_INTERNAL
16 #include <libwebsockets.h>
17 #endif
18
19 #include <string.h>
20
21 #define RING_DEPTH 1024
22
23 /* one of these created for each message */
24
25 struct msg {
26 void *payload; /* is malloc'd */
27 size_t len;
28 char binary;
29 char first;
30 char final;
31 };
32
33 struct per_session_data__minimal_client_echo {
34 struct lws_ring *ring;
35 uint32_t tail;
36 char flow_controlled;
37 uint8_t completed:1;
38 uint8_t write_consume_pending:1;
39 };
40
41 struct vhd_minimal_client_echo {
42 struct lws_context *context;
43 struct lws_vhost *vhost;
44 struct lws *client_wsi;
45
46 lws_sorted_usec_list_t sul;
47
48 int *interrupted;
49 int *options;
50 const char **url;
51 const char **ads;
52 const char **iface;
53 int *port;
54 };
55
56 static void
sul_connect_attempt(struct lws_sorted_usec_list * sul)57 sul_connect_attempt(struct lws_sorted_usec_list *sul)
58 {
59 struct vhd_minimal_client_echo *vhd =
60 lws_container_of(sul, struct vhd_minimal_client_echo, sul);
61 struct lws_client_connect_info i;
62 char host[128];
63
64 lws_snprintf(host, sizeof(host), "%s:%u", *vhd->ads, *vhd->port);
65
66 memset(&i, 0, sizeof(i));
67
68 i.context = vhd->context;
69 i.port = *vhd->port;
70 i.address = *vhd->ads;
71 i.path = *vhd->url;
72 i.host = host;
73 i.origin = host;
74 i.ssl_connection = 0;
75 if ((*vhd->options) & 2)
76 i.ssl_connection |= LCCSCF_USE_SSL;
77 i.vhost = vhd->vhost;
78 i.iface = *vhd->iface;
79 //i.protocol = ;
80 i.pwsi = &vhd->client_wsi;
81
82 lwsl_user("connecting to %s:%d/%s\n", i.address, i.port, i.path);
83
84 if (!lws_client_connect_via_info(&i))
85 lws_sul_schedule(vhd->context, 0, &vhd->sul,
86 sul_connect_attempt, 10 * LWS_US_PER_SEC);
87 }
88
89 static void
__minimal_destroy_message(void * _msg)90 __minimal_destroy_message(void *_msg)
91 {
92 struct msg *msg = _msg;
93
94 free(msg->payload);
95 msg->payload = NULL;
96 msg->len = 0;
97 }
98
99 static int
callback_minimal_client_echo(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)100 callback_minimal_client_echo(struct lws *wsi, enum lws_callback_reasons reason,
101 void *user, void *in, size_t len)
102 {
103 struct per_session_data__minimal_client_echo *pss =
104 (struct per_session_data__minimal_client_echo *)user;
105 struct vhd_minimal_client_echo *vhd = (struct vhd_minimal_client_echo *)
106 lws_protocol_vh_priv_get(lws_get_vhost(wsi),
107 lws_get_protocol(wsi));
108 const struct msg *pmsg;
109 struct msg amsg;
110 int n, m, flags;
111
112 switch (reason) {
113
114 case LWS_CALLBACK_PROTOCOL_INIT:
115 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
116 lws_get_protocol(wsi),
117 sizeof(struct vhd_minimal_client_echo));
118 if (!vhd)
119 return -1;
120
121 vhd->context = lws_get_context(wsi);
122 vhd->vhost = lws_get_vhost(wsi);
123
124 /* get the pointer to "interrupted" we were passed in pvo */
125 vhd->interrupted = (int *)lws_pvo_search(
126 (const struct lws_protocol_vhost_options *)in,
127 "interrupted")->value;
128 vhd->port = (int *)lws_pvo_search(
129 (const struct lws_protocol_vhost_options *)in,
130 "port")->value;
131 vhd->options = (int *)lws_pvo_search(
132 (const struct lws_protocol_vhost_options *)in,
133 "options")->value;
134 vhd->ads = (const char **)lws_pvo_search(
135 (const struct lws_protocol_vhost_options *)in,
136 "ads")->value;
137 vhd->url = (const char **)lws_pvo_search(
138 (const struct lws_protocol_vhost_options *)in,
139 "url")->value;
140 vhd->iface = (const char **)lws_pvo_search(
141 (const struct lws_protocol_vhost_options *)in,
142 "iface")->value;
143
144 sul_connect_attempt(&vhd->sul);
145 break;
146
147 case LWS_CALLBACK_PROTOCOL_DESTROY:
148 lws_sul_cancel(&vhd->sul);
149 break;
150
151 case LWS_CALLBACK_CLIENT_ESTABLISHED:
152 lwsl_user("LWS_CALLBACK_CLIENT_ESTABLISHED\n");
153 pss->ring = lws_ring_create(sizeof(struct msg), RING_DEPTH,
154 __minimal_destroy_message);
155 if (!pss->ring)
156 return 1;
157 pss->tail = 0;
158 break;
159
160 case LWS_CALLBACK_CLIENT_WRITEABLE:
161
162 lwsl_user("LWS_CALLBACK_CLIENT_WRITEABLE\n");
163
164 if (pss->write_consume_pending) {
165 /* perform the deferred fifo consume */
166 lws_ring_consume_single_tail(pss->ring, &pss->tail, 1);
167 pss->write_consume_pending = 0;
168 }
169 pmsg = lws_ring_get_element(pss->ring, &pss->tail);
170 if (!pmsg) {
171 lwsl_user(" (nothing in ring)\n");
172 break;
173 }
174
175 flags = lws_write_ws_flags(
176 pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT,
177 pmsg->first, pmsg->final);
178
179 /* notice we allowed for LWS_PRE in the payload already */
180 m = lws_write(wsi, ((unsigned char *)pmsg->payload) +
181 LWS_PRE, pmsg->len, (enum lws_write_protocol)flags);
182 if (m < (int)pmsg->len) {
183 lwsl_err("ERROR %d writing to ws socket\n", m);
184 return -1;
185 }
186
187 lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n",
188 m, flags, pmsg->first, pmsg->final);
189
190 if ((*vhd->options & 1) && pmsg && pmsg->final)
191 pss->completed = 1;
192
193 /*
194 * Workaround deferred deflate in pmd extension by only
195 * consuming the fifo entry when we are certain it has been
196 * fully deflated at the next WRITABLE callback. You only need
197 * this if you're using pmd.
198 */
199 pss->write_consume_pending = 1;
200 lws_callback_on_writable(wsi);
201
202 if (pss->flow_controlled &&
203 (int)lws_ring_get_count_free_elements(pss->ring) > RING_DEPTH - 5) {
204 lws_rx_flow_control(wsi, 1);
205 pss->flow_controlled = 0;
206 }
207
208 break;
209
210 case LWS_CALLBACK_CLIENT_RECEIVE:
211
212 lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: %4d (rpp %5d, first %d, last %d, bin %d)\n",
213 (int)len, (int)lws_remaining_packet_payload(wsi),
214 lws_is_first_fragment(wsi),
215 lws_is_final_fragment(wsi),
216 lws_frame_is_binary(wsi));
217
218 // lwsl_hexdump_notice(in, len);
219
220 amsg.first = (char)lws_is_first_fragment(wsi);
221 amsg.final = (char)lws_is_final_fragment(wsi);
222 amsg.binary = (char)lws_frame_is_binary(wsi);
223 n = (int)lws_ring_get_count_free_elements(pss->ring);
224 if (!n) {
225 lwsl_user("dropping!\n");
226 break;
227 }
228
229 amsg.len = len;
230 /* notice we over-allocate by LWS_PRE */
231 amsg.payload = malloc(LWS_PRE + len);
232 if (!amsg.payload) {
233 lwsl_user("OOM: dropping\n");
234 break;
235 }
236
237 memcpy((char *)amsg.payload + LWS_PRE, in, len);
238 if (!lws_ring_insert(pss->ring, &amsg, 1)) {
239 __minimal_destroy_message(&amsg);
240 lwsl_user("dropping!\n");
241 break;
242 }
243 lws_callback_on_writable(wsi);
244
245 if (!pss->flow_controlled && n < 3) {
246 pss->flow_controlled = 1;
247 lws_rx_flow_control(wsi, 0);
248 }
249 break;
250
251 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
252 lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
253 in ? (char *)in : "(null)");
254 vhd->client_wsi = NULL;
255 if (!*vhd->interrupted)
256 *vhd->interrupted = 3;
257 lws_cancel_service(lws_get_context(wsi));
258 break;
259
260 case LWS_CALLBACK_CLIENT_CLOSED:
261 lwsl_user("LWS_CALLBACK_CLIENT_CLOSED\n");
262 lws_ring_destroy(pss->ring);
263 vhd->client_wsi = NULL;
264 if (!*vhd->interrupted)
265 *vhd->interrupted = 1 + pss->completed;
266 lws_cancel_service(lws_get_context(wsi));
267 break;
268
269 default:
270 break;
271 }
272
273 return 0;
274 }
275
276 #define LWS_PLUGIN_PROTOCOL_MINIMAL_CLIENT_ECHO \
277 { \
278 "lws-minimal-client-echo", \
279 callback_minimal_client_echo, \
280 sizeof(struct per_session_data__minimal_client_echo), \
281 1024, \
282 0, NULL, 0 \
283 }
284