• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "components/invalidation/non_blocking_invalidator.h"
6 
7 #include <cstddef>
8 
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "base/threading/thread.h"
15 #include "components/invalidation/gcm_network_channel_delegate.h"
16 #include "components/invalidation/invalidation_notifier.h"
17 #include "components/invalidation/sync_system_resources.h"
18 #include "jingle/notifier/listener/push_client.h"
19 #include "sync/internal_api/public/util/weak_handle.h"
20 #include "sync/notifier/invalidation_handler.h"
21 #include "sync/notifier/object_id_invalidation_map.h"
22 
23 namespace syncer {
24 
25 struct NonBlockingInvalidator::InitializeOptions {
InitializeOptionssyncer::NonBlockingInvalidator::InitializeOptions26   InitializeOptions(
27       NetworkChannelCreator network_channel_creator,
28       const std::string& invalidator_client_id,
29       const UnackedInvalidationsMap& saved_invalidations,
30       const std::string& invalidation_bootstrap_data,
31       const WeakHandle<InvalidationStateTracker>&
32           invalidation_state_tracker,
33       const std::string& client_info,
34       scoped_refptr<net::URLRequestContextGetter> request_context_getter)
35       : network_channel_creator(network_channel_creator),
36         invalidator_client_id(invalidator_client_id),
37         saved_invalidations(saved_invalidations),
38         invalidation_bootstrap_data(invalidation_bootstrap_data),
39         invalidation_state_tracker(invalidation_state_tracker),
40         client_info(client_info),
41         request_context_getter(request_context_getter) {
42   }
43 
44   NetworkChannelCreator network_channel_creator;
45   std::string invalidator_client_id;
46   UnackedInvalidationsMap saved_invalidations;
47   std::string invalidation_bootstrap_data;
48   WeakHandle<InvalidationStateTracker> invalidation_state_tracker;
49   std::string client_info;
50   scoped_refptr<net::URLRequestContextGetter> request_context_getter;
51 };
52 
53 namespace {
54 // This class provides a wrapper for a logging class in order to receive
55 // callbacks across threads, without having to worry about owner threads.
56 class CallbackProxy {
57  public:
58   explicit CallbackProxy(
59       base::Callback<void(const base::DictionaryValue&)> callback);
60   ~CallbackProxy();
61 
62   void Run(const base::DictionaryValue& value);
63 
64  private:
65   static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback,
66                     scoped_ptr<base::DictionaryValue> value);
67 
68   base::Callback<void(const base::DictionaryValue&)> callback_;
69   scoped_refptr<base::SingleThreadTaskRunner> running_thread_;
70 
71   DISALLOW_COPY_AND_ASSIGN(CallbackProxy);
72 };
73 
CallbackProxy(base::Callback<void (const base::DictionaryValue &)> callback)74 CallbackProxy::CallbackProxy(
75     base::Callback<void(const base::DictionaryValue&)> callback)
76     : callback_(callback),
77       running_thread_(base::ThreadTaskRunnerHandle::Get()) {}
78 
~CallbackProxy()79 CallbackProxy::~CallbackProxy() {}
80 
DoRun(base::Callback<void (const base::DictionaryValue &)> callback,scoped_ptr<base::DictionaryValue> value)81 void CallbackProxy::DoRun(
82     base::Callback<void(const base::DictionaryValue&)> callback,
83     scoped_ptr<base::DictionaryValue> value) {
84   callback.Run(*value);
85 }
86 
Run(const base::DictionaryValue & value)87 void CallbackProxy::Run(const base::DictionaryValue& value) {
88   scoped_ptr<base::DictionaryValue> copied(value.DeepCopy());
89   running_thread_->PostTask(
90       FROM_HERE,
91       base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied)));
92 }
93 }
94 
95 class NonBlockingInvalidator::Core
96     : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>,
97       // InvalidationHandler to observe the InvalidationNotifier we create.
98       public InvalidationHandler {
99  public:
100   // Called on parent thread.  |delegate_observer| should be initialized.
101   explicit Core(
102       const WeakHandle<NonBlockingInvalidator>& delegate_observer);
103 
104   // Helpers called on I/O thread.
105   void Initialize(
106       const NonBlockingInvalidator::InitializeOptions& initialize_options);
107   void Teardown();
108   void UpdateRegisteredIds(const ObjectIdSet& ids);
109   void UpdateCredentials(const std::string& email, const std::string& token);
110   void RequestDetailedStatus(
111       base::Callback<void(const base::DictionaryValue&)> callback) const;
112 
113   // InvalidationHandler implementation (all called on I/O thread by
114   // InvalidationNotifier).
115   virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE;
116   virtual void OnIncomingInvalidation(
117       const ObjectIdInvalidationMap& invalidation_map) OVERRIDE;
118   virtual std::string GetOwnerName() const OVERRIDE;
119 
120  private:
121   friend class
122       base::RefCountedThreadSafe<NonBlockingInvalidator::Core>;
123   // Called on parent or I/O thread.
124   virtual ~Core();
125 
126   // The variables below should be used only on the I/O thread.
127   const WeakHandle<NonBlockingInvalidator> delegate_observer_;
128   scoped_ptr<InvalidationNotifier> invalidation_notifier_;
129   scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
130 
131   DISALLOW_COPY_AND_ASSIGN(Core);
132 };
133 
Core(const WeakHandle<NonBlockingInvalidator> & delegate_observer)134 NonBlockingInvalidator::Core::Core(
135     const WeakHandle<NonBlockingInvalidator>& delegate_observer)
136     : delegate_observer_(delegate_observer) {
137   DCHECK(delegate_observer_.IsInitialized());
138 }
139 
~Core()140 NonBlockingInvalidator::Core::~Core() {
141 }
142 
Initialize(const NonBlockingInvalidator::InitializeOptions & initialize_options)143 void NonBlockingInvalidator::Core::Initialize(
144     const NonBlockingInvalidator::InitializeOptions& initialize_options) {
145   DCHECK(initialize_options.request_context_getter.get());
146   network_task_runner_ =
147       initialize_options.request_context_getter->GetNetworkTaskRunner();
148   DCHECK(network_task_runner_->BelongsToCurrentThread());
149   scoped_ptr<SyncNetworkChannel> network_channel =
150       initialize_options.network_channel_creator.Run();
151   invalidation_notifier_.reset(
152       new InvalidationNotifier(
153           network_channel.Pass(),
154           initialize_options.invalidator_client_id,
155           initialize_options.saved_invalidations,
156           initialize_options.invalidation_bootstrap_data,
157           initialize_options.invalidation_state_tracker,
158           initialize_options.client_info));
159   invalidation_notifier_->RegisterHandler(this);
160 }
161 
Teardown()162 void NonBlockingInvalidator::Core::Teardown() {
163   DCHECK(network_task_runner_->BelongsToCurrentThread());
164   invalidation_notifier_->UnregisterHandler(this);
165   invalidation_notifier_.reset();
166   network_task_runner_ = NULL;
167 }
168 
UpdateRegisteredIds(const ObjectIdSet & ids)169 void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) {
170   DCHECK(network_task_runner_->BelongsToCurrentThread());
171   invalidation_notifier_->UpdateRegisteredIds(this, ids);
172 }
173 
UpdateCredentials(const std::string & email,const std::string & token)174 void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email,
175                                                      const std::string& token) {
176   DCHECK(network_task_runner_->BelongsToCurrentThread());
177   invalidation_notifier_->UpdateCredentials(email, token);
178 }
179 
RequestDetailedStatus(base::Callback<void (const base::DictionaryValue &)> callback) const180 void NonBlockingInvalidator::Core::RequestDetailedStatus(
181     base::Callback<void(const base::DictionaryValue&)> callback) const {
182   DCHECK(network_task_runner_->BelongsToCurrentThread());
183   invalidation_notifier_->RequestDetailedStatus(callback);
184 }
185 
OnInvalidatorStateChange(InvalidatorState reason)186 void NonBlockingInvalidator::Core::OnInvalidatorStateChange(
187     InvalidatorState reason) {
188   DCHECK(network_task_runner_->BelongsToCurrentThread());
189   delegate_observer_.Call(FROM_HERE,
190                           &NonBlockingInvalidator::OnInvalidatorStateChange,
191                           reason);
192 }
193 
OnIncomingInvalidation(const ObjectIdInvalidationMap & invalidation_map)194 void NonBlockingInvalidator::Core::OnIncomingInvalidation(
195     const ObjectIdInvalidationMap& invalidation_map) {
196   DCHECK(network_task_runner_->BelongsToCurrentThread());
197   delegate_observer_.Call(FROM_HERE,
198                           &NonBlockingInvalidator::OnIncomingInvalidation,
199                           invalidation_map);
200 }
201 
GetOwnerName() const202 std::string NonBlockingInvalidator::Core::GetOwnerName() const {
203   return "Sync";
204 }
205 
NonBlockingInvalidator(NetworkChannelCreator network_channel_creator,const std::string & invalidator_client_id,const UnackedInvalidationsMap & saved_invalidations,const std::string & invalidation_bootstrap_data,InvalidationStateTracker * invalidation_state_tracker,const std::string & client_info,const scoped_refptr<net::URLRequestContextGetter> & request_context_getter)206 NonBlockingInvalidator::NonBlockingInvalidator(
207     NetworkChannelCreator network_channel_creator,
208     const std::string& invalidator_client_id,
209     const UnackedInvalidationsMap& saved_invalidations,
210     const std::string& invalidation_bootstrap_data,
211     InvalidationStateTracker* invalidation_state_tracker,
212     const std::string& client_info,
213     const scoped_refptr<net::URLRequestContextGetter>& request_context_getter)
214     : invalidation_state_tracker_(invalidation_state_tracker),
215       parent_task_runner_(base::ThreadTaskRunnerHandle::Get()),
216       network_task_runner_(request_context_getter->GetNetworkTaskRunner()),
217       weak_ptr_factory_(this) {
218   core_ = new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()));
219 
220   InitializeOptions initialize_options(
221       network_channel_creator,
222       invalidator_client_id,
223       saved_invalidations,
224       invalidation_bootstrap_data,
225       MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
226       client_info,
227       request_context_getter);
228 
229   if (!network_task_runner_->PostTask(
230           FROM_HERE,
231           base::Bind(
232               &NonBlockingInvalidator::Core::Initialize,
233               core_.get(),
234               initialize_options))) {
235     NOTREACHED();
236   }
237 }
238 
~NonBlockingInvalidator()239 NonBlockingInvalidator::~NonBlockingInvalidator() {
240   DCHECK(parent_task_runner_->BelongsToCurrentThread());
241   if (!network_task_runner_->PostTask(
242           FROM_HERE,
243           base::Bind(&NonBlockingInvalidator::Core::Teardown,
244                      core_.get()))) {
245     DVLOG(1) << "Network thread stopped before invalidator is destroyed.";
246   }
247 }
248 
RegisterHandler(InvalidationHandler * handler)249 void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) {
250   DCHECK(parent_task_runner_->BelongsToCurrentThread());
251   registrar_.RegisterHandler(handler);
252 }
253 
UpdateRegisteredIds(InvalidationHandler * handler,const ObjectIdSet & ids)254 void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler,
255                                                  const ObjectIdSet& ids) {
256   DCHECK(parent_task_runner_->BelongsToCurrentThread());
257   registrar_.UpdateRegisteredIds(handler, ids);
258   if (!network_task_runner_->PostTask(
259           FROM_HERE,
260           base::Bind(
261               &NonBlockingInvalidator::Core::UpdateRegisteredIds,
262               core_.get(),
263               registrar_.GetAllRegisteredIds()))) {
264     NOTREACHED();
265   }
266 }
267 
UnregisterHandler(InvalidationHandler * handler)268 void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) {
269   DCHECK(parent_task_runner_->BelongsToCurrentThread());
270   registrar_.UnregisterHandler(handler);
271 }
272 
GetInvalidatorState() const273 InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const {
274   DCHECK(parent_task_runner_->BelongsToCurrentThread());
275   return registrar_.GetInvalidatorState();
276 }
277 
UpdateCredentials(const std::string & email,const std::string & token)278 void NonBlockingInvalidator::UpdateCredentials(const std::string& email,
279                                                const std::string& token) {
280   DCHECK(parent_task_runner_->BelongsToCurrentThread());
281   if (!network_task_runner_->PostTask(
282           FROM_HERE,
283           base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials,
284                      core_.get(), email, token))) {
285     NOTREACHED();
286   }
287 }
288 
RequestDetailedStatus(base::Callback<void (const base::DictionaryValue &)> callback) const289 void NonBlockingInvalidator::RequestDetailedStatus(
290     base::Callback<void(const base::DictionaryValue&)> callback) const {
291   DCHECK(parent_task_runner_->BelongsToCurrentThread());
292   base::Callback<void(const base::DictionaryValue&)> proxy_callback =
293       base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback)));
294   if (!network_task_runner_->PostTask(
295           FROM_HERE,
296           base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus,
297                      core_.get(),
298                      proxy_callback))) {
299     NOTREACHED();
300   }
301 }
302 
303 NetworkChannelCreator
MakePushClientChannelCreator(const notifier::NotifierOptions & notifier_options)304     NonBlockingInvalidator::MakePushClientChannelCreator(
305         const notifier::NotifierOptions& notifier_options) {
306   return base::Bind(SyncNetworkChannel::CreatePushClientChannel,
307       notifier_options);
308 }
309 
MakeGCMNetworkChannelCreator(scoped_refptr<net::URLRequestContextGetter> request_context_getter,scoped_ptr<GCMNetworkChannelDelegate> delegate)310 NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator(
311     scoped_refptr<net::URLRequestContextGetter> request_context_getter,
312     scoped_ptr<GCMNetworkChannelDelegate> delegate) {
313   return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel,
314                     request_context_getter,
315                     base::Passed(&delegate));
316 }
317 
ClearAndSetNewClientId(const std::string & data)318 void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) {
319   DCHECK(parent_task_runner_->BelongsToCurrentThread());
320   invalidation_state_tracker_->ClearAndSetNewClientId(data);
321 }
322 
GetInvalidatorClientId() const323 std::string NonBlockingInvalidator::GetInvalidatorClientId() const {
324   DCHECK(parent_task_runner_->BelongsToCurrentThread());
325   return invalidation_state_tracker_->GetInvalidatorClientId();
326 }
327 
SetBootstrapData(const std::string & data)328 void NonBlockingInvalidator::SetBootstrapData(const std::string& data) {
329   DCHECK(parent_task_runner_->BelongsToCurrentThread());
330   invalidation_state_tracker_->SetBootstrapData(data);
331 }
332 
GetBootstrapData() const333 std::string NonBlockingInvalidator::GetBootstrapData() const {
334   DCHECK(parent_task_runner_->BelongsToCurrentThread());
335   return invalidation_state_tracker_->GetBootstrapData();
336 }
337 
SetSavedInvalidations(const UnackedInvalidationsMap & states)338 void NonBlockingInvalidator::SetSavedInvalidations(
339       const UnackedInvalidationsMap& states) {
340   DCHECK(parent_task_runner_->BelongsToCurrentThread());
341   invalidation_state_tracker_->SetSavedInvalidations(states);
342 }
343 
GetSavedInvalidations() const344 UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const {
345   DCHECK(parent_task_runner_->BelongsToCurrentThread());
346   return invalidation_state_tracker_->GetSavedInvalidations();
347 }
348 
Clear()349 void NonBlockingInvalidator::Clear() {
350   DCHECK(parent_task_runner_->BelongsToCurrentThread());
351   invalidation_state_tracker_->Clear();
352 }
353 
OnInvalidatorStateChange(InvalidatorState state)354 void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) {
355   DCHECK(parent_task_runner_->BelongsToCurrentThread());
356   registrar_.UpdateInvalidatorState(state);
357 }
358 
OnIncomingInvalidation(const ObjectIdInvalidationMap & invalidation_map)359 void NonBlockingInvalidator::OnIncomingInvalidation(
360         const ObjectIdInvalidationMap& invalidation_map) {
361   DCHECK(parent_task_runner_->BelongsToCurrentThread());
362   registrar_.DispatchInvalidationsToHandlers(invalidation_map);
363 }
364 
GetOwnerName() const365 std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; }
366 
367 }  // namespace syncer
368