• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
6 
7 #include <string>
8 #include <vector>
9 
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
13 #include "chrome/browser/sync/notifier/invalidation_util.h"
14 #include "chrome/browser/sync/notifier/registration_manager.h"
15 #include "chrome/browser/sync/syncable/model_type.h"
16 #include "google/cacheinvalidation/invalidation-client-impl.h"
17 
18 namespace sync_notifier {
19 
~Listener()20 ChromeInvalidationClient::Listener::~Listener() {}
21 
ChromeInvalidationClient()22 ChromeInvalidationClient::ChromeInvalidationClient()
23     : chrome_system_resources_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
24       scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
25       handle_outbound_packet_callback_(
26           scoped_callback_factory_.NewCallback(
27               &ChromeInvalidationClient::HandleOutboundPacket)),
28       listener_(NULL),
29       state_writer_(NULL) {
30   DCHECK(non_thread_safe_.CalledOnValidThread());
31 }
32 
~ChromeInvalidationClient()33 ChromeInvalidationClient::~ChromeInvalidationClient() {
34   DCHECK(non_thread_safe_.CalledOnValidThread());
35   Stop();
36   DCHECK(!listener_);
37   DCHECK(!state_writer_);
38 }
39 
Start(const std::string & client_id,const std::string & client_info,const std::string & state,Listener * listener,StateWriter * state_writer,base::WeakPtr<talk_base::Task> base_task)40 void ChromeInvalidationClient::Start(
41     const std::string& client_id, const std::string& client_info,
42     const std::string& state, Listener* listener,
43     StateWriter* state_writer, base::WeakPtr<talk_base::Task> base_task) {
44   DCHECK(non_thread_safe_.CalledOnValidThread());
45   Stop();
46 
47   chrome_system_resources_.StartScheduler();
48 
49   DCHECK(!listener_);
50   DCHECK(listener);
51   listener_ = listener;
52   DCHECK(!state_writer_);
53   DCHECK(state_writer);
54   state_writer_ = state_writer;
55 
56   invalidation::ClientType client_type;
57   client_type.set_type(invalidation::ClientType::CHROME_SYNC);
58   // TODO(akalin): Use InvalidationClient::Create() once it supports
59   // taking a ClientConfig.
60   invalidation::ClientConfig client_config;
61   // Bump up limits so that we reduce the number of registration
62   // replies we get.
63   client_config.max_registrations_per_message = 20;
64   client_config.max_ops_per_message = 40;
65   invalidation_client_.reset(
66       new invalidation::InvalidationClientImpl(
67           &chrome_system_resources_, client_type, client_id, client_info,
68           client_config, this));
69   invalidation_client_->Start(state);
70   invalidation::NetworkEndpoint* network_endpoint =
71       invalidation_client_->network_endpoint();
72   CHECK(network_endpoint);
73   network_endpoint->RegisterOutboundListener(
74       handle_outbound_packet_callback_.get());
75   ChangeBaseTask(base_task);
76   registration_manager_.reset(
77       new RegistrationManager(invalidation_client_.get()));
78   registration_manager_->SetRegisteredTypes(registered_types_);
79 }
80 
ChangeBaseTask(base::WeakPtr<talk_base::Task> base_task)81 void ChromeInvalidationClient::ChangeBaseTask(
82     base::WeakPtr<talk_base::Task> base_task) {
83   DCHECK(invalidation_client_.get());
84   DCHECK(base_task.get());
85   cache_invalidation_packet_handler_.reset(
86       new CacheInvalidationPacketHandler(base_task,
87                                          invalidation_client_.get()));
88 }
89 
Stop()90 void ChromeInvalidationClient::Stop() {
91   DCHECK(non_thread_safe_.CalledOnValidThread());
92   if (!invalidation_client_.get()) {
93     DCHECK(!cache_invalidation_packet_handler_.get());
94     return;
95   }
96 
97   chrome_system_resources_.StopScheduler();
98 
99   registration_manager_.reset();
100   cache_invalidation_packet_handler_.reset();
101   invalidation_client_.reset();
102   state_writer_ = NULL;
103   listener_ = NULL;
104 }
105 
RegisterTypes(const syncable::ModelTypeSet & types)106 void ChromeInvalidationClient::RegisterTypes(
107     const syncable::ModelTypeSet& types) {
108   DCHECK(non_thread_safe_.CalledOnValidThread());
109   registered_types_ = types;
110   if (registration_manager_.get()) {
111     registration_manager_->SetRegisteredTypes(registered_types_);
112   }
113   // TODO(akalin): Clear invalidation versions for unregistered types.
114 }
115 
Invalidate(const invalidation::Invalidation & invalidation,invalidation::Closure * callback)116 void ChromeInvalidationClient::Invalidate(
117     const invalidation::Invalidation& invalidation,
118     invalidation::Closure* callback) {
119   DCHECK(non_thread_safe_.CalledOnValidThread());
120   DCHECK(invalidation::IsCallbackRepeatable(callback));
121   VLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
122   syncable::ModelType model_type;
123   if (!ObjectIdToRealModelType(invalidation.object_id(), &model_type)) {
124     LOG(WARNING) << "Could not get invalidation model type; "
125                  << "invalidating everything";
126     EmitInvalidation(registered_types_, std::string());
127     RunAndDeleteClosure(callback);
128     return;
129   }
130   // The invalidation API spec allows for the possibility of redundant
131   // invalidations, so keep track of the max versions and drop
132   // invalidations with old versions.
133   //
134   // TODO(akalin): Now that we keep track of registered types, we
135   // should drop invalidations for unregistered types.  We may also
136   // have to filter it at a higher level, as invalidations for
137   // newly-unregistered types may already be in flight.
138   //
139   // TODO(akalin): Persist |max_invalidation_versions_| somehow.
140   if (invalidation.version() != UNKNOWN_OBJECT_VERSION) {
141     std::map<syncable::ModelType, int64>::const_iterator it =
142         max_invalidation_versions_.find(model_type);
143     if ((it != max_invalidation_versions_.end()) &&
144         (invalidation.version() <= it->second)) {
145       // Drop redundant invalidations.
146       RunAndDeleteClosure(callback);
147       return;
148     }
149     max_invalidation_versions_[model_type] = invalidation.version();
150   }
151 
152   std::string payload;
153   // payload() CHECK()'s has_payload(), so we must check it ourselves first.
154   if (invalidation.has_payload())
155     payload = invalidation.payload();
156 
157   syncable::ModelTypeSet types;
158   types.insert(model_type);
159   EmitInvalidation(types, payload);
160   // TODO(akalin): We should really |callback| only after we get the
161   // updates from the sync server. (see http://crbug.com/78462).
162   RunAndDeleteClosure(callback);
163 }
164 
165 // This should behave as if we got an invalidation with version
166 // UNKNOWN_OBJECT_VERSION for all known data types.
InvalidateAll(invalidation::Closure * callback)167 void ChromeInvalidationClient::InvalidateAll(
168     invalidation::Closure* callback) {
169   DCHECK(non_thread_safe_.CalledOnValidThread());
170   DCHECK(invalidation::IsCallbackRepeatable(callback));
171   VLOG(1) << "InvalidateAll";
172   EmitInvalidation(registered_types_, std::string());
173   // TODO(akalin): We should really |callback| only after we get the
174   // updates from the sync server. (see http://crbug.com/76482).
175   RunAndDeleteClosure(callback);
176 }
177 
EmitInvalidation(const syncable::ModelTypeSet & types,const std::string & payload)178 void ChromeInvalidationClient::EmitInvalidation(
179     const syncable::ModelTypeSet& types, const std::string& payload) {
180   DCHECK(non_thread_safe_.CalledOnValidThread());
181   // TODO(akalin): Move all uses of ModelTypeBitSet for invalidations
182   // to ModelTypeSet.
183   syncable::ModelTypePayloadMap type_payloads =
184       syncable::ModelTypePayloadMapFromBitSet(
185           syncable::ModelTypeBitSetFromSet(types), payload);
186   listener_->OnInvalidate(type_payloads);
187 }
188 
RegistrationStateChanged(const invalidation::ObjectId & object_id,invalidation::RegistrationState new_state,const invalidation::UnknownHint & unknown_hint)189 void ChromeInvalidationClient::RegistrationStateChanged(
190     const invalidation::ObjectId& object_id,
191     invalidation::RegistrationState new_state,
192     const invalidation::UnknownHint& unknown_hint) {
193   DCHECK(non_thread_safe_.CalledOnValidThread());
194   VLOG(1) << "RegistrationStateChanged: "
195           << ObjectIdToString(object_id) << " " << new_state;
196   if (new_state == invalidation::RegistrationState_UNKNOWN) {
197     VLOG(1) << "is_transient=" << unknown_hint.is_transient()
198             << ", message=" << unknown_hint.message();
199   }
200 
201   syncable::ModelType model_type;
202   if (!ObjectIdToRealModelType(object_id, &model_type)) {
203     LOG(WARNING) << "Could not get object id model type; ignoring";
204     return;
205   }
206 
207   if (new_state != invalidation::RegistrationState_REGISTERED) {
208     // We don't care about |unknown_hint|; we let
209     // |registration_manager_| handle the registration backoff policy.
210     registration_manager_->MarkRegistrationLost(model_type);
211   }
212 }
213 
AllRegistrationsLost(invalidation::Closure * callback)214 void ChromeInvalidationClient::AllRegistrationsLost(
215     invalidation::Closure* callback) {
216   DCHECK(non_thread_safe_.CalledOnValidThread());
217   DCHECK(invalidation::IsCallbackRepeatable(callback));
218   VLOG(1) << "AllRegistrationsLost";
219   registration_manager_->MarkAllRegistrationsLost();
220   RunAndDeleteClosure(callback);
221 }
222 
SessionStatusChanged(bool has_session)223 void ChromeInvalidationClient::SessionStatusChanged(bool has_session) {
224   VLOG(1) << "SessionStatusChanged: " << has_session;
225   listener_->OnSessionStatusChanged(has_session);
226 }
227 
WriteState(const std::string & state)228 void ChromeInvalidationClient::WriteState(const std::string& state) {
229   DCHECK(non_thread_safe_.CalledOnValidThread());
230   CHECK(state_writer_);
231   state_writer_->WriteState(state);
232 }
233 
HandleOutboundPacket(invalidation::NetworkEndpoint * const & network_endpoint)234 void ChromeInvalidationClient::HandleOutboundPacket(
235     invalidation::NetworkEndpoint* const& network_endpoint) {
236   DCHECK(non_thread_safe_.CalledOnValidThread());
237   CHECK(cache_invalidation_packet_handler_.get());
238   cache_invalidation_packet_handler_->
239       HandleOutboundPacket(network_endpoint);
240 }
241 
242 }  // namespace sync_notifier
243