1 // Copyright 2012 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ 7 8 #include <unordered_map> 9 #include <utility> 10 11 #include "base/auto_reset.h" 12 #include "base/base_export.h" 13 #include "base/check.h" 14 #include "base/check_op.h" 15 #include "base/containers/contains.h" 16 #include "base/dcheck_is_on.h" 17 #include "base/debug/stack_trace.h" 18 #include "base/functional/bind.h" 19 #include "base/location.h" 20 #include "base/memory/raw_ptr.h" 21 #include "base/memory/ref_counted.h" 22 #include "base/observer_list.h" 23 #include "base/strings/strcat.h" 24 #include "base/synchronization/lock.h" 25 #include "base/task/sequenced_task_runner.h" 26 #include "base/task/single_thread_task_runner.h" 27 #include "build/build_config.h" 28 29 /////////////////////////////////////////////////////////////////////////////// 30 // 31 // OVERVIEW: 32 // 33 // A thread-safe container for a list of observers. This is similar to the 34 // observer_list (see observer_list.h), but it is more robust for multi- 35 // threaded situations. 36 // 37 // The following use cases are supported: 38 // * Observers can register for notifications from any sequence. They are 39 // always notified on the sequence from which they were registered. 40 // * Any sequence may trigger a notification via Notify(). 41 // * Observers can remove themselves from the observer list inside of a 42 // callback. 43 // * If one sequence is notifying observers concurrently with an observer 44 // removing itself from the observer list, the notifications will be 45 // silently dropped. However if the observer is currently inside a 46 // notification callback, the callback will finish running. 47 // 48 // By default, observers can be removed from any sequence. However this can be 49 // error-prone since an observer may be running a callback when it's removed, 50 // in which case it isn't safe to delete until the callback is finished. 51 // Consider using the RemoveObserverPolicy::kAddingSequenceOnly template 52 // parameter, which will CHECK that observers are only removed from the 53 // sequence where they were added (which is also the sequence that runs 54 // callbacks). 55 // 56 // The drawback of the threadsafe observer list is that notifications are not 57 // as real-time as the non-threadsafe version of this class. Notifications 58 // will always be done via PostTask() to another sequence, whereas with the 59 // non-thread-safe ObserverList, notifications happen synchronously. 60 // 61 // Note: this class previously supported synchronous notifications for 62 // same-sequence observers, but it was error-prone and removed in 63 // crbug.com/1193750, think twice before re-considering this paradigm. 64 // 65 /////////////////////////////////////////////////////////////////////////////// 66 67 namespace base { 68 namespace internal { 69 70 class BASE_EXPORT ObserverListThreadSafeBase 71 : public RefCountedThreadSafe<ObserverListThreadSafeBase> { 72 public: 73 struct NotificationDataBase { NotificationDataBaseNotificationDataBase74 NotificationDataBase(void* observer_list_in, const Location& from_here_in) 75 : observer_list(observer_list_in), from_here(from_here_in) {} 76 77 raw_ptr<void> observer_list; 78 Location from_here; 79 }; 80 81 ObserverListThreadSafeBase() = default; 82 ObserverListThreadSafeBase(const ObserverListThreadSafeBase&) = delete; 83 ObserverListThreadSafeBase& operator=(const ObserverListThreadSafeBase&) = 84 delete; 85 86 protected: 87 template <typename ObserverType, typename Method> 88 struct Dispatcher; 89 90 template <typename ObserverType, typename ReceiverType, typename... Params> 91 struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> { 92 static void Run(void (ReceiverType::*m)(Params...), 93 Params... params, 94 ObserverType* obj) { 95 (obj->*m)(std::forward<Params>(params)...); 96 } 97 }; 98 99 static const NotificationDataBase*& GetCurrentNotification(); 100 101 virtual ~ObserverListThreadSafeBase() = default; 102 103 private: 104 friend class RefCountedThreadSafe<ObserverListThreadSafeBase>; 105 }; 106 107 } // namespace internal 108 109 enum class RemoveObserverPolicy { 110 // Observers can be removed from any sequence. 111 kAnySequence, 112 // Observers can only be removed from the sequence that added them. 113 kAddingSequenceOnly, 114 }; 115 116 template <class ObserverType, 117 RemoveObserverPolicy RemovePolicy = 118 RemoveObserverPolicy::kAnySequence> 119 class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase { 120 using Self = ObserverListThreadSafe<ObserverType, RemovePolicy>; 121 122 public: 123 enum class AddObserverResult { 124 kBecameNonEmpty, 125 kWasAlreadyNonEmpty, 126 }; 127 enum class RemoveObserverResult { 128 kWasOrBecameEmpty, 129 kRemainsNonEmpty, 130 }; 131 132 ObserverListThreadSafe() = default; 133 explicit ObserverListThreadSafe(ObserverListPolicy policy) 134 : policy_(policy) {} 135 ObserverListThreadSafe(const ObserverListThreadSafe&) = delete; 136 ObserverListThreadSafe& operator=(const ObserverListThreadSafe&) = delete; 137 138 // Adds |observer| to the list. |observer| must not already be in the list. 139 AddObserverResult AddObserver(ObserverType* observer) { 140 DCHECK(SequencedTaskRunner::HasCurrentDefault()) 141 << "An observer can only be registered when " 142 "SequencedTaskRunner::HasCurrentDefault. If this is in a unit test, " 143 "you're likely merely missing a " 144 "base::test::(SingleThread)TaskEnvironment in your fixture. " 145 "Otherwise, try running this code on a named thread (main/UI/IO) or " 146 "from a task posted to a base::SequencedTaskRunner or " 147 "base::SingleThreadTaskRunner."; 148 149 AutoLock auto_lock(lock_); 150 151 bool was_empty = observers_.empty(); 152 153 // Add |observer| to the list of observers. 154 DCHECK(!Contains(observers_, observer)); 155 const scoped_refptr<SequencedTaskRunner> task_runner = 156 SequencedTaskRunner::GetCurrentDefault(); 157 // Each observer gets a unique identifier. These unique identifiers are used 158 // to avoid execution of pending posted-tasks over removed or released 159 // observers. 160 const size_t observer_id = ++observer_id_counter_; 161 #if DCHECK_IS_ON() 162 ObserverTaskRunnerInfo task_info = {task_runner, base::debug::StackTrace(), 163 observer_id}; 164 #else 165 ObserverTaskRunnerInfo task_info = {task_runner, observer_id}; 166 #endif 167 observers_[observer] = std::move(task_info); 168 169 // If this is called while a notification is being dispatched on this thread 170 // and |policy_| is ALL, |observer| must be notified (if a notification is 171 // being dispatched on another thread in parallel, the notification may or 172 // may not make it to |observer| depending on the outcome of the race to 173 // |lock_|). 174 if (policy_ == ObserverListPolicy::ALL) { 175 if (const NotificationDataBase* const current_notification = 176 GetCurrentNotification(); 177 current_notification && current_notification->observer_list == this) { 178 const NotificationData* notification_data = 179 static_cast<const NotificationData*>(current_notification); 180 task_runner->PostTask( 181 current_notification->from_here, 182 BindOnce(&Self::NotifyWrapper, this, 183 // While `observer` may be dangling, we pass it and 184 // check it wasn't deallocated in NotifyWrapper() which can 185 // check `observers_` to verify presence (the owner of the 186 // observer is responsible for removing it from that list 187 // before deallocation). 188 UnsafeDangling(observer), 189 NotificationData(this, observer_id, 190 current_notification->from_here, 191 notification_data->method))); 192 } 193 } 194 195 return was_empty ? AddObserverResult::kBecameNonEmpty 196 : AddObserverResult::kWasAlreadyNonEmpty; 197 } 198 199 // Remove an observer from the list if it is in the list. 200 // 201 // If a notification was sent to the observer but hasn't started to run yet, 202 // it will be aborted. If a notification has started to run, removing the 203 // observer won't stop it. 204 RemoveObserverResult RemoveObserver(ObserverType* observer) { 205 AutoLock auto_lock(lock_); 206 if constexpr (RemovePolicy == RemoveObserverPolicy::kAddingSequenceOnly) { 207 const auto it = observers_.find(observer); 208 CHECK(it == observers_.end() || 209 it->second.task_runner->RunsTasksInCurrentSequence()); 210 } 211 observers_.erase(observer); 212 return observers_.empty() ? RemoveObserverResult::kWasOrBecameEmpty 213 : RemoveObserverResult::kRemainsNonEmpty; 214 } 215 216 // Verifies that the list is currently empty (i.e. there are no observers). 217 void AssertEmpty() const { 218 #if DCHECK_IS_ON() 219 AutoLock auto_lock(lock_); 220 bool observers_is_empty = observers_.empty(); 221 DUMP_WILL_BE_CHECK(observers_is_empty) 222 << "\n" 223 << GetObserversCreationStackStringLocked(); 224 #endif 225 } 226 227 // Asynchronously invokes a callback on all observers, on their registration 228 // sequence. You cannot assume that at the completion of the Notify call that 229 // all Observers have been Notified. The notification may still be pending 230 // delivery. 231 template <typename Method, typename... Params> 232 void Notify(const Location& from_here, Method m, Params&&... params) { 233 RepeatingCallback<void(ObserverType*)> method = 234 BindRepeating(&Dispatcher<ObserverType, Method>::Run, m, 235 std::forward<Params>(params)...); 236 237 AutoLock lock(lock_); 238 for (const auto& observer : observers_) { 239 observer.second.task_runner->PostTask( 240 from_here, 241 BindOnce(&Self::NotifyWrapper, this, 242 // While `observer.first` may be dangling, we pass it and 243 // check it wasn't deallocated in NotifyWrapper() which can 244 // check `observers_` to verify presence (the owner of the 245 // observer is responsible for removing it from that list 246 // before deallocation). 247 UnsafeDangling(observer.first), 248 NotificationData(this, observer.second.observer_id, 249 from_here, method))); 250 } 251 } 252 253 private: 254 friend class RefCountedThreadSafe<ObserverListThreadSafeBase>; 255 256 struct NotificationData : public NotificationDataBase { 257 NotificationData(ObserverListThreadSafe* observer_list_in, 258 size_t observer_id_in, 259 const Location& from_here_in, 260 const RepeatingCallback<void(ObserverType*)>& method_in) 261 : NotificationDataBase(observer_list_in, from_here_in), 262 method(method_in), 263 observer_id(observer_id_in) {} 264 265 RepeatingCallback<void(ObserverType*)> method; 266 size_t observer_id; 267 }; 268 269 ~ObserverListThreadSafe() override = default; 270 271 void NotifyWrapper(MayBeDangling<ObserverType> observer, 272 const NotificationData& notification) { 273 { 274 AutoLock auto_lock(lock_); 275 276 // Check whether the observer still needs a notification. 277 DCHECK_EQ(notification.observer_list, this); 278 auto it = observers_.find(observer); 279 if (it == observers_.end() || 280 it->second.observer_id != notification.observer_id) { 281 return; 282 } 283 DCHECK(it->second.task_runner->RunsTasksInCurrentSequence()); 284 } 285 286 // Keep track of the notification being dispatched on the current thread. 287 // This will be used if the callback below calls AddObserver(). 288 // 289 // Note: GetCurrentNotification() may not return null if this runs in a 290 // nested loop started by a notification callback. In that case, it is 291 // important to save the previous value to restore it later. 292 const AutoReset<const NotificationDataBase*> resetter_( 293 &GetCurrentNotification(), ¬ification); 294 295 // Invoke the callback. 296 notification.method.Run(observer); 297 } 298 299 std::string GetObserversCreationStackStringLocked() const 300 EXCLUSIVE_LOCKS_REQUIRED(lock_) { 301 std::string result; 302 #if DCHECK_IS_ON() 303 for (const auto& observer : observers_) { 304 StrAppend(&result, 305 {observer.second.add_observer_stack_.ToString(), "\n"}); 306 } 307 #endif 308 return result; 309 } 310 311 const ObserverListPolicy policy_ = ObserverListPolicy::ALL; 312 313 mutable Lock lock_; 314 315 size_t observer_id_counter_ GUARDED_BY(lock_) = 0; 316 317 struct ObserverTaskRunnerInfo { 318 scoped_refptr<SequencedTaskRunner> task_runner; 319 #if DCHECK_IS_ON() 320 base::debug::StackTrace add_observer_stack_; 321 #endif 322 size_t observer_id = 0; 323 }; 324 325 // Keys are observers. Values are the SequencedTaskRunners on which they must 326 // be notified. 327 std::unordered_map<ObserverType*, ObserverTaskRunnerInfo> observers_ 328 GUARDED_BY(lock_); 329 }; 330 331 } // namespace base 332 333 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ 334