• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #ifndef RTC_BASE_THREAD_H_
12 #define RTC_BASE_THREAD_H_
13 
14 #include <stdint.h>
15 
16 #include <list>
17 #include <map>
18 #include <memory>
19 #include <queue>
20 #include <set>
21 #include <string>
22 #include <type_traits>
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/strings/string_view.h"
27 
28 #if defined(WEBRTC_POSIX)
29 #include <pthread.h>
30 #endif
31 #include "absl/base/attributes.h"
32 #include "absl/functional/any_invocable.h"
33 #include "api/function_view.h"
34 #include "api/task_queue/task_queue_base.h"
35 #include "api/units/time_delta.h"
36 #include "rtc_base/checks.h"
37 #include "rtc_base/deprecated/recursive_critical_section.h"
38 #include "rtc_base/platform_thread_types.h"
39 #include "rtc_base/socket_server.h"
40 #include "rtc_base/synchronization/mutex.h"
41 #include "rtc_base/system/rtc_export.h"
42 #include "rtc_base/thread_annotations.h"
43 
44 #if defined(WEBRTC_WIN)
45 #include "rtc_base/win32.h"
46 #endif
47 
48 #if RTC_DCHECK_IS_ON
49 // Counts how many `Thread::BlockingCall` are made from within a scope and logs
50 // the number of blocking calls at the end of the scope.
51 #define RTC_LOG_THREAD_BLOCK_COUNT()                                        \
52   rtc::Thread::ScopedCountBlockingCalls blocked_call_count_printer(         \
53       [func = __func__](uint32_t actual_block, uint32_t could_block) {      \
54         auto total = actual_block + could_block;                            \
55         if (total) {                                                        \
56           RTC_LOG(LS_WARNING) << "Blocking " << func << ": total=" << total \
57                               << " (actual=" << actual_block                \
58                               << ", could=" << could_block << ")";          \
59         }                                                                   \
60       })
61 
62 // Adds an RTC_DCHECK_LE that checks that the number of blocking calls are
63 // less than or equal to a specific value. Use to avoid regressing in the
64 // number of blocking thread calls.
65 // Note: Use of this macro, requires RTC_LOG_THREAD_BLOCK_COUNT() to be called
66 // first.
67 #define RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x)                               \
68   do {                                                                       \
69     blocked_call_count_printer.set_minimum_call_count_for_callback(x + 1);   \
70     RTC_DCHECK_LE(blocked_call_count_printer.GetTotalBlockedCallCount(), x); \
71   } while (0)
72 #else
73 #define RTC_LOG_THREAD_BLOCK_COUNT()
74 #define RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x)
75 #endif
76 
77 namespace rtc {
78 
79 class Thread;
80 
81 class RTC_EXPORT ThreadManager {
82  public:
83   static const int kForever = -1;
84 
85   // Singleton, constructor and destructor are private.
86   static ThreadManager* Instance();
87 
88   static void Add(Thread* message_queue);
89   static void Remove(Thread* message_queue);
90 
91   // For testing purposes, for use with a simulated clock.
92   // Ensures that all message queues have processed delayed messages
93   // up until the current point in time.
94   static void ProcessAllMessageQueuesForTesting();
95 
96   Thread* CurrentThread();
97   void SetCurrentThread(Thread* thread);
98   // Allows changing the current thread, this is intended for tests where we
99   // want to simulate multiple threads running on a single physical thread.
100   void ChangeCurrentThreadForTest(Thread* thread);
101 
102   // Returns a thread object with its thread_ ivar set
103   // to whatever the OS uses to represent the thread.
104   // If there already *is* a Thread object corresponding to this thread,
105   // this method will return that.  Otherwise it creates a new Thread
106   // object whose wrapped() method will return true, and whose
107   // handle will, on Win32, be opened with only synchronization privileges -
108   // if you need more privilegs, rather than changing this method, please
109   // write additional code to adjust the privileges, or call a different
110   // factory method of your own devising, because this one gets used in
111   // unexpected contexts (like inside browser plugins) and it would be a
112   // shame to break it.  It is also conceivable on Win32 that we won't even
113   // be able to get synchronization privileges, in which case the result
114   // will have a null handle.
115   Thread* WrapCurrentThread();
116   void UnwrapCurrentThread();
117 
118 #if RTC_DCHECK_IS_ON
119   // Registers that a Send operation is to be performed between `source` and
120   // `target`, while checking that this does not cause a send cycle that could
121   // potentially cause a deadlock.
122   void RegisterSendAndCheckForCycles(Thread* source, Thread* target);
123 #endif
124 
125  private:
126   ThreadManager();
127   ~ThreadManager();
128 
129   ThreadManager(const ThreadManager&) = delete;
130   ThreadManager& operator=(const ThreadManager&) = delete;
131 
132   void SetCurrentThreadInternal(Thread* thread);
133   void AddInternal(Thread* message_queue);
134   void RemoveInternal(Thread* message_queue);
135   void ProcessAllMessageQueuesInternal();
136 #if RTC_DCHECK_IS_ON
137   void RemoveFromSendGraph(Thread* thread) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
138 #endif
139 
140   // This list contains all live Threads.
141   std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_);
142 
143   // Methods that don't modify the list of message queues may be called in a
144   // re-entrant fashion. "processing_" keeps track of the depth of re-entrant
145   // calls.
146   RecursiveCriticalSection crit_;
147   size_t processing_ RTC_GUARDED_BY(crit_) = 0;
148 #if RTC_DCHECK_IS_ON
149   // Represents all thread seand actions by storing all send targets per thread.
150   // This is used by RegisterSendAndCheckForCycles. This graph has no cycles
151   // since we will trigger a CHECK failure if a cycle is introduced.
152   std::map<Thread*, std::set<Thread*>> send_graph_ RTC_GUARDED_BY(crit_);
153 #endif
154 
155 #if defined(WEBRTC_POSIX)
156   pthread_key_t key_;
157 #endif
158 
159 #if defined(WEBRTC_WIN)
160   const DWORD key_;
161 #endif
162 };
163 
164 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS!  See ~Thread().
165 
166 class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
167  public:
168   static const int kForever = -1;
169 
170   // Create a new Thread and optionally assign it to the passed
171   // SocketServer. Subclasses that override Clear should pass false for
172   // init_queue and call DoInit() from their constructor to prevent races
173   // with the ThreadManager using the object while the vtable is still
174   // being created.
175   explicit Thread(SocketServer* ss);
176   explicit Thread(std::unique_ptr<SocketServer> ss);
177 
178   // Constructors meant for subclasses; they should call DoInit themselves and
179   // pass false for `do_init`, so that DoInit is called only on the fully
180   // instantiated class, which avoids a vptr data race.
181   Thread(SocketServer* ss, bool do_init);
182   Thread(std::unique_ptr<SocketServer> ss, bool do_init);
183 
184   // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or
185   // guarantee Stop() is explicitly called before the subclass is destroyed).
186   // This is required to avoid a data race between the destructor modifying the
187   // vtable, and the Thread::PreRun calling the virtual method Run().
188 
189   // NOTE: SUBCLASSES OF Thread THAT OVERRIDE Clear MUST CALL
190   // DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race
191   // between the destructor modifying the vtable, and the ThreadManager
192   // calling Clear on the object from a different thread.
193   ~Thread() override;
194 
195   Thread(const Thread&) = delete;
196   Thread& operator=(const Thread&) = delete;
197 
198   static std::unique_ptr<Thread> CreateWithSocketServer();
199   static std::unique_ptr<Thread> Create();
200   static Thread* Current();
201 
202   // Used to catch performance regressions. Use this to disallow BlockingCall
203   // for a given scope.  If a synchronous call is made while this is in
204   // effect, an assert will be triggered.
205   // Note that this is a single threaded class.
206   class ScopedDisallowBlockingCalls {
207    public:
208     ScopedDisallowBlockingCalls();
209     ScopedDisallowBlockingCalls(const ScopedDisallowBlockingCalls&) = delete;
210     ScopedDisallowBlockingCalls& operator=(const ScopedDisallowBlockingCalls&) =
211         delete;
212     ~ScopedDisallowBlockingCalls();
213 
214    private:
215     Thread* const thread_;
216     const bool previous_state_;
217   };
218 
219 #if RTC_DCHECK_IS_ON
220   class ScopedCountBlockingCalls {
221    public:
222     ScopedCountBlockingCalls(std::function<void(uint32_t, uint32_t)> callback);
223     ScopedCountBlockingCalls(const ScopedDisallowBlockingCalls&) = delete;
224     ScopedCountBlockingCalls& operator=(const ScopedDisallowBlockingCalls&) =
225         delete;
226     ~ScopedCountBlockingCalls();
227 
228     uint32_t GetBlockingCallCount() const;
229     uint32_t GetCouldBeBlockingCallCount() const;
230     uint32_t GetTotalBlockedCallCount() const;
231 
set_minimum_call_count_for_callback(uint32_t minimum)232     void set_minimum_call_count_for_callback(uint32_t minimum) {
233       min_blocking_calls_for_callback_ = minimum;
234     }
235 
236    private:
237     Thread* const thread_;
238     const uint32_t base_blocking_call_count_;
239     const uint32_t base_could_be_blocking_call_count_;
240     // The minimum number of blocking calls required in order to issue the
241     // result_callback_. This is used by RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN to
242     // tame log spam.
243     // By default we always issue the callback, regardless of callback count.
244     uint32_t min_blocking_calls_for_callback_ = 0;
245     std::function<void(uint32_t, uint32_t)> result_callback_;
246   };
247 
248   uint32_t GetBlockingCallCount() const;
249   uint32_t GetCouldBeBlockingCallCount() const;
250 #endif
251 
252   SocketServer* socketserver();
253 
254   // Note: The behavior of Thread has changed.  When a thread is stopped,
255   // futher Posts and Sends will fail.  However, any pending Sends and *ready*
256   // Posts (as opposed to unexpired delayed Posts) will be delivered before
257   // Get (or Peek) returns false.  By guaranteeing delivery of those messages,
258   // we eliminate the race condition when an MessageHandler and Thread
259   // may be destroyed independently of each other.
260   virtual void Quit();
261   virtual bool IsQuitting();
262   virtual void Restart();
263   // Not all message queues actually process messages (such as SignalThread).
264   // In those cases, it's important to know, before posting, that it won't be
265   // Processed.  Normally, this would be true until IsQuitting() is true.
266   virtual bool IsProcessingMessagesForTesting();
267 
268   // Amount of time until the next message can be retrieved
269   virtual int GetDelay();
270 
empty()271   bool empty() const { return size() == 0u; }
size()272   size_t size() const {
273     webrtc::MutexLock lock(&mutex_);
274     return messages_.size() + delayed_messages_.size();
275   }
276 
277   bool IsCurrent() const;
278 
279   // Sleeps the calling thread for the specified number of milliseconds, during
280   // which time no processing is performed. Returns false if sleeping was
281   // interrupted by a signal (POSIX only).
282   static bool SleepMs(int millis);
283 
284   // Sets the thread's name, for debugging. Must be called before Start().
285   // If `obj` is non-null, its value is appended to `name`.
name()286   const std::string& name() const { return name_; }
287   bool SetName(absl::string_view name, const void* obj);
288 
289   // Sets the expected processing time in ms. The thread will write
290   // log messages when Dispatch() takes more time than this.
291   // Default is 50 ms.
292   void SetDispatchWarningMs(int deadline);
293 
294   // Starts the execution of the thread.
295   bool Start();
296 
297   // Tells the thread to stop and waits until it is joined.
298   // Never call Stop on the current thread.  Instead use the inherited Quit
299   // function which will exit the base Thread without terminating the
300   // underlying OS thread.
301   virtual void Stop();
302 
303   // By default, Thread::Run() calls ProcessMessages(kForever).  To do other
304   // work, override Run().  To receive and dispatch messages, call
305   // ProcessMessages occasionally.
306   virtual void Run();
307 
308   // Convenience method to invoke a functor on another thread.
309   // Blocks the current thread until execution is complete.
310   // Ex: thread.BlockingCall([&] { result = MyFunctionReturningBool(); });
311   // NOTE: This function can only be called when synchronous calls are allowed.
312   // See ScopedDisallowBlockingCalls for details.
313   // NOTE: Blocking calls are DISCOURAGED, consider if what you're doing can
314   // be achieved with PostTask() and callbacks instead.
315   virtual void BlockingCall(FunctionView<void()> functor);
316 
317   template <typename Functor,
318             typename ReturnT = std::invoke_result_t<Functor>,
319             typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>>
BlockingCall(Functor && functor)320   ReturnT BlockingCall(Functor&& functor) {
321     ReturnT result;
322     BlockingCall([&] { result = std::forward<Functor>(functor)(); });
323     return result;
324   }
325 
326   // Allows BlockingCall to specified `thread`. Thread never will be
327   // dereferenced and will be used only for reference-based comparison, so
328   // instance can be safely deleted. If NDEBUG is defined and RTC_DCHECK_IS_ON
329   // is undefined do nothing.
330   void AllowInvokesToThread(Thread* thread);
331 
332   // If NDEBUG is defined and RTC_DCHECK_IS_ON is undefined do nothing.
333   void DisallowAllInvokes();
334   // Returns true if `target` was allowed by AllowInvokesToThread() or if no
335   // calls were made to AllowInvokesToThread and DisallowAllInvokes. Otherwise
336   // returns false.
337   // If NDEBUG is defined and RTC_DCHECK_IS_ON is undefined always returns
338   // true.
339   bool IsInvokeToThreadAllowed(rtc::Thread* target);
340 
341   // From TaskQueueBase
342   void Delete() override;
343   void PostTask(absl::AnyInvocable<void() &&> task) override;
344   void PostDelayedTask(absl::AnyInvocable<void() &&> task,
345                        webrtc::TimeDelta delay) override;
346   void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
347                                     webrtc::TimeDelta delay) override;
348 
349   // ProcessMessages will process I/O and dispatch messages until:
350   //  1) cms milliseconds have elapsed (returns true)
351   //  2) Stop() is called (returns false)
352   bool ProcessMessages(int cms);
353 
354   // Returns true if this is a thread that we created using the standard
355   // constructor, false if it was created by a call to
356   // ThreadManager::WrapCurrentThread().  The main thread of an application
357   // is generally not owned, since the OS representation of the thread
358   // obviously exists before we can get to it.
359   // You cannot call Start on non-owned threads.
360   bool IsOwned();
361 
362   // Expose private method IsRunning() for tests.
363   //
364   // DANGER: this is a terrible public API.  Most callers that might want to
365   // call this likely do not have enough control/knowledge of the Thread in
366   // question to guarantee that the returned value remains true for the duration
367   // of whatever code is conditionally executing because of the return value!
RunningForTest()368   bool RunningForTest() { return IsRunning(); }
369 
370   // These functions are public to avoid injecting test hooks. Don't call them
371   // outside of tests.
372   // This method should be called when thread is created using non standard
373   // method, like derived implementation of rtc::Thread and it can not be
374   // started by calling Start(). This will set started flag to true and
375   // owned to false. This must be called from the current thread.
376   bool WrapCurrent();
377   void UnwrapCurrent();
378 
379   // Sets the per-thread allow-blocking-calls flag to false; this is
380   // irrevocable. Must be called on this thread.
DisallowBlockingCalls()381   void DisallowBlockingCalls() { SetAllowBlockingCalls(false); }
382 
383  protected:
384   class CurrentThreadSetter : CurrentTaskQueueSetter {
385    public:
CurrentThreadSetter(Thread * thread)386     explicit CurrentThreadSetter(Thread* thread)
387         : CurrentTaskQueueSetter(thread),
388           manager_(rtc::ThreadManager::Instance()),
389           previous_(manager_->CurrentThread()) {
390       manager_->ChangeCurrentThreadForTest(thread);
391     }
~CurrentThreadSetter()392     ~CurrentThreadSetter() { manager_->ChangeCurrentThreadForTest(previous_); }
393 
394    private:
395     rtc::ThreadManager* const manager_;
396     rtc::Thread* const previous_;
397   };
398 
399   // DelayedMessage goes into a priority queue, sorted by trigger time. Messages
400   // with the same trigger time are processed in num_ (FIFO) order.
401   struct DelayedMessage {
402     bool operator<(const DelayedMessage& dmsg) const {
403       return (dmsg.run_time_ms < run_time_ms) ||
404              ((dmsg.run_time_ms == run_time_ms) &&
405               (dmsg.message_number < message_number));
406     }
407 
408     int64_t delay_ms;  // for debugging
409     int64_t run_time_ms;
410     // Monotonicaly incrementing number used for ordering of messages
411     // targeted to execute at the same time.
412     uint32_t message_number;
413     // std::priority_queue doesn't allow to extract elements, but functor
414     // is move-only and thus need to be changed when pulled out of the
415     // priority queue. That is ok because `functor` doesn't affect operator<
416     mutable absl::AnyInvocable<void() &&> functor;
417   };
418 
419   // Perform initialization, subclasses must call this from their constructor
420   // if false was passed as init_queue to the Thread constructor.
421   void DoInit();
422 
423   // Perform cleanup; subclasses must call this from the destructor,
424   // and are not expected to actually hold the lock.
425   void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
426 
427   void WakeUpSocketServer();
428 
429   // Same as WrapCurrent except that it never fails as it does not try to
430   // acquire the synchronization access of the thread. The caller should never
431   // call Stop() or Join() on this thread.
432   void SafeWrapCurrent();
433 
434   // Blocks the calling thread until this thread has terminated.
435   void Join();
436 
437   static void AssertBlockingIsAllowedOnCurrentThread();
438 
439   friend class ScopedDisallowBlockingCalls;
440 
441  private:
442   static const int kSlowDispatchLoggingThreshold = 50;  // 50 ms
443 
444   // Get() will process I/O until:
445   //  1) A task is available (returns it)
446   //  2) cmsWait seconds have elapsed (returns empty task)
447   //  3) Stop() is called (returns empty task)
448   absl::AnyInvocable<void() &&> Get(int cmsWait);
449   void Dispatch(absl::AnyInvocable<void() &&> task);
450 
451   // Sets the per-thread allow-blocking-calls flag and returns the previous
452   // value. Must be called on this thread.
453   bool SetAllowBlockingCalls(bool allow);
454 
455 #if defined(WEBRTC_WIN)
456   static DWORD WINAPI PreRun(LPVOID context);
457 #else
458   static void* PreRun(void* pv);
459 #endif
460 
461   // ThreadManager calls this instead WrapCurrent() because
462   // ThreadManager::Instance() cannot be used while ThreadManager is
463   // being created.
464   // The method tries to get synchronization rights of the thread on Windows if
465   // `need_synchronize_access` is true.
466   bool WrapCurrentWithThreadManager(ThreadManager* thread_manager,
467                                     bool need_synchronize_access);
468 
469   // Return true if the thread is currently running.
470   bool IsRunning();
471 
472   // Called by the ThreadManager when being set as the current thread.
473   void EnsureIsCurrentTaskQueue();
474 
475   // Called by the ThreadManager when being unset as the current thread.
476   void ClearCurrentTaskQueue();
477 
478   std::queue<absl::AnyInvocable<void() &&>> messages_ RTC_GUARDED_BY(mutex_);
479   std::priority_queue<DelayedMessage> delayed_messages_ RTC_GUARDED_BY(mutex_);
480   uint32_t delayed_next_num_ RTC_GUARDED_BY(mutex_);
481 #if RTC_DCHECK_IS_ON
482   uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0;
483   uint32_t could_be_blocking_call_count_ RTC_GUARDED_BY(this) = 0;
484   std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this);
485   bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false;
486 #endif
487   mutable webrtc::Mutex mutex_;
488   bool fInitialized_;
489   bool fDestroyed_;
490 
491   std::atomic<int> stop_;
492 
493   // The SocketServer might not be owned by Thread.
494   SocketServer* const ss_;
495   // Used if SocketServer ownership lies with `this`.
496   std::unique_ptr<SocketServer> own_ss_;
497 
498   std::string name_;
499 
500   // TODO(tommi): Add thread checks for proper use of control methods.
501   // Ideally we should be able to just use PlatformThread.
502 
503 #if defined(WEBRTC_POSIX)
504   pthread_t thread_ = 0;
505 #endif
506 
507 #if defined(WEBRTC_WIN)
508   HANDLE thread_ = nullptr;
509   DWORD thread_id_ = 0;
510 #endif
511 
512   // Indicates whether or not ownership of the worker thread lies with
513   // this instance or not. (i.e. owned_ == !wrapped).
514   // Must only be modified when the worker thread is not running.
515   bool owned_ = true;
516 
517   // Only touched from the worker thread itself.
518   bool blocking_calls_allowed_ = true;
519 
520   std::unique_ptr<TaskQueueBase::CurrentTaskQueueSetter>
521       task_queue_registration_;
522 
523   friend class ThreadManager;
524 
525   int dispatch_warning_ms_ RTC_GUARDED_BY(this) = kSlowDispatchLoggingThreshold;
526 };
527 
528 // AutoThread automatically installs itself at construction
529 // uninstalls at destruction, if a Thread object is
530 // _not already_ associated with the current OS thread.
531 //
532 // NOTE: *** This class should only be used by tests ***
533 //
534 class AutoThread : public Thread {
535  public:
536   AutoThread();
537   ~AutoThread() override;
538 
539   AutoThread(const AutoThread&) = delete;
540   AutoThread& operator=(const AutoThread&) = delete;
541 };
542 
543 // AutoSocketServerThread automatically installs itself at
544 // construction and uninstalls at destruction. If a Thread object is
545 // already associated with the current OS thread, it is temporarily
546 // disassociated and restored by the destructor.
547 
548 class AutoSocketServerThread : public Thread {
549  public:
550   explicit AutoSocketServerThread(SocketServer* ss);
551   ~AutoSocketServerThread() override;
552 
553   AutoSocketServerThread(const AutoSocketServerThread&) = delete;
554   AutoSocketServerThread& operator=(const AutoSocketServerThread&) = delete;
555 
556  private:
557   rtc::Thread* old_thread_;
558 };
559 }  // namespace rtc
560 
561 #endif  // RTC_BASE_THREAD_H_
562