• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
6 #define BASE_OBSERVER_LIST_THREADSAFE_H_
7 
8 #include <unordered_map>
9 
10 #include "base/base_export.h"
11 #include "base/bind.h"
12 #include "base/lazy_instance.h"
13 #include "base/location.h"
14 #include "base/logging.h"
15 #include "base/macros.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/observer_list.h"
18 #include "base/sequenced_task_runner.h"
19 #include "base/stl_util.h"
20 #include "base/synchronization/lock.h"
21 #include "base/threading/sequenced_task_runner_handle.h"
22 #include "base/threading/thread_local.h"
23 #include "build/build_config.h"
24 
25 // TODO(fdoray): Removing these includes causes IWYU failures in other headers,
26 // remove them in a follow- up CL.
27 #include "base/memory/ptr_util.h"
28 #include "base/single_thread_task_runner.h"
29 #include "base/threading/thread_task_runner_handle.h"
30 
31 ///////////////////////////////////////////////////////////////////////////////
32 //
33 // OVERVIEW:
34 //
35 //   A thread-safe container for a list of observers. This is similar to the
36 //   observer_list (see observer_list.h), but it is more robust for multi-
37 //   threaded situations.
38 //
39 //   The following use cases are supported:
40 //    * Observers can register for notifications from any sequence. They are
41 //      always notified on the sequence from which they were registered.
42 //    * Any sequence may trigger a notification via Notify().
43 //    * Observers can remove themselves from the observer list inside of a
44 //      callback.
45 //    * If one sequence is notifying observers concurrently with an observer
46 //      removing itself from the observer list, the notifications will be
47 //      silently dropped.
48 //
49 //   The drawback of the threadsafe observer list is that notifications are not
50 //   as real-time as the non-threadsafe version of this class. Notifications
51 //   will always be done via PostTask() to another sequence, whereas with the
52 //   non-thread-safe observer_list, notifications happen synchronously.
53 //
54 ///////////////////////////////////////////////////////////////////////////////
55 
56 namespace base {
57 namespace internal {
58 
59 class BASE_EXPORT ObserverListThreadSafeBase
60     : public RefCountedThreadSafe<ObserverListThreadSafeBase> {
61  public:
62   ObserverListThreadSafeBase() = default;
63 
64  protected:
65   template <typename ObserverType, typename Method>
66   struct Dispatcher;
67 
68   template <typename ObserverType, typename ReceiverType, typename... Params>
69   struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> {
70     static void Run(void (ReceiverType::*m)(Params...),
71                     Params... params,
72                     ObserverType* obj) {
73       (obj->*m)(std::forward<Params>(params)...);
74     }
75   };
76 
77   struct NotificationDataBase {
78     NotificationDataBase(void* observer_list_in, const Location& from_here_in)
79         : observer_list(observer_list_in), from_here(from_here_in) {}
80 
81     void* observer_list;
82     Location from_here;
83   };
84 
85   virtual ~ObserverListThreadSafeBase() = default;
86 
87   static LazyInstance<ThreadLocalPointer<const NotificationDataBase>>::Leaky
88       tls_current_notification_;
89 
90  private:
91   friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
92 
93   DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafeBase);
94 };
95 
96 }  // namespace internal
97 
98 template <class ObserverType>
99 class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
100  public:
101   ObserverListThreadSafe() = default;
102   explicit ObserverListThreadSafe(ObserverListPolicy policy)
103       : policy_(policy) {}
104 
105   // Adds |observer| to the list. |observer| must not already be in the list.
106   void AddObserver(ObserverType* observer) {
107     // TODO(fdoray): Change this to a DCHECK once all call sites have a
108     // SequencedTaskRunnerHandle.
109     if (!SequencedTaskRunnerHandle::IsSet())
110       return;
111 
112     AutoLock auto_lock(lock_);
113 
114     // Add |observer| to the list of observers.
115     DCHECK(!ContainsKey(observers_, observer));
116     const scoped_refptr<SequencedTaskRunner> task_runner =
117         SequencedTaskRunnerHandle::Get();
118     observers_[observer] = task_runner;
119 
120     // If this is called while a notification is being dispatched on this thread
121     // and |policy_| is ALL, |observer| must be notified (if a notification is
122     // being dispatched on another thread in parallel, the notification may or
123     // may not make it to |observer| depending on the outcome of the race to
124     // |lock_|).
125     if (policy_ == ObserverListPolicy::ALL) {
126       const NotificationDataBase* current_notification =
127           tls_current_notification_.Get().Get();
128       if (current_notification && current_notification->observer_list == this) {
129         task_runner->PostTask(
130             current_notification->from_here,
131             BindOnce(
132                 &ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
133                 observer,
134                 *static_cast<const NotificationData*>(current_notification)));
135       }
136     }
137   }
138 
139   // Remove an observer from the list if it is in the list.
140   //
141   // If a notification was sent to the observer but hasn't started to run yet,
142   // it will be aborted. If a notification has started to run, removing the
143   // observer won't stop it.
144   void RemoveObserver(ObserverType* observer) {
145     AutoLock auto_lock(lock_);
146     observers_.erase(observer);
147   }
148 
149   // Verifies that the list is currently empty (i.e. there are no observers).
150   void AssertEmpty() const {
151 #if DCHECK_IS_ON()
152     AutoLock auto_lock(lock_);
153     DCHECK(observers_.empty());
154 #endif
155   }
156 
157   // Asynchronously invokes a callback on all observers, on their registration
158   // sequence. You cannot assume that at the completion of the Notify call that
159   // all Observers have been Notified. The notification may still be pending
160   // delivery.
161   template <typename Method, typename... Params>
162   void Notify(const Location& from_here, Method m, Params&&... params) {
163     Callback<void(ObserverType*)> method =
164         Bind(&Dispatcher<ObserverType, Method>::Run, m,
165              std::forward<Params>(params)...);
166 
167     AutoLock lock(lock_);
168     for (const auto& observer : observers_) {
169       observer.second->PostTask(
170           from_here,
171           BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
172                    observer.first, NotificationData(this, from_here, method)));
173     }
174   }
175 
176  private:
177   friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
178 
179   struct NotificationData : public NotificationDataBase {
180     NotificationData(ObserverListThreadSafe* observer_list_in,
181                      const Location& from_here_in,
182                      const Callback<void(ObserverType*)>& method_in)
183         : NotificationDataBase(observer_list_in, from_here_in),
184           method(method_in) {}
185 
186     Callback<void(ObserverType*)> method;
187   };
188 
189   ~ObserverListThreadSafe() override = default;
190 
191   void NotifyWrapper(ObserverType* observer,
192                      const NotificationData& notification) {
193     {
194       AutoLock auto_lock(lock_);
195 
196       // Check whether the observer still needs a notification.
197       auto it = observers_.find(observer);
198       if (it == observers_.end())
199         return;
200       DCHECK(it->second->RunsTasksInCurrentSequence());
201     }
202 
203     // Keep track of the notification being dispatched on the current thread.
204     // This will be used if the callback below calls AddObserver().
205     //
206     // Note: |tls_current_notification_| may not be nullptr if this runs in a
207     // nested loop started by a notification callback. In that case, it is
208     // important to save the previous value to restore it later.
209     auto& tls_current_notification = tls_current_notification_.Get();
210     const NotificationDataBase* const previous_notification =
211         tls_current_notification.Get();
212     tls_current_notification.Set(&notification);
213 
214     // Invoke the callback.
215     notification.method.Run(observer);
216 
217     // Reset the notification being dispatched on the current thread to its
218     // previous value.
219     tls_current_notification.Set(previous_notification);
220   }
221 
222   const ObserverListPolicy policy_ = ObserverListPolicy::ALL;
223 
224   // Synchronizes access to |observers_|.
225   mutable Lock lock_;
226 
227   // Keys are observers. Values are the SequencedTaskRunners on which they must
228   // be notified.
229   std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
230       observers_;
231 
232   DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
233 };
234 
235 }  // namespace base
236 
237 #endif  // BASE_OBSERVER_LIST_THREADSAFE_H_
238