1 /*
2 * Copyright 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "message_loop_thread.h"
18
19 #include <base/functional/callback.h>
20 #include <base/location.h>
21 #include <base/time/time.h>
22 #include <bluetooth/log.h>
23 #include <sys/syscall.h>
24 #include <unistd.h>
25
26 #include <future>
27 #include <mutex>
28 #include <string>
29 #include <thread>
30
31 #include "common/postable_context.h"
32
33 namespace bluetooth {
34 namespace common {
35
36 static constexpr int kRealTimeFifoSchedulingPriority = 1;
37
timeDeltaFromMicroseconds(std::chrono::microseconds t)38 static base::TimeDelta timeDeltaFromMicroseconds(std::chrono::microseconds t) {
39 #if BASE_VER < 931007
40 return base::TimeDelta::FromMicroseconds(t.count());
41 #else
42 return base::Microseconds(t.count());
43 #endif
44 }
45
MessageLoopThread(const std::string & thread_name)46 MessageLoopThread::MessageLoopThread(const std::string& thread_name)
47 : thread_name_(thread_name),
48 message_loop_(nullptr),
49 run_loop_(nullptr),
50 thread_(nullptr),
51 thread_id_(-1),
52 linux_tid_(-1),
53 weak_ptr_factory_(this),
54 shutting_down_(false) {}
55
~MessageLoopThread()56 MessageLoopThread::~MessageLoopThread() { ShutDown(); }
57
StartUp()58 void MessageLoopThread::StartUp() {
59 std::promise<void> start_up_promise;
60 std::future<void> start_up_future = start_up_promise.get_future();
61 {
62 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
63 if (thread_ != nullptr) {
64 log::warn("thread {} is already started", *this);
65
66 return;
67 }
68 thread_ = new std::thread(&MessageLoopThread::RunThread, this, std::move(start_up_promise));
69 }
70 start_up_future.wait();
71 }
72
DoInThread(base::OnceClosure task)73 bool MessageLoopThread::DoInThread(base::OnceClosure task) {
74 return DoInThreadDelayed(std::move(task), std::chrono::microseconds(0));
75 }
76
DoInThreadDelayed(base::OnceClosure task,std::chrono::microseconds delay)77 bool MessageLoopThread::DoInThreadDelayed(base::OnceClosure task, std::chrono::microseconds delay) {
78 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
79
80 if (message_loop_ == nullptr) {
81 log::error("message loop is null for thread {}", *this);
82 return false;
83 }
84 if (!message_loop_->task_runner()->PostDelayedTask(FROM_HERE, std::move(task),
85 timeDeltaFromMicroseconds(delay))) {
86 log::error("failed to post task to message loop for thread {}", *this);
87 return false;
88 }
89 return true;
90 }
91
ShutDown()92 void MessageLoopThread::ShutDown() {
93 {
94 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
95 if (thread_ == nullptr) {
96 log::info("thread {} is already stopped", *this);
97 return;
98 }
99 if (message_loop_ == nullptr) {
100 log::info("message_loop_ is null. Already stopping");
101 return;
102 }
103 if (shutting_down_) {
104 log::info("waiting for thread to join");
105 return;
106 }
107 shutting_down_ = true;
108 log::assert_that(thread_id_ != base::PlatformThread::CurrentId(),
109 "should not be called on the thread itself. Otherwise, deadlock may happen.");
110 run_loop_->QuitWhenIdle();
111 }
112 thread_->join();
113 {
114 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
115 delete thread_;
116 thread_ = nullptr;
117 shutting_down_ = false;
118 }
119 }
120
GetThreadId() const121 base::PlatformThreadId MessageLoopThread::GetThreadId() const {
122 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
123 return thread_id_;
124 }
125
GetName() const126 std::string MessageLoopThread::GetName() const { return thread_name_; }
127
ToString() const128 std::string MessageLoopThread::ToString() const {
129 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
130 return std::format("{}({})", thread_name_, thread_id_);
131 }
132
IsRunning() const133 bool MessageLoopThread::IsRunning() const {
134 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
135 return thread_id_ != -1;
136 }
137
138 // Non API method, should not be protected by API mutex
RunThread(MessageLoopThread * thread,std::promise<void> start_up_promise)139 void MessageLoopThread::RunThread(MessageLoopThread* thread, std::promise<void> start_up_promise) {
140 thread->Run(std::move(start_up_promise));
141 }
142
143 // This is only for use in tests.
message_loop() const144 btbase::AbstractMessageLoop* MessageLoopThread::message_loop() const {
145 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
146 return message_loop_;
147 }
148
EnableRealTimeScheduling()149 bool MessageLoopThread::EnableRealTimeScheduling() {
150 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
151
152 if (!IsRunning()) {
153 log::error("thread {} is not running", *this);
154 return false;
155 }
156
157 struct sched_param rt_params = {.sched_priority = kRealTimeFifoSchedulingPriority};
158 int rc = sched_setscheduler(linux_tid_, SCHED_FIFO, &rt_params);
159 if (rc != 0) {
160 log::error("unable to set SCHED_FIFO priority {} for linux_tid {}, thread {}, error: {}",
161 kRealTimeFifoSchedulingPriority, linux_tid_, *this, strerror(errno));
162 return false;
163 }
164 return true;
165 }
166
GetWeakPtr()167 base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() {
168 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
169 return weak_ptr_factory_.GetWeakPtr();
170 }
171
Run(std::promise<void> start_up_promise)172 void MessageLoopThread::Run(std::promise<void> start_up_promise) {
173 {
174 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
175
176 log::info("message loop starting for thread {}", thread_name_);
177 base::PlatformThread::SetName(thread_name_);
178 message_loop_ = new btbase::AbstractMessageLoop();
179 run_loop_ = new base::RunLoop();
180 thread_id_ = base::PlatformThread::CurrentId();
181 linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid));
182 start_up_promise.set_value();
183 }
184
185 // Blocking until ShutDown() is called
186 run_loop_->Run();
187
188 {
189 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
190 thread_id_ = -1;
191 linux_tid_ = -1;
192 delete message_loop_;
193 message_loop_ = nullptr;
194 delete run_loop_;
195 run_loop_ = nullptr;
196 log::info("message loop finished for thread {}", thread_name_);
197 }
198 }
199
Post(base::OnceClosure closure)200 void MessageLoopThread::Post(base::OnceClosure closure) { DoInThread(std::move(closure)); }
201
Postable()202 PostableContext* MessageLoopThread::Postable() { return this; }
203
204 } // namespace common
205 } // namespace bluetooth
206