• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/threading/thread.h"
6 
7 #include <memory>
8 #include <type_traits>
9 #include <utility>
10 
11 #include "base/dcheck_is_on.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/message_loop/message_pump.h"
19 #include "base/run_loop.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/task/current_thread.h"
22 #include "base/task/sequence_manager/sequence_manager_impl.h"
23 #include "base/task/sequence_manager/task_queue.h"
24 #include "base/task/single_thread_task_runner.h"
25 #include "base/threading/thread_id_name_manager.h"
26 #include "base/threading/thread_restrictions.h"
27 #include "base/types/pass_key.h"
28 #include "build/build_config.h"
29 #include "third_party/abseil-cpp/absl/base/dynamic_annotations.h"
30 
31 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
32 #include <optional>
33 
34 #include "base/files/file_descriptor_watcher_posix.h"
35 #endif
36 
37 #if BUILDFLAG(IS_WIN)
38 #include "base/win/scoped_com_initializer.h"
39 #endif
40 
41 namespace base {
42 
43 #if DCHECK_IS_ON()
44 namespace {
45 
46 // We use this thread-local variable to record whether or not a thread exited
47 // because its Stop method was called.  This allows us to catch cases where
48 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when
49 // using a Thread to setup and run a MessageLoop.
50 constinit thread_local bool was_quit_properly = false;
51 
52 }  // namespace
53 #endif
54 
55 namespace internal {
56 
57 class SequenceManagerThreadDelegate : public Thread::Delegate {
58  public:
SequenceManagerThreadDelegate(MessagePumpType message_pump_type,OnceCallback<std::unique_ptr<MessagePump> ()> message_pump_factory)59   explicit SequenceManagerThreadDelegate(
60       MessagePumpType message_pump_type,
61       OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory)
62       : sequence_manager_(
63             sequence_manager::internal::CreateUnboundSequenceManagerImpl(
64                 PassKey<base::internal::SequenceManagerThreadDelegate>(),
65                 sequence_manager::SequenceManager::Settings::Builder()
66                     .SetMessagePumpType(message_pump_type)
67                     .Build())),
68         default_task_queue_(sequence_manager_->CreateTaskQueue(
69             sequence_manager::TaskQueue::Spec(
70                 sequence_manager::QueueName::DEFAULT_TQ))),
71         message_pump_factory_(std::move(message_pump_factory)) {
72     sequence_manager_->SetDefaultTaskRunner(default_task_queue_->task_runner());
73   }
74 
75   ~SequenceManagerThreadDelegate() override = default;
76 
GetDefaultTaskRunner()77   scoped_refptr<SingleThreadTaskRunner> GetDefaultTaskRunner() override {
78     // Surprisingly this might not be default_task_queue_->task_runner() which
79     // we set in the constructor. The Thread::Init() method could create a
80     // SequenceManager on top of the current one and call
81     // SequenceManager::SetDefaultTaskRunner which would propagate the new
82     // TaskRunner down to our SequenceManager. Turns out, code actually relies
83     // on this and somehow relies on
84     // SequenceManagerThreadDelegate::GetDefaultTaskRunner returning this new
85     // TaskRunner. So instead of returning default_task_queue_->task_runner() we
86     // need to query the SequenceManager for it.
87     // The underlying problem here is that Subclasses of Thread can do crazy
88     // stuff in Init() but they are not really in control of what happens in the
89     // Thread::Delegate, as this is passed in on calling StartWithOptions which
90     // could happen far away from where the Thread is created. We should
91     // consider getting rid of StartWithOptions, and pass them as a constructor
92     // argument instead.
93     return sequence_manager_->GetTaskRunner();
94   }
95 
BindToCurrentThread()96   void BindToCurrentThread() override {
97     sequence_manager_->BindToMessagePump(
98         std::move(message_pump_factory_).Run());
99   }
100 
101  private:
102   std::unique_ptr<sequence_manager::internal::SequenceManagerImpl>
103       sequence_manager_;
104   sequence_manager::TaskQueue::Handle default_task_queue_;
105   OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory_;
106 };
107 
108 }  // namespace internal
109 
110 Thread::Options::Options() = default;
111 
Options(MessagePumpType type,size_t size)112 Thread::Options::Options(MessagePumpType type, size_t size)
113     : message_pump_type(type), stack_size(size) {}
114 
Options(ThreadType thread_type)115 Thread::Options::Options(ThreadType thread_type) : thread_type(thread_type) {}
116 
Options(Options && other)117 Thread::Options::Options(Options&& other)
118     : message_pump_type(std::move(other.message_pump_type)),
119       delegate(std::move(other.delegate)),
120       message_pump_factory(std::move(other.message_pump_factory)),
121       stack_size(std::move(other.stack_size)),
122       thread_type(std::move(other.thread_type)),
123       joinable(std::move(other.joinable)) {
124   other.moved_from = true;
125 }
126 
operator =(Thread::Options && other)127 Thread::Options& Thread::Options::operator=(Thread::Options&& other) {
128   DCHECK_NE(this, &other);
129 
130   message_pump_type = std::move(other.message_pump_type);
131   delegate = std::move(other.delegate);
132   message_pump_factory = std::move(other.message_pump_factory);
133   stack_size = std::move(other.stack_size);
134   thread_type = std::move(other.thread_type);
135   joinable = std::move(other.joinable);
136   other.moved_from = true;
137 
138   return *this;
139 }
140 
141 Thread::Options::~Options() = default;
142 
Thread(const std::string & name)143 Thread::Thread(const std::string& name)
144     : id_event_(WaitableEvent::ResetPolicy::MANUAL,
145                 WaitableEvent::InitialState::NOT_SIGNALED),
146       name_(name),
147       start_event_(WaitableEvent::ResetPolicy::MANUAL,
148                    WaitableEvent::InitialState::NOT_SIGNALED) {
149   // Only bind the sequence on Start(): the state is constant between
150   // construction and Start() and it's thus valid for Start() to be called on
151   // another sequence as long as every other operation is then performed on that
152   // sequence.
153   owning_sequence_checker_.DetachFromSequence();
154 }
155 
~Thread()156 Thread::~Thread() {
157   Stop();
158 }
159 
Start()160 bool Thread::Start() {
161   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
162 
163   Options options;
164 #if BUILDFLAG(IS_WIN)
165   if (com_status_ == STA)
166     options.message_pump_type = MessagePumpType::UI;
167 #endif
168   return StartWithOptions(std::move(options));
169 }
170 
StartWithOptions(Options options)171 bool Thread::StartWithOptions(Options options) {
172   DCHECK(options.IsValid());
173   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
174   DCHECK(!delegate_);
175   DCHECK(!IsRunning());
176   DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
177                      << "not allowed!";
178 #if BUILDFLAG(IS_WIN)
179   DCHECK((com_status_ != STA) ||
180          (options.message_pump_type == MessagePumpType::UI));
181 #endif
182 
183   // Reset |id_| here to support restarting the thread.
184   id_event_.Reset();
185   id_ = kInvalidThreadId;
186 
187   SetThreadWasQuitProperly(false);
188 
189   if (options.delegate) {
190     DCHECK(!options.message_pump_factory);
191     delegate_ = std::move(options.delegate);
192   } else if (options.message_pump_factory) {
193     delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
194         MessagePumpType::CUSTOM, options.message_pump_factory);
195   } else {
196     delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
197         options.message_pump_type,
198         BindOnce([](MessagePumpType type) { return MessagePump::Create(type); },
199                  options.message_pump_type));
200   }
201 
202   start_event_.Reset();
203 
204   // Hold |thread_lock_| while starting the new thread to synchronize with
205   // Stop() while it's not guaranteed to be sequenced (until crbug/629139 is
206   // fixed).
207   {
208     AutoLock lock(thread_lock_);
209     bool success = options.joinable
210                        ? PlatformThread::CreateWithType(
211                              options.stack_size, this, &thread_,
212                              options.thread_type, options.message_pump_type)
213                        : PlatformThread::CreateNonJoinableWithType(
214                              options.stack_size, this, options.thread_type,
215                              options.message_pump_type);
216     if (!success) {
217       DLOG(ERROR) << "failed to create thread";
218       return false;
219     }
220   }
221 
222   joinable_ = options.joinable;
223 
224   return true;
225 }
226 
StartAndWaitForTesting()227 bool Thread::StartAndWaitForTesting() {
228   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
229   bool result = Start();
230   if (!result)
231     return false;
232   WaitUntilThreadStarted();
233   return true;
234 }
235 
WaitUntilThreadStarted() const236 bool Thread::WaitUntilThreadStarted() const {
237   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
238   if (!delegate_)
239     return false;
240   // https://crbug.com/918039
241   base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
242   start_event_.Wait();
243   return true;
244 }
245 
FlushForTesting()246 void Thread::FlushForTesting() {
247   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
248   if (!delegate_)
249     return;
250 
251   WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC,
252                      WaitableEvent::InitialState::NOT_SIGNALED);
253   task_runner()->PostTask(FROM_HERE,
254                           BindOnce(&WaitableEvent::Signal, Unretained(&done)));
255   done.Wait();
256 }
257 
Stop()258 void Thread::Stop() {
259   DCHECK(joinable_);
260 
261   // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
262   // enable this check, until then synchronization with Start() via
263   // |thread_lock_| is required...
264   // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
265   AutoLock lock(thread_lock_);
266 
267   StopSoon();
268 
269   // Can't join if the |thread_| is either already gone or is non-joinable.
270   if (thread_.is_null())
271     return;
272 
273   // Wait for the thread to exit.
274   //
275   // TODO(darin): Unfortunately, we need to keep |delegate_| around
276   // until the thread exits. Some consumers are abusing the API. Make them stop.
277   PlatformThread::Join(thread_);
278   thread_ = base::PlatformThreadHandle();
279 
280   // The thread should release |delegate_| on exit (note: Join() adds
281   // an implicit memory barrier and no lock is thus required for this check).
282   DCHECK(!delegate_);
283 
284   stopping_ = false;
285 }
286 
StopSoon()287 void Thread::StopSoon() {
288   // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
289   // enable this check.
290   // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
291 
292   if (stopping_ || !delegate_)
293     return;
294 
295   stopping_ = true;
296 
297   task_runner()->PostTask(
298       FROM_HERE, base::BindOnce(&Thread::ThreadQuitHelper, Unretained(this)));
299 }
300 
DetachFromSequence()301 void Thread::DetachFromSequence() {
302   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
303   owning_sequence_checker_.DetachFromSequence();
304 }
305 
GetThreadId() const306 PlatformThreadId Thread::GetThreadId() const {
307   if (!id_event_.IsSignaled()) {
308     // If the thread is created but not started yet, wait for |id_| being ready.
309     base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
310     id_event_.Wait();
311   }
312   return id_;
313 }
314 
IsRunning() const315 bool Thread::IsRunning() const {
316   // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
317   // enable this check.
318   // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
319 
320   // If the thread's already started (i.e. |delegate_| is non-null) and
321   // not yet requested to stop (i.e. |stopping_| is false) we can just return
322   // true. (Note that |stopping_| is touched only on the same sequence that
323   // starts / started the new thread so we need no locking here.)
324   if (delegate_ && !stopping_)
325     return true;
326   // Otherwise check the |running_| flag, which is set to true by the new thread
327   // only while it is inside Run().
328   AutoLock lock(running_lock_);
329   return running_;
330 }
331 
Run(RunLoop * run_loop)332 void Thread::Run(RunLoop* run_loop) {
333   // Overridable protected method to be called from our |thread_| only.
334   DCHECK(id_event_.IsSignaled());
335   DCHECK_EQ(id_, PlatformThread::CurrentId());
336 
337   run_loop->Run();
338 }
339 
340 // static
SetThreadWasQuitProperly(bool flag)341 void Thread::SetThreadWasQuitProperly(bool flag) {
342 #if DCHECK_IS_ON()
343   was_quit_properly = flag;
344 #endif
345 }
346 
347 // static
GetThreadWasQuitProperly()348 bool Thread::GetThreadWasQuitProperly() {
349 #if DCHECK_IS_ON()
350   return was_quit_properly;
351 #else
352   return true;
353 #endif
354 }
355 
ThreadMain()356 void Thread::ThreadMain() {
357   // First, make GetThreadId() available to avoid deadlocks. It could be called
358   // any place in the following thread initialization code.
359   DCHECK(!id_event_.IsSignaled());
360   // Note: this read of |id_| while |id_event_| isn't signaled is exceptionally
361   // okay because ThreadMain has a happens-after relationship with the other
362   // write in StartWithOptions().
363   DCHECK_EQ(kInvalidThreadId, id_);
364   id_ = PlatformThread::CurrentId();
365   DCHECK_NE(kInvalidThreadId, id_);
366   id_event_.Signal();
367 
368   // Complete the initialization of our Thread object.
369   PlatformThread::SetName(name_.c_str());
370   ABSL_ANNOTATE_THREAD_NAME(name_.c_str());  // Tell the name to race detector.
371 
372   // Lazily initialize the |message_loop| so that it can run on this thread.
373   DCHECK(delegate_);
374   // This binds CurrentThread and SingleThreadTaskRunner::CurrentDefaultHandle.
375   delegate_->BindToCurrentThread();
376   DCHECK(CurrentThread::Get());
377   DCHECK(SingleThreadTaskRunner::HasCurrentDefault());
378 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
379   // Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API.
380   std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher;
381   if (CurrentIOThread::IsSet()) {
382     file_descriptor_watcher = std::make_unique<FileDescriptorWatcher>(
383         delegate_->GetDefaultTaskRunner());
384   }
385 #endif  // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
386 
387 #if BUILDFLAG(IS_WIN)
388   std::unique_ptr<win::ScopedCOMInitializer> com_initializer;
389   if (com_status_ != NONE) {
390     com_initializer.reset(
391         (com_status_ == STA)
392             ? new win::ScopedCOMInitializer()
393             : new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
394   }
395 #endif
396 
397   // Let the thread do extra initialization.
398   Init();
399 
400   {
401     AutoLock lock(running_lock_);
402     running_ = true;
403   }
404 
405   start_event_.Signal();
406 
407   RunLoop run_loop;
408   run_loop_ = &run_loop;
409   Run(run_loop_);
410 
411   {
412     AutoLock lock(running_lock_);
413     running_ = false;
414   }
415 
416   // Let the thread do extra cleanup.
417   CleanUp();
418 
419 #if BUILDFLAG(IS_WIN)
420   com_initializer.reset();
421 #endif
422 
423   DCHECK(GetThreadWasQuitProperly());
424 
425   // We can't receive messages anymore.
426   // (The message loop is destructed at the end of this block)
427   delegate_.reset();
428   run_loop_ = nullptr;
429 }
430 
ThreadQuitHelper()431 void Thread::ThreadQuitHelper() {
432   DCHECK(run_loop_);
433   run_loop_->QuitWhenIdle();
434   SetThreadWasQuitProperly(true);
435 }
436 
437 }  // namespace base
438