1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #ifndef RTC_BASE_THREAD_H_ 12 #define RTC_BASE_THREAD_H_ 13 14 #include <stdint.h> 15 16 #include <list> 17 #include <map> 18 #include <memory> 19 #include <queue> 20 #include <set> 21 #include <string> 22 #include <type_traits> 23 #include <utility> 24 #include <vector> 25 26 #include "absl/strings/string_view.h" 27 28 #if defined(WEBRTC_POSIX) 29 #include <pthread.h> 30 #endif 31 #include "absl/base/attributes.h" 32 #include "absl/functional/any_invocable.h" 33 #include "api/function_view.h" 34 #include "api/task_queue/task_queue_base.h" 35 #include "api/units/time_delta.h" 36 #include "rtc_base/checks.h" 37 #include "rtc_base/deprecated/recursive_critical_section.h" 38 #include "rtc_base/platform_thread_types.h" 39 #include "rtc_base/socket_server.h" 40 #include "rtc_base/synchronization/mutex.h" 41 #include "rtc_base/system/rtc_export.h" 42 #include "rtc_base/thread_annotations.h" 43 44 #if defined(WEBRTC_WIN) 45 #include "rtc_base/win32.h" 46 #endif 47 48 #if RTC_DCHECK_IS_ON 49 // Counts how many `Thread::BlockingCall` are made from within a scope and logs 50 // the number of blocking calls at the end of the scope. 51 #define RTC_LOG_THREAD_BLOCK_COUNT() \ 52 rtc::Thread::ScopedCountBlockingCalls blocked_call_count_printer( \ 53 [func = __func__](uint32_t actual_block, uint32_t could_block) { \ 54 auto total = actual_block + could_block; \ 55 if (total) { \ 56 RTC_LOG(LS_WARNING) << "Blocking " << func << ": total=" << total \ 57 << " (actual=" << actual_block \ 58 << ", could=" << could_block << ")"; \ 59 } \ 60 }) 61 62 // Adds an RTC_DCHECK_LE that checks that the number of blocking calls are 63 // less than or equal to a specific value. Use to avoid regressing in the 64 // number of blocking thread calls. 65 // Note: Use of this macro, requires RTC_LOG_THREAD_BLOCK_COUNT() to be called 66 // first. 67 #define RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) \ 68 do { \ 69 blocked_call_count_printer.set_minimum_call_count_for_callback(x + 1); \ 70 RTC_DCHECK_LE(blocked_call_count_printer.GetTotalBlockedCallCount(), x); \ 71 } while (0) 72 #else 73 #define RTC_LOG_THREAD_BLOCK_COUNT() 74 #define RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) 75 #endif 76 77 namespace rtc { 78 79 class Thread; 80 81 class RTC_EXPORT ThreadManager { 82 public: 83 static const int kForever = -1; 84 85 // Singleton, constructor and destructor are private. 86 static ThreadManager* Instance(); 87 88 static void Add(Thread* message_queue); 89 static void Remove(Thread* message_queue); 90 91 // For testing purposes, for use with a simulated clock. 92 // Ensures that all message queues have processed delayed messages 93 // up until the current point in time. 94 static void ProcessAllMessageQueuesForTesting(); 95 96 Thread* CurrentThread(); 97 void SetCurrentThread(Thread* thread); 98 // Allows changing the current thread, this is intended for tests where we 99 // want to simulate multiple threads running on a single physical thread. 100 void ChangeCurrentThreadForTest(Thread* thread); 101 102 // Returns a thread object with its thread_ ivar set 103 // to whatever the OS uses to represent the thread. 104 // If there already *is* a Thread object corresponding to this thread, 105 // this method will return that. Otherwise it creates a new Thread 106 // object whose wrapped() method will return true, and whose 107 // handle will, on Win32, be opened with only synchronization privileges - 108 // if you need more privilegs, rather than changing this method, please 109 // write additional code to adjust the privileges, or call a different 110 // factory method of your own devising, because this one gets used in 111 // unexpected contexts (like inside browser plugins) and it would be a 112 // shame to break it. It is also conceivable on Win32 that we won't even 113 // be able to get synchronization privileges, in which case the result 114 // will have a null handle. 115 Thread* WrapCurrentThread(); 116 void UnwrapCurrentThread(); 117 118 #if RTC_DCHECK_IS_ON 119 // Registers that a Send operation is to be performed between `source` and 120 // `target`, while checking that this does not cause a send cycle that could 121 // potentially cause a deadlock. 122 void RegisterSendAndCheckForCycles(Thread* source, Thread* target); 123 #endif 124 125 private: 126 ThreadManager(); 127 ~ThreadManager(); 128 129 ThreadManager(const ThreadManager&) = delete; 130 ThreadManager& operator=(const ThreadManager&) = delete; 131 132 void SetCurrentThreadInternal(Thread* thread); 133 void AddInternal(Thread* message_queue); 134 void RemoveInternal(Thread* message_queue); 135 void ProcessAllMessageQueuesInternal(); 136 #if RTC_DCHECK_IS_ON 137 void RemoveFromSendGraph(Thread* thread) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); 138 #endif 139 140 // This list contains all live Threads. 141 std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_); 142 143 // Methods that don't modify the list of message queues may be called in a 144 // re-entrant fashion. "processing_" keeps track of the depth of re-entrant 145 // calls. 146 RecursiveCriticalSection crit_; 147 size_t processing_ RTC_GUARDED_BY(crit_) = 0; 148 #if RTC_DCHECK_IS_ON 149 // Represents all thread seand actions by storing all send targets per thread. 150 // This is used by RegisterSendAndCheckForCycles. This graph has no cycles 151 // since we will trigger a CHECK failure if a cycle is introduced. 152 std::map<Thread*, std::set<Thread*>> send_graph_ RTC_GUARDED_BY(crit_); 153 #endif 154 155 #if defined(WEBRTC_POSIX) 156 pthread_key_t key_; 157 #endif 158 159 #if defined(WEBRTC_WIN) 160 const DWORD key_; 161 #endif 162 }; 163 164 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). 165 166 class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { 167 public: 168 static const int kForever = -1; 169 170 // Create a new Thread and optionally assign it to the passed 171 // SocketServer. Subclasses that override Clear should pass false for 172 // init_queue and call DoInit() from their constructor to prevent races 173 // with the ThreadManager using the object while the vtable is still 174 // being created. 175 explicit Thread(SocketServer* ss); 176 explicit Thread(std::unique_ptr<SocketServer> ss); 177 178 // Constructors meant for subclasses; they should call DoInit themselves and 179 // pass false for `do_init`, so that DoInit is called only on the fully 180 // instantiated class, which avoids a vptr data race. 181 Thread(SocketServer* ss, bool do_init); 182 Thread(std::unique_ptr<SocketServer> ss, bool do_init); 183 184 // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or 185 // guarantee Stop() is explicitly called before the subclass is destroyed). 186 // This is required to avoid a data race between the destructor modifying the 187 // vtable, and the Thread::PreRun calling the virtual method Run(). 188 189 // NOTE: SUBCLASSES OF Thread THAT OVERRIDE Clear MUST CALL 190 // DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race 191 // between the destructor modifying the vtable, and the ThreadManager 192 // calling Clear on the object from a different thread. 193 ~Thread() override; 194 195 Thread(const Thread&) = delete; 196 Thread& operator=(const Thread&) = delete; 197 198 static std::unique_ptr<Thread> CreateWithSocketServer(); 199 static std::unique_ptr<Thread> Create(); 200 static Thread* Current(); 201 202 // Used to catch performance regressions. Use this to disallow BlockingCall 203 // for a given scope. If a synchronous call is made while this is in 204 // effect, an assert will be triggered. 205 // Note that this is a single threaded class. 206 class ScopedDisallowBlockingCalls { 207 public: 208 ScopedDisallowBlockingCalls(); 209 ScopedDisallowBlockingCalls(const ScopedDisallowBlockingCalls&) = delete; 210 ScopedDisallowBlockingCalls& operator=(const ScopedDisallowBlockingCalls&) = 211 delete; 212 ~ScopedDisallowBlockingCalls(); 213 214 private: 215 Thread* const thread_; 216 const bool previous_state_; 217 }; 218 219 #if RTC_DCHECK_IS_ON 220 class ScopedCountBlockingCalls { 221 public: 222 ScopedCountBlockingCalls(std::function<void(uint32_t, uint32_t)> callback); 223 ScopedCountBlockingCalls(const ScopedDisallowBlockingCalls&) = delete; 224 ScopedCountBlockingCalls& operator=(const ScopedDisallowBlockingCalls&) = 225 delete; 226 ~ScopedCountBlockingCalls(); 227 228 uint32_t GetBlockingCallCount() const; 229 uint32_t GetCouldBeBlockingCallCount() const; 230 uint32_t GetTotalBlockedCallCount() const; 231 set_minimum_call_count_for_callback(uint32_t minimum)232 void set_minimum_call_count_for_callback(uint32_t minimum) { 233 min_blocking_calls_for_callback_ = minimum; 234 } 235 236 private: 237 Thread* const thread_; 238 const uint32_t base_blocking_call_count_; 239 const uint32_t base_could_be_blocking_call_count_; 240 // The minimum number of blocking calls required in order to issue the 241 // result_callback_. This is used by RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN to 242 // tame log spam. 243 // By default we always issue the callback, regardless of callback count. 244 uint32_t min_blocking_calls_for_callback_ = 0; 245 std::function<void(uint32_t, uint32_t)> result_callback_; 246 }; 247 248 uint32_t GetBlockingCallCount() const; 249 uint32_t GetCouldBeBlockingCallCount() const; 250 #endif 251 252 SocketServer* socketserver(); 253 254 // Note: The behavior of Thread has changed. When a thread is stopped, 255 // futher Posts and Sends will fail. However, any pending Sends and *ready* 256 // Posts (as opposed to unexpired delayed Posts) will be delivered before 257 // Get (or Peek) returns false. By guaranteeing delivery of those messages, 258 // we eliminate the race condition when an MessageHandler and Thread 259 // may be destroyed independently of each other. 260 virtual void Quit(); 261 virtual bool IsQuitting(); 262 virtual void Restart(); 263 // Not all message queues actually process messages (such as SignalThread). 264 // In those cases, it's important to know, before posting, that it won't be 265 // Processed. Normally, this would be true until IsQuitting() is true. 266 virtual bool IsProcessingMessagesForTesting(); 267 268 // Amount of time until the next message can be retrieved 269 virtual int GetDelay(); 270 empty()271 bool empty() const { return size() == 0u; } size()272 size_t size() const { 273 webrtc::MutexLock lock(&mutex_); 274 return messages_.size() + delayed_messages_.size(); 275 } 276 277 bool IsCurrent() const; 278 279 // Sleeps the calling thread for the specified number of milliseconds, during 280 // which time no processing is performed. Returns false if sleeping was 281 // interrupted by a signal (POSIX only). 282 static bool SleepMs(int millis); 283 284 // Sets the thread's name, for debugging. Must be called before Start(). 285 // If `obj` is non-null, its value is appended to `name`. name()286 const std::string& name() const { return name_; } 287 bool SetName(absl::string_view name, const void* obj); 288 289 // Sets the expected processing time in ms. The thread will write 290 // log messages when Dispatch() takes more time than this. 291 // Default is 50 ms. 292 void SetDispatchWarningMs(int deadline); 293 294 // Starts the execution of the thread. 295 bool Start(); 296 297 // Tells the thread to stop and waits until it is joined. 298 // Never call Stop on the current thread. Instead use the inherited Quit 299 // function which will exit the base Thread without terminating the 300 // underlying OS thread. 301 virtual void Stop(); 302 303 // By default, Thread::Run() calls ProcessMessages(kForever). To do other 304 // work, override Run(). To receive and dispatch messages, call 305 // ProcessMessages occasionally. 306 virtual void Run(); 307 308 // Convenience method to invoke a functor on another thread. 309 // Blocks the current thread until execution is complete. 310 // Ex: thread.BlockingCall([&] { result = MyFunctionReturningBool(); }); 311 // NOTE: This function can only be called when synchronous calls are allowed. 312 // See ScopedDisallowBlockingCalls for details. 313 // NOTE: Blocking calls are DISCOURAGED, consider if what you're doing can 314 // be achieved with PostTask() and callbacks instead. 315 virtual void BlockingCall(FunctionView<void()> functor); 316 317 template <typename Functor, 318 typename ReturnT = std::invoke_result_t<Functor>, 319 typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>> BlockingCall(Functor && functor)320 ReturnT BlockingCall(Functor&& functor) { 321 ReturnT result; 322 BlockingCall([&] { result = std::forward<Functor>(functor)(); }); 323 return result; 324 } 325 326 // Allows BlockingCall to specified `thread`. Thread never will be 327 // dereferenced and will be used only for reference-based comparison, so 328 // instance can be safely deleted. If NDEBUG is defined and RTC_DCHECK_IS_ON 329 // is undefined do nothing. 330 void AllowInvokesToThread(Thread* thread); 331 332 // If NDEBUG is defined and RTC_DCHECK_IS_ON is undefined do nothing. 333 void DisallowAllInvokes(); 334 // Returns true if `target` was allowed by AllowInvokesToThread() or if no 335 // calls were made to AllowInvokesToThread and DisallowAllInvokes. Otherwise 336 // returns false. 337 // If NDEBUG is defined and RTC_DCHECK_IS_ON is undefined always returns 338 // true. 339 bool IsInvokeToThreadAllowed(rtc::Thread* target); 340 341 // From TaskQueueBase 342 void Delete() override; 343 void PostTask(absl::AnyInvocable<void() &&> task) override; 344 void PostDelayedTask(absl::AnyInvocable<void() &&> task, 345 webrtc::TimeDelta delay) override; 346 void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task, 347 webrtc::TimeDelta delay) override; 348 349 // ProcessMessages will process I/O and dispatch messages until: 350 // 1) cms milliseconds have elapsed (returns true) 351 // 2) Stop() is called (returns false) 352 bool ProcessMessages(int cms); 353 354 // Returns true if this is a thread that we created using the standard 355 // constructor, false if it was created by a call to 356 // ThreadManager::WrapCurrentThread(). The main thread of an application 357 // is generally not owned, since the OS representation of the thread 358 // obviously exists before we can get to it. 359 // You cannot call Start on non-owned threads. 360 bool IsOwned(); 361 362 // Expose private method IsRunning() for tests. 363 // 364 // DANGER: this is a terrible public API. Most callers that might want to 365 // call this likely do not have enough control/knowledge of the Thread in 366 // question to guarantee that the returned value remains true for the duration 367 // of whatever code is conditionally executing because of the return value! RunningForTest()368 bool RunningForTest() { return IsRunning(); } 369 370 // These functions are public to avoid injecting test hooks. Don't call them 371 // outside of tests. 372 // This method should be called when thread is created using non standard 373 // method, like derived implementation of rtc::Thread and it can not be 374 // started by calling Start(). This will set started flag to true and 375 // owned to false. This must be called from the current thread. 376 bool WrapCurrent(); 377 void UnwrapCurrent(); 378 379 // Sets the per-thread allow-blocking-calls flag to false; this is 380 // irrevocable. Must be called on this thread. DisallowBlockingCalls()381 void DisallowBlockingCalls() { SetAllowBlockingCalls(false); } 382 383 protected: 384 class CurrentThreadSetter : CurrentTaskQueueSetter { 385 public: CurrentThreadSetter(Thread * thread)386 explicit CurrentThreadSetter(Thread* thread) 387 : CurrentTaskQueueSetter(thread), 388 manager_(rtc::ThreadManager::Instance()), 389 previous_(manager_->CurrentThread()) { 390 manager_->ChangeCurrentThreadForTest(thread); 391 } ~CurrentThreadSetter()392 ~CurrentThreadSetter() { manager_->ChangeCurrentThreadForTest(previous_); } 393 394 private: 395 rtc::ThreadManager* const manager_; 396 rtc::Thread* const previous_; 397 }; 398 399 // DelayedMessage goes into a priority queue, sorted by trigger time. Messages 400 // with the same trigger time are processed in num_ (FIFO) order. 401 struct DelayedMessage { 402 bool operator<(const DelayedMessage& dmsg) const { 403 return (dmsg.run_time_ms < run_time_ms) || 404 ((dmsg.run_time_ms == run_time_ms) && 405 (dmsg.message_number < message_number)); 406 } 407 408 int64_t delay_ms; // for debugging 409 int64_t run_time_ms; 410 // Monotonicaly incrementing number used for ordering of messages 411 // targeted to execute at the same time. 412 uint32_t message_number; 413 // std::priority_queue doesn't allow to extract elements, but functor 414 // is move-only and thus need to be changed when pulled out of the 415 // priority queue. That is ok because `functor` doesn't affect operator< 416 mutable absl::AnyInvocable<void() &&> functor; 417 }; 418 419 // Perform initialization, subclasses must call this from their constructor 420 // if false was passed as init_queue to the Thread constructor. 421 void DoInit(); 422 423 // Perform cleanup; subclasses must call this from the destructor, 424 // and are not expected to actually hold the lock. 425 void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 426 427 void WakeUpSocketServer(); 428 429 // Same as WrapCurrent except that it never fails as it does not try to 430 // acquire the synchronization access of the thread. The caller should never 431 // call Stop() or Join() on this thread. 432 void SafeWrapCurrent(); 433 434 // Blocks the calling thread until this thread has terminated. 435 void Join(); 436 437 static void AssertBlockingIsAllowedOnCurrentThread(); 438 439 friend class ScopedDisallowBlockingCalls; 440 441 private: 442 static const int kSlowDispatchLoggingThreshold = 50; // 50 ms 443 444 // Get() will process I/O until: 445 // 1) A task is available (returns it) 446 // 2) cmsWait seconds have elapsed (returns empty task) 447 // 3) Stop() is called (returns empty task) 448 absl::AnyInvocable<void() &&> Get(int cmsWait); 449 void Dispatch(absl::AnyInvocable<void() &&> task); 450 451 // Sets the per-thread allow-blocking-calls flag and returns the previous 452 // value. Must be called on this thread. 453 bool SetAllowBlockingCalls(bool allow); 454 455 #if defined(WEBRTC_WIN) 456 static DWORD WINAPI PreRun(LPVOID context); 457 #else 458 static void* PreRun(void* pv); 459 #endif 460 461 // ThreadManager calls this instead WrapCurrent() because 462 // ThreadManager::Instance() cannot be used while ThreadManager is 463 // being created. 464 // The method tries to get synchronization rights of the thread on Windows if 465 // `need_synchronize_access` is true. 466 bool WrapCurrentWithThreadManager(ThreadManager* thread_manager, 467 bool need_synchronize_access); 468 469 // Return true if the thread is currently running. 470 bool IsRunning(); 471 472 // Called by the ThreadManager when being set as the current thread. 473 void EnsureIsCurrentTaskQueue(); 474 475 // Called by the ThreadManager when being unset as the current thread. 476 void ClearCurrentTaskQueue(); 477 478 std::queue<absl::AnyInvocable<void() &&>> messages_ RTC_GUARDED_BY(mutex_); 479 std::priority_queue<DelayedMessage> delayed_messages_ RTC_GUARDED_BY(mutex_); 480 uint32_t delayed_next_num_ RTC_GUARDED_BY(mutex_); 481 #if RTC_DCHECK_IS_ON 482 uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0; 483 uint32_t could_be_blocking_call_count_ RTC_GUARDED_BY(this) = 0; 484 std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this); 485 bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false; 486 #endif 487 mutable webrtc::Mutex mutex_; 488 bool fInitialized_; 489 bool fDestroyed_; 490 491 std::atomic<int> stop_; 492 493 // The SocketServer might not be owned by Thread. 494 SocketServer* const ss_; 495 // Used if SocketServer ownership lies with `this`. 496 std::unique_ptr<SocketServer> own_ss_; 497 498 std::string name_; 499 500 // TODO(tommi): Add thread checks for proper use of control methods. 501 // Ideally we should be able to just use PlatformThread. 502 503 #if defined(WEBRTC_POSIX) 504 pthread_t thread_ = 0; 505 #endif 506 507 #if defined(WEBRTC_WIN) 508 HANDLE thread_ = nullptr; 509 DWORD thread_id_ = 0; 510 #endif 511 512 // Indicates whether or not ownership of the worker thread lies with 513 // this instance or not. (i.e. owned_ == !wrapped). 514 // Must only be modified when the worker thread is not running. 515 bool owned_ = true; 516 517 // Only touched from the worker thread itself. 518 bool blocking_calls_allowed_ = true; 519 520 std::unique_ptr<TaskQueueBase::CurrentTaskQueueSetter> 521 task_queue_registration_; 522 523 friend class ThreadManager; 524 525 int dispatch_warning_ms_ RTC_GUARDED_BY(this) = kSlowDispatchLoggingThreshold; 526 }; 527 528 // AutoThread automatically installs itself at construction 529 // uninstalls at destruction, if a Thread object is 530 // _not already_ associated with the current OS thread. 531 // 532 // NOTE: *** This class should only be used by tests *** 533 // 534 class AutoThread : public Thread { 535 public: 536 AutoThread(); 537 ~AutoThread() override; 538 539 AutoThread(const AutoThread&) = delete; 540 AutoThread& operator=(const AutoThread&) = delete; 541 }; 542 543 // AutoSocketServerThread automatically installs itself at 544 // construction and uninstalls at destruction. If a Thread object is 545 // already associated with the current OS thread, it is temporarily 546 // disassociated and restored by the destructor. 547 548 class AutoSocketServerThread : public Thread { 549 public: 550 explicit AutoSocketServerThread(SocketServer* ss); 551 ~AutoSocketServerThread() override; 552 553 AutoSocketServerThread(const AutoSocketServerThread&) = delete; 554 AutoSocketServerThread& operator=(const AutoSocketServerThread&) = delete; 555 556 private: 557 rtc::Thread* old_thread_; 558 }; 559 } // namespace rtc 560 561 #endif // RTC_BASE_THREAD_H_ 562