1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/sessions/model_type_registry.h"
6
7 #include "base/bind.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/model_type_sync_proxy.h"
13 #include "sync/engine/model_type_sync_proxy_impl.h"
14 #include "sync/engine/model_type_sync_worker.h"
15 #include "sync/engine/model_type_sync_worker_impl.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h"
19
20 namespace syncer {
21
22 namespace {
23
24 class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
25 public:
26 ModelTypeSyncProxyWrapper(
27 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
28 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
29 virtual ~ModelTypeSyncProxyWrapper();
30
31 virtual void OnCommitCompleted(
32 const DataTypeState& type_state,
33 const CommitResponseDataList& response_list) OVERRIDE;
34 virtual void OnUpdateReceived(
35 const DataTypeState& type_state,
36 const UpdateResponseDataList& response_list,
37 const UpdateResponseDataList& pending_updates) OVERRIDE;
38
39 private:
40 base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
41 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
42 };
43
ModelTypeSyncProxyWrapper(const base::WeakPtr<ModelTypeSyncProxyImpl> & proxy,const scoped_refptr<base::SequencedTaskRunner> & processor_task_runner)44 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
45 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
46 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
47 : processor_(proxy), processor_task_runner_(processor_task_runner) {
48 }
49
~ModelTypeSyncProxyWrapper()50 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
51 }
52
OnCommitCompleted(const DataTypeState & type_state,const CommitResponseDataList & response_list)53 void ModelTypeSyncProxyWrapper::OnCommitCompleted(
54 const DataTypeState& type_state,
55 const CommitResponseDataList& response_list) {
56 processor_task_runner_->PostTask(
57 FROM_HERE,
58 base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
59 processor_,
60 type_state,
61 response_list));
62 }
63
OnUpdateReceived(const DataTypeState & type_state,const UpdateResponseDataList & response_list,const UpdateResponseDataList & pending_updates)64 void ModelTypeSyncProxyWrapper::OnUpdateReceived(
65 const DataTypeState& type_state,
66 const UpdateResponseDataList& response_list,
67 const UpdateResponseDataList& pending_updates) {
68 processor_task_runner_->PostTask(
69 FROM_HERE,
70 base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
71 processor_,
72 type_state,
73 response_list,
74 pending_updates));
75 }
76
77 class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
78 public:
79 ModelTypeSyncWorkerWrapper(
80 const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
81 const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
82 virtual ~ModelTypeSyncWorkerWrapper();
83
84 virtual void EnqueueForCommit(const CommitRequestDataList& list) OVERRIDE;
85
86 private:
87 base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
88 scoped_refptr<base::SequencedTaskRunner> sync_thread_;
89 };
90
ModelTypeSyncWorkerWrapper(const base::WeakPtr<ModelTypeSyncWorkerImpl> & worker,const scoped_refptr<base::SequencedTaskRunner> & sync_thread)91 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
92 const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
93 const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
94 : worker_(worker), sync_thread_(sync_thread) {
95 }
96
~ModelTypeSyncWorkerWrapper()97 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
98 }
99
EnqueueForCommit(const CommitRequestDataList & list)100 void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
101 const CommitRequestDataList& list) {
102 sync_thread_->PostTask(
103 FROM_HERE,
104 base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
105 }
106
107 } // namespace
108
ModelTypeRegistry(const std::vector<scoped_refptr<ModelSafeWorker>> & workers,syncable::Directory * directory,NudgeHandler * nudge_handler)109 ModelTypeRegistry::ModelTypeRegistry(
110 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
111 syncable::Directory* directory,
112 NudgeHandler* nudge_handler)
113 : directory_(directory),
114 nudge_handler_(nudge_handler),
115 weak_ptr_factory_(this) {
116 for (size_t i = 0u; i < workers.size(); ++i) {
117 workers_map_.insert(
118 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
119 }
120 }
121
~ModelTypeRegistry()122 ModelTypeRegistry::~ModelTypeRegistry() {
123 }
124
SetEnabledDirectoryTypes(const ModelSafeRoutingInfo & routing_info)125 void ModelTypeRegistry::SetEnabledDirectoryTypes(
126 const ModelSafeRoutingInfo& routing_info) {
127 // Remove all existing directory processors and delete them. The
128 // DebugInfoEmitters are not deleted here, since we want to preserve their
129 // counters.
130 for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
131 it.Good(); it.Inc()) {
132 size_t result1 = update_handler_map_.erase(it.Get());
133 size_t result2 = commit_contributor_map_.erase(it.Get());
134 DCHECK_EQ(1U, result1);
135 DCHECK_EQ(1U, result2);
136 }
137
138 // Clear the old instances of directory update handlers and commit
139 // contributors, deleting their contents in the processs.
140 directory_update_handlers_.clear();
141 directory_commit_contributors_.clear();
142
143 // Create new ones and add them to the appropriate containers.
144 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
145 routing_iter != routing_info.end(); ++routing_iter) {
146 ModelType type = routing_iter->first;
147 ModelSafeGroup group = routing_iter->second;
148 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
149 worker_it = workers_map_.find(group);
150 DCHECK(worker_it != workers_map_.end());
151 scoped_refptr<ModelSafeWorker> worker = worker_it->second;
152
153 // DebugInfoEmitters are never deleted. Use existing one if we have it.
154 DirectoryTypeDebugInfoEmitter* emitter = NULL;
155 DirectoryTypeDebugInfoEmitterMap::iterator it =
156 directory_type_debug_info_emitter_map_.find(type);
157 if (it != directory_type_debug_info_emitter_map_.end()) {
158 emitter = it->second;
159 } else {
160 emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
161 &type_debug_info_observers_);
162 directory_type_debug_info_emitter_map_.insert(
163 std::make_pair(type, emitter));
164 directory_type_debug_info_emitters_.push_back(emitter);
165 }
166
167 DirectoryCommitContributor* committer =
168 new DirectoryCommitContributor(directory_, type, emitter);
169 DirectoryUpdateHandler* updater =
170 new DirectoryUpdateHandler(directory_, type, worker, emitter);
171
172 // These containers take ownership of their contents.
173 directory_commit_contributors_.push_back(committer);
174 directory_update_handlers_.push_back(updater);
175
176 bool inserted1 =
177 update_handler_map_.insert(std::make_pair(type, updater)).second;
178 DCHECK(inserted1) << "Attempt to override existing type handler in map";
179
180 bool inserted2 =
181 commit_contributor_map_.insert(std::make_pair(type, committer)).second;
182 DCHECK(inserted2) << "Attempt to override existing type handler in map";
183 }
184
185 enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
186 DCHECK(Intersection(GetEnabledDirectoryTypes(),
187 GetEnabledNonBlockingTypes()).Empty());
188 }
189
ConnectSyncTypeToWorker(ModelType type,const DataTypeState & data_type_state,const UpdateResponseDataList & saved_pending_updates,const scoped_refptr<base::SequencedTaskRunner> & type_task_runner,const base::WeakPtr<ModelTypeSyncProxyImpl> & proxy_impl)190 void ModelTypeRegistry::ConnectSyncTypeToWorker(
191 ModelType type,
192 const DataTypeState& data_type_state,
193 const UpdateResponseDataList& saved_pending_updates,
194 const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
195 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
196 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
197
198 // Initialize Worker -> Proxy communication channel.
199 scoped_ptr<ModelTypeSyncProxy> proxy(
200 new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
201 scoped_ptr<Cryptographer> cryptographer_copy;
202 if (encrypted_types_.Has(type))
203 cryptographer_copy.reset(new Cryptographer(*cryptographer_));
204
205 scoped_ptr<ModelTypeSyncWorkerImpl> worker(
206 new ModelTypeSyncWorkerImpl(type,
207 data_type_state,
208 saved_pending_updates,
209 cryptographer_copy.Pass(),
210 nudge_handler_,
211 proxy.Pass()));
212
213 // Initialize Proxy -> Worker communication channel.
214 scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
215 new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
216 scoped_refptr<base::SequencedTaskRunner>(
217 base::ThreadTaskRunnerHandle::Get())));
218 type_task_runner->PostTask(FROM_HERE,
219 base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
220 proxy_impl,
221 base::Passed(&wrapped_worker)));
222
223 DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
224 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
225
226 update_handler_map_.insert(std::make_pair(type, worker.get()));
227 commit_contributor_map_.insert(std::make_pair(type, worker.get()));
228
229 // The container takes ownership.
230 model_type_sync_workers_.push_back(worker.release());
231
232 DCHECK(Intersection(GetEnabledDirectoryTypes(),
233 GetEnabledNonBlockingTypes()).Empty());
234 }
235
DisconnectSyncWorker(ModelType type)236 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
237 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
238 DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
239 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
240
241 size_t updaters_erased = update_handler_map_.erase(type);
242 size_t committers_erased = commit_contributor_map_.erase(type);
243
244 DCHECK_EQ(1U, updaters_erased);
245 DCHECK_EQ(1U, committers_erased);
246
247 // Remove from the ScopedVector, deleting the worker in the process.
248 for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
249 model_type_sync_workers_.begin();
250 it != model_type_sync_workers_.end();
251 ++it) {
252 if ((*it)->GetModelType() == type) {
253 model_type_sync_workers_.erase(it);
254 break;
255 }
256 }
257 }
258
GetEnabledTypes() const259 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
260 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
261 }
262
update_handler_map()263 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
264 return &update_handler_map_;
265 }
266
commit_contributor_map()267 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
268 return &commit_contributor_map_;
269 }
270
271 DirectoryTypeDebugInfoEmitterMap*
directory_type_debug_info_emitter_map()272 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
273 return &directory_type_debug_info_emitter_map_;
274 }
275
RegisterDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)276 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
277 syncer::TypeDebugInfoObserver* observer) {
278 if (!type_debug_info_observers_.HasObserver(observer))
279 type_debug_info_observers_.AddObserver(observer);
280 }
281
UnregisterDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)282 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
283 syncer::TypeDebugInfoObserver* observer) {
284 type_debug_info_observers_.RemoveObserver(observer);
285 }
286
HasDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)287 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
288 syncer::TypeDebugInfoObserver* observer) {
289 return type_debug_info_observers_.HasObserver(observer);
290 }
291
RequestEmitDebugInfo()292 void ModelTypeRegistry::RequestEmitDebugInfo() {
293 for (DirectoryTypeDebugInfoEmitterMap::iterator it =
294 directory_type_debug_info_emitter_map_.begin();
295 it != directory_type_debug_info_emitter_map_.end(); ++it) {
296 it->second->EmitCommitCountersUpdate();
297 it->second->EmitUpdateCountersUpdate();
298 it->second->EmitStatusCountersUpdate();
299 }
300 }
301
AsWeakPtr()302 base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
303 return weak_ptr_factory_.GetWeakPtr();
304 }
305
OnPassphraseRequired(PassphraseRequiredReason reason,const sync_pb::EncryptedData & pending_keys)306 void ModelTypeRegistry::OnPassphraseRequired(
307 PassphraseRequiredReason reason,
308 const sync_pb::EncryptedData& pending_keys) {
309 }
310
OnPassphraseAccepted()311 void ModelTypeRegistry::OnPassphraseAccepted() {
312 }
313
OnBootstrapTokenUpdated(const std::string & bootstrap_token,BootstrapTokenType type)314 void ModelTypeRegistry::OnBootstrapTokenUpdated(
315 const std::string& bootstrap_token,
316 BootstrapTokenType type) {
317 }
318
OnEncryptedTypesChanged(ModelTypeSet encrypted_types,bool encrypt_everything)319 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
320 bool encrypt_everything) {
321 encrypted_types_ = encrypted_types;
322 OnEncryptionStateChanged();
323 }
324
OnEncryptionComplete()325 void ModelTypeRegistry::OnEncryptionComplete() {
326 }
327
OnCryptographerStateChanged(Cryptographer * cryptographer)328 void ModelTypeRegistry::OnCryptographerStateChanged(
329 Cryptographer* cryptographer) {
330 cryptographer_.reset(new Cryptographer(*cryptographer));
331 OnEncryptionStateChanged();
332 }
333
OnPassphraseTypeChanged(PassphraseType type,base::Time passphrase_time)334 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type,
335 base::Time passphrase_time) {
336 }
337
GetEnabledDirectoryTypes() const338 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
339 return enabled_directory_types_;
340 }
341
OnEncryptionStateChanged()342 void ModelTypeRegistry::OnEncryptionStateChanged() {
343 for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
344 model_type_sync_workers_.begin();
345 it != model_type_sync_workers_.end();
346 ++it) {
347 if (encrypted_types_.Has((*it)->GetModelType())) {
348 (*it)->UpdateCryptographer(
349 make_scoped_ptr(new Cryptographer(*cryptographer_)));
350 }
351 }
352 }
353
GetEnabledNonBlockingTypes() const354 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
355 ModelTypeSet enabled_off_thread_types;
356 for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
357 model_type_sync_workers_.begin();
358 it != model_type_sync_workers_.end();
359 ++it) {
360 enabled_off_thread_types.Put((*it)->GetModelType());
361 }
362 return enabled_off_thread_types;
363 }
364
365 } // namespace syncer
366