• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "modules/utility/maybe_worker_thread.h"
11 
12 #include <utility>
13 
14 #include "api/task_queue/pending_task_safety_flag.h"
15 #include "api/task_queue/task_queue_base.h"
16 #include "rtc_base/checks.h"
17 #include "rtc_base/event.h"
18 #include "rtc_base/logging.h"
19 #include "rtc_base/task_queue.h"
20 
21 namespace webrtc {
22 
MaybeWorkerThread(const FieldTrialsView & field_trials,absl::string_view task_queue_name,TaskQueueFactory * factory)23 MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials,
24                                      absl::string_view task_queue_name,
25                                      TaskQueueFactory* factory)
26     : owned_task_queue_(
27           field_trials.IsEnabled("WebRTC-SendPacketsOnWorkerThread")
28               ? nullptr
29               : factory->CreateTaskQueue(task_queue_name,
30                                          rtc::TaskQueue::Priority::NORMAL)),
31       worker_thread_(TaskQueueBase::Current()) {
32   RTC_DCHECK(worker_thread_);
33   RTC_LOG(LS_INFO) << "WebRTC-SendPacketsOnWorkerThread"
34                    << (owned_task_queue_ ? " Disabled" : " Enabled");
35 }
36 
~MaybeWorkerThread()37 MaybeWorkerThread::~MaybeWorkerThread() {
38   RTC_DCHECK_RUN_ON(&sequence_checker_);
39 
40   if (owned_task_queue_) {
41     // owned_task_queue_ must be a valid pointer when the task queue is
42     // destroyed since there may be tasks that use this object that run when the
43     // task queue is deleted.
44     owned_task_queue_->Delete();
45     owned_task_queue_.release();
46   }
47 }
48 
RunSynchronous(absl::AnyInvocable<void ()&&> task)49 void MaybeWorkerThread::RunSynchronous(absl::AnyInvocable<void() &&> task) {
50   if (owned_task_queue_) {
51     rtc::Event thread_sync_event;
52     auto closure = [&thread_sync_event, task = std::move(task)]() mutable {
53       std::move(task)();
54       thread_sync_event.Set();
55     };
56     owned_task_queue_->PostTask(std::move(closure));
57     thread_sync_event.Wait(rtc::Event::kForever);
58   } else {
59     RTC_DCHECK_RUN_ON(&sequence_checker_);
60     std::move(task)();
61   }
62 }
63 
RunOrPost(absl::AnyInvocable<void ()&&> task)64 void MaybeWorkerThread::RunOrPost(absl::AnyInvocable<void() &&> task) {
65   if (owned_task_queue_) {
66     owned_task_queue_->PostTask(std::move(task));
67   } else {
68     RTC_DCHECK_RUN_ON(&sequence_checker_);
69     std::move(task)();
70   }
71 }
72 
TaskQueueForDelayedTasks() const73 TaskQueueBase* MaybeWorkerThread::TaskQueueForDelayedTasks() const {
74   RTC_DCHECK(IsCurrent());
75   return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_;
76 }
77 
TaskQueueForPost() const78 TaskQueueBase* MaybeWorkerThread::TaskQueueForPost() const {
79   return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_;
80 }
81 
IsCurrent() const82 bool MaybeWorkerThread::IsCurrent() const {
83   if (owned_task_queue_) {
84     return owned_task_queue_->IsCurrent();
85   }
86   return worker_thread_->IsCurrent();
87 }
88 
MaybeSafeTask(rtc::scoped_refptr<PendingTaskSafetyFlag> flag,absl::AnyInvocable<void ()&&> task)89 absl::AnyInvocable<void() &&> MaybeWorkerThread::MaybeSafeTask(
90     rtc::scoped_refptr<PendingTaskSafetyFlag> flag,
91     absl::AnyInvocable<void() &&> task) {
92   if (owned_task_queue_) {
93     return task;
94   } else {
95     return SafeTask(std::move(flag), std::move(task));
96   }
97 }
98 
99 }  // namespace webrtc
100