1 /*
2 * lws-minimal-http-server-sse
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This demonstrates a minimal http server that can serve both normal static
10 * content and server-side event connections.
11 *
12 * To keep it simple, it serves the static stuff from the subdirectory
13 * "./mount-origin" of the directory it was started in.
14 *
15 * You can change that by changing mount.origin below.
16 */
17
18 #include <libwebsockets.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <signal.h>
22 #if defined(WIN32)
23 #define HAVE_STRUCT_TIMESPEC
24 #if defined(pid_t)
25 #undef pid_t
26 #endif
27 #endif
28 #include <pthread.h>
29 #include <time.h>
30
31 /* one of these created for each message in the ringbuffer */
32
33 struct msg {
34 void *payload; /* is malloc'd */
35 size_t len;
36 };
37
38 /*
39 * Unlike ws, http is a stateless protocol. This pss only exists for the
40 * duration of a single http transaction. With http/1.1 keep-alive and http/2,
41 * that is unrelated to (shorter than) the lifetime of the network connection.
42 */
43 struct pss {
44 struct pss *pss_list;
45 struct lws *wsi;
46 uint32_t tail;
47 };
48
49 /* one of these is created for each vhost our protocol is used with */
50
51 struct vhd {
52 struct lws_context *context;
53 struct lws_vhost *vhost;
54 const struct lws_protocols *protocol;
55
56 struct pss *pss_list; /* linked-list of live pss*/
57 pthread_t pthread_spam[2];
58
59 pthread_mutex_t lock_ring; /* serialize access to the ring buffer */
60 struct lws_ring *ring; /* ringbuffer holding unsent messages */
61 char finished;
62 };
63
64 static int interrupted;
65
66 #if defined(WIN32)
usleep(unsigned long l)67 static void usleep(unsigned long l) { Sleep(l / 1000); }
68 #endif
69
70
71 /* destroys the message when everyone has had a copy of it */
72
73 static void
__minimal_destroy_message(void * _msg)74 __minimal_destroy_message(void *_msg)
75 {
76 struct msg *msg = _msg;
77
78 free(msg->payload);
79 msg->payload = NULL;
80 msg->len = 0;
81 }
82
83 /*
84 * This runs under the "spam thread" thread context only.
85 *
86 * We spawn two threads that generate messages with this.
87 *
88 */
89
90 static void *
thread_spam(void * d)91 thread_spam(void *d)
92 {
93 struct vhd *vhd = (struct vhd *)d;
94 struct msg amsg;
95 int len = 128, index = 1, n, whoami = 0;
96
97 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
98 if (pthread_equal(pthread_self(), vhd->pthread_spam[n]))
99 whoami = n + 1;
100
101 do {
102 /* don't generate output if nobody connected */
103 if (!vhd->pss_list)
104 goto wait;
105
106 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
107
108 /* only create if space in ringbuffer */
109 n = (int)lws_ring_get_count_free_elements(vhd->ring);
110 if (!n) {
111 lwsl_user("dropping!\n");
112 goto wait_unlock;
113 }
114
115 amsg.payload = malloc((unsigned int)len);
116 if (!amsg.payload) {
117 lwsl_user("OOM: dropping\n");
118 goto wait_unlock;
119 }
120 n = lws_snprintf((char *)amsg.payload, (unsigned int)len,
121 "%s: tid: %d, msg: %d", __func__, whoami, index++);
122 amsg.len = (unsigned int)n;
123 n = (int)lws_ring_insert(vhd->ring, &amsg, 1);
124 if (n != 1) {
125 __minimal_destroy_message(&amsg);
126 lwsl_user("dropping!\n");
127 } else
128 /*
129 * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED
130 * in the lws service thread context.
131 */
132 lws_cancel_service(vhd->context);
133
134 wait_unlock:
135 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
136
137 wait:
138 /* rand() would make more sense but coverity shrieks */
139 usleep((useconds_t)(100000 + (time(NULL) & 0xffff)));
140
141 } while (!vhd->finished);
142
143 lwsl_notice("thread_spam %d exiting\n", whoami);
144
145 pthread_exit(NULL);
146
147 return NULL;
148 }
149
150
151 static int
callback_sse(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)152 callback_sse(struct lws *wsi, enum lws_callback_reasons reason, void *user,
153 void *in, size_t len)
154 {
155 struct pss *pss = (struct pss *)user;
156 struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
157 lws_get_vhost(wsi), lws_get_protocol(wsi));
158 uint8_t buf[LWS_PRE + LWS_RECOMMENDED_MIN_HEADER_SPACE],
159 *start = &buf[LWS_PRE], *p = start,
160 *end = &buf[sizeof(buf) - 1];
161 const struct msg *pmsg;
162 void *retval;
163 int n;
164
165 switch (reason) {
166
167 /* --- vhost protocol lifecycle --- */
168
169 case LWS_CALLBACK_PROTOCOL_INIT:
170 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
171 lws_get_protocol(wsi), sizeof(struct vhd));
172 vhd->context = lws_get_context(wsi);
173 vhd->protocol = lws_get_protocol(wsi);
174 vhd->vhost = lws_get_vhost(wsi);
175
176 vhd->ring = lws_ring_create(sizeof(struct msg), 8,
177 __minimal_destroy_message);
178 if (!vhd->ring)
179 return 1;
180
181 pthread_mutex_init(&vhd->lock_ring, NULL);
182
183 /* start the content-creating threads */
184
185 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
186 if (pthread_create(&vhd->pthread_spam[n], NULL,
187 thread_spam, vhd)) {
188 lwsl_err("thread creation failed\n");
189 goto init_fail;
190 }
191
192 return 0;
193
194 case LWS_CALLBACK_PROTOCOL_DESTROY:
195 init_fail:
196 vhd->finished = 1;
197 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
198 pthread_join(vhd->pthread_spam[n], &retval);
199
200 if (vhd->ring)
201 lws_ring_destroy(vhd->ring);
202
203 pthread_mutex_destroy(&vhd->lock_ring);
204 return 0;
205
206 /* --- http connection lifecycle --- */
207
208 case LWS_CALLBACK_HTTP:
209 /*
210 * `in` contains the url part after our mountpoint /sse, if any
211 * you can use this to determine what data to return and store
212 * that in the pss
213 */
214 lwsl_info("%s: LWS_CALLBACK_HTTP: '%s'\n", __func__,
215 (const char *)in);
216
217 /* SSE requires a http OK response with this content-type */
218
219 if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
220 "text/event-stream",
221 LWS_ILLEGAL_HTTP_CONTENT_LEN,
222 &p, end))
223 return 1;
224
225 if (lws_finalize_write_http_header(wsi, start, &p, end))
226 return 1;
227
228 /* add ourselves to the list of live pss held in the vhd */
229
230 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
231 pss->tail = lws_ring_get_oldest_tail(vhd->ring);
232 pss->wsi = wsi;
233
234 /*
235 * This tells lws we are no longer a normal http stream,
236 * but are an "immortal" (plus or minus whatever timeout you
237 * set on it afterwards) SSE stream. In http/2 case that also
238 * stops idle timeouts being applied to the network connection
239 * while this wsi is still open.
240 */
241 lws_http_mark_sse(wsi);
242
243 /* write the body separately */
244
245 lws_callback_on_writable(wsi);
246
247 return 0;
248
249 case LWS_CALLBACK_CLOSED_HTTP:
250 /* remove our closing pss from the list of live pss */
251
252 lws_ll_fwd_remove(struct pss, pss_list, pss, vhd->pss_list);
253 return 0;
254
255 /* --- data transfer --- */
256
257 case LWS_CALLBACK_HTTP_WRITEABLE:
258
259 lwsl_info("%s: LWS_CALLBACK_HTTP_WRITEABLE\n", __func__);
260
261 pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
262 if (!pmsg)
263 break;
264
265 p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p),
266 "data: %s\x0d\x0a\x0d\x0a",
267 (const char *)pmsg->payload);
268
269 if (lws_write(wsi, (uint8_t *)start, lws_ptr_diff_size_t(p, start),
270 LWS_WRITE_HTTP) != lws_ptr_diff(p, start))
271 return 1;
272
273 lws_ring_consume_and_update_oldest_tail(
274 vhd->ring, /* lws_ring object */
275 struct pss, /* type of objects with tails */
276 &pss->tail, /* tail of guy doing the consuming */
277 1, /* number of payload objects being consumed */
278 vhd->pss_list, /* head of list of objects with tails */
279 tail, /* member name of tail in objects with tails */
280 pss_list /* member name of next object in objects with tails */
281 );
282
283 if (lws_ring_get_element(vhd->ring, &pss->tail))
284 /* come back as soon as we can write more */
285 lws_callback_on_writable(pss->wsi);
286
287 return 0;
288
289 case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
290 if (!vhd)
291 break;
292 /*
293 * let everybody know we want to write something on them
294 * as soon as they are ready
295 */
296 lws_start_foreach_llp(struct pss **, ppss, vhd->pss_list) {
297 lws_callback_on_writable((*ppss)->wsi);
298 } lws_end_foreach_llp(ppss, pss_list);
299 return 0;
300
301 default:
302 break;
303 }
304
305 return lws_callback_http_dummy(wsi, reason, user, in, len);
306 }
307
308 static struct lws_protocols protocols[] = {
309 { "http", lws_callback_http_dummy, 0, 0, 0, NULL, 0 },
310 { "sse", callback_sse, sizeof(struct pss), 0, 0, NULL, 0 },
311 LWS_PROTOCOL_LIST_TERM
312 };
313
314 /* override the default mount for /sse in the URL space */
315
316 static const struct lws_http_mount mount_sse = {
317 /* .mount_next */ NULL, /* linked-list "next" */
318 /* .mountpoint */ "/sse", /* mountpoint URL */
319 /* .origin */ NULL, /* protocol */
320 /* .def */ NULL,
321 /* .protocol */ "sse",
322 /* .cgienv */ NULL,
323 /* .extra_mimetypes */ NULL,
324 /* .interpret */ NULL,
325 /* .cgi_timeout */ 0,
326 /* .cache_max_age */ 0,
327 /* .auth_mask */ 0,
328 /* .cache_reusable */ 0,
329 /* .cache_revalidate */ 0,
330 /* .cache_intermediaries */ 0,
331 /* .origin_protocol */ LWSMPRO_CALLBACK, /* dynamic */
332 /* .mountpoint_len */ 4, /* char count */
333 /* .basic_auth_login_file */ NULL,
334 };
335
336 /* default mount serves the URL space from ./mount-origin */
337
338 static const struct lws_http_mount mount = {
339 /* .mount_next */ &mount_sse, /* linked-list "next" */
340 /* .mountpoint */ "/", /* mountpoint URL */
341 /* .origin */ "./mount-origin", /* serve from dir */
342 /* .def */ "index.html", /* default filename */
343 /* .protocol */ NULL,
344 /* .cgienv */ NULL,
345 /* .extra_mimetypes */ NULL,
346 /* .interpret */ NULL,
347 /* .cgi_timeout */ 0,
348 /* .cache_max_age */ 0,
349 /* .auth_mask */ 0,
350 /* .cache_reusable */ 0,
351 /* .cache_revalidate */ 0,
352 /* .cache_intermediaries */ 0,
353 /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */
354 /* .mountpoint_len */ 1, /* char count */
355 /* .basic_auth_login_file */ NULL,
356 };
357
sigint_handler(int sig)358 void sigint_handler(int sig)
359 {
360 interrupted = 1;
361 }
362
main(int argc,const char ** argv)363 int main(int argc, const char **argv)
364 {
365 struct lws_context_creation_info info;
366 struct lws_context *context;
367 const char *p;
368 int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
369 /* for LLL_ verbosity above NOTICE to be built into lws,
370 * lws must have been configured and built with
371 * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
372 /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
373 /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
374 /* | LLL_DEBUG */;
375
376 signal(SIGINT, sigint_handler);
377
378 if ((p = lws_cmdline_option(argc, argv, "-d")))
379 logs = atoi(p);
380
381 lws_set_log_level(logs, NULL);
382 lwsl_user("LWS minimal http Server-Side Events + ring | visit http://localhost:7681\n");
383
384 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
385 info.port = 7681;
386 info.protocols = protocols;
387 info.mounts = &mount;
388 info.options =
389 LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE;
390
391 context = lws_create_context(&info);
392 if (!context) {
393 lwsl_err("lws init failed\n");
394 return 1;
395 }
396
397 while (n >= 0 && !interrupted)
398 n = lws_service(context, 0);
399
400 lws_context_destroy(context);
401
402 return 0;
403 }
404