• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23 
24 #define GST_DISABLE_MINIOBJECT_INLINE_FUNCTIONS
25 #include "gst_private.h"
26 
27 #include "gstpromise.h"
28 
29 #define GST_CAT_DEFAULT gst_promise_debug
30 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
31 
32 /**
33  * SECTION:gstpromise
34  * @title: GstPromise
35  * @short_description: a miniobject for future/promise-like functionality
36  *
37  * The #GstPromise object implements the container for values that may
38  * be available later. i.e. a Future or a Promise in
39  * <https://en.wikipedia.org/wiki/Futures_and_promises>.
40  * As with all Future/Promise-like functionality, there is the concept of the
41  * producer of the value and the consumer of the value.
42  *
43  * A #GstPromise is created with gst_promise_new() by the consumer and passed
44  * to the producer to avoid thread safety issues with the change callback.
45  * A #GstPromise can be replied to with a value (or an error) by the producer
46  * with gst_promise_reply(). The exact value returned is defined by the API
47  * contract of the producer and %NULL may be a valid reply.
48  * gst_promise_interrupt() is for the consumer to
49  * indicate to the producer that the value is not needed anymore and producing
50  * that value can stop.  The @GST_PROMISE_RESULT_EXPIRED state set by a call
51  * to gst_promise_expire() indicates to the consumer that a value will never
52  * be produced and is intended to be called by a third party that implements
53  * some notion of message handling such as #GstBus.
54  * A callback can also be installed at #GstPromise creation for
55  * result changes with gst_promise_new_with_change_func().
56  * The change callback can be used to chain #GstPromises's together as in the
57  * following example.
58  * |[<!-- language="C" -->
59  * const GstStructure *reply;
60  * GstPromise *p;
61  * if (gst_promise_wait (promise) != GST_PROMISE_RESULT_REPLIED)
62  *   return; // interrupted or expired value
63  * reply = gst_promise_get_reply (promise);
64  * if (error in reply)
65  *   return; // propagate error
66  * p = gst_promise_new_with_change_func (another_promise_change_func, user_data, notify);
67  * pass p to promise-using API
68  * ]|
69  *
70  * Each #GstPromise starts out with a #GstPromiseResult of
71  * %GST_PROMISE_RESULT_PENDING and only ever transitions once
72  * into one of the other #GstPromiseResult's.
73  *
74  * In order to support multi-threaded code, gst_promise_reply(),
75  * gst_promise_interrupt() and gst_promise_expire() may all be from
76  * different threads with some restrictions and the final result of the promise
77  * is whichever call is made first.  There are two restrictions on ordering:
78  *
79  * 1. That gst_promise_reply() and gst_promise_interrupt() cannot be called
80  * after gst_promise_expire()
81  * 2. That gst_promise_reply() and gst_promise_interrupt()
82  * cannot be called twice.
83  *
84  * The change function set with gst_promise_new_with_change_func() is
85  * called directly from either the gst_promise_reply(),
86  * gst_promise_interrupt() or gst_promise_expire() and can be called
87  * from an arbitrary thread.  #GstPromise using APIs can restrict this to
88  * a single thread or a subset of threads but that is entirely up to the API
89  * that uses #GstPromise.
90  */
91 
92 static const int immutable_structure_refcount = 2;
93 
94 #define GST_PROMISE_REPLY(p)            (((GstPromiseImpl *)(p))->reply)
95 #define GST_PROMISE_RESULT(p)           (((GstPromiseImpl *)(p))->result)
96 #define GST_PROMISE_LOCK(p)             (&(((GstPromiseImpl *)(p))->lock))
97 #define GST_PROMISE_COND(p)             (&(((GstPromiseImpl *)(p))->cond))
98 #define GST_PROMISE_CHANGE_FUNC(p)      (((GstPromiseImpl *)(p))->change_func)
99 #define GST_PROMISE_CHANGE_DATA(p)      (((GstPromiseImpl *)(p))->user_data)
100 #define GST_PROMISE_CHANGE_NOTIFY(p)    (((GstPromiseImpl *)(p))->notify)
101 
102 typedef struct
103 {
104   GstPromise promise;
105 
106   GstPromiseResult result;
107   GstStructure *reply;
108 
109   GMutex lock;
110   GCond cond;
111   GstPromiseChangeFunc change_func;
112   gpointer user_data;
113   GDestroyNotify notify;
114 } GstPromiseImpl;
115 
116 /**
117  * gst_promise_wait:
118  * @promise: a #GstPromise
119  *
120  * Wait for @promise to move out of the %GST_PROMISE_RESULT_PENDING state.
121  * If @promise is not in %GST_PROMISE_RESULT_PENDING then it will return
122  * immediately with the current result.
123  *
124  * Returns: the result of the promise
125  *
126  * Since: 1.14
127  */
128 GstPromiseResult
gst_promise_wait(GstPromise * promise)129 gst_promise_wait (GstPromise * promise)
130 {
131   GstPromiseResult ret;
132 
133   g_return_val_if_fail (promise != NULL, GST_PROMISE_RESULT_EXPIRED);
134 
135   g_mutex_lock (GST_PROMISE_LOCK (promise));
136   ret = GST_PROMISE_RESULT (promise);
137 
138   while (ret == GST_PROMISE_RESULT_PENDING) {
139     GST_LOG ("%p waiting", promise);
140     g_cond_wait (GST_PROMISE_COND (promise), GST_PROMISE_LOCK (promise));
141     ret = GST_PROMISE_RESULT (promise);
142   }
143   GST_LOG ("%p waited", promise);
144 
145   g_mutex_unlock (GST_PROMISE_LOCK (promise));
146 
147   return ret;
148 }
149 
150 /**
151  * gst_promise_reply:
152  * @promise: (allow-none): a #GstPromise
153  * @s: (transfer full) (nullable): a #GstStructure with the the reply contents
154  *
155  * Set a reply on @promise.  This will wake up any waiters with
156  * %GST_PROMISE_RESULT_REPLIED.  Called by the producer of the value to
157  * indicate success (or failure).
158  *
159  * If @promise has already been interrupted by the consumer, then this reply
160  * is not visible to the consumer.
161  *
162  * Since: 1.14
163  */
164 void
gst_promise_reply(GstPromise * promise,GstStructure * s)165 gst_promise_reply (GstPromise * promise, GstStructure * s)
166 {
167   GstPromiseChangeFunc change_func = NULL;
168   gpointer change_data = NULL;
169 
170   /* Caller requested that no reply is necessary */
171   if (promise == NULL)
172     return;
173 
174   g_mutex_lock (GST_PROMISE_LOCK (promise));
175   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
176       GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_INTERRUPTED) {
177     GstPromiseResult result = GST_PROMISE_RESULT (promise);
178     g_mutex_unlock (GST_PROMISE_LOCK (promise));
179     g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
180         result == GST_PROMISE_RESULT_INTERRUPTED);
181   }
182 
183   /* XXX: is this necessary and valid? */
184   if (GST_PROMISE_REPLY (promise) && GST_PROMISE_REPLY (promise) != s)
185     gst_structure_free (GST_PROMISE_REPLY (promise));
186 
187   /* Only reply iff we are currently in pending */
188   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
189     if (s
190         && !gst_structure_set_parent_refcount (s,
191             (int *) &immutable_structure_refcount)) {
192       g_critical ("Input structure has a parent already!");
193       g_mutex_unlock (GST_PROMISE_LOCK (promise));
194       return;
195     }
196 
197     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_REPLIED;
198     GST_LOG ("%p replied", promise);
199 
200     GST_PROMISE_REPLY (promise) = s;
201 
202     change_func = GST_PROMISE_CHANGE_FUNC (promise);
203     change_data = GST_PROMISE_CHANGE_DATA (promise);
204   } else {
205     /* eat the value */
206     if (s)
207       gst_structure_free (s);
208   }
209 
210   g_cond_broadcast (GST_PROMISE_COND (promise));
211   g_mutex_unlock (GST_PROMISE_LOCK (promise));
212 
213   if (change_func)
214     change_func (promise, change_data);
215 }
216 
217 /**
218  * gst_promise_get_reply:
219  * @promise: a #GstPromise
220  *
221  * Retrieve the reply set on @promise.  @promise must be in
222  * %GST_PROMISE_RESULT_REPLIED and the returned structure is owned by @promise
223  *
224  * Returns: (transfer none) (nullable): The reply set on @promise
225  *
226  * Since: 1.14
227  */
228 const GstStructure *
gst_promise_get_reply(GstPromise * promise)229 gst_promise_get_reply (GstPromise * promise)
230 {
231   g_return_val_if_fail (promise != NULL, NULL);
232 
233   g_mutex_lock (GST_PROMISE_LOCK (promise));
234   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
235     GstPromiseResult result = GST_PROMISE_RESULT (promise);
236     g_mutex_unlock (GST_PROMISE_LOCK (promise));
237     g_return_val_if_fail (result == GST_PROMISE_RESULT_REPLIED, NULL);
238   }
239 
240   g_mutex_unlock (GST_PROMISE_LOCK (promise));
241 
242   return GST_PROMISE_REPLY (promise);
243 }
244 
245 /**
246  * gst_promise_interrupt:
247  * @promise: a #GstPromise
248  *
249  * Interrupt waiting for a @promise.  This will wake up any waiters with
250  * %GST_PROMISE_RESULT_INTERRUPTED.  Called when the consumer does not want
251  * the value produced anymore.
252  *
253  * Since: 1.14
254  */
255 void
gst_promise_interrupt(GstPromise * promise)256 gst_promise_interrupt (GstPromise * promise)
257 {
258   GstPromiseChangeFunc change_func = NULL;
259   gpointer change_data = NULL;
260 
261   g_return_if_fail (promise != NULL);
262 
263   g_mutex_lock (GST_PROMISE_LOCK (promise));
264   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
265       GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
266     GstPromiseResult result = GST_PROMISE_RESULT (promise);
267     g_mutex_unlock (GST_PROMISE_LOCK (promise));
268     g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
269         result == GST_PROMISE_RESULT_REPLIED);
270   }
271   /* only interrupt if we are currently in pending */
272   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
273     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_INTERRUPTED;
274     g_cond_broadcast (GST_PROMISE_COND (promise));
275     GST_LOG ("%p interrupted", promise);
276 
277     change_func = GST_PROMISE_CHANGE_FUNC (promise);
278     change_data = GST_PROMISE_CHANGE_DATA (promise);
279   }
280   g_mutex_unlock (GST_PROMISE_LOCK (promise));
281 
282   if (change_func)
283     change_func (promise, change_data);
284 }
285 
286 /**
287  * gst_promise_expire:
288  * @promise: a #GstPromise
289  *
290  * Expire a @promise.  This will wake up any waiters with
291  * %GST_PROMISE_RESULT_EXPIRED.  Called by a message loop when the parent
292  * message is handled and/or destroyed (possibly unanswered).
293  *
294  * Since: 1.14
295  */
296 void
gst_promise_expire(GstPromise * promise)297 gst_promise_expire (GstPromise * promise)
298 {
299   GstPromiseChangeFunc change_func = NULL;
300   gpointer change_data = NULL;
301 
302   g_return_if_fail (promise != NULL);
303 
304   g_mutex_lock (GST_PROMISE_LOCK (promise));
305   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
306     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_EXPIRED;
307     g_cond_broadcast (GST_PROMISE_COND (promise));
308     GST_LOG ("%p expired", promise);
309 
310     change_func = GST_PROMISE_CHANGE_FUNC (promise);
311     change_data = GST_PROMISE_CHANGE_DATA (promise);
312     GST_PROMISE_CHANGE_FUNC (promise) = NULL;
313     GST_PROMISE_CHANGE_DATA (promise) = NULL;
314   }
315   g_mutex_unlock (GST_PROMISE_LOCK (promise));
316 
317   if (change_func)
318     change_func (promise, change_data);
319 }
320 
321 static void
gst_promise_free(GstMiniObject * object)322 gst_promise_free (GstMiniObject * object)
323 {
324   GstPromise *promise = (GstPromise *) object;
325 
326   /* the promise *must* be dealt with in some way before destruction */
327   g_warn_if_fail (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING);
328 
329   if (GST_PROMISE_CHANGE_NOTIFY (promise))
330     GST_PROMISE_CHANGE_NOTIFY (promise) (GST_PROMISE_CHANGE_DATA (promise));
331 
332   if (GST_PROMISE_REPLY (promise)) {
333     gst_structure_set_parent_refcount (GST_PROMISE_REPLY (promise), NULL);
334     gst_structure_free (GST_PROMISE_REPLY (promise));
335   }
336   g_mutex_clear (GST_PROMISE_LOCK (promise));
337   g_cond_clear (GST_PROMISE_COND (promise));
338   GST_LOG ("%p finalized", promise);
339 
340 #ifdef USE_POISONING
341   memset (promise, 0xff, sizeof (GstPromiseImpl));
342 #endif
343 
344   g_free (promise);
345 }
346 
347 static void
gst_promise_init(GstPromise * promise)348 gst_promise_init (GstPromise * promise)
349 {
350   static gsize _init = 0;
351 
352   if (g_once_init_enter (&_init)) {
353     GST_DEBUG_CATEGORY_INIT (gst_promise_debug, "gstpromise", 0, "gstpromise");
354     g_once_init_leave (&_init, 1);
355   }
356 
357   gst_mini_object_init (GST_MINI_OBJECT (promise), 0, GST_TYPE_PROMISE, NULL,
358       NULL, gst_promise_free);
359 
360   GST_PROMISE_REPLY (promise) = NULL;
361   GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_PENDING;
362   g_mutex_init (GST_PROMISE_LOCK (promise));
363   g_cond_init (GST_PROMISE_COND (promise));
364 }
365 
366 /**
367  * gst_promise_new:
368  *
369  * Returns: a new #GstPromise
370  *
371  * Since: 1.14
372  */
373 GstPromise *
gst_promise_new(void)374 gst_promise_new (void)
375 {
376   GstPromise *promise = GST_PROMISE (g_new0 (GstPromiseImpl, 1));
377 
378   gst_promise_init (promise);
379   GST_LOG ("new promise %p", promise);
380 
381   return promise;
382 }
383 
384 /**
385  * gst_promise_new_with_change_func:
386  * @func: (scope notified): a #GstPromiseChangeFunc to call
387  * @user_data: (closure): argument to call @func with
388  * @notify: notification function that @user_data is no longer needed
389  *
390  * @func will be called exactly once when transitioning out of
391  * %GST_PROMISE_RESULT_PENDING into any of the other #GstPromiseResult
392  * states.
393  *
394  * Returns: a new #GstPromise
395  *
396  * Since: 1.14
397  */
398 GstPromise *
gst_promise_new_with_change_func(GstPromiseChangeFunc func,gpointer user_data,GDestroyNotify notify)399 gst_promise_new_with_change_func (GstPromiseChangeFunc func, gpointer user_data,
400     GDestroyNotify notify)
401 {
402   GstPromise *promise = gst_promise_new ();
403 
404   GST_PROMISE_CHANGE_FUNC (promise) = func;
405   GST_PROMISE_CHANGE_DATA (promise) = user_data;
406   GST_PROMISE_CHANGE_NOTIFY (promise) = notify;
407 
408   return promise;
409 }
410 
411 GST_DEFINE_MINI_OBJECT_TYPE (GstPromise, gst_promise);
412 
413 /**
414  * gst_promise_ref:
415  * @promise: a #GstPromise.
416  *
417  * Increases the refcount of the given @promise by one.
418  *
419  * Returns: (transfer full): @promise
420  *
421  * Since: 1.14
422  */
423 GstPromise *
gst_promise_ref(GstPromise * promise)424 gst_promise_ref (GstPromise * promise)
425 {
426   return (GstPromise *) gst_mini_object_ref (GST_MINI_OBJECT_CAST (promise));
427 }
428 
429 /**
430  * gst_promise_unref:
431  * @promise: (transfer full): a #GstPromise.
432  *
433  * Decreases the refcount of the promise. If the refcount reaches 0, the
434  * promise will be freed.
435  *
436  * Since: 1.14
437  */
438 void
gst_promise_unref(GstPromise * promise)439 gst_promise_unref (GstPromise * promise)
440 {
441   gst_mini_object_unref (GST_MINI_OBJECT_CAST (promise));
442 }
443