1 /*
2 * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "modules/utility/maybe_worker_thread.h"
11
12 #include <utility>
13
14 #include "api/task_queue/pending_task_safety_flag.h"
15 #include "api/task_queue/task_queue_base.h"
16 #include "rtc_base/checks.h"
17 #include "rtc_base/event.h"
18 #include "rtc_base/logging.h"
19 #include "rtc_base/task_queue.h"
20
21 namespace webrtc {
22
MaybeWorkerThread(const FieldTrialsView & field_trials,absl::string_view task_queue_name,TaskQueueFactory * factory)23 MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials,
24 absl::string_view task_queue_name,
25 TaskQueueFactory* factory)
26 : owned_task_queue_(
27 field_trials.IsEnabled("WebRTC-SendPacketsOnWorkerThread")
28 ? nullptr
29 : factory->CreateTaskQueue(task_queue_name,
30 rtc::TaskQueue::Priority::NORMAL)),
31 worker_thread_(TaskQueueBase::Current()) {
32 RTC_DCHECK(worker_thread_);
33 RTC_LOG(LS_INFO) << "WebRTC-SendPacketsOnWorkerThread"
34 << (owned_task_queue_ ? " Disabled" : " Enabled");
35 }
36
~MaybeWorkerThread()37 MaybeWorkerThread::~MaybeWorkerThread() {
38 RTC_DCHECK_RUN_ON(&sequence_checker_);
39
40 if (owned_task_queue_) {
41 // owned_task_queue_ must be a valid pointer when the task queue is
42 // destroyed since there may be tasks that use this object that run when the
43 // task queue is deleted.
44 owned_task_queue_->Delete();
45 owned_task_queue_.release();
46 }
47 }
48
RunSynchronous(absl::AnyInvocable<void ()&&> task)49 void MaybeWorkerThread::RunSynchronous(absl::AnyInvocable<void() &&> task) {
50 if (owned_task_queue_) {
51 rtc::Event thread_sync_event;
52 auto closure = [&thread_sync_event, task = std::move(task)]() mutable {
53 std::move(task)();
54 thread_sync_event.Set();
55 };
56 owned_task_queue_->PostTask(std::move(closure));
57 thread_sync_event.Wait(rtc::Event::kForever);
58 } else {
59 RTC_DCHECK_RUN_ON(&sequence_checker_);
60 std::move(task)();
61 }
62 }
63
RunOrPost(absl::AnyInvocable<void ()&&> task)64 void MaybeWorkerThread::RunOrPost(absl::AnyInvocable<void() &&> task) {
65 if (owned_task_queue_) {
66 owned_task_queue_->PostTask(std::move(task));
67 } else {
68 RTC_DCHECK_RUN_ON(&sequence_checker_);
69 std::move(task)();
70 }
71 }
72
TaskQueueForDelayedTasks() const73 TaskQueueBase* MaybeWorkerThread::TaskQueueForDelayedTasks() const {
74 RTC_DCHECK(IsCurrent());
75 return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_;
76 }
77
TaskQueueForPost() const78 TaskQueueBase* MaybeWorkerThread::TaskQueueForPost() const {
79 return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_;
80 }
81
IsCurrent() const82 bool MaybeWorkerThread::IsCurrent() const {
83 if (owned_task_queue_) {
84 return owned_task_queue_->IsCurrent();
85 }
86 return worker_thread_->IsCurrent();
87 }
88
MaybeSafeTask(rtc::scoped_refptr<PendingTaskSafetyFlag> flag,absl::AnyInvocable<void ()&&> task)89 absl::AnyInvocable<void() &&> MaybeWorkerThread::MaybeSafeTask(
90 rtc::scoped_refptr<PendingTaskSafetyFlag> flag,
91 absl::AnyInvocable<void() &&> task) {
92 if (owned_task_queue_) {
93 return task;
94 } else {
95 return SafeTask(std::move(flag), std::move(task));
96 }
97 }
98
99 } // namespace webrtc
100