• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool
3  *
4  * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * The main reason some things are as they are is that the task lifecycle may
10  * be unrelated to the wsi lifecycle that queued that task.
11  *
12  * Consider the task may call an external library and run for 30s without
13  * "checking in" to see if it should stop.  The wsi that started the task may
14  * have closed at any time before the 30s are up, with the browser window
15  * closing or whatever.
16  *
17  * So data shared between the asynchronous task and the wsi must have its
18  * lifecycle determined by the task, not the wsi.  That means a separate struct
19  * that can be freed by the task.
20  *
21  * In the case the wsi outlives the task, the tasks do not get destroyed until
22  * the service thread has called lws_threadpool_task_status() on the completed
23  * task.  So there is no danger of the shared task private data getting randomly
24  * freed.
25  */
26 
27 #if !defined (LWS_PLUGIN_STATIC)
28 #define LWS_DLL
29 #define LWS_INTERNAL
30 #include <libwebsockets.h>
31 #endif
32 
33 #include <string.h>
34 
35 struct per_vhost_data__minimal {
36 	struct lws_threadpool *tp;
37 	struct lws_context *context;
38 	lws_sorted_usec_list_t sul;
39 	const char *config;
40 };
41 
42 struct task_data {
43 	char result[64];
44 
45 	uint64_t pos, end;
46 };
47 
48 #if defined(WIN32)
usleep(unsigned long l)49 static void usleep(unsigned long l) { Sleep(l / 1000); }
50 #endif
51 
52 /*
53  * Create the private data for the task
54  *
55  * Notice we hand over responsibility for the cleanup and freeing of the
56  * allocated task_data to the threadpool, because the wsi it was originally
57  * bound to may close while the thread is still running.  So we allocate
58  * something discrete for the task private data that can be definitively owned
59  * and freed by the threadpool, not the wsi... the pss won't do, as it only
60  * exists for the lifecycle of the wsi connection.
61  *
62  * When the task is created, we also tell it how to destroy the private data
63  * by giving it args.cleanup as cleanup_task_private_data() defined below.
64  */
65 
66 static struct task_data *
create_task_private_data(void)67 create_task_private_data(void)
68 {
69 	struct task_data *priv = malloc(sizeof(*priv));
70 
71 	return priv;
72 }
73 
74 /*
75  * Destroy the private data for the task
76  *
77  * Notice the wsi the task was originally bound to may be long gone, in the
78  * case we are destroying the lws context and the thread was doing something
79  * for a long time without checking in.
80  */
81 static void
cleanup_task_private_data(struct lws * wsi,void * user)82 cleanup_task_private_data(struct lws *wsi, void *user)
83 {
84 	struct task_data *priv = (struct task_data *)user;
85 
86 	free(priv);
87 }
88 
89 /*
90  * This runs in its own thread, from the threadpool.
91  *
92  * The implementation behind this in lws uses pthreads, but no pthreadisms are
93  * required in the user code.
94  *
95  * The example counts to 10M, "checking in" to see if it should stop after every
96  * 100K and pausing to sync with the service thread to send a ws message every
97  * 1M.  It resumes after the service thread determines the wsi is writable and
98  * the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by
99  * calling lws_threadpool_task_sync().
100  */
101 
102 static enum lws_threadpool_task_return
task_function(void * user,enum lws_threadpool_task_status s)103 task_function(void *user, enum lws_threadpool_task_status s)
104 {
105 	struct task_data *priv = (struct task_data *)user;
106 	int budget = 100 * 1000;
107 
108 	if (priv->pos == priv->end)
109 		return LWS_TP_RETURN_FINISHED;
110 
111 	/*
112 	 * Preferably replace this with ~100ms of your real task, so it
113 	 * can "check in" at short intervals to see if it has been asked to
114 	 * stop.
115 	 *
116 	 * You can just run tasks atomically here with the thread dedicated
117 	 * to it, but it will cause odd delays while shutting down etc and
118 	 * the task will run to completion even if the wsi that started it
119 	 * has since closed.
120 	 */
121 
122 	while (budget--)
123 		priv->pos++;
124 
125 	usleep(100000);
126 
127 	if (!(priv->pos % (1000 * 1000))) {
128 		lws_snprintf(priv->result + LWS_PRE,
129 			     sizeof(priv->result) - LWS_PRE,
130 			     "pos %llu", (unsigned long long)priv->pos);
131 
132 		return LWS_TP_RETURN_SYNC;
133 	}
134 
135 	return LWS_TP_RETURN_CHECKING_IN;
136 }
137 
138 
139 static void
sul_tp_dump(struct lws_sorted_usec_list * sul)140 sul_tp_dump(struct lws_sorted_usec_list *sul)
141 {
142 	struct per_vhost_data__minimal *vhd =
143 		lws_container_of(sul, struct per_vhost_data__minimal, sul);
144 	/*
145 	 * in debug mode, dump the threadpool stat to the logs once
146 	 * a second
147 	 */
148 	lws_threadpool_dump(vhd->tp);
149 	lws_sul_schedule(vhd->context, 0, &vhd->sul,
150 			 sul_tp_dump, LWS_US_PER_SEC);
151 }
152 
153 
154 static int
callback_minimal(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)155 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
156 			void *user, void *in, size_t len)
157 {
158 	struct per_vhost_data__minimal *vhd =
159 			(struct per_vhost_data__minimal *)
160 			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
161 					lws_get_protocol(wsi));
162 	const struct lws_protocol_vhost_options *pvo;
163 	struct lws_threadpool_create_args cargs;
164 	struct lws_threadpool_task_args args;
165 	struct lws_threadpool_task *task;
166 	struct task_data *priv;
167 	int n, m, r = 0;
168 	char name[32];
169 	void *_user;
170 
171 	switch (reason) {
172 	case LWS_CALLBACK_PROTOCOL_INIT:
173 		/* create our per-vhost struct */
174 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
175 				lws_get_protocol(wsi),
176 				sizeof(struct per_vhost_data__minimal));
177 		if (!vhd)
178 			return 1;
179 
180 		vhd->context = lws_get_context(wsi);
181 
182 		/* recover the pointer to the globals struct */
183 		pvo = lws_pvo_search(
184 			(const struct lws_protocol_vhost_options *)in,
185 			"config");
186 		if (!pvo || !pvo->value) {
187 			lwsl_err("%s: Can't find \"config\" pvo\n", __func__);
188 			return 1;
189 		}
190 		vhd->config = pvo->value;
191 
192 		memset(&cargs, 0, sizeof(cargs));
193 
194 		cargs.max_queue_depth = 8;
195 		cargs.threads = 3;
196 		vhd->tp = lws_threadpool_create(lws_get_context(wsi),
197 				&cargs, "%s",
198 				lws_get_vhost_name(lws_get_vhost(wsi)));
199 		if (!vhd->tp)
200 			return 1;
201 
202 		lws_sul_schedule(vhd->context, 0, &vhd->sul,
203 				 sul_tp_dump, LWS_US_PER_SEC);
204 		break;
205 
206 	case LWS_CALLBACK_PROTOCOL_DESTROY:
207 		lws_threadpool_finish(vhd->tp);
208 		lws_threadpool_destroy(vhd->tp);
209 		lws_sul_cancel(&vhd->sul);
210 		break;
211 
212 	case LWS_CALLBACK_ESTABLISHED:
213 
214 		memset(&args, 0, sizeof(args));
215 		priv = args.user = create_task_private_data();
216 		if (!args.user)
217 			return 1;
218 
219 		priv->pos = 0;
220 		priv->end = 10 * 1000 * 1000;
221 
222 		/* queue the task... the task takes on responsibility for
223 		 * destroying args.user.  pss->priv just has a copy of it */
224 
225 		args.wsi = wsi;
226 		args.task = task_function;
227 		args.cleanup = cleanup_task_private_data;
228 
229 		lws_get_peer_simple(wsi, name, sizeof(name));
230 
231 		if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) {
232 			lwsl_user("%s: Couldn't enqueue task\n", __func__);
233 			cleanup_task_private_data(wsi, priv);
234 			return 1;
235 		}
236 
237 		lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30);
238 
239 		/*
240 		 * so the asynchronous worker will let us know the next step
241 		 * by causing LWS_CALLBACK_SERVER_WRITEABLE
242 		 */
243 
244 		break;
245 
246 	case LWS_CALLBACK_CLOSED:
247 		break;
248 
249 	case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
250 		lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi);
251 		lws_threadpool_dequeue_task(lws_threadpool_get_task_wsi(wsi));
252 		break;
253 
254 	case LWS_CALLBACK_SERVER_WRITEABLE:
255 
256 		/*
257 		 * even completed tasks wait in a queue until we call the
258 		 * below on them.  Then they may destroy themselves and their
259 		 * args.user data (by calling the cleanup callback).
260 		 *
261 		 * If you need to get things from the still-valid private task
262 		 * data, copy it here before calling
263 		 * lws_threadpool_task_status() that may free the task and the
264 		 * private task data.
265 		 */
266 
267 		task = lws_threadpool_get_task_wsi(wsi);
268 		if (!task)
269 			break;
270 		n = (int)lws_threadpool_task_status(task, &_user);
271 		lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n",
272 			   __func__, n);
273 		switch(n) {
274 
275 		case LWS_TP_STATUS_FINISHED:
276 		case LWS_TP_STATUS_STOPPED:
277 		case LWS_TP_STATUS_QUEUED:
278 		case LWS_TP_STATUS_RUNNING:
279 		case LWS_TP_STATUS_STOPPING:
280 			return 0;
281 
282 		case LWS_TP_STATUS_SYNCING:
283 			/* the task has paused for us to do something */
284 			break;
285 		default:
286 			return -1;
287 		}
288 
289 		priv = (struct task_data *)_user;
290 
291 		lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5);
292 
293 		n = (int)strlen(priv->result + LWS_PRE);
294 		m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE,
295 			      (unsigned int)n, LWS_WRITE_TEXT);
296 		if (m < n) {
297 			lwsl_err("ERROR %d writing to ws socket\n", m);
298 			lws_threadpool_task_sync(task, 1);
299 			return -1;
300 		}
301 
302 		/*
303 		 * service thread has done whatever it wanted to do with the
304 		 * data the task produced: if it's waiting to do more it can
305 		 * continue now.
306 		 */
307 		lws_threadpool_task_sync(task, 0);
308 		break;
309 
310 	default:
311 		break;
312 	}
313 
314 	return r;
315 }
316 
317 #define LWS_PLUGIN_PROTOCOL_MINIMAL \
318 	{ \
319 		"lws-minimal", \
320 		callback_minimal, \
321 		0, \
322 		128, \
323 		0, NULL, 0 \
324 	}
325