• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
6 #define BASE_OBSERVER_LIST_THREADSAFE_H_
7 
8 #include <algorithm>
9 #include <map>
10 #include <memory>
11 #include <tuple>
12 
13 #include "base/bind.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/observer_list.h"
20 #include "base/single_thread_task_runner.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/threading/thread_task_runner_handle.h"
23 
24 ///////////////////////////////////////////////////////////////////////////////
25 //
26 // OVERVIEW:
27 //
28 //   A thread-safe container for a list of observers.
29 //   This is similar to the observer_list (see observer_list.h), but it
30 //   is more robust for multi-threaded situations.
31 //
32 //   The following use cases are supported:
33 //    * Observers can register for notifications from any thread.
34 //      Callbacks to the observer will occur on the same thread where
35 //      the observer initially called AddObserver() from.
36 //    * Any thread may trigger a notification via Notify().
37 //    * Observers can remove themselves from the observer list inside
38 //      of a callback.
39 //    * If one thread is notifying observers concurrently with an observer
40 //      removing itself from the observer list, the notifications will
41 //      be silently dropped.
42 //
43 //   The drawback of the threadsafe observer list is that notifications
44 //   are not as real-time as the non-threadsafe version of this class.
45 //   Notifications will always be done via PostTask() to another thread,
46 //   whereas with the non-thread-safe observer_list, notifications happen
47 //   synchronously and immediately.
48 //
49 //   IMPLEMENTATION NOTES
50 //   The ObserverListThreadSafe maintains an ObserverList for each thread
51 //   which uses the ThreadSafeObserver.  When Notifying the observers,
52 //   we simply call PostTask to each registered thread, and then each thread
53 //   will notify its regular ObserverList.
54 //
55 ///////////////////////////////////////////////////////////////////////////////
56 
57 namespace base {
58 namespace internal {
59 
60 template <typename ObserverType, typename Method>
61 struct Dispatcher;
62 
63 template <typename ObserverType, typename ReceiverType, typename... Params>
64 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> {
65   static void Run(void(ReceiverType::* m)(Params...),
66                   Params... params, ObserverType* obj) {
67     (obj->*m)(std::forward<Params>(params)...);
68   }
69 };
70 
71 }  // namespace internal
72 
73 template <class ObserverType>
74 class ObserverListThreadSafe
75     : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> {
76  public:
77   using NotificationType =
78       typename ObserverList<ObserverType>::NotificationType;
79 
80   ObserverListThreadSafe()
81       : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
82   explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
83 
84   // Add an observer to the list.  An observer should not be added to
85   // the same list more than once.
86   void AddObserver(ObserverType* obs) {
87     // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it,
88     // so do not add the observer.
89     if (!ThreadTaskRunnerHandle::IsSet())
90       return;
91 
92     ObserverList<ObserverType>* list = nullptr;
93     PlatformThreadId thread_id = PlatformThread::CurrentId();
94     {
95       AutoLock lock(list_lock_);
96       if (observer_lists_.find(thread_id) == observer_lists_.end()) {
97         observer_lists_[thread_id] =
98             base::MakeUnique<ObserverListContext>(type_);
99       }
100       list = &(observer_lists_[thread_id]->list);
101     }
102     list->AddObserver(obs);
103   }
104 
105   // Remove an observer from the list if it is in the list.
106   // If there are pending notifications in-transit to the observer, they will
107   // be aborted.
108   // If the observer to be removed is in the list, RemoveObserver MUST
109   // be called from the same thread which called AddObserver.
110   void RemoveObserver(ObserverType* obs) {
111     PlatformThreadId thread_id = PlatformThread::CurrentId();
112     {
113       AutoLock lock(list_lock_);
114       auto it = observer_lists_.find(thread_id);
115       if (it == observer_lists_.end()) {
116         // This will happen if we try to remove an observer on a thread
117         // we never added an observer for.
118         return;
119       }
120       ObserverList<ObserverType>& list = it->second->list;
121 
122       list.RemoveObserver(obs);
123 
124       // If that was the last observer in the list, remove the ObserverList
125       // entirely.
126       if (list.size() == 0)
127         observer_lists_.erase(it);
128     }
129   }
130 
131   // Verifies that the list is currently empty (i.e. there are no observers).
132   void AssertEmpty() const {
133     AutoLock lock(list_lock_);
134     DCHECK(observer_lists_.empty());
135   }
136 
137   // Notify methods.
138   // Make a thread-safe callback to each Observer in the list.
139   // Note, these calls are effectively asynchronous.  You cannot assume
140   // that at the completion of the Notify call that all Observers have
141   // been Notified.  The notification may still be pending delivery.
142   template <typename Method, typename... Params>
143   void Notify(const tracked_objects::Location& from_here,
144               Method m, Params&&... params) {
145     Callback<void(ObserverType*)> method =
146         Bind(&internal::Dispatcher<ObserverType, Method>::Run,
147              m, std::forward<Params>(params)...);
148 
149     AutoLock lock(list_lock_);
150     for (const auto& entry : observer_lists_) {
151       ObserverListContext* context = entry.second.get();
152       context->task_runner->PostTask(
153           from_here,
154           Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
155                this, context, method));
156     }
157   }
158 
159  private:
160   friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
161 
162   struct ObserverListContext {
163     explicit ObserverListContext(NotificationType type)
164         : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {}
165 
166     scoped_refptr<SingleThreadTaskRunner> task_runner;
167     ObserverList<ObserverType> list;
168 
169    private:
170     DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
171   };
172 
173   ~ObserverListThreadSafe() {
174   }
175 
176   // Wrapper which is called to fire the notifications for each thread's
177   // ObserverList.  This function MUST be called on the thread which owns
178   // the unsafe ObserverList.
179   void NotifyWrapper(ObserverListContext* context,
180                      const Callback<void(ObserverType*)>& method) {
181     // Check that this list still needs notifications.
182     {
183       AutoLock lock(list_lock_);
184       auto it = observer_lists_.find(PlatformThread::CurrentId());
185 
186       // The ObserverList could have been removed already.  In fact, it could
187       // have been removed and then re-added!  If the master list's loop
188       // does not match this one, then we do not need to finish this
189       // notification.
190       if (it == observer_lists_.end() || it->second.get() != context)
191         return;
192     }
193 
194     for (auto& observer : context->list) {
195       method.Run(&observer);
196     }
197 
198     // If there are no more observers on the list, we can now delete it.
199     if (context->list.size() == 0) {
200       {
201         AutoLock lock(list_lock_);
202         // Remove |list| if it's not already removed.
203         // This can happen if multiple observers got removed in a notification.
204         // See http://crbug.com/55725.
205         auto it = observer_lists_.find(PlatformThread::CurrentId());
206         if (it != observer_lists_.end() && it->second.get() == context)
207           observer_lists_.erase(it);
208       }
209     }
210   }
211 
212   mutable Lock list_lock_;  // Protects the observer_lists_.
213 
214   // Key by PlatformThreadId because in tests, clients can attempt to remove
215   // observers without a SingleThreadTaskRunner. If this were keyed by
216   // SingleThreadTaskRunner, that operation would be silently ignored, leaving
217   // garbage in the ObserverList.
218   std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>>
219       observer_lists_;
220 
221   const NotificationType type_;
222 
223   DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
224 };
225 
226 }  // namespace base
227 
228 #endif  // BASE_OBSERVER_LIST_THREADSAFE_H_
229