• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "sync/sessions/model_type_registry.h"
6 
7 #include "base/bind.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/model_type_sync_proxy.h"
13 #include "sync/engine/model_type_sync_proxy_impl.h"
14 #include "sync/engine/model_type_sync_worker.h"
15 #include "sync/engine/model_type_sync_worker_impl.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h"
19 
20 namespace syncer {
21 
22 namespace {
23 
24 class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
25  public:
26   ModelTypeSyncProxyWrapper(
27       const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
28       const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
29   virtual ~ModelTypeSyncProxyWrapper();
30 
31   virtual void OnCommitCompleted(
32       const DataTypeState& type_state,
33       const CommitResponseDataList& response_list) OVERRIDE;
34   virtual void OnUpdateReceived(
35       const DataTypeState& type_state,
36       const UpdateResponseDataList& response_list,
37       const UpdateResponseDataList& pending_updates) OVERRIDE;
38 
39  private:
40   base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
41   scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
42 };
43 
ModelTypeSyncProxyWrapper(const base::WeakPtr<ModelTypeSyncProxyImpl> & proxy,const scoped_refptr<base::SequencedTaskRunner> & processor_task_runner)44 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
45     const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
46     const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
47     : processor_(proxy), processor_task_runner_(processor_task_runner) {
48 }
49 
~ModelTypeSyncProxyWrapper()50 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
51 }
52 
OnCommitCompleted(const DataTypeState & type_state,const CommitResponseDataList & response_list)53 void ModelTypeSyncProxyWrapper::OnCommitCompleted(
54     const DataTypeState& type_state,
55     const CommitResponseDataList& response_list) {
56   processor_task_runner_->PostTask(
57       FROM_HERE,
58       base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
59                  processor_,
60                  type_state,
61                  response_list));
62 }
63 
OnUpdateReceived(const DataTypeState & type_state,const UpdateResponseDataList & response_list,const UpdateResponseDataList & pending_updates)64 void ModelTypeSyncProxyWrapper::OnUpdateReceived(
65     const DataTypeState& type_state,
66     const UpdateResponseDataList& response_list,
67     const UpdateResponseDataList& pending_updates) {
68   processor_task_runner_->PostTask(
69       FROM_HERE,
70       base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
71                  processor_,
72                  type_state,
73                  response_list,
74                  pending_updates));
75 }
76 
77 class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
78  public:
79   ModelTypeSyncWorkerWrapper(
80       const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
81       const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
82   virtual ~ModelTypeSyncWorkerWrapper();
83 
84   virtual void EnqueueForCommit(const CommitRequestDataList& list) OVERRIDE;
85 
86  private:
87   base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
88   scoped_refptr<base::SequencedTaskRunner> sync_thread_;
89 };
90 
ModelTypeSyncWorkerWrapper(const base::WeakPtr<ModelTypeSyncWorkerImpl> & worker,const scoped_refptr<base::SequencedTaskRunner> & sync_thread)91 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
92     const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
93     const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
94     : worker_(worker), sync_thread_(sync_thread) {
95 }
96 
~ModelTypeSyncWorkerWrapper()97 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
98 }
99 
EnqueueForCommit(const CommitRequestDataList & list)100 void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
101     const CommitRequestDataList& list) {
102   sync_thread_->PostTask(
103       FROM_HERE,
104       base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
105 }
106 
107 }  // namespace
108 
ModelTypeRegistry(const std::vector<scoped_refptr<ModelSafeWorker>> & workers,syncable::Directory * directory,NudgeHandler * nudge_handler)109 ModelTypeRegistry::ModelTypeRegistry(
110     const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
111     syncable::Directory* directory,
112     NudgeHandler* nudge_handler)
113     : directory_(directory),
114       nudge_handler_(nudge_handler),
115       weak_ptr_factory_(this) {
116   for (size_t i = 0u; i < workers.size(); ++i) {
117     workers_map_.insert(
118         std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
119   }
120 }
121 
~ModelTypeRegistry()122 ModelTypeRegistry::~ModelTypeRegistry() {
123 }
124 
SetEnabledDirectoryTypes(const ModelSafeRoutingInfo & routing_info)125 void ModelTypeRegistry::SetEnabledDirectoryTypes(
126     const ModelSafeRoutingInfo& routing_info) {
127   // Remove all existing directory processors and delete them.  The
128   // DebugInfoEmitters are not deleted here, since we want to preserve their
129   // counters.
130   for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
131        it.Good(); it.Inc()) {
132     size_t result1 = update_handler_map_.erase(it.Get());
133     size_t result2 = commit_contributor_map_.erase(it.Get());
134     DCHECK_EQ(1U, result1);
135     DCHECK_EQ(1U, result2);
136   }
137 
138   // Clear the old instances of directory update handlers and commit
139   // contributors, deleting their contents in the processs.
140   directory_update_handlers_.clear();
141   directory_commit_contributors_.clear();
142 
143   // Create new ones and add them to the appropriate containers.
144   for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
145        routing_iter != routing_info.end(); ++routing_iter) {
146     ModelType type = routing_iter->first;
147     ModelSafeGroup group = routing_iter->second;
148     std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
149         worker_it = workers_map_.find(group);
150     DCHECK(worker_it != workers_map_.end());
151     scoped_refptr<ModelSafeWorker> worker = worker_it->second;
152 
153     // DebugInfoEmitters are never deleted.  Use existing one if we have it.
154     DirectoryTypeDebugInfoEmitter* emitter = NULL;
155     DirectoryTypeDebugInfoEmitterMap::iterator it =
156         directory_type_debug_info_emitter_map_.find(type);
157     if (it != directory_type_debug_info_emitter_map_.end()) {
158       emitter = it->second;
159     } else {
160       emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
161                                                   &type_debug_info_observers_);
162       directory_type_debug_info_emitter_map_.insert(
163           std::make_pair(type, emitter));
164       directory_type_debug_info_emitters_.push_back(emitter);
165     }
166 
167     DirectoryCommitContributor* committer =
168         new DirectoryCommitContributor(directory_, type, emitter);
169     DirectoryUpdateHandler* updater =
170         new DirectoryUpdateHandler(directory_, type, worker, emitter);
171 
172     // These containers take ownership of their contents.
173     directory_commit_contributors_.push_back(committer);
174     directory_update_handlers_.push_back(updater);
175 
176     bool inserted1 =
177         update_handler_map_.insert(std::make_pair(type, updater)).second;
178     DCHECK(inserted1) << "Attempt to override existing type handler in map";
179 
180     bool inserted2 =
181         commit_contributor_map_.insert(std::make_pair(type, committer)).second;
182     DCHECK(inserted2) << "Attempt to override existing type handler in map";
183   }
184 
185   enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
186   DCHECK(Intersection(GetEnabledDirectoryTypes(),
187                       GetEnabledNonBlockingTypes()).Empty());
188 }
189 
ConnectSyncTypeToWorker(ModelType type,const DataTypeState & data_type_state,const UpdateResponseDataList & saved_pending_updates,const scoped_refptr<base::SequencedTaskRunner> & type_task_runner,const base::WeakPtr<ModelTypeSyncProxyImpl> & proxy_impl)190 void ModelTypeRegistry::ConnectSyncTypeToWorker(
191     ModelType type,
192     const DataTypeState& data_type_state,
193     const UpdateResponseDataList& saved_pending_updates,
194     const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
195     const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
196   DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
197 
198   // Initialize Worker -> Proxy communication channel.
199   scoped_ptr<ModelTypeSyncProxy> proxy(
200       new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
201   scoped_ptr<Cryptographer> cryptographer_copy;
202   if (encrypted_types_.Has(type))
203     cryptographer_copy.reset(new Cryptographer(*cryptographer_));
204 
205   scoped_ptr<ModelTypeSyncWorkerImpl> worker(
206       new ModelTypeSyncWorkerImpl(type,
207                                   data_type_state,
208                                   saved_pending_updates,
209                                   cryptographer_copy.Pass(),
210                                   nudge_handler_,
211                                   proxy.Pass()));
212 
213   // Initialize Proxy -> Worker communication channel.
214   scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
215       new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
216                                      scoped_refptr<base::SequencedTaskRunner>(
217                                          base::ThreadTaskRunnerHandle::Get())));
218   type_task_runner->PostTask(FROM_HERE,
219                              base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
220                                         proxy_impl,
221                                         base::Passed(&wrapped_worker)));
222 
223   DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
224   DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
225 
226   update_handler_map_.insert(std::make_pair(type, worker.get()));
227   commit_contributor_map_.insert(std::make_pair(type, worker.get()));
228 
229   // The container takes ownership.
230   model_type_sync_workers_.push_back(worker.release());
231 
232   DCHECK(Intersection(GetEnabledDirectoryTypes(),
233                       GetEnabledNonBlockingTypes()).Empty());
234 }
235 
DisconnectSyncWorker(ModelType type)236 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
237   DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
238   DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
239   DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
240 
241   size_t updaters_erased = update_handler_map_.erase(type);
242   size_t committers_erased = commit_contributor_map_.erase(type);
243 
244   DCHECK_EQ(1U, updaters_erased);
245   DCHECK_EQ(1U, committers_erased);
246 
247   // Remove from the ScopedVector, deleting the worker in the process.
248   for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
249            model_type_sync_workers_.begin();
250        it != model_type_sync_workers_.end();
251        ++it) {
252     if ((*it)->GetModelType() == type) {
253       model_type_sync_workers_.erase(it);
254       break;
255     }
256   }
257 }
258 
GetEnabledTypes() const259 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
260   return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
261 }
262 
update_handler_map()263 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
264   return &update_handler_map_;
265 }
266 
commit_contributor_map()267 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
268   return &commit_contributor_map_;
269 }
270 
271 DirectoryTypeDebugInfoEmitterMap*
directory_type_debug_info_emitter_map()272 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
273   return &directory_type_debug_info_emitter_map_;
274 }
275 
RegisterDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)276 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
277     syncer::TypeDebugInfoObserver* observer) {
278   if (!type_debug_info_observers_.HasObserver(observer))
279     type_debug_info_observers_.AddObserver(observer);
280 }
281 
UnregisterDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)282 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
283     syncer::TypeDebugInfoObserver* observer) {
284   type_debug_info_observers_.RemoveObserver(observer);
285 }
286 
HasDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)287 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
288     syncer::TypeDebugInfoObserver* observer) {
289   return type_debug_info_observers_.HasObserver(observer);
290 }
291 
RequestEmitDebugInfo()292 void ModelTypeRegistry::RequestEmitDebugInfo() {
293   for (DirectoryTypeDebugInfoEmitterMap::iterator it =
294        directory_type_debug_info_emitter_map_.begin();
295        it != directory_type_debug_info_emitter_map_.end(); ++it) {
296     it->second->EmitCommitCountersUpdate();
297     it->second->EmitUpdateCountersUpdate();
298     it->second->EmitStatusCountersUpdate();
299   }
300 }
301 
AsWeakPtr()302 base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
303   return weak_ptr_factory_.GetWeakPtr();
304 }
305 
OnPassphraseRequired(PassphraseRequiredReason reason,const sync_pb::EncryptedData & pending_keys)306 void ModelTypeRegistry::OnPassphraseRequired(
307     PassphraseRequiredReason reason,
308     const sync_pb::EncryptedData& pending_keys) {
309 }
310 
OnPassphraseAccepted()311 void ModelTypeRegistry::OnPassphraseAccepted() {
312 }
313 
OnBootstrapTokenUpdated(const std::string & bootstrap_token,BootstrapTokenType type)314 void ModelTypeRegistry::OnBootstrapTokenUpdated(
315     const std::string& bootstrap_token,
316     BootstrapTokenType type) {
317 }
318 
OnEncryptedTypesChanged(ModelTypeSet encrypted_types,bool encrypt_everything)319 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
320                                                 bool encrypt_everything) {
321   encrypted_types_ = encrypted_types;
322   OnEncryptionStateChanged();
323 }
324 
OnEncryptionComplete()325 void ModelTypeRegistry::OnEncryptionComplete() {
326 }
327 
OnCryptographerStateChanged(Cryptographer * cryptographer)328 void ModelTypeRegistry::OnCryptographerStateChanged(
329     Cryptographer* cryptographer) {
330   cryptographer_.reset(new Cryptographer(*cryptographer));
331   OnEncryptionStateChanged();
332 }
333 
OnPassphraseTypeChanged(PassphraseType type,base::Time passphrase_time)334 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type,
335                                                 base::Time passphrase_time) {
336 }
337 
GetEnabledDirectoryTypes() const338 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
339   return enabled_directory_types_;
340 }
341 
OnEncryptionStateChanged()342 void ModelTypeRegistry::OnEncryptionStateChanged() {
343   for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
344            model_type_sync_workers_.begin();
345        it != model_type_sync_workers_.end();
346        ++it) {
347     if (encrypted_types_.Has((*it)->GetModelType())) {
348       (*it)->UpdateCryptographer(
349           make_scoped_ptr(new Cryptographer(*cryptographer_)));
350     }
351   }
352 }
353 
GetEnabledNonBlockingTypes() const354 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
355   ModelTypeSet enabled_off_thread_types;
356   for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
357            model_type_sync_workers_.begin();
358        it != model_type_sync_workers_.end();
359        ++it) {
360     enabled_off_thread_types.Put((*it)->GetModelType());
361   }
362   return enabled_off_thread_types;
363 }
364 
365 }  // namespace syncer
366