1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include "private-lib-core.h"
26
27 /*
28 * per pending event
29 */
30 typedef struct lws_seq_event {
31 struct lws_dll2 seq_event_list;
32
33 void *data;
34 void *aux;
35 lws_seq_events_t e;
36 } lws_seq_event_t;
37
38 /*
39 * per sequencer
40 */
41 typedef struct lws_sequencer {
42 struct lws_dll2 seq_list;
43
44 lws_sorted_usec_list_t sul_timeout;
45 lws_sorted_usec_list_t sul_pending;
46
47 struct lws_dll2_owner seq_event_owner;
48 struct lws_context_per_thread *pt;
49 lws_seq_event_cb cb;
50 const char *name;
51 const lws_retry_bo_t *retry;
52
53 lws_usec_t time_created;
54 lws_usec_t timeout; /* 0 or time we timeout */
55
56 uint8_t going_down:1;
57 uint8_t wakesuspend:1;
58 } lws_seq_t;
59
60 #define QUEUE_SANITY_LIMIT 10
61
62 static void
lws_sul_seq_heartbeat_cb(lws_sorted_usec_list_t * sul)63 lws_sul_seq_heartbeat_cb(lws_sorted_usec_list_t *sul)
64 {
65 struct lws_context_per_thread *pt = lws_container_of(sul,
66 struct lws_context_per_thread, sul_seq_heartbeat);
67
68 /* send every sequencer a heartbeat message... it can ignore it */
69
70 lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
71 lws_dll2_get_head(&pt->seq_owner)) {
72 lws_seq_t *s = lws_container_of(p, lws_seq_t, seq_list);
73
74 /* queue the message to inform the sequencer */
75 lws_seq_queue_event(s, LWSSEQ_HEARTBEAT, NULL, NULL);
76
77 } lws_end_foreach_dll_safe(p, tp);
78
79 /* schedule the next one */
80
81 __lws_sul_insert_us(&pt->pt_sul_owner[LWSSULLI_MISS_IF_SUSPENDED],
82 &pt->sul_seq_heartbeat, LWS_US_PER_SEC);
83 }
84
85 int
lws_seq_pt_init(struct lws_context_per_thread * pt)86 lws_seq_pt_init(struct lws_context_per_thread *pt)
87 {
88 pt->sul_seq_heartbeat.cb = lws_sul_seq_heartbeat_cb;
89
90 /* schedule the first heartbeat */
91 __lws_sul_insert_us(&pt->pt_sul_owner[LWSSULLI_MISS_IF_SUSPENDED],
92 &pt->sul_seq_heartbeat, LWS_US_PER_SEC);
93
94 return 0;
95 }
96
97 lws_seq_t *
lws_seq_create(lws_seq_info_t * i)98 lws_seq_create(lws_seq_info_t *i)
99 {
100 struct lws_context_per_thread *pt = &i->context->pt[i->tsi];
101 lws_seq_t *seq = lws_zalloc(sizeof(*seq) + i->user_size, __func__);
102
103 if (!seq)
104 return NULL;
105
106 seq->cb = i->cb;
107 seq->pt = pt;
108 seq->name = i->name;
109 seq->retry = i->retry;
110 seq->wakesuspend = i->wakesuspend;
111
112 *i->puser = (void *)&seq[1];
113
114 /* add the sequencer to the pt */
115
116 lws_pt_lock(pt, __func__); /* ---------------------------------- pt { */
117
118 lws_dll2_add_tail(&seq->seq_list, &pt->seq_owner);
119
120 lws_pt_unlock(pt); /* } pt ------------------------------------------ */
121
122 seq->time_created = lws_now_usecs();
123
124 /* try to queue the creation cb */
125
126 if (lws_seq_queue_event(seq, LWSSEQ_CREATED, NULL, NULL)) {
127 lws_dll2_remove(&seq->seq_list);
128 lws_free(seq);
129
130 return NULL;
131 }
132
133 return seq;
134 }
135
136 static int
seq_ev_destroy(struct lws_dll2 * d,void * user)137 seq_ev_destroy(struct lws_dll2 *d, void *user)
138 {
139 lws_seq_event_t *seqe = lws_container_of(d, lws_seq_event_t,
140 seq_event_list);
141
142 lws_dll2_remove(&seqe->seq_event_list);
143 lws_free(seqe);
144
145 return 0;
146 }
147
148 void
lws_seq_destroy(lws_seq_t ** pseq)149 lws_seq_destroy(lws_seq_t **pseq)
150 {
151 lws_seq_t *seq = *pseq;
152
153 /* defeat another thread racing to add events while we are destroying */
154 seq->going_down = 1;
155
156 seq->cb(seq, (void *)&seq[1], LWSSEQ_DESTROYED, NULL, NULL);
157
158 lws_pt_lock(seq->pt, __func__); /* -------------------------- pt { */
159
160 lws_dll2_remove(&seq->seq_list);
161 lws_dll2_remove(&seq->sul_timeout.list);
162 lws_dll2_remove(&seq->sul_pending.list);
163 /* remove and destroy any pending events */
164 lws_dll2_foreach_safe(&seq->seq_event_owner, NULL, seq_ev_destroy);
165
166 lws_pt_unlock(seq->pt); /* } pt ---------------------------------- */
167
168
169 lws_free_set_NULL(seq);
170 }
171
172 void
lws_seq_destroy_all_on_pt(struct lws_context_per_thread * pt)173 lws_seq_destroy_all_on_pt(struct lws_context_per_thread *pt)
174 {
175 lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
176 pt->seq_owner.head) {
177 lws_seq_t *s = lws_container_of(p, lws_seq_t,
178 seq_list);
179
180 lws_seq_destroy(&s);
181
182 } lws_end_foreach_dll_safe(p, tp);
183 }
184
185 static void
lws_seq_sul_pending_cb(lws_sorted_usec_list_t * sul)186 lws_seq_sul_pending_cb(lws_sorted_usec_list_t *sul)
187 {
188 lws_seq_t *seq = lws_container_of(sul, lws_seq_t, sul_pending);
189 lws_seq_event_t *seqe;
190 struct lws_dll2 *dh;
191 int n;
192
193 if (!seq->seq_event_owner.count)
194 return;
195
196 /* events are only added at tail, so no race possible yet... */
197
198 dh = lws_dll2_get_head(&seq->seq_event_owner);
199 seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
200
201 n = (int)seq->cb(seq, (void *)&seq[1], (int)seqe->e, seqe->data, seqe->aux);
202
203 /* ... have to lock here though, because we will change the list */
204
205 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
206
207 /* detach event from sequencer event list and free it */
208 lws_dll2_remove(&seqe->seq_event_list);
209 lws_free(seqe);
210 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
211
212 if (n) {
213 lwsl_info("%s: destroying seq '%s' by request\n", __func__,
214 seq->name);
215 lws_seq_destroy(&seq);
216 }
217 }
218
219 int
lws_seq_queue_event(lws_seq_t * seq,lws_seq_events_t e,void * data,void * aux)220 lws_seq_queue_event(lws_seq_t *seq, lws_seq_events_t e, void *data, void *aux)
221 {
222 lws_seq_event_t *seqe;
223
224 if (!seq || seq->going_down)
225 return 1;
226
227 seqe = lws_zalloc(sizeof(*seqe), __func__);
228 if (!seqe)
229 return 1;
230
231 seqe->e = e;
232 seqe->data = data;
233 seqe->aux = aux;
234
235 // lwsl_notice("%s: seq %s: event %d\n", __func__, seq->name, e);
236
237 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
238
239 if (seq->seq_event_owner.count > QUEUE_SANITY_LIMIT) {
240 lwsl_err("%s: more than %d events queued\n", __func__,
241 QUEUE_SANITY_LIMIT);
242 }
243
244 lws_dll2_add_tail(&seqe->seq_event_list, &seq->seq_event_owner);
245
246 seq->sul_pending.cb = lws_seq_sul_pending_cb;
247 __lws_sul_insert_us(&seq->pt->pt_sul_owner[seq->wakesuspend],
248 &seq->sul_pending, 1);
249
250 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
251
252 return 0;
253 }
254
255 /*
256 * Check if wsi still extant, by peeking in the message queue for a
257 * LWSSEQ_WSI_CONN_CLOSE message about wsi. (Doesn't need to do the same for
258 * CONN_FAIL since that will never have produced any messages prior to that).
259 *
260 * Use this to avoid trying to perform operations on wsi that have already
261 * closed but we didn't get to that message yet.
262 *
263 * Returns 0 if not closed yet or 1 if it has closed but we didn't process the
264 * close message yet.
265 */
266
267 int
lws_seq_check_wsi(lws_seq_t * seq,struct lws * wsi)268 lws_seq_check_wsi(lws_seq_t *seq, struct lws *wsi)
269 {
270 lws_seq_event_t *seqe;
271 struct lws_dll2 *dh;
272
273 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
274
275 dh = lws_dll2_get_head(&seq->seq_event_owner);
276 while (dh) {
277 seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
278
279 if (seqe->e == LWSSEQ_WSI_CONN_CLOSE && seqe->data == wsi)
280 break;
281
282 dh = dh->next;
283 }
284
285 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
286
287 return !!dh;
288 }
289
290
291 static void
lws_seq_sul_timeout_cb(lws_sorted_usec_list_t * sul)292 lws_seq_sul_timeout_cb(lws_sorted_usec_list_t *sul)
293 {
294 lws_seq_t *s = lws_container_of(sul, lws_seq_t, sul_timeout);
295
296 lws_seq_queue_event(s, LWSSEQ_TIMED_OUT, NULL, NULL);
297 }
298
299 /* set us to LWS_SET_TIMER_USEC_CANCEL to remove timeout */
300
301 int
lws_seq_timeout_us(lws_seq_t * seq,lws_usec_t us)302 lws_seq_timeout_us(lws_seq_t *seq, lws_usec_t us)
303 {
304 seq->sul_timeout.cb = lws_seq_sul_timeout_cb;
305 /* list is always at the very top of the sul */
306 __lws_sul_insert_us(&seq->pt->pt_sul_owner[seq->wakesuspend],
307 (lws_sorted_usec_list_t *)&seq->sul_timeout.list, us);
308
309 return 0;
310 }
311
312 lws_seq_t *
lws_seq_from_user(void * u)313 lws_seq_from_user(void *u)
314 {
315 return &((lws_seq_t *)u)[-1];
316 }
317
318 const char *
lws_seq_name(lws_seq_t * seq)319 lws_seq_name(lws_seq_t *seq)
320 {
321 return seq->name;
322 }
323
324 lws_usec_t
lws_seq_us_since_creation(lws_seq_t * seq)325 lws_seq_us_since_creation(lws_seq_t *seq)
326 {
327 return lws_now_usecs() - seq->time_created;
328 }
329
330 struct lws_context *
lws_seq_get_context(lws_seq_t * seq)331 lws_seq_get_context(lws_seq_t *seq)
332 {
333 return seq->pt->context;
334 }
335
336