1 /*
2 * ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * The main reason some things are as they are is that the task lifecycle may
10 * be unrelated to the wsi lifecycle that queued that task.
11 *
12 * Consider the task may call an external library and run for 30s without
13 * "checking in" to see if it should stop. The wsi that started the task may
14 * have closed at any time before the 30s are up, with the browser window
15 * closing or whatever.
16 *
17 * So data shared between the asynchronous task and the wsi must have its
18 * lifecycle determined by the task, not the wsi. That means a separate struct
19 * that can be freed by the task.
20 *
21 * In the case the wsi outlives the task, the tasks do not get destroyed until
22 * the service thread has called lws_threadpool_task_status() on the completed
23 * task. So there is no danger of the shared task private data getting randomly
24 * freed.
25 */
26
27 #if !defined (LWS_PLUGIN_STATIC)
28 #define LWS_DLL
29 #define LWS_INTERNAL
30 #include <libwebsockets.h>
31 #endif
32
33 #include <string.h>
34
35 struct per_vhost_data__minimal {
36 struct lws_threadpool *tp;
37 struct lws_context *context;
38 lws_sorted_usec_list_t sul;
39 const char *config;
40 };
41
42 struct task_data {
43 char result[64];
44
45 uint64_t pos, end;
46 };
47
48 #if defined(WIN32)
usleep(unsigned long l)49 static void usleep(unsigned long l) { Sleep(l / 1000); }
50 #endif
51
52 /*
53 * Create the private data for the task
54 *
55 * Notice we hand over responsibility for the cleanup and freeing of the
56 * allocated task_data to the threadpool, because the wsi it was originally
57 * bound to may close while the thread is still running. So we allocate
58 * something discrete for the task private data that can be definitively owned
59 * and freed by the threadpool, not the wsi... the pss won't do, as it only
60 * exists for the lifecycle of the wsi connection.
61 *
62 * When the task is created, we also tell it how to destroy the private data
63 * by giving it args.cleanup as cleanup_task_private_data() defined below.
64 */
65
66 static struct task_data *
create_task_private_data(void)67 create_task_private_data(void)
68 {
69 struct task_data *priv = malloc(sizeof(*priv));
70
71 return priv;
72 }
73
74 /*
75 * Destroy the private data for the task
76 *
77 * Notice the wsi the task was originally bound to may be long gone, in the
78 * case we are destroying the lws context and the thread was doing something
79 * for a long time without checking in.
80 */
81 static void
cleanup_task_private_data(struct lws * wsi,void * user)82 cleanup_task_private_data(struct lws *wsi, void *user)
83 {
84 struct task_data *priv = (struct task_data *)user;
85
86 free(priv);
87 }
88
89 /*
90 * This runs in its own thread, from the threadpool.
91 *
92 * The implementation behind this in lws uses pthreads, but no pthreadisms are
93 * required in the user code.
94 *
95 * The example counts to 10M, "checking in" to see if it should stop after every
96 * 100K and pausing to sync with the service thread to send a ws message every
97 * 1M. It resumes after the service thread determines the wsi is writable and
98 * the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by
99 * calling lws_threadpool_task_sync().
100 */
101
102 static enum lws_threadpool_task_return
task_function(void * user,enum lws_threadpool_task_status s)103 task_function(void *user, enum lws_threadpool_task_status s)
104 {
105 struct task_data *priv = (struct task_data *)user;
106 int budget = 100 * 1000;
107
108 if (priv->pos == priv->end)
109 return LWS_TP_RETURN_FINISHED;
110
111 /*
112 * Preferably replace this with ~100ms of your real task, so it
113 * can "check in" at short intervals to see if it has been asked to
114 * stop.
115 *
116 * You can just run tasks atomically here with the thread dedicated
117 * to it, but it will cause odd delays while shutting down etc and
118 * the task will run to completion even if the wsi that started it
119 * has since closed.
120 */
121
122 while (budget--)
123 priv->pos++;
124
125 usleep(100000);
126
127 if (!(priv->pos % (1000 * 1000))) {
128 lws_snprintf(priv->result + LWS_PRE,
129 sizeof(priv->result) - LWS_PRE,
130 "pos %llu", (unsigned long long)priv->pos);
131
132 return LWS_TP_RETURN_SYNC;
133 }
134
135 return LWS_TP_RETURN_CHECKING_IN;
136 }
137
138
139 static void
sul_tp_dump(struct lws_sorted_usec_list * sul)140 sul_tp_dump(struct lws_sorted_usec_list *sul)
141 {
142 struct per_vhost_data__minimal *vhd =
143 lws_container_of(sul, struct per_vhost_data__minimal, sul);
144 /*
145 * in debug mode, dump the threadpool stat to the logs once
146 * a second
147 */
148 lws_threadpool_dump(vhd->tp);
149 lws_sul_schedule(vhd->context, 0, &vhd->sul,
150 sul_tp_dump, LWS_US_PER_SEC);
151 }
152
153
154 static int
callback_minimal(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)155 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
156 void *user, void *in, size_t len)
157 {
158 struct per_vhost_data__minimal *vhd =
159 (struct per_vhost_data__minimal *)
160 lws_protocol_vh_priv_get(lws_get_vhost(wsi),
161 lws_get_protocol(wsi));
162 const struct lws_protocol_vhost_options *pvo;
163 struct lws_threadpool_create_args cargs;
164 struct lws_threadpool_task_args args;
165 struct lws_threadpool_task *task;
166 struct task_data *priv;
167 int n, m, r = 0;
168 char name[32];
169 void *_user;
170
171 switch (reason) {
172 case LWS_CALLBACK_PROTOCOL_INIT:
173 /* create our per-vhost struct */
174 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
175 lws_get_protocol(wsi),
176 sizeof(struct per_vhost_data__minimal));
177 if (!vhd)
178 return 1;
179
180 vhd->context = lws_get_context(wsi);
181
182 /* recover the pointer to the globals struct */
183 pvo = lws_pvo_search(
184 (const struct lws_protocol_vhost_options *)in,
185 "config");
186 if (!pvo || !pvo->value) {
187 lwsl_err("%s: Can't find \"config\" pvo\n", __func__);
188 return 1;
189 }
190 vhd->config = pvo->value;
191
192 memset(&cargs, 0, sizeof(cargs));
193
194 cargs.max_queue_depth = 8;
195 cargs.threads = 3;
196 vhd->tp = lws_threadpool_create(lws_get_context(wsi),
197 &cargs, "%s",
198 lws_get_vhost_name(lws_get_vhost(wsi)));
199 if (!vhd->tp)
200 return 1;
201
202 lws_sul_schedule(vhd->context, 0, &vhd->sul,
203 sul_tp_dump, LWS_US_PER_SEC);
204 break;
205
206 case LWS_CALLBACK_PROTOCOL_DESTROY:
207 lws_threadpool_finish(vhd->tp);
208 lws_threadpool_destroy(vhd->tp);
209 lws_sul_cancel(&vhd->sul);
210 break;
211
212 case LWS_CALLBACK_ESTABLISHED:
213
214 memset(&args, 0, sizeof(args));
215 priv = args.user = create_task_private_data();
216 if (!args.user)
217 return 1;
218
219 priv->pos = 0;
220 priv->end = 10 * 1000 * 1000;
221
222 /* queue the task... the task takes on responsibility for
223 * destroying args.user. pss->priv just has a copy of it */
224
225 args.wsi = wsi;
226 args.task = task_function;
227 args.cleanup = cleanup_task_private_data;
228
229 lws_get_peer_simple(wsi, name, sizeof(name));
230
231 if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) {
232 lwsl_user("%s: Couldn't enqueue task\n", __func__);
233 cleanup_task_private_data(wsi, priv);
234 return 1;
235 }
236
237 lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30);
238
239 /*
240 * so the asynchronous worker will let us know the next step
241 * by causing LWS_CALLBACK_SERVER_WRITEABLE
242 */
243
244 break;
245
246 case LWS_CALLBACK_CLOSED:
247 break;
248
249 case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
250 lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi);
251 lws_threadpool_dequeue_task(lws_threadpool_get_task_wsi(wsi));
252 break;
253
254 case LWS_CALLBACK_SERVER_WRITEABLE:
255
256 /*
257 * even completed tasks wait in a queue until we call the
258 * below on them. Then they may destroy themselves and their
259 * args.user data (by calling the cleanup callback).
260 *
261 * If you need to get things from the still-valid private task
262 * data, copy it here before calling
263 * lws_threadpool_task_status() that may free the task and the
264 * private task data.
265 */
266
267 task = lws_threadpool_get_task_wsi(wsi);
268 if (!task)
269 break;
270 n = (int)lws_threadpool_task_status(task, &_user);
271 lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n",
272 __func__, n);
273 switch(n) {
274
275 case LWS_TP_STATUS_FINISHED:
276 case LWS_TP_STATUS_STOPPED:
277 case LWS_TP_STATUS_QUEUED:
278 case LWS_TP_STATUS_RUNNING:
279 case LWS_TP_STATUS_STOPPING:
280 return 0;
281
282 case LWS_TP_STATUS_SYNCING:
283 /* the task has paused for us to do something */
284 break;
285 default:
286 return -1;
287 }
288
289 priv = (struct task_data *)_user;
290
291 lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5);
292
293 n = (int)strlen(priv->result + LWS_PRE);
294 m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE,
295 (unsigned int)n, LWS_WRITE_TEXT);
296 if (m < n) {
297 lwsl_err("ERROR %d writing to ws socket\n", m);
298 lws_threadpool_task_sync(task, 1);
299 return -1;
300 }
301
302 /*
303 * service thread has done whatever it wanted to do with the
304 * data the task produced: if it's waiting to do more it can
305 * continue now.
306 */
307 lws_threadpool_task_sync(task, 0);
308 break;
309
310 default:
311 break;
312 }
313
314 return r;
315 }
316
317 #define LWS_PLUGIN_PROTOCOL_MINIMAL \
318 { \
319 "lws-minimal", \
320 callback_minimal, \
321 0, \
322 128, \
323 0, NULL, 0 \
324 }
325