1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ 7 8 #include <unordered_map> 9 10 #include "base/base_export.h" 11 #include "base/bind.h" 12 #include "base/lazy_instance.h" 13 #include "base/location.h" 14 #include "base/logging.h" 15 #include "base/macros.h" 16 #include "base/memory/ref_counted.h" 17 #include "base/observer_list.h" 18 #include "base/sequenced_task_runner.h" 19 #include "base/stl_util.h" 20 #include "base/synchronization/lock.h" 21 #include "base/threading/sequenced_task_runner_handle.h" 22 #include "base/threading/thread_local.h" 23 #include "build/build_config.h" 24 25 // TODO(fdoray): Removing these includes causes IWYU failures in other headers, 26 // remove them in a follow- up CL. 27 #include "base/memory/ptr_util.h" 28 #include "base/single_thread_task_runner.h" 29 #include "base/threading/thread_task_runner_handle.h" 30 31 /////////////////////////////////////////////////////////////////////////////// 32 // 33 // OVERVIEW: 34 // 35 // A thread-safe container for a list of observers. This is similar to the 36 // observer_list (see observer_list.h), but it is more robust for multi- 37 // threaded situations. 38 // 39 // The following use cases are supported: 40 // * Observers can register for notifications from any sequence. They are 41 // always notified on the sequence from which they were registered. 42 // * Any sequence may trigger a notification via Notify(). 43 // * Observers can remove themselves from the observer list inside of a 44 // callback. 45 // * If one sequence is notifying observers concurrently with an observer 46 // removing itself from the observer list, the notifications will be 47 // silently dropped. 48 // 49 // The drawback of the threadsafe observer list is that notifications are not 50 // as real-time as the non-threadsafe version of this class. Notifications 51 // will always be done via PostTask() to another sequence, whereas with the 52 // non-thread-safe observer_list, notifications happen synchronously. 53 // 54 /////////////////////////////////////////////////////////////////////////////// 55 56 namespace base { 57 namespace internal { 58 59 class BASE_EXPORT ObserverListThreadSafeBase 60 : public RefCountedThreadSafe<ObserverListThreadSafeBase> { 61 public: 62 ObserverListThreadSafeBase() = default; 63 64 protected: 65 template <typename ObserverType, typename Method> 66 struct Dispatcher; 67 68 template <typename ObserverType, typename ReceiverType, typename... Params> 69 struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> { 70 static void Run(void (ReceiverType::*m)(Params...), 71 Params... params, 72 ObserverType* obj) { 73 (obj->*m)(std::forward<Params>(params)...); 74 } 75 }; 76 77 struct NotificationDataBase { 78 NotificationDataBase(void* observer_list_in, const Location& from_here_in) 79 : observer_list(observer_list_in), from_here(from_here_in) {} 80 81 void* observer_list; 82 Location from_here; 83 }; 84 85 virtual ~ObserverListThreadSafeBase() = default; 86 87 static LazyInstance<ThreadLocalPointer<const NotificationDataBase>>::Leaky 88 tls_current_notification_; 89 90 private: 91 friend class RefCountedThreadSafe<ObserverListThreadSafeBase>; 92 93 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafeBase); 94 }; 95 96 } // namespace internal 97 98 template <class ObserverType> 99 class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase { 100 public: 101 ObserverListThreadSafe() = default; 102 explicit ObserverListThreadSafe(ObserverListPolicy policy) 103 : policy_(policy) {} 104 105 // Adds |observer| to the list. |observer| must not already be in the list. 106 void AddObserver(ObserverType* observer) { 107 // TODO(fdoray): Change this to a DCHECK once all call sites have a 108 // SequencedTaskRunnerHandle. 109 if (!SequencedTaskRunnerHandle::IsSet()) 110 return; 111 112 AutoLock auto_lock(lock_); 113 114 // Add |observer| to the list of observers. 115 DCHECK(!ContainsKey(observers_, observer)); 116 const scoped_refptr<SequencedTaskRunner> task_runner = 117 SequencedTaskRunnerHandle::Get(); 118 observers_[observer] = task_runner; 119 120 // If this is called while a notification is being dispatched on this thread 121 // and |policy_| is ALL, |observer| must be notified (if a notification is 122 // being dispatched on another thread in parallel, the notification may or 123 // may not make it to |observer| depending on the outcome of the race to 124 // |lock_|). 125 if (policy_ == ObserverListPolicy::ALL) { 126 const NotificationDataBase* current_notification = 127 tls_current_notification_.Get().Get(); 128 if (current_notification && current_notification->observer_list == this) { 129 task_runner->PostTask( 130 current_notification->from_here, 131 BindOnce( 132 &ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, 133 observer, 134 *static_cast<const NotificationData*>(current_notification))); 135 } 136 } 137 } 138 139 // Remove an observer from the list if it is in the list. 140 // 141 // If a notification was sent to the observer but hasn't started to run yet, 142 // it will be aborted. If a notification has started to run, removing the 143 // observer won't stop it. 144 void RemoveObserver(ObserverType* observer) { 145 AutoLock auto_lock(lock_); 146 observers_.erase(observer); 147 } 148 149 // Verifies that the list is currently empty (i.e. there are no observers). 150 void AssertEmpty() const { 151 #if DCHECK_IS_ON() 152 AutoLock auto_lock(lock_); 153 DCHECK(observers_.empty()); 154 #endif 155 } 156 157 // Asynchronously invokes a callback on all observers, on their registration 158 // sequence. You cannot assume that at the completion of the Notify call that 159 // all Observers have been Notified. The notification may still be pending 160 // delivery. 161 template <typename Method, typename... Params> 162 void Notify(const Location& from_here, Method m, Params&&... params) { 163 Callback<void(ObserverType*)> method = 164 Bind(&Dispatcher<ObserverType, Method>::Run, m, 165 std::forward<Params>(params)...); 166 167 AutoLock lock(lock_); 168 for (const auto& observer : observers_) { 169 observer.second->PostTask( 170 from_here, 171 BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, 172 observer.first, NotificationData(this, from_here, method))); 173 } 174 } 175 176 private: 177 friend class RefCountedThreadSafe<ObserverListThreadSafeBase>; 178 179 struct NotificationData : public NotificationDataBase { 180 NotificationData(ObserverListThreadSafe* observer_list_in, 181 const Location& from_here_in, 182 const Callback<void(ObserverType*)>& method_in) 183 : NotificationDataBase(observer_list_in, from_here_in), 184 method(method_in) {} 185 186 Callback<void(ObserverType*)> method; 187 }; 188 189 ~ObserverListThreadSafe() override = default; 190 191 void NotifyWrapper(ObserverType* observer, 192 const NotificationData& notification) { 193 { 194 AutoLock auto_lock(lock_); 195 196 // Check whether the observer still needs a notification. 197 auto it = observers_.find(observer); 198 if (it == observers_.end()) 199 return; 200 DCHECK(it->second->RunsTasksInCurrentSequence()); 201 } 202 203 // Keep track of the notification being dispatched on the current thread. 204 // This will be used if the callback below calls AddObserver(). 205 // 206 // Note: |tls_current_notification_| may not be nullptr if this runs in a 207 // nested loop started by a notification callback. In that case, it is 208 // important to save the previous value to restore it later. 209 auto& tls_current_notification = tls_current_notification_.Get(); 210 const NotificationDataBase* const previous_notification = 211 tls_current_notification.Get(); 212 tls_current_notification.Set(¬ification); 213 214 // Invoke the callback. 215 notification.method.Run(observer); 216 217 // Reset the notification being dispatched on the current thread to its 218 // previous value. 219 tls_current_notification.Set(previous_notification); 220 } 221 222 const ObserverListPolicy policy_ = ObserverListPolicy::ALL; 223 224 // Synchronizes access to |observers_|. 225 mutable Lock lock_; 226 227 // Keys are observers. Values are the SequencedTaskRunners on which they must 228 // be notified. 229 std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>> 230 observers_; 231 232 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); 233 }; 234 235 } // namespace base 236 237 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ 238