1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/thread.h"
6
7 #include <memory>
8 #include <type_traits>
9 #include <utility>
10
11 #include "base/dcheck_is_on.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/message_loop/message_pump.h"
19 #include "base/run_loop.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/task/current_thread.h"
22 #include "base/task/sequence_manager/sequence_manager_impl.h"
23 #include "base/task/sequence_manager/task_queue.h"
24 #include "base/task/single_thread_task_runner.h"
25 #include "base/threading/thread_id_name_manager.h"
26 #include "base/threading/thread_restrictions.h"
27 #include "base/types/pass_key.h"
28 #include "build/build_config.h"
29 #include "third_party/abseil-cpp/absl/base/dynamic_annotations.h"
30
31 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
32 #include <optional>
33
34 #include "base/files/file_descriptor_watcher_posix.h"
35 #endif
36
37 #if BUILDFLAG(IS_WIN)
38 #include "base/win/scoped_com_initializer.h"
39 #endif
40
41 namespace base {
42
43 #if DCHECK_IS_ON()
44 namespace {
45
46 // We use this thread-local variable to record whether or not a thread exited
47 // because its Stop method was called. This allows us to catch cases where
48 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when
49 // using a Thread to setup and run a MessageLoop.
50 constinit thread_local bool was_quit_properly = false;
51
52 } // namespace
53 #endif
54
55 namespace internal {
56
57 class SequenceManagerThreadDelegate : public Thread::Delegate {
58 public:
SequenceManagerThreadDelegate(MessagePumpType message_pump_type,OnceCallback<std::unique_ptr<MessagePump> ()> message_pump_factory)59 explicit SequenceManagerThreadDelegate(
60 MessagePumpType message_pump_type,
61 OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory)
62 : sequence_manager_(
63 sequence_manager::internal::CreateUnboundSequenceManagerImpl(
64 PassKey<base::internal::SequenceManagerThreadDelegate>(),
65 sequence_manager::SequenceManager::Settings::Builder()
66 .SetMessagePumpType(message_pump_type)
67 .Build())),
68 default_task_queue_(sequence_manager_->CreateTaskQueue(
69 sequence_manager::TaskQueue::Spec(
70 sequence_manager::QueueName::DEFAULT_TQ))),
71 message_pump_factory_(std::move(message_pump_factory)) {
72 sequence_manager_->SetDefaultTaskRunner(default_task_queue_->task_runner());
73 }
74
75 ~SequenceManagerThreadDelegate() override = default;
76
GetDefaultTaskRunner()77 scoped_refptr<SingleThreadTaskRunner> GetDefaultTaskRunner() override {
78 // Surprisingly this might not be default_task_queue_->task_runner() which
79 // we set in the constructor. The Thread::Init() method could create a
80 // SequenceManager on top of the current one and call
81 // SequenceManager::SetDefaultTaskRunner which would propagate the new
82 // TaskRunner down to our SequenceManager. Turns out, code actually relies
83 // on this and somehow relies on
84 // SequenceManagerThreadDelegate::GetDefaultTaskRunner returning this new
85 // TaskRunner. So instead of returning default_task_queue_->task_runner() we
86 // need to query the SequenceManager for it.
87 // The underlying problem here is that Subclasses of Thread can do crazy
88 // stuff in Init() but they are not really in control of what happens in the
89 // Thread::Delegate, as this is passed in on calling StartWithOptions which
90 // could happen far away from where the Thread is created. We should
91 // consider getting rid of StartWithOptions, and pass them as a constructor
92 // argument instead.
93 return sequence_manager_->GetTaskRunner();
94 }
95
BindToCurrentThread()96 void BindToCurrentThread() override {
97 sequence_manager_->BindToMessagePump(
98 std::move(message_pump_factory_).Run());
99 }
100
101 private:
102 std::unique_ptr<sequence_manager::internal::SequenceManagerImpl>
103 sequence_manager_;
104 sequence_manager::TaskQueue::Handle default_task_queue_;
105 OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory_;
106 };
107
108 } // namespace internal
109
110 Thread::Options::Options() = default;
111
Options(MessagePumpType type,size_t size)112 Thread::Options::Options(MessagePumpType type, size_t size)
113 : message_pump_type(type), stack_size(size) {}
114
Options(ThreadType thread_type)115 Thread::Options::Options(ThreadType thread_type) : thread_type(thread_type) {}
116
Options(Options && other)117 Thread::Options::Options(Options&& other)
118 : message_pump_type(std::move(other.message_pump_type)),
119 delegate(std::move(other.delegate)),
120 message_pump_factory(std::move(other.message_pump_factory)),
121 stack_size(std::move(other.stack_size)),
122 thread_type(std::move(other.thread_type)),
123 joinable(std::move(other.joinable)) {
124 other.moved_from = true;
125 }
126
operator =(Thread::Options && other)127 Thread::Options& Thread::Options::operator=(Thread::Options&& other) {
128 DCHECK_NE(this, &other);
129
130 message_pump_type = std::move(other.message_pump_type);
131 delegate = std::move(other.delegate);
132 message_pump_factory = std::move(other.message_pump_factory);
133 stack_size = std::move(other.stack_size);
134 thread_type = std::move(other.thread_type);
135 joinable = std::move(other.joinable);
136 other.moved_from = true;
137
138 return *this;
139 }
140
141 Thread::Options::~Options() = default;
142
Thread(const std::string & name)143 Thread::Thread(const std::string& name)
144 : id_event_(WaitableEvent::ResetPolicy::MANUAL,
145 WaitableEvent::InitialState::NOT_SIGNALED),
146 name_(name),
147 start_event_(WaitableEvent::ResetPolicy::MANUAL,
148 WaitableEvent::InitialState::NOT_SIGNALED) {
149 // Only bind the sequence on Start(): the state is constant between
150 // construction and Start() and it's thus valid for Start() to be called on
151 // another sequence as long as every other operation is then performed on that
152 // sequence.
153 owning_sequence_checker_.DetachFromSequence();
154 }
155
~Thread()156 Thread::~Thread() {
157 Stop();
158 }
159
Start()160 bool Thread::Start() {
161 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
162
163 Options options;
164 #if BUILDFLAG(IS_WIN)
165 if (com_status_ == STA)
166 options.message_pump_type = MessagePumpType::UI;
167 #endif
168 return StartWithOptions(std::move(options));
169 }
170
StartWithOptions(Options options)171 bool Thread::StartWithOptions(Options options) {
172 DCHECK(options.IsValid());
173 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
174 DCHECK(!delegate_);
175 DCHECK(!IsRunning());
176 DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
177 << "not allowed!";
178 #if BUILDFLAG(IS_WIN)
179 DCHECK((com_status_ != STA) ||
180 (options.message_pump_type == MessagePumpType::UI));
181 #endif
182
183 // Reset |id_| here to support restarting the thread.
184 id_event_.Reset();
185 id_ = kInvalidThreadId;
186
187 SetThreadWasQuitProperly(false);
188
189 if (options.delegate) {
190 DCHECK(!options.message_pump_factory);
191 delegate_ = std::move(options.delegate);
192 } else if (options.message_pump_factory) {
193 delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
194 MessagePumpType::CUSTOM, options.message_pump_factory);
195 } else {
196 delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
197 options.message_pump_type,
198 BindOnce([](MessagePumpType type) { return MessagePump::Create(type); },
199 options.message_pump_type));
200 }
201
202 start_event_.Reset();
203
204 // Hold |thread_lock_| while starting the new thread to synchronize with
205 // Stop() while it's not guaranteed to be sequenced (until crbug/629139 is
206 // fixed).
207 {
208 AutoLock lock(thread_lock_);
209 bool success = options.joinable
210 ? PlatformThread::CreateWithType(
211 options.stack_size, this, &thread_,
212 options.thread_type, options.message_pump_type)
213 : PlatformThread::CreateNonJoinableWithType(
214 options.stack_size, this, options.thread_type,
215 options.message_pump_type);
216 if (!success) {
217 DLOG(ERROR) << "failed to create thread";
218 return false;
219 }
220 }
221
222 joinable_ = options.joinable;
223
224 return true;
225 }
226
StartAndWaitForTesting()227 bool Thread::StartAndWaitForTesting() {
228 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
229 bool result = Start();
230 if (!result)
231 return false;
232 WaitUntilThreadStarted();
233 return true;
234 }
235
WaitUntilThreadStarted() const236 bool Thread::WaitUntilThreadStarted() const {
237 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
238 if (!delegate_)
239 return false;
240 // https://crbug.com/918039
241 base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
242 start_event_.Wait();
243 return true;
244 }
245
FlushForTesting()246 void Thread::FlushForTesting() {
247 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
248 if (!delegate_)
249 return;
250
251 WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC,
252 WaitableEvent::InitialState::NOT_SIGNALED);
253 task_runner()->PostTask(FROM_HERE,
254 BindOnce(&WaitableEvent::Signal, Unretained(&done)));
255 done.Wait();
256 }
257
Stop()258 void Thread::Stop() {
259 DCHECK(joinable_);
260
261 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
262 // enable this check, until then synchronization with Start() via
263 // |thread_lock_| is required...
264 // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
265 AutoLock lock(thread_lock_);
266
267 StopSoon();
268
269 // Can't join if the |thread_| is either already gone or is non-joinable.
270 if (thread_.is_null())
271 return;
272
273 // Wait for the thread to exit.
274 //
275 // TODO(darin): Unfortunately, we need to keep |delegate_| around
276 // until the thread exits. Some consumers are abusing the API. Make them stop.
277 PlatformThread::Join(thread_);
278 thread_ = base::PlatformThreadHandle();
279
280 // The thread should release |delegate_| on exit (note: Join() adds
281 // an implicit memory barrier and no lock is thus required for this check).
282 DCHECK(!delegate_);
283
284 stopping_ = false;
285 }
286
StopSoon()287 void Thread::StopSoon() {
288 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
289 // enable this check.
290 // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
291
292 if (stopping_ || !delegate_)
293 return;
294
295 stopping_ = true;
296
297 task_runner()->PostTask(
298 FROM_HERE, base::BindOnce(&Thread::ThreadQuitHelper, Unretained(this)));
299 }
300
DetachFromSequence()301 void Thread::DetachFromSequence() {
302 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
303 owning_sequence_checker_.DetachFromSequence();
304 }
305
GetThreadId() const306 PlatformThreadId Thread::GetThreadId() const {
307 if (!id_event_.IsSignaled()) {
308 // If the thread is created but not started yet, wait for |id_| being ready.
309 base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
310 id_event_.Wait();
311 }
312 return id_;
313 }
314
IsRunning() const315 bool Thread::IsRunning() const {
316 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
317 // enable this check.
318 // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
319
320 // If the thread's already started (i.e. |delegate_| is non-null) and
321 // not yet requested to stop (i.e. |stopping_| is false) we can just return
322 // true. (Note that |stopping_| is touched only on the same sequence that
323 // starts / started the new thread so we need no locking here.)
324 if (delegate_ && !stopping_)
325 return true;
326 // Otherwise check the |running_| flag, which is set to true by the new thread
327 // only while it is inside Run().
328 AutoLock lock(running_lock_);
329 return running_;
330 }
331
Run(RunLoop * run_loop)332 void Thread::Run(RunLoop* run_loop) {
333 // Overridable protected method to be called from our |thread_| only.
334 DCHECK(id_event_.IsSignaled());
335 DCHECK_EQ(id_, PlatformThread::CurrentId());
336
337 run_loop->Run();
338 }
339
340 // static
SetThreadWasQuitProperly(bool flag)341 void Thread::SetThreadWasQuitProperly(bool flag) {
342 #if DCHECK_IS_ON()
343 was_quit_properly = flag;
344 #endif
345 }
346
347 // static
GetThreadWasQuitProperly()348 bool Thread::GetThreadWasQuitProperly() {
349 #if DCHECK_IS_ON()
350 return was_quit_properly;
351 #else
352 return true;
353 #endif
354 }
355
ThreadMain()356 void Thread::ThreadMain() {
357 // First, make GetThreadId() available to avoid deadlocks. It could be called
358 // any place in the following thread initialization code.
359 DCHECK(!id_event_.IsSignaled());
360 // Note: this read of |id_| while |id_event_| isn't signaled is exceptionally
361 // okay because ThreadMain has a happens-after relationship with the other
362 // write in StartWithOptions().
363 DCHECK_EQ(kInvalidThreadId, id_);
364 id_ = PlatformThread::CurrentId();
365 DCHECK_NE(kInvalidThreadId, id_);
366 id_event_.Signal();
367
368 // Complete the initialization of our Thread object.
369 PlatformThread::SetName(name_.c_str());
370 ABSL_ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
371
372 // Lazily initialize the |message_loop| so that it can run on this thread.
373 DCHECK(delegate_);
374 // This binds CurrentThread and SingleThreadTaskRunner::CurrentDefaultHandle.
375 delegate_->BindToCurrentThread();
376 DCHECK(CurrentThread::Get());
377 DCHECK(SingleThreadTaskRunner::HasCurrentDefault());
378 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
379 // Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API.
380 std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher;
381 if (CurrentIOThread::IsSet()) {
382 file_descriptor_watcher = std::make_unique<FileDescriptorWatcher>(
383 delegate_->GetDefaultTaskRunner());
384 }
385 #endif // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
386
387 #if BUILDFLAG(IS_WIN)
388 std::unique_ptr<win::ScopedCOMInitializer> com_initializer;
389 if (com_status_ != NONE) {
390 com_initializer.reset(
391 (com_status_ == STA)
392 ? new win::ScopedCOMInitializer()
393 : new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
394 }
395 #endif
396
397 // Let the thread do extra initialization.
398 Init();
399
400 {
401 AutoLock lock(running_lock_);
402 running_ = true;
403 }
404
405 start_event_.Signal();
406
407 RunLoop run_loop;
408 run_loop_ = &run_loop;
409 Run(run_loop_);
410
411 {
412 AutoLock lock(running_lock_);
413 running_ = false;
414 }
415
416 // Let the thread do extra cleanup.
417 CleanUp();
418
419 #if BUILDFLAG(IS_WIN)
420 com_initializer.reset();
421 #endif
422
423 DCHECK(GetThreadWasQuitProperly());
424
425 // We can't receive messages anymore.
426 // (The message loop is destructed at the end of this block)
427 delegate_.reset();
428 run_loop_ = nullptr;
429 }
430
ThreadQuitHelper()431 void Thread::ThreadQuitHelper() {
432 DCHECK(run_loop_);
433 run_loop_->QuitWhenIdle();
434 SetThreadWasQuitProperly(true);
435 }
436
437 } // namespace base
438