• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "message_loop_thread.h"
18 
19 #include <base/functional/callback.h>
20 #include <base/location.h>
21 #include <base/time/time.h>
22 #include <bluetooth/log.h>
23 #include <sys/syscall.h>
24 #include <unistd.h>
25 
26 #include <future>
27 #include <mutex>
28 #include <string>
29 #include <thread>
30 
31 #include "common/postable_context.h"
32 
33 namespace bluetooth {
34 namespace common {
35 
36 static constexpr int kRealTimeFifoSchedulingPriority = 1;
37 
timeDeltaFromMicroseconds(std::chrono::microseconds t)38 static base::TimeDelta timeDeltaFromMicroseconds(std::chrono::microseconds t) {
39 #if BASE_VER < 931007
40   return base::TimeDelta::FromMicroseconds(t.count());
41 #else
42   return base::Microseconds(t.count());
43 #endif
44 }
45 
MessageLoopThread(const std::string & thread_name)46 MessageLoopThread::MessageLoopThread(const std::string& thread_name)
47     : thread_name_(thread_name),
48       message_loop_(nullptr),
49       run_loop_(nullptr),
50       thread_(nullptr),
51       thread_id_(-1),
52       linux_tid_(-1),
53       weak_ptr_factory_(this),
54       shutting_down_(false) {}
55 
~MessageLoopThread()56 MessageLoopThread::~MessageLoopThread() { ShutDown(); }
57 
StartUp()58 void MessageLoopThread::StartUp() {
59   std::promise<void> start_up_promise;
60   std::future<void> start_up_future = start_up_promise.get_future();
61   {
62     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
63     if (thread_ != nullptr) {
64       log::warn("thread {} is already started", *this);
65 
66       return;
67     }
68     thread_ = new std::thread(&MessageLoopThread::RunThread, this, std::move(start_up_promise));
69   }
70   start_up_future.wait();
71 }
72 
DoInThread(base::OnceClosure task)73 bool MessageLoopThread::DoInThread(base::OnceClosure task) {
74   return DoInThreadDelayed(std::move(task), std::chrono::microseconds(0));
75 }
76 
DoInThreadDelayed(base::OnceClosure task,std::chrono::microseconds delay)77 bool MessageLoopThread::DoInThreadDelayed(base::OnceClosure task, std::chrono::microseconds delay) {
78   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
79 
80   if (message_loop_ == nullptr) {
81     log::error("message loop is null for thread {}", *this);
82     return false;
83   }
84   if (!message_loop_->task_runner()->PostDelayedTask(FROM_HERE, std::move(task),
85                                                      timeDeltaFromMicroseconds(delay))) {
86     log::error("failed to post task to message loop for thread {}", *this);
87     return false;
88   }
89   return true;
90 }
91 
ShutDown()92 void MessageLoopThread::ShutDown() {
93   {
94     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
95     if (thread_ == nullptr) {
96       log::info("thread {} is already stopped", *this);
97       return;
98     }
99     if (message_loop_ == nullptr) {
100       log::info("message_loop_ is null. Already stopping");
101       return;
102     }
103     if (shutting_down_) {
104       log::info("waiting for thread to join");
105       return;
106     }
107     shutting_down_ = true;
108     log::assert_that(thread_id_ != base::PlatformThread::CurrentId(),
109                      "should not be called on the thread itself. Otherwise, deadlock may happen.");
110     run_loop_->QuitWhenIdle();
111   }
112   thread_->join();
113   {
114     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
115     delete thread_;
116     thread_ = nullptr;
117     shutting_down_ = false;
118   }
119 }
120 
GetThreadId() const121 base::PlatformThreadId MessageLoopThread::GetThreadId() const {
122   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
123   return thread_id_;
124 }
125 
GetName() const126 std::string MessageLoopThread::GetName() const { return thread_name_; }
127 
ToString() const128 std::string MessageLoopThread::ToString() const {
129   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
130   return std::format("{}({})", thread_name_, thread_id_);
131 }
132 
IsRunning() const133 bool MessageLoopThread::IsRunning() const {
134   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
135   return thread_id_ != -1;
136 }
137 
138 // Non API method, should not be protected by API mutex
RunThread(MessageLoopThread * thread,std::promise<void> start_up_promise)139 void MessageLoopThread::RunThread(MessageLoopThread* thread, std::promise<void> start_up_promise) {
140   thread->Run(std::move(start_up_promise));
141 }
142 
143 // This is only for use in tests.
message_loop() const144 btbase::AbstractMessageLoop* MessageLoopThread::message_loop() const {
145   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
146   return message_loop_;
147 }
148 
EnableRealTimeScheduling()149 bool MessageLoopThread::EnableRealTimeScheduling() {
150   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
151 
152   if (!IsRunning()) {
153     log::error("thread {} is not running", *this);
154     return false;
155   }
156 
157   struct sched_param rt_params = {.sched_priority = kRealTimeFifoSchedulingPriority};
158   int rc = sched_setscheduler(linux_tid_, SCHED_FIFO, &rt_params);
159   if (rc != 0) {
160     log::error("unable to set SCHED_FIFO priority {} for linux_tid {}, thread {}, error: {}",
161                kRealTimeFifoSchedulingPriority, linux_tid_, *this, strerror(errno));
162     return false;
163   }
164   return true;
165 }
166 
GetWeakPtr()167 base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() {
168   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
169   return weak_ptr_factory_.GetWeakPtr();
170 }
171 
Run(std::promise<void> start_up_promise)172 void MessageLoopThread::Run(std::promise<void> start_up_promise) {
173   {
174     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
175 
176     log::info("message loop starting for thread {}", thread_name_);
177     base::PlatformThread::SetName(thread_name_);
178     message_loop_ = new btbase::AbstractMessageLoop();
179     run_loop_ = new base::RunLoop();
180     thread_id_ = base::PlatformThread::CurrentId();
181     linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid));
182     start_up_promise.set_value();
183   }
184 
185   // Blocking until ShutDown() is called
186   run_loop_->Run();
187 
188   {
189     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
190     thread_id_ = -1;
191     linux_tid_ = -1;
192     delete message_loop_;
193     message_loop_ = nullptr;
194     delete run_loop_;
195     run_loop_ = nullptr;
196     log::info("message loop finished for thread {}", thread_name_);
197   }
198 }
199 
Post(base::OnceClosure closure)200 void MessageLoopThread::Post(base::OnceClosure closure) { DoInThread(std::move(closure)); }
201 
Postable()202 PostableContext* MessageLoopThread::Postable() { return this; }
203 
204 }  // namespace common
205 }  // namespace bluetooth
206