• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "message_loop_thread.h"
18 
19 #include <sys/syscall.h>
20 #include <unistd.h>
21 #include <thread>
22 
23 #include <base/strings/stringprintf.h>
24 
25 namespace bluetooth {
26 
27 namespace common {
28 
29 static constexpr int kRealTimeFifoSchedulingPriority = 1;
30 
MessageLoopThread(const std::string & thread_name)31 MessageLoopThread::MessageLoopThread(const std::string& thread_name)
32     : thread_name_(thread_name),
33       message_loop_(nullptr),
34       run_loop_(nullptr),
35       thread_(nullptr),
36       thread_id_(-1),
37       linux_tid_(-1),
38       weak_ptr_factory_(this),
39       shutting_down_(false) {}
40 
~MessageLoopThread()41 MessageLoopThread::~MessageLoopThread() { ShutDown(); }
42 
StartUp()43 void MessageLoopThread::StartUp() {
44   std::promise<void> start_up_promise;
45   std::future<void> start_up_future = start_up_promise.get_future();
46   {
47     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
48     if (thread_ != nullptr) {
49       LOG(WARNING) << __func__ << ": thread " << *this << " is already started";
50 
51       return;
52     }
53     thread_ = new std::thread(&MessageLoopThread::RunThread, this,
54                               std::move(start_up_promise));
55   }
56   start_up_future.wait();
57 }
58 
DoInThread(const base::Location & from_here,base::OnceClosure task)59 bool MessageLoopThread::DoInThread(const base::Location& from_here,
60                                    base::OnceClosure task) {
61   return DoInThreadDelayed(from_here, std::move(task), base::TimeDelta());
62 }
63 
DoInThreadDelayed(const base::Location & from_here,base::OnceClosure task,const base::TimeDelta & delay)64 bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here,
65                                           base::OnceClosure task,
66                                           const base::TimeDelta& delay) {
67   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
68   if (message_loop_ == nullptr) {
69     LOG(ERROR) << __func__ << ": message loop is null for thread " << *this
70                << ", from " << from_here.ToString();
71     return false;
72   }
73   if (!message_loop_->task_runner()->PostDelayedTask(from_here, std::move(task),
74                                                      delay)) {
75     LOG(ERROR) << __func__
76                << ": failed to post task to message loop for thread " << *this
77                << ", from " << from_here.ToString();
78     return false;
79   }
80   return true;
81 }
82 
ShutDown()83 void MessageLoopThread::ShutDown() {
84   {
85     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
86     if (thread_ == nullptr) {
87       LOG(INFO) << __func__ << ": thread " << *this << " is already stopped";
88       return;
89     }
90     if (message_loop_ == nullptr) {
91       LOG(INFO) << __func__ << ": message_loop_ is null. Already stopping";
92       return;
93     }
94     if (shutting_down_) {
95       LOG(INFO) << __func__ << ": waiting for thread to join";
96       return;
97     }
98     shutting_down_ = true;
99     CHECK_NE(thread_id_, base::PlatformThread::CurrentId())
100         << __func__ << " should not be called on the thread itself. "
101         << "Otherwise, deadlock may happen.";
102     if (!message_loop_->task_runner()->PostTask(
103             FROM_HERE, run_loop_->QuitWhenIdleClosure())) {
104       LOG(FATAL) << __func__
105                  << ": failed to post task to message loop for thread "
106                  << *this;
107     }
108   }
109   thread_->join();
110   {
111     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
112     delete thread_;
113     thread_ = nullptr;
114     shutting_down_ = false;
115   }
116 }
117 
GetThreadId() const118 base::PlatformThreadId MessageLoopThread::GetThreadId() const {
119   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
120   return thread_id_;
121 }
122 
GetName() const123 std::string MessageLoopThread::GetName() const {
124   return thread_name_;
125 }
126 
ToString() const127 std::string MessageLoopThread::ToString() const {
128   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
129   return base::StringPrintf("%s(%d)", thread_name_.c_str(), thread_id_);
130 }
131 
IsRunning() const132 bool MessageLoopThread::IsRunning() const {
133   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
134   return message_loop_ != nullptr;
135 }
136 
137 // Non API method, should not be protected by API mutex
RunThread(MessageLoopThread * thread,std::promise<void> start_up_promise)138 void MessageLoopThread::RunThread(MessageLoopThread* thread,
139                                   std::promise<void> start_up_promise) {
140   thread->Run(std::move(start_up_promise));
141 }
142 
message_loop() const143 base::MessageLoop* MessageLoopThread::message_loop() const {
144   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
145   return message_loop_;
146 }
147 
EnableRealTimeScheduling()148 bool MessageLoopThread::EnableRealTimeScheduling() {
149   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
150   if (!IsRunning()) {
151     LOG(ERROR) << __func__ << ": thread " << *this << " is not running";
152     return false;
153   }
154   struct sched_param rt_params = {.sched_priority =
155                                       kRealTimeFifoSchedulingPriority};
156   int rc = sched_setscheduler(linux_tid_, SCHED_FIFO, &rt_params);
157   if (rc != 0) {
158     LOG(ERROR) << __func__ << ": unable to set SCHED_FIFO priority "
159                << kRealTimeFifoSchedulingPriority << " for linux_tid "
160                << std::to_string(linux_tid_) << ", thread " << *this
161                << ", error: " << strerror(errno);
162     return false;
163   }
164   return true;
165 }
166 
GetWeakPtr()167 base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() {
168   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
169   return weak_ptr_factory_.GetWeakPtr();
170 }
171 
Run(std::promise<void> start_up_promise)172 void MessageLoopThread::Run(std::promise<void> start_up_promise) {
173   {
174     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
175     LOG(INFO) << __func__ << ": message loop starting for thread "
176               << thread_name_;
177     base::PlatformThread::SetName(thread_name_);
178     message_loop_ = new base::MessageLoop();
179     run_loop_ = new base::RunLoop();
180     thread_id_ = base::PlatformThread::CurrentId();
181     linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid));
182     start_up_promise.set_value();
183   }
184 
185   // Blocking until ShutDown() is called
186   run_loop_->Run();
187 
188   {
189     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
190     thread_id_ = -1;
191     linux_tid_ = -1;
192     delete message_loop_;
193     message_loop_ = nullptr;
194     delete run_loop_;
195     run_loop_ = nullptr;
196     LOG(INFO) << __func__ << ": message loop finished for thread "
197               << thread_name_;
198   }
199 }
200 
201 }  // namespace common
202 
203 }  // namespace bluetooth
204