1 /* GStreamer
2 * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23
24 #define GST_DISABLE_MINIOBJECT_INLINE_FUNCTIONS
25 #include "gst_private.h"
26
27 #include "gstpromise.h"
28
29 #define GST_CAT_DEFAULT gst_promise_debug
30 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
31
32 /**
33 * SECTION:gstpromise
34 * @title: GstPromise
35 * @short_description: a miniobject for future/promise-like functionality
36 *
37 * The #GstPromise object implements the container for values that may
38 * be available later. i.e. a Future or a Promise in
39 * <https://en.wikipedia.org/wiki/Futures_and_promises>.
40 * As with all Future/Promise-like functionality, there is the concept of the
41 * producer of the value and the consumer of the value.
42 *
43 * A #GstPromise is created with gst_promise_new() by the consumer and passed
44 * to the producer to avoid thread safety issues with the change callback.
45 * A #GstPromise can be replied to with a value (or an error) by the producer
46 * with gst_promise_reply(). The exact value returned is defined by the API
47 * contract of the producer and %NULL may be a valid reply.
48 * gst_promise_interrupt() is for the consumer to
49 * indicate to the producer that the value is not needed anymore and producing
50 * that value can stop. The @GST_PROMISE_RESULT_EXPIRED state set by a call
51 * to gst_promise_expire() indicates to the consumer that a value will never
52 * be produced and is intended to be called by a third party that implements
53 * some notion of message handling such as #GstBus.
54 * A callback can also be installed at #GstPromise creation for
55 * result changes with gst_promise_new_with_change_func().
56 * The change callback can be used to chain #GstPromises's together as in the
57 * following example.
58 * |[<!-- language="C" -->
59 * const GstStructure *reply;
60 * GstPromise *p;
61 * if (gst_promise_wait (promise) != GST_PROMISE_RESULT_REPLIED)
62 * return; // interrupted or expired value
63 * reply = gst_promise_get_reply (promise);
64 * if (error in reply)
65 * return; // propagate error
66 * p = gst_promise_new_with_change_func (another_promise_change_func, user_data, notify);
67 * pass p to promise-using API
68 * ]|
69 *
70 * Each #GstPromise starts out with a #GstPromiseResult of
71 * %GST_PROMISE_RESULT_PENDING and only ever transitions once
72 * into one of the other #GstPromiseResult's.
73 *
74 * In order to support multi-threaded code, gst_promise_reply(),
75 * gst_promise_interrupt() and gst_promise_expire() may all be from
76 * different threads with some restrictions and the final result of the promise
77 * is whichever call is made first. There are two restrictions on ordering:
78 *
79 * 1. That gst_promise_reply() and gst_promise_interrupt() cannot be called
80 * after gst_promise_expire()
81 * 2. That gst_promise_reply() and gst_promise_interrupt()
82 * cannot be called twice.
83 *
84 * The change function set with gst_promise_new_with_change_func() is
85 * called directly from either the gst_promise_reply(),
86 * gst_promise_interrupt() or gst_promise_expire() and can be called
87 * from an arbitrary thread. #GstPromise using APIs can restrict this to
88 * a single thread or a subset of threads but that is entirely up to the API
89 * that uses #GstPromise.
90 */
91
92 static const int immutable_structure_refcount = 2;
93
94 #define GST_PROMISE_REPLY(p) (((GstPromiseImpl *)(p))->reply)
95 #define GST_PROMISE_RESULT(p) (((GstPromiseImpl *)(p))->result)
96 #define GST_PROMISE_LOCK(p) (&(((GstPromiseImpl *)(p))->lock))
97 #define GST_PROMISE_COND(p) (&(((GstPromiseImpl *)(p))->cond))
98 #define GST_PROMISE_CHANGE_FUNC(p) (((GstPromiseImpl *)(p))->change_func)
99 #define GST_PROMISE_CHANGE_DATA(p) (((GstPromiseImpl *)(p))->user_data)
100 #define GST_PROMISE_CHANGE_NOTIFY(p) (((GstPromiseImpl *)(p))->notify)
101
102 typedef struct
103 {
104 GstPromise promise;
105
106 GstPromiseResult result;
107 GstStructure *reply;
108
109 GMutex lock;
110 GCond cond;
111 GstPromiseChangeFunc change_func;
112 gpointer user_data;
113 GDestroyNotify notify;
114 } GstPromiseImpl;
115
116 /**
117 * gst_promise_wait:
118 * @promise: a #GstPromise
119 *
120 * Wait for @promise to move out of the %GST_PROMISE_RESULT_PENDING state.
121 * If @promise is not in %GST_PROMISE_RESULT_PENDING then it will return
122 * immediately with the current result.
123 *
124 * Returns: the result of the promise
125 *
126 * Since: 1.14
127 */
128 GstPromiseResult
gst_promise_wait(GstPromise * promise)129 gst_promise_wait (GstPromise * promise)
130 {
131 GstPromiseResult ret;
132
133 g_return_val_if_fail (promise != NULL, GST_PROMISE_RESULT_EXPIRED);
134
135 g_mutex_lock (GST_PROMISE_LOCK (promise));
136 ret = GST_PROMISE_RESULT (promise);
137
138 while (ret == GST_PROMISE_RESULT_PENDING) {
139 GST_LOG ("%p waiting", promise);
140 g_cond_wait (GST_PROMISE_COND (promise), GST_PROMISE_LOCK (promise));
141 ret = GST_PROMISE_RESULT (promise);
142 }
143 GST_LOG ("%p waited", promise);
144
145 g_mutex_unlock (GST_PROMISE_LOCK (promise));
146
147 return ret;
148 }
149
150 /**
151 * gst_promise_reply:
152 * @promise: (allow-none): a #GstPromise
153 * @s: (transfer full) (nullable): a #GstStructure with the the reply contents
154 *
155 * Set a reply on @promise. This will wake up any waiters with
156 * %GST_PROMISE_RESULT_REPLIED. Called by the producer of the value to
157 * indicate success (or failure).
158 *
159 * If @promise has already been interrupted by the consumer, then this reply
160 * is not visible to the consumer.
161 *
162 * Since: 1.14
163 */
164 void
gst_promise_reply(GstPromise * promise,GstStructure * s)165 gst_promise_reply (GstPromise * promise, GstStructure * s)
166 {
167 GstPromiseChangeFunc change_func = NULL;
168 gpointer change_data = NULL;
169
170 /* Caller requested that no reply is necessary */
171 if (promise == NULL)
172 return;
173
174 g_mutex_lock (GST_PROMISE_LOCK (promise));
175 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
176 GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_INTERRUPTED) {
177 GstPromiseResult result = GST_PROMISE_RESULT (promise);
178 g_mutex_unlock (GST_PROMISE_LOCK (promise));
179 g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
180 result == GST_PROMISE_RESULT_INTERRUPTED);
181 }
182
183 /* XXX: is this necessary and valid? */
184 if (GST_PROMISE_REPLY (promise) && GST_PROMISE_REPLY (promise) != s)
185 gst_structure_free (GST_PROMISE_REPLY (promise));
186
187 /* Only reply iff we are currently in pending */
188 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
189 if (s
190 && !gst_structure_set_parent_refcount (s,
191 (int *) &immutable_structure_refcount)) {
192 g_critical ("Input structure has a parent already!");
193 g_mutex_unlock (GST_PROMISE_LOCK (promise));
194 return;
195 }
196
197 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_REPLIED;
198 GST_LOG ("%p replied", promise);
199
200 GST_PROMISE_REPLY (promise) = s;
201
202 change_func = GST_PROMISE_CHANGE_FUNC (promise);
203 change_data = GST_PROMISE_CHANGE_DATA (promise);
204 } else {
205 /* eat the value */
206 if (s)
207 gst_structure_free (s);
208 }
209
210 g_cond_broadcast (GST_PROMISE_COND (promise));
211 g_mutex_unlock (GST_PROMISE_LOCK (promise));
212
213 if (change_func)
214 change_func (promise, change_data);
215 }
216
217 /**
218 * gst_promise_get_reply:
219 * @promise: a #GstPromise
220 *
221 * Retrieve the reply set on @promise. @promise must be in
222 * %GST_PROMISE_RESULT_REPLIED and the returned structure is owned by @promise
223 *
224 * Returns: (transfer none) (nullable): The reply set on @promise
225 *
226 * Since: 1.14
227 */
228 const GstStructure *
gst_promise_get_reply(GstPromise * promise)229 gst_promise_get_reply (GstPromise * promise)
230 {
231 g_return_val_if_fail (promise != NULL, NULL);
232
233 g_mutex_lock (GST_PROMISE_LOCK (promise));
234 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
235 GstPromiseResult result = GST_PROMISE_RESULT (promise);
236 g_mutex_unlock (GST_PROMISE_LOCK (promise));
237 g_return_val_if_fail (result == GST_PROMISE_RESULT_REPLIED, NULL);
238 }
239
240 g_mutex_unlock (GST_PROMISE_LOCK (promise));
241
242 return GST_PROMISE_REPLY (promise);
243 }
244
245 /**
246 * gst_promise_interrupt:
247 * @promise: a #GstPromise
248 *
249 * Interrupt waiting for a @promise. This will wake up any waiters with
250 * %GST_PROMISE_RESULT_INTERRUPTED. Called when the consumer does not want
251 * the value produced anymore.
252 *
253 * Since: 1.14
254 */
255 void
gst_promise_interrupt(GstPromise * promise)256 gst_promise_interrupt (GstPromise * promise)
257 {
258 GstPromiseChangeFunc change_func = NULL;
259 gpointer change_data = NULL;
260
261 g_return_if_fail (promise != NULL);
262
263 g_mutex_lock (GST_PROMISE_LOCK (promise));
264 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
265 GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
266 GstPromiseResult result = GST_PROMISE_RESULT (promise);
267 g_mutex_unlock (GST_PROMISE_LOCK (promise));
268 g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
269 result == GST_PROMISE_RESULT_REPLIED);
270 }
271 /* only interrupt if we are currently in pending */
272 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
273 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_INTERRUPTED;
274 g_cond_broadcast (GST_PROMISE_COND (promise));
275 GST_LOG ("%p interrupted", promise);
276
277 change_func = GST_PROMISE_CHANGE_FUNC (promise);
278 change_data = GST_PROMISE_CHANGE_DATA (promise);
279 }
280 g_mutex_unlock (GST_PROMISE_LOCK (promise));
281
282 if (change_func)
283 change_func (promise, change_data);
284 }
285
286 /**
287 * gst_promise_expire:
288 * @promise: a #GstPromise
289 *
290 * Expire a @promise. This will wake up any waiters with
291 * %GST_PROMISE_RESULT_EXPIRED. Called by a message loop when the parent
292 * message is handled and/or destroyed (possibly unanswered).
293 *
294 * Since: 1.14
295 */
296 void
gst_promise_expire(GstPromise * promise)297 gst_promise_expire (GstPromise * promise)
298 {
299 GstPromiseChangeFunc change_func = NULL;
300 gpointer change_data = NULL;
301
302 g_return_if_fail (promise != NULL);
303
304 g_mutex_lock (GST_PROMISE_LOCK (promise));
305 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
306 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_EXPIRED;
307 g_cond_broadcast (GST_PROMISE_COND (promise));
308 GST_LOG ("%p expired", promise);
309
310 change_func = GST_PROMISE_CHANGE_FUNC (promise);
311 change_data = GST_PROMISE_CHANGE_DATA (promise);
312 GST_PROMISE_CHANGE_FUNC (promise) = NULL;
313 GST_PROMISE_CHANGE_DATA (promise) = NULL;
314 }
315 g_mutex_unlock (GST_PROMISE_LOCK (promise));
316
317 if (change_func)
318 change_func (promise, change_data);
319 }
320
321 static void
gst_promise_free(GstMiniObject * object)322 gst_promise_free (GstMiniObject * object)
323 {
324 GstPromise *promise = (GstPromise *) object;
325
326 /* the promise *must* be dealt with in some way before destruction */
327 g_warn_if_fail (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING);
328
329 if (GST_PROMISE_CHANGE_NOTIFY (promise))
330 GST_PROMISE_CHANGE_NOTIFY (promise) (GST_PROMISE_CHANGE_DATA (promise));
331
332 if (GST_PROMISE_REPLY (promise)) {
333 gst_structure_set_parent_refcount (GST_PROMISE_REPLY (promise), NULL);
334 gst_structure_free (GST_PROMISE_REPLY (promise));
335 }
336 g_mutex_clear (GST_PROMISE_LOCK (promise));
337 g_cond_clear (GST_PROMISE_COND (promise));
338 GST_LOG ("%p finalized", promise);
339
340 #ifdef USE_POISONING
341 memset (promise, 0xff, sizeof (GstPromiseImpl));
342 #endif
343
344 g_free (promise);
345 }
346
347 static void
gst_promise_init(GstPromise * promise)348 gst_promise_init (GstPromise * promise)
349 {
350 static gsize _init = 0;
351
352 if (g_once_init_enter (&_init)) {
353 GST_DEBUG_CATEGORY_INIT (gst_promise_debug, "gstpromise", 0, "gstpromise");
354 g_once_init_leave (&_init, 1);
355 }
356
357 gst_mini_object_init (GST_MINI_OBJECT (promise), 0, GST_TYPE_PROMISE, NULL,
358 NULL, gst_promise_free);
359
360 GST_PROMISE_REPLY (promise) = NULL;
361 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_PENDING;
362 g_mutex_init (GST_PROMISE_LOCK (promise));
363 g_cond_init (GST_PROMISE_COND (promise));
364 }
365
366 /**
367 * gst_promise_new:
368 *
369 * Returns: a new #GstPromise
370 *
371 * Since: 1.14
372 */
373 GstPromise *
gst_promise_new(void)374 gst_promise_new (void)
375 {
376 GstPromise *promise = GST_PROMISE (g_new0 (GstPromiseImpl, 1));
377
378 gst_promise_init (promise);
379 GST_LOG ("new promise %p", promise);
380
381 return promise;
382 }
383
384 /**
385 * gst_promise_new_with_change_func:
386 * @func: (scope notified): a #GstPromiseChangeFunc to call
387 * @user_data: (closure): argument to call @func with
388 * @notify: notification function that @user_data is no longer needed
389 *
390 * @func will be called exactly once when transitioning out of
391 * %GST_PROMISE_RESULT_PENDING into any of the other #GstPromiseResult
392 * states.
393 *
394 * Returns: a new #GstPromise
395 *
396 * Since: 1.14
397 */
398 GstPromise *
gst_promise_new_with_change_func(GstPromiseChangeFunc func,gpointer user_data,GDestroyNotify notify)399 gst_promise_new_with_change_func (GstPromiseChangeFunc func, gpointer user_data,
400 GDestroyNotify notify)
401 {
402 GstPromise *promise = gst_promise_new ();
403
404 GST_PROMISE_CHANGE_FUNC (promise) = func;
405 GST_PROMISE_CHANGE_DATA (promise) = user_data;
406 GST_PROMISE_CHANGE_NOTIFY (promise) = notify;
407
408 return promise;
409 }
410
411 GST_DEFINE_MINI_OBJECT_TYPE (GstPromise, gst_promise);
412
413 /**
414 * gst_promise_ref:
415 * @promise: a #GstPromise.
416 *
417 * Increases the refcount of the given @promise by one.
418 *
419 * Returns: (transfer full): @promise
420 *
421 * Since: 1.14
422 */
423 GstPromise *
gst_promise_ref(GstPromise * promise)424 gst_promise_ref (GstPromise * promise)
425 {
426 return (GstPromise *) gst_mini_object_ref (GST_MINI_OBJECT_CAST (promise));
427 }
428
429 /**
430 * gst_promise_unref:
431 * @promise: (transfer full): a #GstPromise.
432 *
433 * Decreases the refcount of the promise. If the refcount reaches 0, the
434 * promise will be freed.
435 *
436 * Since: 1.14
437 */
438 void
gst_promise_unref(GstPromise * promise)439 gst_promise_unref (GstPromise * promise)
440 {
441 gst_mini_object_unref (GST_MINI_OBJECT_CAST (promise));
442 }
443