• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/thread.h"
12 
13 #include "absl/strings/string_view.h"
14 #include "api/units/time_delta.h"
15 #include "rtc_base/socket_server.h"
16 
17 #if defined(WEBRTC_WIN)
18 #include <comdef.h>
19 #elif defined(WEBRTC_POSIX)
20 #include <time.h>
21 #else
22 #error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined."
23 #endif
24 
25 #if defined(WEBRTC_WIN)
26 // Disable warning that we don't care about:
27 // warning C4722: destructor never returns, potential memory leak
28 #pragma warning(disable : 4722)
29 #endif
30 
31 #include <stdio.h>
32 
33 #include <utility>
34 
35 #include "absl/algorithm/container.h"
36 #include "absl/cleanup/cleanup.h"
37 #include "api/sequence_checker.h"
38 #include "rtc_base/checks.h"
39 #include "rtc_base/deprecated/recursive_critical_section.h"
40 #include "rtc_base/event.h"
41 #include "rtc_base/internal/default_socket_server.h"
42 #include "rtc_base/logging.h"
43 #include "rtc_base/null_socket_server.h"
44 #include "rtc_base/synchronization/mutex.h"
45 #include "rtc_base/time_utils.h"
46 #include "rtc_base/trace_event.h"
47 
48 #if defined(WEBRTC_MAC)
49 #include "rtc_base/system/cocoa_threading.h"
50 
51 /*
52  * These are forward-declarations for methods that are part of the
53  * ObjC runtime. They are declared in the private header objc-internal.h.
54  * These calls are what clang inserts when using @autoreleasepool in ObjC,
55  * but here they are used directly in order to keep this file C++.
56  * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support
57  */
58 extern "C" {
59 void* objc_autoreleasePoolPush(void);
60 void objc_autoreleasePoolPop(void* pool);
61 }
62 
63 namespace {
64 class ScopedAutoReleasePool {
65  public:
ScopedAutoReleasePool()66   ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {}
~ScopedAutoReleasePool()67   ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); }
68 
69  private:
70   void* const pool_;
71 };
72 }  // namespace
73 #endif
74 
75 namespace rtc {
76 namespace {
77 
78 using ::webrtc::MutexLock;
79 using ::webrtc::TimeDelta;
80 
81 class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
82  public:
MarkProcessingCritScope(const RecursiveCriticalSection * cs,size_t * processing)83   MarkProcessingCritScope(const RecursiveCriticalSection* cs,
84                           size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
85       : cs_(cs), processing_(processing) {
86     cs_->Enter();
87     *processing_ += 1;
88   }
89 
RTC_UNLOCK_FUNCTION()90   ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
91     *processing_ -= 1;
92     cs_->Leave();
93   }
94 
95   MarkProcessingCritScope(const MarkProcessingCritScope&) = delete;
96   MarkProcessingCritScope& operator=(const MarkProcessingCritScope&) = delete;
97 
98  private:
99   const RecursiveCriticalSection* const cs_;
100   size_t* processing_;
101 };
102 
103 }  // namespace
104 
Instance()105 ThreadManager* ThreadManager::Instance() {
106   static ThreadManager* const thread_manager = new ThreadManager();
107   return thread_manager;
108 }
109 
~ThreadManager()110 ThreadManager::~ThreadManager() {
111   // By above RTC_DEFINE_STATIC_LOCAL.
112   RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed.";
113 }
114 
115 // static
Add(Thread * message_queue)116 void ThreadManager::Add(Thread* message_queue) {
117   return Instance()->AddInternal(message_queue);
118 }
AddInternal(Thread * message_queue)119 void ThreadManager::AddInternal(Thread* message_queue) {
120   CritScope cs(&crit_);
121   // Prevent changes while the list of message queues is processed.
122   RTC_DCHECK_EQ(processing_, 0);
123   message_queues_.push_back(message_queue);
124 }
125 
126 // static
Remove(Thread * message_queue)127 void ThreadManager::Remove(Thread* message_queue) {
128   return Instance()->RemoveInternal(message_queue);
129 }
RemoveInternal(Thread * message_queue)130 void ThreadManager::RemoveInternal(Thread* message_queue) {
131   {
132     CritScope cs(&crit_);
133     // Prevent changes while the list of message queues is processed.
134     RTC_DCHECK_EQ(processing_, 0);
135     std::vector<Thread*>::iterator iter;
136     iter = absl::c_find(message_queues_, message_queue);
137     if (iter != message_queues_.end()) {
138       message_queues_.erase(iter);
139     }
140 #if RTC_DCHECK_IS_ON
141     RemoveFromSendGraph(message_queue);
142 #endif
143   }
144 }
145 
146 #if RTC_DCHECK_IS_ON
RemoveFromSendGraph(Thread * thread)147 void ThreadManager::RemoveFromSendGraph(Thread* thread) {
148   for (auto it = send_graph_.begin(); it != send_graph_.end();) {
149     if (it->first == thread) {
150       it = send_graph_.erase(it);
151     } else {
152       it->second.erase(thread);
153       ++it;
154     }
155   }
156 }
157 
RegisterSendAndCheckForCycles(Thread * source,Thread * target)158 void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
159                                                   Thread* target) {
160   RTC_DCHECK(source);
161   RTC_DCHECK(target);
162 
163   CritScope cs(&crit_);
164   std::deque<Thread*> all_targets({target});
165   // We check the pre-existing who-sends-to-who graph for any path from target
166   // to source. This loop is guaranteed to terminate because per the send graph
167   // invariant, there are no cycles in the graph.
168   for (size_t i = 0; i < all_targets.size(); i++) {
169     const auto& targets = send_graph_[all_targets[i]];
170     all_targets.insert(all_targets.end(), targets.begin(), targets.end());
171   }
172   RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
173       << " send loop between " << source->name() << " and " << target->name();
174 
175   // We may now insert source -> target without creating a cycle, since there
176   // was no path from target to source per the prior CHECK.
177   send_graph_[source].insert(target);
178 }
179 #endif
180 
181 // static
ProcessAllMessageQueuesForTesting()182 void ThreadManager::ProcessAllMessageQueuesForTesting() {
183   return Instance()->ProcessAllMessageQueuesInternal();
184 }
185 
ProcessAllMessageQueuesInternal()186 void ThreadManager::ProcessAllMessageQueuesInternal() {
187   // This works by posting a delayed message at the current time and waiting
188   // for it to be dispatched on all queues, which will ensure that all messages
189   // that came before it were also dispatched.
190   std::atomic<int> queues_not_done(0);
191 
192   {
193     MarkProcessingCritScope cs(&crit_, &processing_);
194     for (Thread* queue : message_queues_) {
195       if (!queue->IsProcessingMessagesForTesting()) {
196         // If the queue is not processing messages, it can
197         // be ignored. If we tried to post a message to it, it would be dropped
198         // or ignored.
199         continue;
200       }
201       queues_not_done.fetch_add(1);
202       // Whether the task is processed, or the thread is simply cleared,
203       // queues_not_done gets decremented.
204       absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); };
205       // Post delayed task instead of regular task to wait for all delayed tasks
206       // that are ready for processing.
207       queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero());
208     }
209   }
210 
211   rtc::Thread* current = rtc::Thread::Current();
212   // Note: One of the message queues may have been on this thread, which is
213   // why we can't synchronously wait for queues_not_done to go to 0; we need
214   // to process messages as well.
215   while (queues_not_done.load() > 0) {
216     if (current) {
217       current->ProcessMessages(0);
218     }
219   }
220 }
221 
222 // static
Current()223 Thread* Thread::Current() {
224   ThreadManager* manager = ThreadManager::Instance();
225   Thread* thread = manager->CurrentThread();
226 
227   return thread;
228 }
229 
230 #if defined(WEBRTC_POSIX)
ThreadManager()231 ThreadManager::ThreadManager() {
232 #if defined(WEBRTC_MAC)
233   InitCocoaMultiThreading();
234 #endif
235   pthread_key_create(&key_, nullptr);
236 }
237 
CurrentThread()238 Thread* ThreadManager::CurrentThread() {
239   return static_cast<Thread*>(pthread_getspecific(key_));
240 }
241 
SetCurrentThreadInternal(Thread * thread)242 void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
243   pthread_setspecific(key_, thread);
244 }
245 #endif
246 
247 #if defined(WEBRTC_WIN)
ThreadManager()248 ThreadManager::ThreadManager() : key_(TlsAlloc()) {}
249 
CurrentThread()250 Thread* ThreadManager::CurrentThread() {
251   return static_cast<Thread*>(TlsGetValue(key_));
252 }
253 
SetCurrentThreadInternal(Thread * thread)254 void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
255   TlsSetValue(key_, thread);
256 }
257 #endif
258 
SetCurrentThread(Thread * thread)259 void ThreadManager::SetCurrentThread(Thread* thread) {
260 #if RTC_DLOG_IS_ON
261   if (CurrentThread() && thread) {
262     RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
263   }
264 #endif  // RTC_DLOG_IS_ON
265 
266   if (thread) {
267     thread->EnsureIsCurrentTaskQueue();
268   } else {
269     Thread* current = CurrentThread();
270     if (current) {
271       // The current thread is being cleared, e.g. as a result of
272       // UnwrapCurrent() being called or when a thread is being stopped
273       // (see PreRun()). This signals that the Thread instance is being detached
274       // from the thread, which also means that TaskQueue::Current() must not
275       // return a pointer to the Thread instance.
276       current->ClearCurrentTaskQueue();
277     }
278   }
279 
280   SetCurrentThreadInternal(thread);
281 }
282 
ChangeCurrentThreadForTest(rtc::Thread * thread)283 void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) {
284   SetCurrentThreadInternal(thread);
285 }
286 
WrapCurrentThread()287 Thread* ThreadManager::WrapCurrentThread() {
288   Thread* result = CurrentThread();
289   if (nullptr == result) {
290     result = new Thread(CreateDefaultSocketServer());
291     result->WrapCurrentWithThreadManager(this, true);
292   }
293   return result;
294 }
295 
UnwrapCurrentThread()296 void ThreadManager::UnwrapCurrentThread() {
297   Thread* t = CurrentThread();
298   if (t && !(t->IsOwned())) {
299     t->UnwrapCurrent();
300     delete t;
301   }
302 }
303 
ScopedDisallowBlockingCalls()304 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
305     : thread_(Thread::Current()),
306       previous_state_(thread_->SetAllowBlockingCalls(false)) {}
307 
~ScopedDisallowBlockingCalls()308 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
309   RTC_DCHECK(thread_->IsCurrent());
310   thread_->SetAllowBlockingCalls(previous_state_);
311 }
312 
313 #if RTC_DCHECK_IS_ON
ScopedCountBlockingCalls(std::function<void (uint32_t,uint32_t)> callback)314 Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls(
315     std::function<void(uint32_t, uint32_t)> callback)
316     : thread_(Thread::Current()),
317       base_blocking_call_count_(thread_->GetBlockingCallCount()),
318       base_could_be_blocking_call_count_(
319           thread_->GetCouldBeBlockingCallCount()),
320       result_callback_(std::move(callback)) {}
321 
~ScopedCountBlockingCalls()322 Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() {
323   if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) {
324     result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount());
325   }
326 }
327 
GetBlockingCallCount() const328 uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const {
329   return thread_->GetBlockingCallCount() - base_blocking_call_count_;
330 }
331 
GetCouldBeBlockingCallCount() const332 uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const {
333   return thread_->GetCouldBeBlockingCallCount() -
334          base_could_be_blocking_call_count_;
335 }
336 
GetTotalBlockedCallCount() const337 uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const {
338   return GetBlockingCallCount() + GetCouldBeBlockingCallCount();
339 }
340 #endif
341 
Thread(SocketServer * ss)342 Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {}
343 
Thread(std::unique_ptr<SocketServer> ss)344 Thread::Thread(std::unique_ptr<SocketServer> ss)
345     : Thread(std::move(ss), /*do_init=*/true) {}
346 
Thread(SocketServer * ss,bool do_init)347 Thread::Thread(SocketServer* ss, bool do_init)
348     : delayed_next_num_(0),
349       fInitialized_(false),
350       fDestroyed_(false),
351       stop_(0),
352       ss_(ss) {
353   RTC_DCHECK(ss);
354   ss_->SetMessageQueue(this);
355   SetName("Thread", this);  // default name
356   if (do_init) {
357     DoInit();
358   }
359 }
360 
Thread(std::unique_ptr<SocketServer> ss,bool do_init)361 Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
362     : Thread(ss.get(), do_init) {
363   own_ss_ = std::move(ss);
364 }
365 
~Thread()366 Thread::~Thread() {
367   Stop();
368   DoDestroy();
369 }
370 
DoInit()371 void Thread::DoInit() {
372   if (fInitialized_) {
373     return;
374   }
375 
376   fInitialized_ = true;
377   ThreadManager::Add(this);
378 }
379 
DoDestroy()380 void Thread::DoDestroy() {
381   if (fDestroyed_) {
382     return;
383   }
384 
385   fDestroyed_ = true;
386   // The signal is done from here to ensure
387   // that it always gets called when the queue
388   // is going away.
389   if (ss_) {
390     ss_->SetMessageQueue(nullptr);
391   }
392   ThreadManager::Remove(this);
393   // Clear.
394   messages_ = {};
395   delayed_messages_ = {};
396 }
397 
socketserver()398 SocketServer* Thread::socketserver() {
399   return ss_;
400 }
401 
WakeUpSocketServer()402 void Thread::WakeUpSocketServer() {
403   ss_->WakeUp();
404 }
405 
Quit()406 void Thread::Quit() {
407   stop_.store(1, std::memory_order_release);
408   WakeUpSocketServer();
409 }
410 
IsQuitting()411 bool Thread::IsQuitting() {
412   return stop_.load(std::memory_order_acquire) != 0;
413 }
414 
Restart()415 void Thread::Restart() {
416   stop_.store(0, std::memory_order_release);
417 }
418 
Get(int cmsWait)419 absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) {
420   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
421 
422   int64_t cmsTotal = cmsWait;
423   int64_t cmsElapsed = 0;
424   int64_t msStart = TimeMillis();
425   int64_t msCurrent = msStart;
426   while (true) {
427     // Check for posted events
428     int64_t cmsDelayNext = kForever;
429     {
430       // All queue operations need to be locked, but nothing else in this loop
431       // can happen while holding the `mutex_`.
432       MutexLock lock(&mutex_);
433       // Check for delayed messages that have been triggered and calculate the
434       // next trigger time.
435       while (!delayed_messages_.empty()) {
436         if (msCurrent < delayed_messages_.top().run_time_ms) {
437           cmsDelayNext =
438               TimeDiff(delayed_messages_.top().run_time_ms, msCurrent);
439           break;
440         }
441         messages_.push(std::move(delayed_messages_.top().functor));
442         delayed_messages_.pop();
443       }
444       // Pull a message off the message queue, if available.
445       if (!messages_.empty()) {
446         absl::AnyInvocable<void()&&> task = std::move(messages_.front());
447         messages_.pop();
448         return task;
449       }
450     }
451 
452     if (IsQuitting())
453       break;
454 
455     // Which is shorter, the delay wait or the asked wait?
456 
457     int64_t cmsNext;
458     if (cmsWait == kForever) {
459       cmsNext = cmsDelayNext;
460     } else {
461       cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
462       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
463         cmsNext = cmsDelayNext;
464     }
465 
466     {
467       // Wait and multiplex in the meantime
468       if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
469                                          : webrtc::TimeDelta::Millis(cmsNext),
470                      /*process_io=*/true))
471         return nullptr;
472     }
473 
474     // If the specified timeout expired, return
475 
476     msCurrent = TimeMillis();
477     cmsElapsed = TimeDiff(msCurrent, msStart);
478     if (cmsWait != kForever) {
479       if (cmsElapsed >= cmsWait)
480         return nullptr;
481     }
482   }
483   return nullptr;
484 }
485 
PostTask(absl::AnyInvocable<void ()&&> task)486 void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
487   if (IsQuitting()) {
488     return;
489   }
490 
491   // Keep thread safe
492   // Add the message to the end of the queue
493   // Signal for the multiplexer to return
494 
495   {
496     MutexLock lock(&mutex_);
497     messages_.push(std::move(task));
498   }
499   WakeUpSocketServer();
500 }
501 
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,webrtc::TimeDelta delay)502 void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
503                                           webrtc::TimeDelta delay) {
504   if (IsQuitting()) {
505     return;
506   }
507 
508   // Keep thread safe
509   // Add to the priority queue. Gets sorted soonest first.
510   // Signal for the multiplexer to return.
511 
512   int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
513   int64_t run_time_ms = TimeAfter(delay_ms);
514   {
515     MutexLock lock(&mutex_);
516     delayed_messages_.push({.delay_ms = delay_ms,
517                             .run_time_ms = run_time_ms,
518                             .message_number = delayed_next_num_,
519                             .functor = std::move(task)});
520     // If this message queue processes 1 message every millisecond for 50 days,
521     // we will wrap this number.  Even then, only messages with identical times
522     // will be misordered, and then only briefly.  This is probably ok.
523     ++delayed_next_num_;
524     RTC_DCHECK_NE(0, delayed_next_num_);
525   }
526   WakeUpSocketServer();
527 }
528 
GetDelay()529 int Thread::GetDelay() {
530   MutexLock lock(&mutex_);
531 
532   if (!messages_.empty())
533     return 0;
534 
535   if (!delayed_messages_.empty()) {
536     int delay = TimeUntil(delayed_messages_.top().run_time_ms);
537     if (delay < 0)
538       delay = 0;
539     return delay;
540   }
541 
542   return kForever;
543 }
544 
Dispatch(absl::AnyInvocable<void ()&&> task)545 void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
546   TRACE_EVENT0("webrtc", "Thread::Dispatch");
547   RTC_DCHECK_RUN_ON(this);
548   int64_t start_time = TimeMillis();
549   std::move(task)();
550   int64_t end_time = TimeMillis();
551   int64_t diff = TimeDiff(end_time, start_time);
552   if (diff >= dispatch_warning_ms_) {
553     RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
554                      << "ms to dispatch.";
555     // To avoid log spew, move the warning limit to only give warning
556     // for delays that are larger than the one observed.
557     dispatch_warning_ms_ = diff + 1;
558   }
559 }
560 
IsCurrent() const561 bool Thread::IsCurrent() const {
562   return ThreadManager::Instance()->CurrentThread() == this;
563 }
564 
CreateWithSocketServer()565 std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
566   return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer()));
567 }
568 
Create()569 std::unique_ptr<Thread> Thread::Create() {
570   return std::unique_ptr<Thread>(
571       new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
572 }
573 
SleepMs(int milliseconds)574 bool Thread::SleepMs(int milliseconds) {
575   AssertBlockingIsAllowedOnCurrentThread();
576 
577 #if defined(WEBRTC_WIN)
578   ::Sleep(milliseconds);
579   return true;
580 #else
581   // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
582   // so we use nanosleep() even though it has greater precision than necessary.
583   struct timespec ts;
584   ts.tv_sec = milliseconds / 1000;
585   ts.tv_nsec = (milliseconds % 1000) * 1000000;
586   int ret = nanosleep(&ts, nullptr);
587   if (ret != 0) {
588     RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early";
589     return false;
590   }
591   return true;
592 #endif
593 }
594 
SetName(absl::string_view name,const void * obj)595 bool Thread::SetName(absl::string_view name, const void* obj) {
596   RTC_DCHECK(!IsRunning());
597 
598   name_ = std::string(name);
599   if (obj) {
600     // The %p specifier typically produce at most 16 hex digits, possibly with a
601     // 0x prefix. But format is implementation defined, so add some margin.
602     char buf[30];
603     snprintf(buf, sizeof(buf), " 0x%p", obj);
604     name_ += buf;
605   }
606   return true;
607 }
608 
SetDispatchWarningMs(int deadline)609 void Thread::SetDispatchWarningMs(int deadline) {
610   if (!IsCurrent()) {
611     PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
612     return;
613   }
614   RTC_DCHECK_RUN_ON(this);
615   dispatch_warning_ms_ = deadline;
616 }
617 
Start()618 bool Thread::Start() {
619   RTC_DCHECK(!IsRunning());
620 
621   if (IsRunning())
622     return false;
623 
624   Restart();  // reset IsQuitting() if the thread is being restarted
625 
626   // Make sure that ThreadManager is created on the main thread before
627   // we start a new thread.
628   ThreadManager::Instance();
629 
630   owned_ = true;
631 
632 #if defined(WEBRTC_WIN)
633   thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);
634   if (!thread_) {
635     return false;
636   }
637 #elif defined(WEBRTC_POSIX)
638   pthread_attr_t attr;
639   pthread_attr_init(&attr);
640 
641   int error_code = pthread_create(&thread_, &attr, PreRun, this);
642   if (0 != error_code) {
643     RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
644     thread_ = 0;
645     return false;
646   }
647   RTC_DCHECK(thread_);
648 #endif
649   return true;
650 }
651 
WrapCurrent()652 bool Thread::WrapCurrent() {
653   return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
654 }
655 
UnwrapCurrent()656 void Thread::UnwrapCurrent() {
657   // Clears the platform-specific thread-specific storage.
658   ThreadManager::Instance()->SetCurrentThread(nullptr);
659 #if defined(WEBRTC_WIN)
660   if (thread_ != nullptr) {
661     if (!CloseHandle(thread_)) {
662       RTC_LOG_GLE(LS_ERROR)
663           << "When unwrapping thread, failed to close handle.";
664     }
665     thread_ = nullptr;
666     thread_id_ = 0;
667   }
668 #elif defined(WEBRTC_POSIX)
669   thread_ = 0;
670 #endif
671 }
672 
SafeWrapCurrent()673 void Thread::SafeWrapCurrent() {
674   WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
675 }
676 
Join()677 void Thread::Join() {
678   if (!IsRunning())
679     return;
680 
681   RTC_DCHECK(!IsCurrent());
682   if (Current() && !Current()->blocking_calls_allowed_) {
683     RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "
684                            "but blocking calls have been disallowed";
685   }
686 
687 #if defined(WEBRTC_WIN)
688   RTC_DCHECK(thread_ != nullptr);
689   WaitForSingleObject(thread_, INFINITE);
690   CloseHandle(thread_);
691   thread_ = nullptr;
692   thread_id_ = 0;
693 #elif defined(WEBRTC_POSIX)
694   pthread_join(thread_, nullptr);
695   thread_ = 0;
696 #endif
697 }
698 
SetAllowBlockingCalls(bool allow)699 bool Thread::SetAllowBlockingCalls(bool allow) {
700   RTC_DCHECK(IsCurrent());
701   bool previous = blocking_calls_allowed_;
702   blocking_calls_allowed_ = allow;
703   return previous;
704 }
705 
706 // static
AssertBlockingIsAllowedOnCurrentThread()707 void Thread::AssertBlockingIsAllowedOnCurrentThread() {
708 #if !defined(NDEBUG)
709   Thread* current = Thread::Current();
710   RTC_DCHECK(!current || current->blocking_calls_allowed_);
711 #endif
712 }
713 
714 // static
715 #if defined(WEBRTC_WIN)
PreRun(LPVOID pv)716 DWORD WINAPI Thread::PreRun(LPVOID pv) {
717 #else
718 void* Thread::PreRun(void* pv) {
719 #endif
720   Thread* thread = static_cast<Thread*>(pv);
721   ThreadManager::Instance()->SetCurrentThread(thread);
722   rtc::SetCurrentThreadName(thread->name_.c_str());
723 #if defined(WEBRTC_MAC)
724   ScopedAutoReleasePool pool;
725 #endif
726   thread->Run();
727 
728   ThreadManager::Instance()->SetCurrentThread(nullptr);
729 #ifdef WEBRTC_WIN
730   return 0;
731 #else
732   return nullptr;
733 #endif
734 }  // namespace rtc
735 
736 void Thread::Run() {
737   ProcessMessages(kForever);
738 }
739 
740 bool Thread::IsOwned() {
741   RTC_DCHECK(IsRunning());
742   return owned_;
743 }
744 
745 void Thread::Stop() {
746   Thread::Quit();
747   Join();
748 }
749 
750 void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
751   TRACE_EVENT0("webrtc", "Thread::BlockingCall");
752 
753   RTC_DCHECK(!IsQuitting());
754   if (IsQuitting())
755     return;
756 
757   if (IsCurrent()) {
758 #if RTC_DCHECK_IS_ON
759     RTC_DCHECK(this->IsInvokeToThreadAllowed(this));
760     RTC_DCHECK_RUN_ON(this);
761     could_be_blocking_call_count_++;
762 #endif
763     functor();
764     return;
765   }
766 
767   AssertBlockingIsAllowedOnCurrentThread();
768 
769   Thread* current_thread = Thread::Current();
770 
771 #if RTC_DCHECK_IS_ON
772   if (current_thread) {
773     RTC_DCHECK_RUN_ON(current_thread);
774     current_thread->blocking_call_count_++;
775     RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
776     ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
777                                                              this);
778   }
779 #endif
780 
781   // Perhaps down the line we can get rid of this workaround and always require
782   // current_thread to be valid when BlockingCall() is called.
783   std::unique_ptr<rtc::Event> done_event;
784   if (!current_thread)
785     done_event.reset(new rtc::Event());
786 
787   bool ready = false;
788   absl::Cleanup cleanup = [this, &ready, current_thread,
789                            done = done_event.get()] {
790     if (current_thread) {
791       {
792         MutexLock lock(&mutex_);
793         ready = true;
794       }
795       current_thread->socketserver()->WakeUp();
796     } else {
797       done->Set();
798     }
799   };
800   PostTask([functor, cleanup = std::move(cleanup)] { functor(); });
801   if (current_thread) {
802     bool waited = false;
803     mutex_.Lock();
804     while (!ready) {
805       mutex_.Unlock();
806       current_thread->socketserver()->Wait(SocketServer::kForever, false);
807       waited = true;
808       mutex_.Lock();
809     }
810     mutex_.Unlock();
811 
812     // Our Wait loop above may have consumed some WakeUp events for this
813     // Thread, that weren't relevant to this Send.  Losing these WakeUps can
814     // cause problems for some SocketServers.
815     //
816     // Concrete example:
817     // Win32SocketServer on thread A calls Send on thread B.  While processing
818     // the message, thread B Posts a message to A.  We consume the wakeup for
819     // that Post while waiting for the Send to complete, which means that when
820     // we exit this loop, we need to issue another WakeUp, or else the Posted
821     // message won't be processed in a timely manner.
822 
823     if (waited) {
824       current_thread->socketserver()->WakeUp();
825     }
826   } else {
827     done_event->Wait(rtc::Event::kForever);
828   }
829 }
830 
831 // Called by the ThreadManager when being set as the current thread.
832 void Thread::EnsureIsCurrentTaskQueue() {
833   task_queue_registration_ =
834       std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this);
835 }
836 
837 // Called by the ThreadManager when being set as the current thread.
838 void Thread::ClearCurrentTaskQueue() {
839   task_queue_registration_.reset();
840 }
841 
842 void Thread::AllowInvokesToThread(Thread* thread) {
843 #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
844   if (!IsCurrent()) {
845     PostTask([thread, this]() { AllowInvokesToThread(thread); });
846     return;
847   }
848   RTC_DCHECK_RUN_ON(this);
849   allowed_threads_.push_back(thread);
850   invoke_policy_enabled_ = true;
851 #endif
852 }
853 
854 void Thread::DisallowAllInvokes() {
855 #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
856   if (!IsCurrent()) {
857     PostTask([this]() { DisallowAllInvokes(); });
858     return;
859   }
860   RTC_DCHECK_RUN_ON(this);
861   allowed_threads_.clear();
862   invoke_policy_enabled_ = true;
863 #endif
864 }
865 
866 #if RTC_DCHECK_IS_ON
867 uint32_t Thread::GetBlockingCallCount() const {
868   RTC_DCHECK_RUN_ON(this);
869   return blocking_call_count_;
870 }
871 uint32_t Thread::GetCouldBeBlockingCallCount() const {
872   RTC_DCHECK_RUN_ON(this);
873   return could_be_blocking_call_count_;
874 }
875 #endif
876 
877 // Returns true if no policies added or if there is at least one policy
878 // that permits invocation to `target` thread.
879 bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
880 #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
881   RTC_DCHECK_RUN_ON(this);
882   if (!invoke_policy_enabled_) {
883     return true;
884   }
885   for (const auto* thread : allowed_threads_) {
886     if (thread == target) {
887       return true;
888     }
889   }
890   return false;
891 #else
892   return true;
893 #endif
894 }
895 
896 void Thread::Delete() {
897   Stop();
898   delete this;
899 }
900 
901 void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
902                              webrtc::TimeDelta delay) {
903   // This implementation does not support low precision yet.
904   PostDelayedHighPrecisionTask(std::move(task), delay);
905 }
906 
907 bool Thread::IsProcessingMessagesForTesting() {
908   return (owned_ || IsCurrent()) && !IsQuitting();
909 }
910 
911 bool Thread::ProcessMessages(int cmsLoop) {
912   // Using ProcessMessages with a custom clock for testing and a time greater
913   // than 0 doesn't work, since it's not guaranteed to advance the custom
914   // clock's time, and may get stuck in an infinite loop.
915   RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
916              cmsLoop == kForever);
917   int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
918   int cmsNext = cmsLoop;
919 
920   while (true) {
921 #if defined(WEBRTC_MAC)
922     ScopedAutoReleasePool pool;
923 #endif
924     absl::AnyInvocable<void()&&> task = Get(cmsNext);
925     if (!task)
926       return !IsQuitting();
927     Dispatch(std::move(task));
928 
929     if (cmsLoop != kForever) {
930       cmsNext = static_cast<int>(TimeUntil(msEnd));
931       if (cmsNext < 0)
932         return true;
933     }
934   }
935 }
936 
937 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
938                                           bool need_synchronize_access) {
939   RTC_DCHECK(!IsRunning());
940 
941 #if defined(WEBRTC_WIN)
942   if (need_synchronize_access) {
943     // We explicitly ask for no rights other than synchronization.
944     // This gives us the best chance of succeeding.
945     thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
946     if (!thread_) {
947       RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
948       return false;
949     }
950     thread_id_ = GetCurrentThreadId();
951   }
952 #elif defined(WEBRTC_POSIX)
953   thread_ = pthread_self();
954 #endif
955   owned_ = false;
956   thread_manager->SetCurrentThread(this);
957   return true;
958 }
959 
960 bool Thread::IsRunning() {
961 #if defined(WEBRTC_WIN)
962   return thread_ != nullptr;
963 #elif defined(WEBRTC_POSIX)
964   return thread_ != 0;
965 #endif
966 }
967 
968 AutoThread::AutoThread()
969     : Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
970   if (!ThreadManager::Instance()->CurrentThread()) {
971     // DoInit registers with ThreadManager. Do that only if we intend to
972     // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
973     // post a message to a queue that no running thread is serving.
974     DoInit();
975     ThreadManager::Instance()->SetCurrentThread(this);
976   }
977 }
978 
979 AutoThread::~AutoThread() {
980   Stop();
981   DoDestroy();
982   if (ThreadManager::Instance()->CurrentThread() == this) {
983     ThreadManager::Instance()->SetCurrentThread(nullptr);
984   }
985 }
986 
987 AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss)
988     : Thread(ss, /*do_init=*/false) {
989   DoInit();
990   old_thread_ = ThreadManager::Instance()->CurrentThread();
991   // Temporarily set the current thread to nullptr so that we can keep checks
992   // around that catch unintentional pointer overwrites.
993   rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
994   rtc::ThreadManager::Instance()->SetCurrentThread(this);
995   if (old_thread_) {
996     ThreadManager::Remove(old_thread_);
997   }
998 }
999 
1000 AutoSocketServerThread::~AutoSocketServerThread() {
1001   RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this);
1002   // Stop and destroy the thread before clearing it as the current thread.
1003   // Sometimes there are messages left in the Thread that will be
1004   // destroyed by DoDestroy, and sometimes the destructors of the message and/or
1005   // its contents rely on this thread still being set as the current thread.
1006   Stop();
1007   DoDestroy();
1008   rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
1009   rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
1010   if (old_thread_) {
1011     ThreadManager::Add(old_thread_);
1012   }
1013 }
1014 
1015 }  // namespace rtc
1016