1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
6
7 #include <string>
8 #include <vector>
9
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
13 #include "chrome/browser/sync/notifier/invalidation_util.h"
14 #include "chrome/browser/sync/notifier/registration_manager.h"
15 #include "chrome/browser/sync/syncable/model_type.h"
16 #include "google/cacheinvalidation/invalidation-client-impl.h"
17
18 namespace sync_notifier {
19
~Listener()20 ChromeInvalidationClient::Listener::~Listener() {}
21
ChromeInvalidationClient()22 ChromeInvalidationClient::ChromeInvalidationClient()
23 : chrome_system_resources_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
24 scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
25 handle_outbound_packet_callback_(
26 scoped_callback_factory_.NewCallback(
27 &ChromeInvalidationClient::HandleOutboundPacket)),
28 listener_(NULL),
29 state_writer_(NULL) {
30 DCHECK(non_thread_safe_.CalledOnValidThread());
31 }
32
~ChromeInvalidationClient()33 ChromeInvalidationClient::~ChromeInvalidationClient() {
34 DCHECK(non_thread_safe_.CalledOnValidThread());
35 Stop();
36 DCHECK(!listener_);
37 DCHECK(!state_writer_);
38 }
39
Start(const std::string & client_id,const std::string & client_info,const std::string & state,Listener * listener,StateWriter * state_writer,base::WeakPtr<talk_base::Task> base_task)40 void ChromeInvalidationClient::Start(
41 const std::string& client_id, const std::string& client_info,
42 const std::string& state, Listener* listener,
43 StateWriter* state_writer, base::WeakPtr<talk_base::Task> base_task) {
44 DCHECK(non_thread_safe_.CalledOnValidThread());
45 Stop();
46
47 chrome_system_resources_.StartScheduler();
48
49 DCHECK(!listener_);
50 DCHECK(listener);
51 listener_ = listener;
52 DCHECK(!state_writer_);
53 DCHECK(state_writer);
54 state_writer_ = state_writer;
55
56 invalidation::ClientType client_type;
57 client_type.set_type(invalidation::ClientType::CHROME_SYNC);
58 // TODO(akalin): Use InvalidationClient::Create() once it supports
59 // taking a ClientConfig.
60 invalidation::ClientConfig client_config;
61 // Bump up limits so that we reduce the number of registration
62 // replies we get.
63 client_config.max_registrations_per_message = 20;
64 client_config.max_ops_per_message = 40;
65 invalidation_client_.reset(
66 new invalidation::InvalidationClientImpl(
67 &chrome_system_resources_, client_type, client_id, client_info,
68 client_config, this));
69 invalidation_client_->Start(state);
70 invalidation::NetworkEndpoint* network_endpoint =
71 invalidation_client_->network_endpoint();
72 CHECK(network_endpoint);
73 network_endpoint->RegisterOutboundListener(
74 handle_outbound_packet_callback_.get());
75 ChangeBaseTask(base_task);
76 registration_manager_.reset(
77 new RegistrationManager(invalidation_client_.get()));
78 registration_manager_->SetRegisteredTypes(registered_types_);
79 }
80
ChangeBaseTask(base::WeakPtr<talk_base::Task> base_task)81 void ChromeInvalidationClient::ChangeBaseTask(
82 base::WeakPtr<talk_base::Task> base_task) {
83 DCHECK(invalidation_client_.get());
84 DCHECK(base_task.get());
85 cache_invalidation_packet_handler_.reset(
86 new CacheInvalidationPacketHandler(base_task,
87 invalidation_client_.get()));
88 }
89
Stop()90 void ChromeInvalidationClient::Stop() {
91 DCHECK(non_thread_safe_.CalledOnValidThread());
92 if (!invalidation_client_.get()) {
93 DCHECK(!cache_invalidation_packet_handler_.get());
94 return;
95 }
96
97 chrome_system_resources_.StopScheduler();
98
99 registration_manager_.reset();
100 cache_invalidation_packet_handler_.reset();
101 invalidation_client_.reset();
102 state_writer_ = NULL;
103 listener_ = NULL;
104 }
105
RegisterTypes(const syncable::ModelTypeSet & types)106 void ChromeInvalidationClient::RegisterTypes(
107 const syncable::ModelTypeSet& types) {
108 DCHECK(non_thread_safe_.CalledOnValidThread());
109 registered_types_ = types;
110 if (registration_manager_.get()) {
111 registration_manager_->SetRegisteredTypes(registered_types_);
112 }
113 // TODO(akalin): Clear invalidation versions for unregistered types.
114 }
115
Invalidate(const invalidation::Invalidation & invalidation,invalidation::Closure * callback)116 void ChromeInvalidationClient::Invalidate(
117 const invalidation::Invalidation& invalidation,
118 invalidation::Closure* callback) {
119 DCHECK(non_thread_safe_.CalledOnValidThread());
120 DCHECK(invalidation::IsCallbackRepeatable(callback));
121 VLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
122 syncable::ModelType model_type;
123 if (!ObjectIdToRealModelType(invalidation.object_id(), &model_type)) {
124 LOG(WARNING) << "Could not get invalidation model type; "
125 << "invalidating everything";
126 EmitInvalidation(registered_types_, std::string());
127 RunAndDeleteClosure(callback);
128 return;
129 }
130 // The invalidation API spec allows for the possibility of redundant
131 // invalidations, so keep track of the max versions and drop
132 // invalidations with old versions.
133 //
134 // TODO(akalin): Now that we keep track of registered types, we
135 // should drop invalidations for unregistered types. We may also
136 // have to filter it at a higher level, as invalidations for
137 // newly-unregistered types may already be in flight.
138 //
139 // TODO(akalin): Persist |max_invalidation_versions_| somehow.
140 if (invalidation.version() != UNKNOWN_OBJECT_VERSION) {
141 std::map<syncable::ModelType, int64>::const_iterator it =
142 max_invalidation_versions_.find(model_type);
143 if ((it != max_invalidation_versions_.end()) &&
144 (invalidation.version() <= it->second)) {
145 // Drop redundant invalidations.
146 RunAndDeleteClosure(callback);
147 return;
148 }
149 max_invalidation_versions_[model_type] = invalidation.version();
150 }
151
152 std::string payload;
153 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
154 if (invalidation.has_payload())
155 payload = invalidation.payload();
156
157 syncable::ModelTypeSet types;
158 types.insert(model_type);
159 EmitInvalidation(types, payload);
160 // TODO(akalin): We should really |callback| only after we get the
161 // updates from the sync server. (see http://crbug.com/78462).
162 RunAndDeleteClosure(callback);
163 }
164
165 // This should behave as if we got an invalidation with version
166 // UNKNOWN_OBJECT_VERSION for all known data types.
InvalidateAll(invalidation::Closure * callback)167 void ChromeInvalidationClient::InvalidateAll(
168 invalidation::Closure* callback) {
169 DCHECK(non_thread_safe_.CalledOnValidThread());
170 DCHECK(invalidation::IsCallbackRepeatable(callback));
171 VLOG(1) << "InvalidateAll";
172 EmitInvalidation(registered_types_, std::string());
173 // TODO(akalin): We should really |callback| only after we get the
174 // updates from the sync server. (see http://crbug.com/76482).
175 RunAndDeleteClosure(callback);
176 }
177
EmitInvalidation(const syncable::ModelTypeSet & types,const std::string & payload)178 void ChromeInvalidationClient::EmitInvalidation(
179 const syncable::ModelTypeSet& types, const std::string& payload) {
180 DCHECK(non_thread_safe_.CalledOnValidThread());
181 // TODO(akalin): Move all uses of ModelTypeBitSet for invalidations
182 // to ModelTypeSet.
183 syncable::ModelTypePayloadMap type_payloads =
184 syncable::ModelTypePayloadMapFromBitSet(
185 syncable::ModelTypeBitSetFromSet(types), payload);
186 listener_->OnInvalidate(type_payloads);
187 }
188
RegistrationStateChanged(const invalidation::ObjectId & object_id,invalidation::RegistrationState new_state,const invalidation::UnknownHint & unknown_hint)189 void ChromeInvalidationClient::RegistrationStateChanged(
190 const invalidation::ObjectId& object_id,
191 invalidation::RegistrationState new_state,
192 const invalidation::UnknownHint& unknown_hint) {
193 DCHECK(non_thread_safe_.CalledOnValidThread());
194 VLOG(1) << "RegistrationStateChanged: "
195 << ObjectIdToString(object_id) << " " << new_state;
196 if (new_state == invalidation::RegistrationState_UNKNOWN) {
197 VLOG(1) << "is_transient=" << unknown_hint.is_transient()
198 << ", message=" << unknown_hint.message();
199 }
200
201 syncable::ModelType model_type;
202 if (!ObjectIdToRealModelType(object_id, &model_type)) {
203 LOG(WARNING) << "Could not get object id model type; ignoring";
204 return;
205 }
206
207 if (new_state != invalidation::RegistrationState_REGISTERED) {
208 // We don't care about |unknown_hint|; we let
209 // |registration_manager_| handle the registration backoff policy.
210 registration_manager_->MarkRegistrationLost(model_type);
211 }
212 }
213
AllRegistrationsLost(invalidation::Closure * callback)214 void ChromeInvalidationClient::AllRegistrationsLost(
215 invalidation::Closure* callback) {
216 DCHECK(non_thread_safe_.CalledOnValidThread());
217 DCHECK(invalidation::IsCallbackRepeatable(callback));
218 VLOG(1) << "AllRegistrationsLost";
219 registration_manager_->MarkAllRegistrationsLost();
220 RunAndDeleteClosure(callback);
221 }
222
SessionStatusChanged(bool has_session)223 void ChromeInvalidationClient::SessionStatusChanged(bool has_session) {
224 VLOG(1) << "SessionStatusChanged: " << has_session;
225 listener_->OnSessionStatusChanged(has_session);
226 }
227
WriteState(const std::string & state)228 void ChromeInvalidationClient::WriteState(const std::string& state) {
229 DCHECK(non_thread_safe_.CalledOnValidThread());
230 CHECK(state_writer_);
231 state_writer_->WriteState(state);
232 }
233
HandleOutboundPacket(invalidation::NetworkEndpoint * const & network_endpoint)234 void ChromeInvalidationClient::HandleOutboundPacket(
235 invalidation::NetworkEndpoint* const& network_endpoint) {
236 DCHECK(non_thread_safe_.CalledOnValidThread());
237 CHECK(cache_invalidation_packet_handler_.get());
238 cache_invalidation_packet_handler_->
239 HandleOutboundPacket(network_endpoint);
240 }
241
242 } // namespace sync_notifier
243