1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2005 Wim Taymans <wim@fluendo.com>
4 *
5 * gsttask.c: Streaming tasks
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 /**
24 * SECTION:gsttask
25 * @title: GstTask
26 * @short_description: Abstraction of GStreamer streaming threads.
27 * @see_also: #GstElement, #GstPad
28 *
29 * #GstTask is used by #GstElement and #GstPad to provide the data passing
30 * threads in a #GstPipeline.
31 *
32 * A #GstPad will typically start a #GstTask to push or pull data to/from the
33 * peer pads. Most source elements start a #GstTask to push data. In some cases
34 * a demuxer element can start a #GstTask to pull data from a peer element. This
35 * is typically done when the demuxer can perform random access on the upstream
36 * peer element for improved performance.
37 *
38 * Although convenience functions exist on #GstPad to start/pause/stop tasks, it
39 * might sometimes be needed to create a #GstTask manually if it is not related to
40 * a #GstPad.
41 *
42 * Before the #GstTask can be run, it needs a #GRecMutex that can be set with
43 * gst_task_set_lock().
44 *
45 * The task can be started, paused and stopped with gst_task_start(), gst_task_pause()
46 * and gst_task_stop() respectively or with the gst_task_set_state() function.
47 *
48 * A #GstTask will repeatedly call the #GstTaskFunction with the user data
49 * that was provided when creating the task with gst_task_new(). While calling
50 * the function it will acquire the provided lock. The provided lock is released
51 * when the task pauses or stops.
52 *
53 * Stopping a task with gst_task_stop() will not immediately make sure the task is
54 * not running anymore. Use gst_task_join() to make sure the task is completely
55 * stopped and the thread is stopped.
56 *
57 * After creating a #GstTask, use gst_object_unref() to free its resources. This can
58 * only be done when the task is not running anymore.
59 *
60 * Task functions can send a #GstMessage to send out-of-band data to the
61 * application. The application can receive messages from the #GstBus in its
62 * mainloop.
63 *
64 * For debugging purposes, the task will configure its object name as the thread
65 * name on Linux. Please note that the object name should be configured before the
66 * task is started; changing the object name after the task has been started, has
67 * no effect on the thread name.
68 */
69
70 #include "gst_private.h"
71
72 #include "gstinfo.h"
73 #include "gsttask.h"
74 #include "glib-compat-private.h"
75
76 #include <stdio.h>
77
78 #ifdef HAVE_SYS_PRCTL_H
79 #include <sys/prctl.h>
80 #endif
81
82 #ifdef HAVE_PTHREAD_SETNAME_NP_WITHOUT_TID
83 #include <pthread.h>
84 #endif
85
86 GST_DEBUG_CATEGORY_STATIC (task_debug);
87 #define GST_CAT_DEFAULT (task_debug)
88
89 #define SET_TASK_STATE(t,s) (g_atomic_int_set (&GST_TASK_STATE(t), (s)))
90 #define GET_TASK_STATE(t) ((GstTaskState) g_atomic_int_get (&GST_TASK_STATE(t)))
91
92 struct _GstTaskPrivate
93 {
94 /* callbacks for managing the thread of this task */
95 GstTaskThreadFunc enter_func;
96 gpointer enter_user_data;
97 GDestroyNotify enter_notify;
98
99 GstTaskThreadFunc leave_func;
100 gpointer leave_user_data;
101 GDestroyNotify leave_notify;
102
103 /* configured pool */
104 GstTaskPool *pool;
105
106 /* remember the pool and id that is currently running. */
107 gpointer id;
108 GstTaskPool *pool_id;
109 };
110
111 #ifdef _MSC_VER
112 #define WIN32_LEAN_AND_MEAN
113 #include <windows.h>
114
115 typedef HRESULT (WINAPI * pSetThreadDescription) (HANDLE hThread,
116 PCWSTR lpThreadDescription);
117 static pSetThreadDescription SetThreadDescriptionFunc = NULL;
118 HMODULE kernel32_module = NULL;
119
120 struct _THREADNAME_INFO
121 {
122 DWORD dwType; // must be 0x1000
123 LPCSTR szName; // pointer to name (in user addr space)
124 DWORD dwThreadID; // thread ID (-1=caller thread)
125 DWORD dwFlags; // reserved for future use, must be zero
126 };
127 typedef struct _THREADNAME_INFO THREADNAME_INFO;
128
129 static void
SetThreadName(DWORD dwThreadID,LPCSTR szThreadName)130 SetThreadName (DWORD dwThreadID, LPCSTR szThreadName)
131 {
132 THREADNAME_INFO info;
133 info.dwType = 0x1000;
134 info.szName = szThreadName;
135 info.dwThreadID = dwThreadID;
136 info.dwFlags = 0;
137
138 __try {
139 RaiseException (0x406D1388, 0, sizeof (info) / sizeof (DWORD),
140 (const ULONG_PTR *) &info);
141 }
142 __except (EXCEPTION_CONTINUE_EXECUTION) {
143 }
144 }
145
146 static gboolean
gst_task_win32_load_library(void)147 gst_task_win32_load_library (void)
148 {
149 /* FIXME: Add support for UWP app */
150 #if WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
151 static gsize _init_once = 0;
152 if (g_once_init_enter (&_init_once)) {
153 kernel32_module = LoadLibraryW (L"kernel32.dll");
154 if (kernel32_module) {
155 SetThreadDescriptionFunc =
156 (pSetThreadDescription) GetProcAddress (kernel32_module,
157 "SetThreadDescription");
158 if (!SetThreadDescriptionFunc)
159 FreeLibrary (kernel32_module);
160 }
161 g_once_init_leave (&_init_once, 1);
162 }
163 #endif
164
165 return ! !SetThreadDescriptionFunc;
166 }
167
168 static gboolean
gst_task_win32_set_thread_desc(const gchar * name)169 gst_task_win32_set_thread_desc (const gchar * name)
170 {
171 HRESULT hr;
172 wchar_t *namew;
173
174 if (!gst_task_win32_load_library () || !name)
175 return FALSE;
176
177 namew = g_utf8_to_utf16 (name, -1, NULL, NULL, NULL);
178 if (!namew)
179 return FALSE;
180
181 hr = SetThreadDescriptionFunc (GetCurrentThread (), namew);
182
183 g_free (namew);
184 return SUCCEEDED (hr);
185 }
186
187 static void
gst_task_win32_set_thread_name(const gchar * name)188 gst_task_win32_set_thread_name (const gchar * name)
189 {
190 /* Prefer SetThreadDescription over exception based way if available,
191 * since thread description set by SetThreadDescription will be preserved
192 * in dump file */
193 if (!gst_task_win32_set_thread_desc (name))
194 SetThreadName ((DWORD) - 1, name);
195 }
196 #endif
197
198 static void gst_task_finalize (GObject * object);
199
200 static void gst_task_func (GstTask * task);
201
202 static GMutex pool_lock;
203
204 static GstTaskPool *_global_task_pool = NULL;
205
206 #define _do_init \
207 { \
208 GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); \
209 }
210
211 G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT,
212 G_ADD_PRIVATE (GstTask) _do_init);
213
214 /* Called with pool_lock */
215 static void
ensure_klass_pool(GstTaskClass * klass)216 ensure_klass_pool (GstTaskClass * klass)
217 {
218 if (G_UNLIKELY (_global_task_pool == NULL)) {
219 _global_task_pool = gst_task_pool_new ();
220 gst_task_pool_prepare (_global_task_pool, NULL);
221
222 /* Classes are never destroyed so this ref will never be dropped */
223 GST_OBJECT_FLAG_SET (_global_task_pool, GST_OBJECT_FLAG_MAY_BE_LEAKED);
224 }
225 klass->pool = _global_task_pool;
226 }
227
228 static void
gst_task_class_init(GstTaskClass * klass)229 gst_task_class_init (GstTaskClass * klass)
230 {
231 GObjectClass *gobject_class;
232
233 gobject_class = (GObjectClass *) klass;
234
235 gobject_class->finalize = gst_task_finalize;
236 }
237
238 static void
gst_task_init(GstTask * task)239 gst_task_init (GstTask * task)
240 {
241 GstTaskClass *klass;
242
243 klass = GST_TASK_GET_CLASS (task);
244
245 task->priv = gst_task_get_instance_private (task);
246 task->running = FALSE;
247 task->thread = NULL;
248 task->lock = NULL;
249 g_cond_init (&task->cond);
250 SET_TASK_STATE (task, GST_TASK_STOPPED);
251
252 /* use the default klass pool for this task, users can
253 * override this later */
254 g_mutex_lock (&pool_lock);
255 ensure_klass_pool (klass);
256 task->priv->pool = gst_object_ref (klass->pool);
257 g_mutex_unlock (&pool_lock);
258 }
259
260 static void
gst_task_finalize(GObject * object)261 gst_task_finalize (GObject * object)
262 {
263 GstTask *task = GST_TASK (object);
264 GstTaskPrivate *priv = task->priv;
265
266 GST_DEBUG ("task %p finalize", task);
267
268 if (priv->enter_notify)
269 priv->enter_notify (priv->enter_user_data);
270
271 if (priv->leave_notify)
272 priv->leave_notify (priv->leave_user_data);
273
274 if (task->notify)
275 task->notify (task->user_data);
276
277 gst_object_unref (priv->pool);
278
279 /* task thread cannot be running here since it holds a ref
280 * to the task so that the finalize could not have happened */
281 g_cond_clear (&task->cond);
282
283 G_OBJECT_CLASS (gst_task_parent_class)->finalize (object);
284 }
285
286 /* should be called with the object LOCK */
287 static void
gst_task_configure_name(GstTask * task)288 gst_task_configure_name (GstTask * task)
289 {
290 #if defined(HAVE_SYS_PRCTL_H) && defined(PR_SET_NAME)
291 const gchar *name;
292 gchar thread_name[17] = { 0, };
293
294 GST_OBJECT_LOCK (task);
295 name = GST_OBJECT_NAME (task);
296
297 /* set the thread name to something easily identifiable */
298 if (!snprintf (thread_name, 17, "%s", GST_STR_NULL (name))) {
299 GST_DEBUG_OBJECT (task, "Could not create thread name for '%s'", name);
300 } else {
301 GST_DEBUG_OBJECT (task, "Setting thread name to '%s'", thread_name);
302 if (prctl (PR_SET_NAME, (unsigned long int) thread_name, 0, 0, 0))
303 GST_DEBUG_OBJECT (task, "Failed to set thread name");
304 }
305 GST_OBJECT_UNLOCK (task);
306 #elif defined(HAVE_PTHREAD_SETNAME_NP_WITHOUT_TID)
307 const gchar *name;
308
309 GST_OBJECT_LOCK (task);
310 name = GST_OBJECT_NAME (task);
311
312 /* set the thread name to something easily identifiable */
313 GST_DEBUG_OBJECT (task, "Setting thread name to '%s'", name);
314 if (pthread_setname_np (name))
315 GST_DEBUG_OBJECT (task, "Failed to set thread name");
316
317 GST_OBJECT_UNLOCK (task);
318 #elif defined (_MSC_VER)
319 const gchar *name;
320 name = GST_OBJECT_NAME (task);
321
322 /* set the thread name to something easily identifiable */
323 GST_DEBUG_OBJECT (task, "Setting thread name to '%s'", name);
324 gst_task_win32_set_thread_name (name);
325 #endif
326 }
327
328 static void
gst_task_func(GstTask * task)329 gst_task_func (GstTask * task)
330 {
331 GRecMutex *lock;
332 GThread *tself;
333 GstTaskPrivate *priv;
334
335 priv = task->priv;
336
337 tself = g_thread_self ();
338
339 GST_DEBUG ("Entering task %p, thread %p", task, tself);
340
341 /* we have to grab the lock to get the mutex. We also
342 * mark our state running so that nobody can mess with
343 * the mutex. */
344 GST_OBJECT_LOCK (task);
345 if (GET_TASK_STATE (task) == GST_TASK_STOPPED)
346 goto exit;
347 lock = GST_TASK_GET_LOCK (task);
348 if (G_UNLIKELY (lock == NULL))
349 goto no_lock;
350 task->thread = tself;
351 GST_OBJECT_UNLOCK (task);
352
353 /* fire the enter_func callback when we need to */
354 if (priv->enter_func)
355 priv->enter_func (task, tself, priv->enter_user_data);
356
357 /* locking order is TASK_LOCK, LOCK */
358 g_rec_mutex_lock (lock);
359 /* configure the thread name now */
360 gst_task_configure_name (task);
361
362 while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) {
363 GST_OBJECT_LOCK (task);
364 while (G_UNLIKELY (GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
365 g_rec_mutex_unlock (lock);
366
367 GST_TASK_SIGNAL (task);
368 GST_INFO_OBJECT (task, "Task going to paused");
369 GST_TASK_WAIT (task);
370 GST_INFO_OBJECT (task, "Task resume from paused");
371 GST_OBJECT_UNLOCK (task);
372 /* locking order.. */
373 g_rec_mutex_lock (lock);
374 GST_OBJECT_LOCK (task);
375 }
376
377 if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_STOPPED)) {
378 GST_OBJECT_UNLOCK (task);
379 break;
380 } else {
381 GST_OBJECT_UNLOCK (task);
382 }
383
384 task->func (task->user_data);
385 }
386
387 g_rec_mutex_unlock (lock);
388
389 GST_OBJECT_LOCK (task);
390 task->thread = NULL;
391
392 exit:
393 if (priv->leave_func) {
394 /* fire the leave_func callback when we need to. We need to do this before
395 * we signal the task and with the task lock released. */
396 GST_OBJECT_UNLOCK (task);
397 priv->leave_func (task, tself, priv->leave_user_data);
398 GST_OBJECT_LOCK (task);
399 }
400 /* now we allow messing with the lock again by setting the running flag to
401 * %FALSE. Together with the SIGNAL this is the sign for the _join() to
402 * complete.
403 * Note that we still have not dropped the final ref on the task. We could
404 * check here if there is a pending join() going on and drop the last ref
405 * before releasing the lock as we can be sure that a ref is held by the
406 * caller of the join(). */
407 task->running = FALSE;
408 GST_TASK_SIGNAL (task);
409 GST_OBJECT_UNLOCK (task);
410
411 GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
412
413 gst_object_unref (task);
414 return;
415
416 no_lock:
417 {
418 g_warning ("starting task without a lock");
419 goto exit;
420 }
421 }
422
423 /**
424 * gst_task_cleanup_all:
425 *
426 * Wait for all tasks to be stopped. This is mainly used internally
427 * to ensure proper cleanup of internal data structures in test suites.
428 *
429 * MT safe.
430 */
431 void
gst_task_cleanup_all(void)432 gst_task_cleanup_all (void)
433 {
434 GstTaskClass *klass;
435
436 if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
437 if (klass->pool) {
438 g_mutex_lock (&pool_lock);
439 gst_task_pool_cleanup (klass->pool);
440 gst_object_unref (klass->pool);
441 klass->pool = NULL;
442 _global_task_pool = NULL;
443 g_mutex_unlock (&pool_lock);
444 }
445 }
446
447 /* GstElement owns a GThreadPool */
448 _priv_gst_element_cleanup ();
449 }
450
451 /**
452 * gst_task_new:
453 * @func: The #GstTaskFunction to use
454 * @user_data: User data to pass to @func
455 * @notify: the function to call when @user_data is no longer needed.
456 *
457 * Create a new Task that will repeatedly call the provided @func
458 * with @user_data as a parameter. Typically the task will run in
459 * a new thread.
460 *
461 * The function cannot be changed after the task has been created. You
462 * must create a new #GstTask to change the function.
463 *
464 * This function will not yet create and start a thread. Use gst_task_start() or
465 * gst_task_pause() to create and start the GThread.
466 *
467 * Before the task can be used, a #GRecMutex must be configured using the
468 * gst_task_set_lock() function. This lock will always be acquired while
469 * @func is called.
470 *
471 * Returns: (transfer full): A new #GstTask.
472 *
473 * MT safe.
474 */
475 GstTask *
gst_task_new(GstTaskFunction func,gpointer user_data,GDestroyNotify notify)476 gst_task_new (GstTaskFunction func, gpointer user_data, GDestroyNotify notify)
477 {
478 GstTask *task;
479
480 g_return_val_if_fail (func != NULL, NULL);
481
482 task = g_object_new (GST_TYPE_TASK, NULL);
483 task->func = func;
484 task->user_data = user_data;
485 task->notify = notify;
486
487 GST_DEBUG ("Created task %p", task);
488
489 /* clear floating flag */
490 gst_object_ref_sink (task);
491
492 return task;
493 }
494
495 /**
496 * gst_task_set_lock:
497 * @task: The #GstTask to use
498 * @mutex: The #GRecMutex to use
499 *
500 * Set the mutex used by the task. The mutex will be acquired before
501 * calling the #GstTaskFunction.
502 *
503 * This function has to be called before calling gst_task_pause() or
504 * gst_task_start().
505 *
506 * MT safe.
507 */
508 void
gst_task_set_lock(GstTask * task,GRecMutex * mutex)509 gst_task_set_lock (GstTask * task, GRecMutex * mutex)
510 {
511 g_return_if_fail (GST_IS_TASK (task));
512
513 GST_OBJECT_LOCK (task);
514 if (G_UNLIKELY (task->running))
515 goto is_running;
516 GST_INFO ("setting stream lock %p on task %p", mutex, task);
517 GST_TASK_GET_LOCK (task) = mutex;
518 GST_OBJECT_UNLOCK (task);
519
520 return;
521
522 /* ERRORS */
523 is_running:
524 {
525 GST_OBJECT_UNLOCK (task);
526 g_warning ("cannot call set_lock on a running task");
527 }
528 }
529
530 /**
531 * gst_task_get_pool:
532 * @task: a #GstTask
533 *
534 * Get the #GstTaskPool that this task will use for its streaming
535 * threads.
536 *
537 * MT safe.
538 *
539 * Returns: (transfer full): the #GstTaskPool used by @task. gst_object_unref()
540 * after usage.
541 */
542 GstTaskPool *
gst_task_get_pool(GstTask * task)543 gst_task_get_pool (GstTask * task)
544 {
545 GstTaskPool *result;
546 GstTaskPrivate *priv;
547
548 g_return_val_if_fail (GST_IS_TASK (task), NULL);
549
550 priv = task->priv;
551
552 GST_OBJECT_LOCK (task);
553 result = gst_object_ref (priv->pool);
554 GST_OBJECT_UNLOCK (task);
555
556 return result;
557 }
558
559 /**
560 * gst_task_set_pool:
561 * @task: a #GstTask
562 * @pool: (transfer none): a #GstTaskPool
563 *
564 * Set @pool as the new GstTaskPool for @task. Any new streaming threads that
565 * will be created by @task will now use @pool.
566 *
567 * MT safe.
568 */
569 void
gst_task_set_pool(GstTask * task,GstTaskPool * pool)570 gst_task_set_pool (GstTask * task, GstTaskPool * pool)
571 {
572 GstTaskPool *old;
573 GstTaskPrivate *priv;
574
575 g_return_if_fail (GST_IS_TASK (task));
576 g_return_if_fail (GST_IS_TASK_POOL (pool));
577
578 priv = task->priv;
579
580 GST_OBJECT_LOCK (task);
581 if (priv->pool != pool) {
582 old = priv->pool;
583 priv->pool = gst_object_ref (pool);
584 } else
585 old = NULL;
586 GST_OBJECT_UNLOCK (task);
587
588 if (old)
589 gst_object_unref (old);
590 }
591
592 /**
593 * gst_task_set_enter_callback:
594 * @task: The #GstTask to use
595 * @enter_func: (in): a #GstTaskThreadFunc
596 * @user_data: user data passed to @enter_func
597 * @notify: called when @user_data is no longer referenced
598 *
599 * Call @enter_func when the task function of @task is entered. @user_data will
600 * be passed to @enter_func and @notify will be called when @user_data is no
601 * longer referenced.
602 */
603 void
gst_task_set_enter_callback(GstTask * task,GstTaskThreadFunc enter_func,gpointer user_data,GDestroyNotify notify)604 gst_task_set_enter_callback (GstTask * task, GstTaskThreadFunc enter_func,
605 gpointer user_data, GDestroyNotify notify)
606 {
607 GDestroyNotify old_notify;
608
609 g_return_if_fail (task != NULL);
610 g_return_if_fail (GST_IS_TASK (task));
611
612 GST_OBJECT_LOCK (task);
613 if ((old_notify = task->priv->enter_notify)) {
614 gpointer old_data = task->priv->enter_user_data;
615
616 task->priv->enter_user_data = NULL;
617 task->priv->enter_notify = NULL;
618 GST_OBJECT_UNLOCK (task);
619
620 old_notify (old_data);
621
622 GST_OBJECT_LOCK (task);
623 }
624 task->priv->enter_func = enter_func;
625 task->priv->enter_user_data = user_data;
626 task->priv->enter_notify = notify;
627 GST_OBJECT_UNLOCK (task);
628 }
629
630 /**
631 * gst_task_set_leave_callback:
632 * @task: The #GstTask to use
633 * @leave_func: (in): a #GstTaskThreadFunc
634 * @user_data: user data passed to @leave_func
635 * @notify: called when @user_data is no longer referenced
636 *
637 * Call @leave_func when the task function of @task is left. @user_data will
638 * be passed to @leave_func and @notify will be called when @user_data is no
639 * longer referenced.
640 */
641 void
gst_task_set_leave_callback(GstTask * task,GstTaskThreadFunc leave_func,gpointer user_data,GDestroyNotify notify)642 gst_task_set_leave_callback (GstTask * task, GstTaskThreadFunc leave_func,
643 gpointer user_data, GDestroyNotify notify)
644 {
645 GDestroyNotify old_notify;
646
647 g_return_if_fail (task != NULL);
648 g_return_if_fail (GST_IS_TASK (task));
649
650 GST_OBJECT_LOCK (task);
651 if ((old_notify = task->priv->leave_notify)) {
652 gpointer old_data = task->priv->leave_user_data;
653
654 task->priv->leave_user_data = NULL;
655 task->priv->leave_notify = NULL;
656 GST_OBJECT_UNLOCK (task);
657
658 old_notify (old_data);
659
660 GST_OBJECT_LOCK (task);
661 }
662 task->priv->leave_func = leave_func;
663 task->priv->leave_user_data = user_data;
664 task->priv->leave_notify = notify;
665 GST_OBJECT_UNLOCK (task);
666 }
667
668 /**
669 * gst_task_get_state:
670 * @task: The #GstTask to query
671 *
672 * Get the current state of the task.
673 *
674 * Returns: The #GstTaskState of the task
675 *
676 * MT safe.
677 */
678 GstTaskState
gst_task_get_state(GstTask * task)679 gst_task_get_state (GstTask * task)
680 {
681 GstTaskState result;
682
683 g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED);
684
685 result = GET_TASK_STATE (task);
686
687 return result;
688 }
689
690 /* make sure the task is running and start a thread if it's not.
691 * This function must be called with the task LOCK. */
692 static gboolean
start_task(GstTask * task)693 start_task (GstTask * task)
694 {
695 gboolean res = TRUE;
696 GError *error = NULL;
697 GstTaskPrivate *priv;
698
699 priv = task->priv;
700
701 /* new task, We ref before so that it remains alive while
702 * the thread is running. */
703 gst_object_ref (task);
704 /* mark task as running so that a join will wait until we schedule
705 * and exit the task function. */
706 task->running = TRUE;
707
708 /* push on the thread pool, we remember the original pool because the user
709 * could change it later on and then we join to the wrong pool. */
710 priv->pool_id = gst_object_ref (priv->pool);
711 priv->id =
712 gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func,
713 task, &error);
714
715 if (error != NULL) {
716 g_warning ("failed to create thread: %s", error->message);
717 g_error_free (error);
718 res = FALSE;
719 }
720 return res;
721 }
722
723 static inline gboolean
gst_task_set_state_unlocked(GstTask * task,GstTaskState state)724 gst_task_set_state_unlocked (GstTask * task, GstTaskState state)
725 {
726 GstTaskState old;
727 gboolean res = TRUE;
728
729 GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state);
730
731 if (state != GST_TASK_STOPPED)
732 if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
733 goto no_lock;
734
735 /* if the state changed, do our thing */
736 old = GET_TASK_STATE (task);
737 if (old != state) {
738 SET_TASK_STATE (task, state);
739 switch (old) {
740 case GST_TASK_STOPPED:
741 /* If the task already has a thread scheduled we don't have to do
742 * anything. */
743 if (G_UNLIKELY (!task->running))
744 res = start_task (task);
745 break;
746 case GST_TASK_PAUSED:
747 /* when we are paused, signal to go to the new state */
748 GST_TASK_SIGNAL (task);
749 break;
750 case GST_TASK_STARTED:
751 /* if we were started, we'll go to the new state after the next
752 * iteration. */
753 break;
754 }
755 }
756
757 return res;
758
759 /* ERRORS */
760 no_lock:
761 {
762 GST_WARNING_OBJECT (task, "state %d set on task without a lock", state);
763 g_warning ("task without a lock can't be set to state %d", state);
764 return FALSE;
765 }
766 }
767
768
769 /**
770 * gst_task_set_state:
771 * @task: a #GstTask
772 * @state: the new task state
773 *
774 * Sets the state of @task to @state.
775 *
776 * The @task must have a lock associated with it using
777 * gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or
778 * this function will return %FALSE.
779 *
780 * MT safe.
781 *
782 * Returns: %TRUE if the state could be changed.
783 */
784 gboolean
gst_task_set_state(GstTask * task,GstTaskState state)785 gst_task_set_state (GstTask * task, GstTaskState state)
786 {
787 gboolean res = TRUE;
788
789 g_return_val_if_fail (GST_IS_TASK (task), FALSE);
790
791 GST_OBJECT_LOCK (task);
792 res = gst_task_set_state_unlocked (task, state);
793 GST_OBJECT_UNLOCK (task);
794
795 return res;
796 }
797
798 /**
799 * gst_task_start:
800 * @task: The #GstTask to start
801 *
802 * Starts @task. The @task must have a lock associated with it using
803 * gst_task_set_lock() or this function will return %FALSE.
804 *
805 * Returns: %TRUE if the task could be started.
806 *
807 * MT safe.
808 */
809 gboolean
gst_task_start(GstTask * task)810 gst_task_start (GstTask * task)
811 {
812 return gst_task_set_state (task, GST_TASK_STARTED);
813 }
814
815 /**
816 * gst_task_stop:
817 * @task: The #GstTask to stop
818 *
819 * Stops @task. This method merely schedules the task to stop and
820 * will not wait for the task to have completely stopped. Use
821 * gst_task_join() to stop and wait for completion.
822 *
823 * Returns: %TRUE if the task could be stopped.
824 *
825 * MT safe.
826 */
827 gboolean
gst_task_stop(GstTask * task)828 gst_task_stop (GstTask * task)
829 {
830 return gst_task_set_state (task, GST_TASK_STOPPED);
831 }
832
833 /**
834 * gst_task_pause:
835 * @task: The #GstTask to pause
836 *
837 * Pauses @task. This method can also be called on a task in the
838 * stopped state, in which case a thread will be started and will remain
839 * in the paused state. This function does not wait for the task to complete
840 * the paused state.
841 *
842 * Returns: %TRUE if the task could be paused.
843 *
844 * MT safe.
845 */
846 gboolean
gst_task_pause(GstTask * task)847 gst_task_pause (GstTask * task)
848 {
849 return gst_task_set_state (task, GST_TASK_PAUSED);
850 }
851
852 /**
853 * gst_task_resume:
854 * @task: The #GstTask to resume
855 *
856 * Resume @task in case it was paused. If the task was stopped, it will
857 * remain in that state and this function will return %FALSE.
858 *
859 * Returns: %TRUE if the task could be resumed.
860 *
861 * MT safe.
862 * Since: 1.18
863 */
864 gboolean
gst_task_resume(GstTask * task)865 gst_task_resume (GstTask * task)
866 {
867 gboolean res = FALSE;
868 g_return_val_if_fail (GST_IS_TASK (task), FALSE);
869
870 GST_OBJECT_LOCK (task);
871 if (GET_TASK_STATE (task) != GST_TASK_STOPPED)
872 res = gst_task_set_state_unlocked (task, GST_TASK_STARTED);
873 GST_OBJECT_UNLOCK (task);
874
875 return res;
876 }
877
878 /**
879 * gst_task_join:
880 * @task: The #GstTask to join
881 *
882 * Joins @task. After this call, it is safe to unref the task
883 * and clean up the lock set with gst_task_set_lock().
884 *
885 * The task will automatically be stopped with this call.
886 *
887 * This function cannot be called from within a task function as this
888 * would cause a deadlock. The function will detect this and print a
889 * g_warning.
890 *
891 * Returns: %TRUE if the task could be joined.
892 *
893 * MT safe.
894 */
895 gboolean
gst_task_join(GstTask * task)896 gst_task_join (GstTask * task)
897 {
898 GThread *tself;
899 GstTaskPrivate *priv;
900 gpointer id;
901 GstTaskPool *pool = NULL;
902
903 g_return_val_if_fail (GST_IS_TASK (task), FALSE);
904
905 priv = task->priv;
906
907 tself = g_thread_self ();
908
909 GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself);
910
911 /* we don't use a real thread join here because we are using
912 * thread pools */
913 GST_OBJECT_LOCK (task);
914 if (G_UNLIKELY (tself == task->thread))
915 goto joining_self;
916 SET_TASK_STATE (task, GST_TASK_STOPPED);
917 /* signal the state change for when it was blocked in PAUSED. */
918 GST_TASK_SIGNAL (task);
919 /* we set the running flag when pushing the task on the thread pool.
920 * This means that the task function might not be called when we try
921 * to join it here. */
922 while (G_LIKELY (task->running))
923 GST_TASK_WAIT (task);
924 /* clean the thread */
925 task->thread = NULL;
926 /* get the id and pool to join */
927 pool = priv->pool_id;
928 id = priv->id;
929 priv->pool_id = NULL;
930 priv->id = NULL;
931 GST_OBJECT_UNLOCK (task);
932
933 if (pool) {
934 if (id)
935 gst_task_pool_join (pool, id);
936 gst_object_unref (pool);
937 }
938
939 GST_DEBUG_OBJECT (task, "Joined task %p", task);
940
941 return TRUE;
942
943 /* ERRORS */
944 joining_self:
945 {
946 GST_WARNING_OBJECT (task, "trying to join task from its thread");
947 GST_OBJECT_UNLOCK (task);
948 g_warning ("\nTrying to join task %p from its thread would deadlock.\n"
949 "You cannot change the state of an element from its streaming\n"
950 "thread. Use g_idle_add() or post a GstMessage on the bus to\n"
951 "schedule the state change from the main thread.\n", task);
952 return FALSE;
953 }
954 }
955