1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "rtc_base/thread.h"
12
13 #include "absl/strings/string_view.h"
14 #include "api/units/time_delta.h"
15 #include "rtc_base/socket_server.h"
16
17 #if defined(WEBRTC_WIN)
18 #include <comdef.h>
19 #elif defined(WEBRTC_POSIX)
20 #include <time.h>
21 #else
22 #error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined."
23 #endif
24
25 #if defined(WEBRTC_WIN)
26 // Disable warning that we don't care about:
27 // warning C4722: destructor never returns, potential memory leak
28 #pragma warning(disable : 4722)
29 #endif
30
31 #include <stdio.h>
32
33 #include <utility>
34
35 #include "absl/algorithm/container.h"
36 #include "absl/cleanup/cleanup.h"
37 #include "api/sequence_checker.h"
38 #include "rtc_base/checks.h"
39 #include "rtc_base/deprecated/recursive_critical_section.h"
40 #include "rtc_base/event.h"
41 #include "rtc_base/internal/default_socket_server.h"
42 #include "rtc_base/logging.h"
43 #include "rtc_base/null_socket_server.h"
44 #include "rtc_base/synchronization/mutex.h"
45 #include "rtc_base/time_utils.h"
46 #include "rtc_base/trace_event.h"
47
48 #if defined(WEBRTC_MAC)
49 #include "rtc_base/system/cocoa_threading.h"
50
51 /*
52 * These are forward-declarations for methods that are part of the
53 * ObjC runtime. They are declared in the private header objc-internal.h.
54 * These calls are what clang inserts when using @autoreleasepool in ObjC,
55 * but here they are used directly in order to keep this file C++.
56 * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support
57 */
58 extern "C" {
59 void* objc_autoreleasePoolPush(void);
60 void objc_autoreleasePoolPop(void* pool);
61 }
62
63 namespace {
64 class ScopedAutoReleasePool {
65 public:
ScopedAutoReleasePool()66 ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {}
~ScopedAutoReleasePool()67 ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); }
68
69 private:
70 void* const pool_;
71 };
72 } // namespace
73 #endif
74
75 namespace rtc {
76 namespace {
77
78 using ::webrtc::MutexLock;
79 using ::webrtc::TimeDelta;
80
81 class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
82 public:
MarkProcessingCritScope(const RecursiveCriticalSection * cs,size_t * processing)83 MarkProcessingCritScope(const RecursiveCriticalSection* cs,
84 size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
85 : cs_(cs), processing_(processing) {
86 cs_->Enter();
87 *processing_ += 1;
88 }
89
RTC_UNLOCK_FUNCTION()90 ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
91 *processing_ -= 1;
92 cs_->Leave();
93 }
94
95 MarkProcessingCritScope(const MarkProcessingCritScope&) = delete;
96 MarkProcessingCritScope& operator=(const MarkProcessingCritScope&) = delete;
97
98 private:
99 const RecursiveCriticalSection* const cs_;
100 size_t* processing_;
101 };
102
103 } // namespace
104
Instance()105 ThreadManager* ThreadManager::Instance() {
106 static ThreadManager* const thread_manager = new ThreadManager();
107 return thread_manager;
108 }
109
~ThreadManager()110 ThreadManager::~ThreadManager() {
111 // By above RTC_DEFINE_STATIC_LOCAL.
112 RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed.";
113 }
114
115 // static
Add(Thread * message_queue)116 void ThreadManager::Add(Thread* message_queue) {
117 return Instance()->AddInternal(message_queue);
118 }
AddInternal(Thread * message_queue)119 void ThreadManager::AddInternal(Thread* message_queue) {
120 CritScope cs(&crit_);
121 // Prevent changes while the list of message queues is processed.
122 RTC_DCHECK_EQ(processing_, 0);
123 message_queues_.push_back(message_queue);
124 }
125
126 // static
Remove(Thread * message_queue)127 void ThreadManager::Remove(Thread* message_queue) {
128 return Instance()->RemoveInternal(message_queue);
129 }
RemoveInternal(Thread * message_queue)130 void ThreadManager::RemoveInternal(Thread* message_queue) {
131 {
132 CritScope cs(&crit_);
133 // Prevent changes while the list of message queues is processed.
134 RTC_DCHECK_EQ(processing_, 0);
135 std::vector<Thread*>::iterator iter;
136 iter = absl::c_find(message_queues_, message_queue);
137 if (iter != message_queues_.end()) {
138 message_queues_.erase(iter);
139 }
140 #if RTC_DCHECK_IS_ON
141 RemoveFromSendGraph(message_queue);
142 #endif
143 }
144 }
145
146 #if RTC_DCHECK_IS_ON
RemoveFromSendGraph(Thread * thread)147 void ThreadManager::RemoveFromSendGraph(Thread* thread) {
148 for (auto it = send_graph_.begin(); it != send_graph_.end();) {
149 if (it->first == thread) {
150 it = send_graph_.erase(it);
151 } else {
152 it->second.erase(thread);
153 ++it;
154 }
155 }
156 }
157
RegisterSendAndCheckForCycles(Thread * source,Thread * target)158 void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
159 Thread* target) {
160 RTC_DCHECK(source);
161 RTC_DCHECK(target);
162
163 CritScope cs(&crit_);
164 std::deque<Thread*> all_targets({target});
165 // We check the pre-existing who-sends-to-who graph for any path from target
166 // to source. This loop is guaranteed to terminate because per the send graph
167 // invariant, there are no cycles in the graph.
168 for (size_t i = 0; i < all_targets.size(); i++) {
169 const auto& targets = send_graph_[all_targets[i]];
170 all_targets.insert(all_targets.end(), targets.begin(), targets.end());
171 }
172 RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
173 << " send loop between " << source->name() << " and " << target->name();
174
175 // We may now insert source -> target without creating a cycle, since there
176 // was no path from target to source per the prior CHECK.
177 send_graph_[source].insert(target);
178 }
179 #endif
180
181 // static
ProcessAllMessageQueuesForTesting()182 void ThreadManager::ProcessAllMessageQueuesForTesting() {
183 return Instance()->ProcessAllMessageQueuesInternal();
184 }
185
ProcessAllMessageQueuesInternal()186 void ThreadManager::ProcessAllMessageQueuesInternal() {
187 // This works by posting a delayed message at the current time and waiting
188 // for it to be dispatched on all queues, which will ensure that all messages
189 // that came before it were also dispatched.
190 std::atomic<int> queues_not_done(0);
191
192 {
193 MarkProcessingCritScope cs(&crit_, &processing_);
194 for (Thread* queue : message_queues_) {
195 if (!queue->IsProcessingMessagesForTesting()) {
196 // If the queue is not processing messages, it can
197 // be ignored. If we tried to post a message to it, it would be dropped
198 // or ignored.
199 continue;
200 }
201 queues_not_done.fetch_add(1);
202 // Whether the task is processed, or the thread is simply cleared,
203 // queues_not_done gets decremented.
204 absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); };
205 // Post delayed task instead of regular task to wait for all delayed tasks
206 // that are ready for processing.
207 queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero());
208 }
209 }
210
211 rtc::Thread* current = rtc::Thread::Current();
212 // Note: One of the message queues may have been on this thread, which is
213 // why we can't synchronously wait for queues_not_done to go to 0; we need
214 // to process messages as well.
215 while (queues_not_done.load() > 0) {
216 if (current) {
217 current->ProcessMessages(0);
218 }
219 }
220 }
221
222 // static
Current()223 Thread* Thread::Current() {
224 ThreadManager* manager = ThreadManager::Instance();
225 Thread* thread = manager->CurrentThread();
226
227 return thread;
228 }
229
230 #if defined(WEBRTC_POSIX)
ThreadManager()231 ThreadManager::ThreadManager() {
232 #if defined(WEBRTC_MAC)
233 InitCocoaMultiThreading();
234 #endif
235 pthread_key_create(&key_, nullptr);
236 }
237
CurrentThread()238 Thread* ThreadManager::CurrentThread() {
239 return static_cast<Thread*>(pthread_getspecific(key_));
240 }
241
SetCurrentThreadInternal(Thread * thread)242 void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
243 pthread_setspecific(key_, thread);
244 }
245 #endif
246
247 #if defined(WEBRTC_WIN)
ThreadManager()248 ThreadManager::ThreadManager() : key_(TlsAlloc()) {}
249
CurrentThread()250 Thread* ThreadManager::CurrentThread() {
251 return static_cast<Thread*>(TlsGetValue(key_));
252 }
253
SetCurrentThreadInternal(Thread * thread)254 void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
255 TlsSetValue(key_, thread);
256 }
257 #endif
258
SetCurrentThread(Thread * thread)259 void ThreadManager::SetCurrentThread(Thread* thread) {
260 #if RTC_DLOG_IS_ON
261 if (CurrentThread() && thread) {
262 RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
263 }
264 #endif // RTC_DLOG_IS_ON
265
266 if (thread) {
267 thread->EnsureIsCurrentTaskQueue();
268 } else {
269 Thread* current = CurrentThread();
270 if (current) {
271 // The current thread is being cleared, e.g. as a result of
272 // UnwrapCurrent() being called or when a thread is being stopped
273 // (see PreRun()). This signals that the Thread instance is being detached
274 // from the thread, which also means that TaskQueue::Current() must not
275 // return a pointer to the Thread instance.
276 current->ClearCurrentTaskQueue();
277 }
278 }
279
280 SetCurrentThreadInternal(thread);
281 }
282
ChangeCurrentThreadForTest(rtc::Thread * thread)283 void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) {
284 SetCurrentThreadInternal(thread);
285 }
286
WrapCurrentThread()287 Thread* ThreadManager::WrapCurrentThread() {
288 Thread* result = CurrentThread();
289 if (nullptr == result) {
290 result = new Thread(CreateDefaultSocketServer());
291 result->WrapCurrentWithThreadManager(this, true);
292 }
293 return result;
294 }
295
UnwrapCurrentThread()296 void ThreadManager::UnwrapCurrentThread() {
297 Thread* t = CurrentThread();
298 if (t && !(t->IsOwned())) {
299 t->UnwrapCurrent();
300 delete t;
301 }
302 }
303
ScopedDisallowBlockingCalls()304 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
305 : thread_(Thread::Current()),
306 previous_state_(thread_->SetAllowBlockingCalls(false)) {}
307
~ScopedDisallowBlockingCalls()308 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
309 RTC_DCHECK(thread_->IsCurrent());
310 thread_->SetAllowBlockingCalls(previous_state_);
311 }
312
313 #if RTC_DCHECK_IS_ON
ScopedCountBlockingCalls(std::function<void (uint32_t,uint32_t)> callback)314 Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls(
315 std::function<void(uint32_t, uint32_t)> callback)
316 : thread_(Thread::Current()),
317 base_blocking_call_count_(thread_->GetBlockingCallCount()),
318 base_could_be_blocking_call_count_(
319 thread_->GetCouldBeBlockingCallCount()),
320 result_callback_(std::move(callback)) {}
321
~ScopedCountBlockingCalls()322 Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() {
323 if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) {
324 result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount());
325 }
326 }
327
GetBlockingCallCount() const328 uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const {
329 return thread_->GetBlockingCallCount() - base_blocking_call_count_;
330 }
331
GetCouldBeBlockingCallCount() const332 uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const {
333 return thread_->GetCouldBeBlockingCallCount() -
334 base_could_be_blocking_call_count_;
335 }
336
GetTotalBlockedCallCount() const337 uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const {
338 return GetBlockingCallCount() + GetCouldBeBlockingCallCount();
339 }
340 #endif
341
Thread(SocketServer * ss)342 Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {}
343
Thread(std::unique_ptr<SocketServer> ss)344 Thread::Thread(std::unique_ptr<SocketServer> ss)
345 : Thread(std::move(ss), /*do_init=*/true) {}
346
Thread(SocketServer * ss,bool do_init)347 Thread::Thread(SocketServer* ss, bool do_init)
348 : delayed_next_num_(0),
349 fInitialized_(false),
350 fDestroyed_(false),
351 stop_(0),
352 ss_(ss) {
353 RTC_DCHECK(ss);
354 ss_->SetMessageQueue(this);
355 SetName("Thread", this); // default name
356 if (do_init) {
357 DoInit();
358 }
359 }
360
Thread(std::unique_ptr<SocketServer> ss,bool do_init)361 Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
362 : Thread(ss.get(), do_init) {
363 own_ss_ = std::move(ss);
364 }
365
~Thread()366 Thread::~Thread() {
367 Stop();
368 DoDestroy();
369 }
370
DoInit()371 void Thread::DoInit() {
372 if (fInitialized_) {
373 return;
374 }
375
376 fInitialized_ = true;
377 ThreadManager::Add(this);
378 }
379
DoDestroy()380 void Thread::DoDestroy() {
381 if (fDestroyed_) {
382 return;
383 }
384
385 fDestroyed_ = true;
386 // The signal is done from here to ensure
387 // that it always gets called when the queue
388 // is going away.
389 if (ss_) {
390 ss_->SetMessageQueue(nullptr);
391 }
392 ThreadManager::Remove(this);
393 // Clear.
394 messages_ = {};
395 delayed_messages_ = {};
396 }
397
socketserver()398 SocketServer* Thread::socketserver() {
399 return ss_;
400 }
401
WakeUpSocketServer()402 void Thread::WakeUpSocketServer() {
403 ss_->WakeUp();
404 }
405
Quit()406 void Thread::Quit() {
407 stop_.store(1, std::memory_order_release);
408 WakeUpSocketServer();
409 }
410
IsQuitting()411 bool Thread::IsQuitting() {
412 return stop_.load(std::memory_order_acquire) != 0;
413 }
414
Restart()415 void Thread::Restart() {
416 stop_.store(0, std::memory_order_release);
417 }
418
Get(int cmsWait)419 absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) {
420 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
421
422 int64_t cmsTotal = cmsWait;
423 int64_t cmsElapsed = 0;
424 int64_t msStart = TimeMillis();
425 int64_t msCurrent = msStart;
426 while (true) {
427 // Check for posted events
428 int64_t cmsDelayNext = kForever;
429 {
430 // All queue operations need to be locked, but nothing else in this loop
431 // can happen while holding the `mutex_`.
432 MutexLock lock(&mutex_);
433 // Check for delayed messages that have been triggered and calculate the
434 // next trigger time.
435 while (!delayed_messages_.empty()) {
436 if (msCurrent < delayed_messages_.top().run_time_ms) {
437 cmsDelayNext =
438 TimeDiff(delayed_messages_.top().run_time_ms, msCurrent);
439 break;
440 }
441 messages_.push(std::move(delayed_messages_.top().functor));
442 delayed_messages_.pop();
443 }
444 // Pull a message off the message queue, if available.
445 if (!messages_.empty()) {
446 absl::AnyInvocable<void()&&> task = std::move(messages_.front());
447 messages_.pop();
448 return task;
449 }
450 }
451
452 if (IsQuitting())
453 break;
454
455 // Which is shorter, the delay wait or the asked wait?
456
457 int64_t cmsNext;
458 if (cmsWait == kForever) {
459 cmsNext = cmsDelayNext;
460 } else {
461 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
462 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
463 cmsNext = cmsDelayNext;
464 }
465
466 {
467 // Wait and multiplex in the meantime
468 if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
469 : webrtc::TimeDelta::Millis(cmsNext),
470 /*process_io=*/true))
471 return nullptr;
472 }
473
474 // If the specified timeout expired, return
475
476 msCurrent = TimeMillis();
477 cmsElapsed = TimeDiff(msCurrent, msStart);
478 if (cmsWait != kForever) {
479 if (cmsElapsed >= cmsWait)
480 return nullptr;
481 }
482 }
483 return nullptr;
484 }
485
PostTask(absl::AnyInvocable<void ()&&> task)486 void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
487 if (IsQuitting()) {
488 return;
489 }
490
491 // Keep thread safe
492 // Add the message to the end of the queue
493 // Signal for the multiplexer to return
494
495 {
496 MutexLock lock(&mutex_);
497 messages_.push(std::move(task));
498 }
499 WakeUpSocketServer();
500 }
501
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,webrtc::TimeDelta delay)502 void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
503 webrtc::TimeDelta delay) {
504 if (IsQuitting()) {
505 return;
506 }
507
508 // Keep thread safe
509 // Add to the priority queue. Gets sorted soonest first.
510 // Signal for the multiplexer to return.
511
512 int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
513 int64_t run_time_ms = TimeAfter(delay_ms);
514 {
515 MutexLock lock(&mutex_);
516 delayed_messages_.push({.delay_ms = delay_ms,
517 .run_time_ms = run_time_ms,
518 .message_number = delayed_next_num_,
519 .functor = std::move(task)});
520 // If this message queue processes 1 message every millisecond for 50 days,
521 // we will wrap this number. Even then, only messages with identical times
522 // will be misordered, and then only briefly. This is probably ok.
523 ++delayed_next_num_;
524 RTC_DCHECK_NE(0, delayed_next_num_);
525 }
526 WakeUpSocketServer();
527 }
528
GetDelay()529 int Thread::GetDelay() {
530 MutexLock lock(&mutex_);
531
532 if (!messages_.empty())
533 return 0;
534
535 if (!delayed_messages_.empty()) {
536 int delay = TimeUntil(delayed_messages_.top().run_time_ms);
537 if (delay < 0)
538 delay = 0;
539 return delay;
540 }
541
542 return kForever;
543 }
544
Dispatch(absl::AnyInvocable<void ()&&> task)545 void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
546 TRACE_EVENT0("webrtc", "Thread::Dispatch");
547 RTC_DCHECK_RUN_ON(this);
548 int64_t start_time = TimeMillis();
549 std::move(task)();
550 int64_t end_time = TimeMillis();
551 int64_t diff = TimeDiff(end_time, start_time);
552 if (diff >= dispatch_warning_ms_) {
553 RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
554 << "ms to dispatch.";
555 // To avoid log spew, move the warning limit to only give warning
556 // for delays that are larger than the one observed.
557 dispatch_warning_ms_ = diff + 1;
558 }
559 }
560
IsCurrent() const561 bool Thread::IsCurrent() const {
562 return ThreadManager::Instance()->CurrentThread() == this;
563 }
564
CreateWithSocketServer()565 std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
566 return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer()));
567 }
568
Create()569 std::unique_ptr<Thread> Thread::Create() {
570 return std::unique_ptr<Thread>(
571 new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
572 }
573
SleepMs(int milliseconds)574 bool Thread::SleepMs(int milliseconds) {
575 AssertBlockingIsAllowedOnCurrentThread();
576
577 #if defined(WEBRTC_WIN)
578 ::Sleep(milliseconds);
579 return true;
580 #else
581 // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
582 // so we use nanosleep() even though it has greater precision than necessary.
583 struct timespec ts;
584 ts.tv_sec = milliseconds / 1000;
585 ts.tv_nsec = (milliseconds % 1000) * 1000000;
586 int ret = nanosleep(&ts, nullptr);
587 if (ret != 0) {
588 RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early";
589 return false;
590 }
591 return true;
592 #endif
593 }
594
SetName(absl::string_view name,const void * obj)595 bool Thread::SetName(absl::string_view name, const void* obj) {
596 RTC_DCHECK(!IsRunning());
597
598 name_ = std::string(name);
599 if (obj) {
600 // The %p specifier typically produce at most 16 hex digits, possibly with a
601 // 0x prefix. But format is implementation defined, so add some margin.
602 char buf[30];
603 snprintf(buf, sizeof(buf), " 0x%p", obj);
604 name_ += buf;
605 }
606 return true;
607 }
608
SetDispatchWarningMs(int deadline)609 void Thread::SetDispatchWarningMs(int deadline) {
610 if (!IsCurrent()) {
611 PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
612 return;
613 }
614 RTC_DCHECK_RUN_ON(this);
615 dispatch_warning_ms_ = deadline;
616 }
617
Start()618 bool Thread::Start() {
619 RTC_DCHECK(!IsRunning());
620
621 if (IsRunning())
622 return false;
623
624 Restart(); // reset IsQuitting() if the thread is being restarted
625
626 // Make sure that ThreadManager is created on the main thread before
627 // we start a new thread.
628 ThreadManager::Instance();
629
630 owned_ = true;
631
632 #if defined(WEBRTC_WIN)
633 thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);
634 if (!thread_) {
635 return false;
636 }
637 #elif defined(WEBRTC_POSIX)
638 pthread_attr_t attr;
639 pthread_attr_init(&attr);
640
641 int error_code = pthread_create(&thread_, &attr, PreRun, this);
642 if (0 != error_code) {
643 RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
644 thread_ = 0;
645 return false;
646 }
647 RTC_DCHECK(thread_);
648 #endif
649 return true;
650 }
651
WrapCurrent()652 bool Thread::WrapCurrent() {
653 return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
654 }
655
UnwrapCurrent()656 void Thread::UnwrapCurrent() {
657 // Clears the platform-specific thread-specific storage.
658 ThreadManager::Instance()->SetCurrentThread(nullptr);
659 #if defined(WEBRTC_WIN)
660 if (thread_ != nullptr) {
661 if (!CloseHandle(thread_)) {
662 RTC_LOG_GLE(LS_ERROR)
663 << "When unwrapping thread, failed to close handle.";
664 }
665 thread_ = nullptr;
666 thread_id_ = 0;
667 }
668 #elif defined(WEBRTC_POSIX)
669 thread_ = 0;
670 #endif
671 }
672
SafeWrapCurrent()673 void Thread::SafeWrapCurrent() {
674 WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
675 }
676
Join()677 void Thread::Join() {
678 if (!IsRunning())
679 return;
680
681 RTC_DCHECK(!IsCurrent());
682 if (Current() && !Current()->blocking_calls_allowed_) {
683 RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "
684 "but blocking calls have been disallowed";
685 }
686
687 #if defined(WEBRTC_WIN)
688 RTC_DCHECK(thread_ != nullptr);
689 WaitForSingleObject(thread_, INFINITE);
690 CloseHandle(thread_);
691 thread_ = nullptr;
692 thread_id_ = 0;
693 #elif defined(WEBRTC_POSIX)
694 pthread_join(thread_, nullptr);
695 thread_ = 0;
696 #endif
697 }
698
SetAllowBlockingCalls(bool allow)699 bool Thread::SetAllowBlockingCalls(bool allow) {
700 RTC_DCHECK(IsCurrent());
701 bool previous = blocking_calls_allowed_;
702 blocking_calls_allowed_ = allow;
703 return previous;
704 }
705
706 // static
AssertBlockingIsAllowedOnCurrentThread()707 void Thread::AssertBlockingIsAllowedOnCurrentThread() {
708 #if !defined(NDEBUG)
709 Thread* current = Thread::Current();
710 RTC_DCHECK(!current || current->blocking_calls_allowed_);
711 #endif
712 }
713
714 // static
715 #if defined(WEBRTC_WIN)
PreRun(LPVOID pv)716 DWORD WINAPI Thread::PreRun(LPVOID pv) {
717 #else
718 void* Thread::PreRun(void* pv) {
719 #endif
720 Thread* thread = static_cast<Thread*>(pv);
721 ThreadManager::Instance()->SetCurrentThread(thread);
722 rtc::SetCurrentThreadName(thread->name_.c_str());
723 #if defined(WEBRTC_MAC)
724 ScopedAutoReleasePool pool;
725 #endif
726 thread->Run();
727
728 ThreadManager::Instance()->SetCurrentThread(nullptr);
729 #ifdef WEBRTC_WIN
730 return 0;
731 #else
732 return nullptr;
733 #endif
734 } // namespace rtc
735
736 void Thread::Run() {
737 ProcessMessages(kForever);
738 }
739
740 bool Thread::IsOwned() {
741 RTC_DCHECK(IsRunning());
742 return owned_;
743 }
744
745 void Thread::Stop() {
746 Thread::Quit();
747 Join();
748 }
749
750 void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
751 TRACE_EVENT0("webrtc", "Thread::BlockingCall");
752
753 RTC_DCHECK(!IsQuitting());
754 if (IsQuitting())
755 return;
756
757 if (IsCurrent()) {
758 #if RTC_DCHECK_IS_ON
759 RTC_DCHECK(this->IsInvokeToThreadAllowed(this));
760 RTC_DCHECK_RUN_ON(this);
761 could_be_blocking_call_count_++;
762 #endif
763 functor();
764 return;
765 }
766
767 AssertBlockingIsAllowedOnCurrentThread();
768
769 Thread* current_thread = Thread::Current();
770
771 #if RTC_DCHECK_IS_ON
772 if (current_thread) {
773 RTC_DCHECK_RUN_ON(current_thread);
774 current_thread->blocking_call_count_++;
775 RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
776 ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
777 this);
778 }
779 #endif
780
781 // Perhaps down the line we can get rid of this workaround and always require
782 // current_thread to be valid when BlockingCall() is called.
783 std::unique_ptr<rtc::Event> done_event;
784 if (!current_thread)
785 done_event.reset(new rtc::Event());
786
787 bool ready = false;
788 absl::Cleanup cleanup = [this, &ready, current_thread,
789 done = done_event.get()] {
790 if (current_thread) {
791 {
792 MutexLock lock(&mutex_);
793 ready = true;
794 }
795 current_thread->socketserver()->WakeUp();
796 } else {
797 done->Set();
798 }
799 };
800 PostTask([functor, cleanup = std::move(cleanup)] { functor(); });
801 if (current_thread) {
802 bool waited = false;
803 mutex_.Lock();
804 while (!ready) {
805 mutex_.Unlock();
806 current_thread->socketserver()->Wait(SocketServer::kForever, false);
807 waited = true;
808 mutex_.Lock();
809 }
810 mutex_.Unlock();
811
812 // Our Wait loop above may have consumed some WakeUp events for this
813 // Thread, that weren't relevant to this Send. Losing these WakeUps can
814 // cause problems for some SocketServers.
815 //
816 // Concrete example:
817 // Win32SocketServer on thread A calls Send on thread B. While processing
818 // the message, thread B Posts a message to A. We consume the wakeup for
819 // that Post while waiting for the Send to complete, which means that when
820 // we exit this loop, we need to issue another WakeUp, or else the Posted
821 // message won't be processed in a timely manner.
822
823 if (waited) {
824 current_thread->socketserver()->WakeUp();
825 }
826 } else {
827 done_event->Wait(rtc::Event::kForever);
828 }
829 }
830
831 // Called by the ThreadManager when being set as the current thread.
832 void Thread::EnsureIsCurrentTaskQueue() {
833 task_queue_registration_ =
834 std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this);
835 }
836
837 // Called by the ThreadManager when being set as the current thread.
838 void Thread::ClearCurrentTaskQueue() {
839 task_queue_registration_.reset();
840 }
841
842 void Thread::AllowInvokesToThread(Thread* thread) {
843 #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
844 if (!IsCurrent()) {
845 PostTask([thread, this]() { AllowInvokesToThread(thread); });
846 return;
847 }
848 RTC_DCHECK_RUN_ON(this);
849 allowed_threads_.push_back(thread);
850 invoke_policy_enabled_ = true;
851 #endif
852 }
853
854 void Thread::DisallowAllInvokes() {
855 #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
856 if (!IsCurrent()) {
857 PostTask([this]() { DisallowAllInvokes(); });
858 return;
859 }
860 RTC_DCHECK_RUN_ON(this);
861 allowed_threads_.clear();
862 invoke_policy_enabled_ = true;
863 #endif
864 }
865
866 #if RTC_DCHECK_IS_ON
867 uint32_t Thread::GetBlockingCallCount() const {
868 RTC_DCHECK_RUN_ON(this);
869 return blocking_call_count_;
870 }
871 uint32_t Thread::GetCouldBeBlockingCallCount() const {
872 RTC_DCHECK_RUN_ON(this);
873 return could_be_blocking_call_count_;
874 }
875 #endif
876
877 // Returns true if no policies added or if there is at least one policy
878 // that permits invocation to `target` thread.
879 bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
880 #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
881 RTC_DCHECK_RUN_ON(this);
882 if (!invoke_policy_enabled_) {
883 return true;
884 }
885 for (const auto* thread : allowed_threads_) {
886 if (thread == target) {
887 return true;
888 }
889 }
890 return false;
891 #else
892 return true;
893 #endif
894 }
895
896 void Thread::Delete() {
897 Stop();
898 delete this;
899 }
900
901 void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
902 webrtc::TimeDelta delay) {
903 // This implementation does not support low precision yet.
904 PostDelayedHighPrecisionTask(std::move(task), delay);
905 }
906
907 bool Thread::IsProcessingMessagesForTesting() {
908 return (owned_ || IsCurrent()) && !IsQuitting();
909 }
910
911 bool Thread::ProcessMessages(int cmsLoop) {
912 // Using ProcessMessages with a custom clock for testing and a time greater
913 // than 0 doesn't work, since it's not guaranteed to advance the custom
914 // clock's time, and may get stuck in an infinite loop.
915 RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
916 cmsLoop == kForever);
917 int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
918 int cmsNext = cmsLoop;
919
920 while (true) {
921 #if defined(WEBRTC_MAC)
922 ScopedAutoReleasePool pool;
923 #endif
924 absl::AnyInvocable<void()&&> task = Get(cmsNext);
925 if (!task)
926 return !IsQuitting();
927 Dispatch(std::move(task));
928
929 if (cmsLoop != kForever) {
930 cmsNext = static_cast<int>(TimeUntil(msEnd));
931 if (cmsNext < 0)
932 return true;
933 }
934 }
935 }
936
937 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
938 bool need_synchronize_access) {
939 RTC_DCHECK(!IsRunning());
940
941 #if defined(WEBRTC_WIN)
942 if (need_synchronize_access) {
943 // We explicitly ask for no rights other than synchronization.
944 // This gives us the best chance of succeeding.
945 thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
946 if (!thread_) {
947 RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
948 return false;
949 }
950 thread_id_ = GetCurrentThreadId();
951 }
952 #elif defined(WEBRTC_POSIX)
953 thread_ = pthread_self();
954 #endif
955 owned_ = false;
956 thread_manager->SetCurrentThread(this);
957 return true;
958 }
959
960 bool Thread::IsRunning() {
961 #if defined(WEBRTC_WIN)
962 return thread_ != nullptr;
963 #elif defined(WEBRTC_POSIX)
964 return thread_ != 0;
965 #endif
966 }
967
968 AutoThread::AutoThread()
969 : Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
970 if (!ThreadManager::Instance()->CurrentThread()) {
971 // DoInit registers with ThreadManager. Do that only if we intend to
972 // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
973 // post a message to a queue that no running thread is serving.
974 DoInit();
975 ThreadManager::Instance()->SetCurrentThread(this);
976 }
977 }
978
979 AutoThread::~AutoThread() {
980 Stop();
981 DoDestroy();
982 if (ThreadManager::Instance()->CurrentThread() == this) {
983 ThreadManager::Instance()->SetCurrentThread(nullptr);
984 }
985 }
986
987 AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss)
988 : Thread(ss, /*do_init=*/false) {
989 DoInit();
990 old_thread_ = ThreadManager::Instance()->CurrentThread();
991 // Temporarily set the current thread to nullptr so that we can keep checks
992 // around that catch unintentional pointer overwrites.
993 rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
994 rtc::ThreadManager::Instance()->SetCurrentThread(this);
995 if (old_thread_) {
996 ThreadManager::Remove(old_thread_);
997 }
998 }
999
1000 AutoSocketServerThread::~AutoSocketServerThread() {
1001 RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this);
1002 // Stop and destroy the thread before clearing it as the current thread.
1003 // Sometimes there are messages left in the Thread that will be
1004 // destroyed by DoDestroy, and sometimes the destructors of the message and/or
1005 // its contents rely on this thread still being set as the current thread.
1006 Stop();
1007 DoDestroy();
1008 rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
1009 rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
1010 if (old_thread_) {
1011 ThreadManager::Add(old_thread_);
1012 }
1013 }
1014
1015 } // namespace rtc
1016