1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include "private-lib-core.h"
26
27 /*
28 * per pending event
29 */
30 typedef struct lws_seq_event {
31 struct lws_dll2 seq_event_list;
32
33 void *data;
34 void *aux;
35 lws_seq_events_t e;
36 } lws_seq_event_t;
37
38 /*
39 * per sequencer
40 */
41 typedef struct lws_sequencer {
42 struct lws_dll2 seq_list;
43
44 lws_sorted_usec_list_t sul_timeout;
45 lws_sorted_usec_list_t sul_pending;
46
47 struct lws_dll2_owner seq_event_owner;
48 struct lws_context_per_thread *pt;
49 lws_seq_event_cb cb;
50 const char *name;
51 const lws_retry_bo_t *retry;
52
53 lws_usec_t time_created;
54 lws_usec_t timeout; /* 0 or time we timeout */
55
56 char going_down;
57 } lws_seq_t;
58
59 #define QUEUE_SANITY_LIMIT 10
60
61 static void
lws_sul_seq_heartbeat_cb(lws_sorted_usec_list_t * sul)62 lws_sul_seq_heartbeat_cb(lws_sorted_usec_list_t *sul)
63 {
64 struct lws_context_per_thread *pt = lws_container_of(sul,
65 struct lws_context_per_thread, sul_seq_heartbeat);
66
67 /* send every sequencer a heartbeat message... it can ignore it */
68
69 lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
70 lws_dll2_get_head(&pt->seq_owner)) {
71 lws_seq_t *s = lws_container_of(p, lws_seq_t, seq_list);
72
73 /* queue the message to inform the sequencer */
74 lws_seq_queue_event(s, LWSSEQ_HEARTBEAT, NULL, NULL);
75
76 } lws_end_foreach_dll_safe(p, tp);
77
78 /* schedule the next one */
79
80 __lws_sul_insert(&pt->pt_sul_owner, &pt->sul_seq_heartbeat,
81 LWS_US_PER_SEC);
82 }
83
84 int
lws_seq_pt_init(struct lws_context_per_thread * pt)85 lws_seq_pt_init(struct lws_context_per_thread *pt)
86 {
87 pt->sul_seq_heartbeat.cb = lws_sul_seq_heartbeat_cb;
88
89 /* schedule the first heartbeat */
90 __lws_sul_insert(&pt->pt_sul_owner, &pt->sul_seq_heartbeat,
91 LWS_US_PER_SEC);
92
93 return 0;
94 }
95
96 lws_seq_t *
lws_seq_create(lws_seq_info_t * i)97 lws_seq_create(lws_seq_info_t *i)
98 {
99 struct lws_context_per_thread *pt = &i->context->pt[i->tsi];
100 lws_seq_t *seq = lws_zalloc(sizeof(*seq) + i->user_size, __func__);
101
102 if (!seq)
103 return NULL;
104
105 seq->cb = i->cb;
106 seq->pt = pt;
107 seq->name = i->name;
108 seq->retry = i->retry;
109
110 *i->puser = (void *)&seq[1];
111
112 /* add the sequencer to the pt */
113
114 lws_pt_lock(pt, __func__); /* ---------------------------------- pt { */
115
116 lws_dll2_add_tail(&seq->seq_list, &pt->seq_owner);
117
118 lws_pt_unlock(pt); /* } pt ------------------------------------------ */
119
120 seq->time_created = lws_now_usecs();
121
122 /* try to queue the creation cb */
123
124 if (lws_seq_queue_event(seq, LWSSEQ_CREATED, NULL, NULL)) {
125 lws_dll2_remove(&seq->seq_list);
126 lws_free(seq);
127
128 return NULL;
129 }
130
131 return seq;
132 }
133
134 static int
seq_ev_destroy(struct lws_dll2 * d,void * user)135 seq_ev_destroy(struct lws_dll2 *d, void *user)
136 {
137 lws_seq_event_t *seqe = lws_container_of(d, lws_seq_event_t,
138 seq_event_list);
139
140 lws_dll2_remove(&seqe->seq_event_list);
141 lws_free(seqe);
142
143 return 0;
144 }
145
146 void
lws_seq_destroy(lws_seq_t ** pseq)147 lws_seq_destroy(lws_seq_t **pseq)
148 {
149 lws_seq_t *seq = *pseq;
150
151 /* defeat another thread racing to add events while we are destroying */
152 seq->going_down = 1;
153
154 seq->cb(seq, (void *)&seq[1], LWSSEQ_DESTROYED, NULL, NULL);
155
156 lws_pt_lock(seq->pt, __func__); /* -------------------------- pt { */
157
158 lws_dll2_remove(&seq->seq_list);
159 lws_dll2_remove(&seq->sul_timeout.list);
160 lws_dll2_remove(&seq->sul_pending.list);
161 /* remove and destroy any pending events */
162 lws_dll2_foreach_safe(&seq->seq_event_owner, NULL, seq_ev_destroy);
163
164 lws_pt_unlock(seq->pt); /* } pt ---------------------------------- */
165
166
167 lws_free_set_NULL(seq);
168 }
169
170 void
lws_seq_destroy_all_on_pt(struct lws_context_per_thread * pt)171 lws_seq_destroy_all_on_pt(struct lws_context_per_thread *pt)
172 {
173 lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
174 pt->seq_owner.head) {
175 lws_seq_t *s = lws_container_of(p, lws_seq_t,
176 seq_list);
177
178 lws_seq_destroy(&s);
179
180 } lws_end_foreach_dll_safe(p, tp);
181 }
182
183 static void
lws_seq_sul_pending_cb(lws_sorted_usec_list_t * sul)184 lws_seq_sul_pending_cb(lws_sorted_usec_list_t *sul)
185 {
186 lws_seq_t *seq = lws_container_of(sul, lws_seq_t, sul_pending);
187 lws_seq_event_t *seqe;
188 struct lws_dll2 *dh;
189 int n;
190
191 if (!seq->seq_event_owner.count)
192 return;
193
194 /* events are only added at tail, so no race possible yet... */
195
196 dh = lws_dll2_get_head(&seq->seq_event_owner);
197 seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
198
199 n = seq->cb(seq, (void *)&seq[1], seqe->e, seqe->data, seqe->aux);
200
201 /* ... have to lock here though, because we will change the list */
202
203 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
204
205 /* detach event from sequencer event list and free it */
206 lws_dll2_remove(&seqe->seq_event_list);
207 lws_free(seqe);
208 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
209
210 if (n) {
211 lwsl_info("%s: destroying seq '%s' by request\n", __func__,
212 seq->name);
213 lws_seq_destroy(&seq);
214 }
215 }
216
217 int
lws_seq_queue_event(lws_seq_t * seq,lws_seq_events_t e,void * data,void * aux)218 lws_seq_queue_event(lws_seq_t *seq, lws_seq_events_t e, void *data, void *aux)
219 {
220 lws_seq_event_t *seqe;
221
222 if (!seq || seq->going_down)
223 return 1;
224
225 seqe = lws_zalloc(sizeof(*seqe), __func__);
226 if (!seqe)
227 return 1;
228
229 seqe->e = e;
230 seqe->data = data;
231 seqe->aux = aux;
232
233 // lwsl_notice("%s: seq %s: event %d\n", __func__, seq->name, e);
234
235 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
236
237 if (seq->seq_event_owner.count > QUEUE_SANITY_LIMIT) {
238 lwsl_err("%s: more than %d events queued\n", __func__,
239 QUEUE_SANITY_LIMIT);
240 }
241
242 lws_dll2_add_tail(&seqe->seq_event_list, &seq->seq_event_owner);
243
244 seq->sul_pending.cb = lws_seq_sul_pending_cb;
245 __lws_sul_insert(&seq->pt->pt_sul_owner, &seq->sul_pending, 1);
246
247 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
248
249 return 0;
250 }
251
252 /*
253 * Check if wsi still extant, by peeking in the message queue for a
254 * LWSSEQ_WSI_CONN_CLOSE message about wsi. (Doesn't need to do the same for
255 * CONN_FAIL since that will never have produced any messages prior to that).
256 *
257 * Use this to avoid trying to perform operations on wsi that have already
258 * closed but we didn't get to that message yet.
259 *
260 * Returns 0 if not closed yet or 1 if it has closed but we didn't process the
261 * close message yet.
262 */
263
264 int
lws_seq_check_wsi(lws_seq_t * seq,struct lws * wsi)265 lws_seq_check_wsi(lws_seq_t *seq, struct lws *wsi)
266 {
267 lws_seq_event_t *seqe;
268 struct lws_dll2 *dh;
269
270 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
271
272 dh = lws_dll2_get_head(&seq->seq_event_owner);
273 while (dh) {
274 seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
275
276 if (seqe->e == LWSSEQ_WSI_CONN_CLOSE && seqe->data == wsi)
277 break;
278
279 dh = dh->next;
280 }
281
282 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
283
284 return !!dh;
285 }
286
287
288 static void
lws_seq_sul_timeout_cb(lws_sorted_usec_list_t * sul)289 lws_seq_sul_timeout_cb(lws_sorted_usec_list_t *sul)
290 {
291 lws_seq_t *s = lws_container_of(sul, lws_seq_t, sul_timeout);
292
293 lws_seq_queue_event(s, LWSSEQ_TIMED_OUT, NULL, NULL);
294 }
295
296 /* set us to LWS_SET_TIMER_USEC_CANCEL to remove timeout */
297
298 int
lws_seq_timeout_us(lws_seq_t * seq,lws_usec_t us)299 lws_seq_timeout_us(lws_seq_t *seq, lws_usec_t us)
300 {
301 seq->sul_timeout.cb = lws_seq_sul_timeout_cb;
302 /* list is always at the very top of the sul */
303 return __lws_sul_insert(&seq->pt->pt_sul_owner,
304 (lws_sorted_usec_list_t *)&seq->sul_timeout.list, us);
305 }
306
307 lws_seq_t *
lws_seq_from_user(void * u)308 lws_seq_from_user(void *u)
309 {
310 return &((lws_seq_t *)u)[-1];
311 }
312
313 const char *
lws_seq_name(lws_seq_t * seq)314 lws_seq_name(lws_seq_t *seq)
315 {
316 return seq->name;
317 }
318
319 lws_usec_t
lws_seq_us_since_creation(lws_seq_t * seq)320 lws_seq_us_since_creation(lws_seq_t *seq)
321 {
322 return lws_now_usecs() - seq->time_created;
323 }
324
325 struct lws_context *
lws_seq_get_context(lws_seq_t * seq)326 lws_seq_get_context(lws_seq_t *seq)
327 {
328 return seq->pt->context;
329 }
330
331