• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "sync/sessions/model_type_registry.h"
6 
7 #include "base/bind.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "base/observer_list.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/non_blocking_sync_common.h"
13 #include "sync/engine/non_blocking_type_processor.h"
14 #include "sync/engine/non_blocking_type_processor_core.h"
15 #include "sync/engine/non_blocking_type_processor_core_interface.h"
16 #include "sync/engine/non_blocking_type_processor_interface.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 
19 namespace syncer {
20 
21 namespace {
22 
23 class NonBlockingTypeProcessorWrapper
24     : public NonBlockingTypeProcessorInterface {
25  public:
26   NonBlockingTypeProcessorWrapper(
27       base::WeakPtr<NonBlockingTypeProcessor> processor,
28       scoped_refptr<base::SequencedTaskRunner> processor_task_runner);
29   virtual ~NonBlockingTypeProcessorWrapper();
30 
31   virtual void ReceiveCommitResponse(
32       const DataTypeState& type_state,
33       const CommitResponseDataList& response_list) OVERRIDE;
34   virtual void ReceiveUpdateResponse(
35       const DataTypeState& type_state,
36       const UpdateResponseDataList& response_list) OVERRIDE;
37 
38  private:
39   base::WeakPtr<NonBlockingTypeProcessor> processor_;
40   scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
41 };
42 
NonBlockingTypeProcessorWrapper(base::WeakPtr<NonBlockingTypeProcessor> processor,scoped_refptr<base::SequencedTaskRunner> processor_task_runner)43 NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper(
44     base::WeakPtr<NonBlockingTypeProcessor> processor,
45     scoped_refptr<base::SequencedTaskRunner> processor_task_runner)
46     : processor_(processor), processor_task_runner_(processor_task_runner) {
47 }
48 
~NonBlockingTypeProcessorWrapper()49 NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() {
50 }
51 
ReceiveCommitResponse(const DataTypeState & type_state,const CommitResponseDataList & response_list)52 void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse(
53     const DataTypeState& type_state,
54     const CommitResponseDataList& response_list) {
55   processor_task_runner_->PostTask(
56       FROM_HERE,
57       base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion,
58                  processor_,
59                  type_state,
60                  response_list));
61 }
62 
ReceiveUpdateResponse(const DataTypeState & type_state,const UpdateResponseDataList & response_list)63 void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse(
64     const DataTypeState& type_state,
65     const UpdateResponseDataList& response_list) {
66   processor_task_runner_->PostTask(
67       FROM_HERE,
68       base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived,
69                  processor_,
70                  type_state,
71                  response_list));
72 }
73 
74 class NonBlockingTypeProcessorCoreWrapper
75     : public NonBlockingTypeProcessorCoreInterface {
76  public:
77   NonBlockingTypeProcessorCoreWrapper(
78       base::WeakPtr<NonBlockingTypeProcessorCore> core,
79       scoped_refptr<base::SequencedTaskRunner> sync_thread);
80   virtual ~NonBlockingTypeProcessorCoreWrapper();
81 
82   virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
83 
84  private:
85   base::WeakPtr<NonBlockingTypeProcessorCore> core_;
86   scoped_refptr<base::SequencedTaskRunner> sync_thread_;
87 };
88 
NonBlockingTypeProcessorCoreWrapper(base::WeakPtr<NonBlockingTypeProcessorCore> core,scoped_refptr<base::SequencedTaskRunner> sync_thread)89 NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper(
90     base::WeakPtr<NonBlockingTypeProcessorCore> core,
91     scoped_refptr<base::SequencedTaskRunner> sync_thread)
92     : core_(core), sync_thread_(sync_thread) {
93 }
94 
~NonBlockingTypeProcessorCoreWrapper()95 NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() {
96 }
97 
RequestCommits(const CommitRequestDataList & list)98 void NonBlockingTypeProcessorCoreWrapper::RequestCommits(
99     const CommitRequestDataList& list) {
100   sync_thread_->PostTask(
101       FROM_HERE,
102       base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list));
103 }
104 
105 }  // namespace
106 
ModelTypeRegistry()107 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {}
108 
ModelTypeRegistry(const std::vector<scoped_refptr<ModelSafeWorker>> & workers,syncable::Directory * directory)109 ModelTypeRegistry::ModelTypeRegistry(
110     const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
111     syncable::Directory* directory)
112     : directory_(directory) {
113   for (size_t i = 0u; i < workers.size(); ++i) {
114     workers_map_.insert(
115         std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
116   }
117 }
118 
~ModelTypeRegistry()119 ModelTypeRegistry::~ModelTypeRegistry() {}
120 
SetEnabledDirectoryTypes(const ModelSafeRoutingInfo & routing_info)121 void ModelTypeRegistry::SetEnabledDirectoryTypes(
122     const ModelSafeRoutingInfo& routing_info) {
123   // Remove all existing directory processors and delete them.  The
124   // DebugInfoEmitters are not deleted here, since we want to preserve their
125   // counters.
126   for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
127        it.Good(); it.Inc()) {
128     size_t result1 = update_handler_map_.erase(it.Get());
129     size_t result2 = commit_contributor_map_.erase(it.Get());
130     DCHECK_EQ(1U, result1);
131     DCHECK_EQ(1U, result2);
132   }
133 
134   // Clear the old instances of directory update handlers and commit
135   // contributors, deleting their contents in the processs.
136   directory_update_handlers_.clear();
137   directory_commit_contributors_.clear();
138 
139   // Create new ones and add them to the appropriate containers.
140   for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
141        routing_iter != routing_info.end(); ++routing_iter) {
142     ModelType type = routing_iter->first;
143     ModelSafeGroup group = routing_iter->second;
144     std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
145         worker_it = workers_map_.find(group);
146     DCHECK(worker_it != workers_map_.end());
147     scoped_refptr<ModelSafeWorker> worker = worker_it->second;
148 
149     // DebugInfoEmitters are never deleted.  Use existing one if we have it.
150     DirectoryTypeDebugInfoEmitter* emitter = NULL;
151     DirectoryTypeDebugInfoEmitterMap::iterator it =
152         directory_type_debug_info_emitter_map_.find(type);
153     if (it != directory_type_debug_info_emitter_map_.end()) {
154       emitter = it->second;
155     } else {
156       emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
157                                                   &type_debug_info_observers_);
158       directory_type_debug_info_emitter_map_.insert(
159           std::make_pair(type, emitter));
160       directory_type_debug_info_emitters_.push_back(emitter);
161     }
162 
163     DirectoryCommitContributor* committer =
164         new DirectoryCommitContributor(directory_, type, emitter);
165     DirectoryUpdateHandler* updater =
166         new DirectoryUpdateHandler(directory_, type, worker, emitter);
167 
168     // These containers take ownership of their contents.
169     directory_commit_contributors_.push_back(committer);
170     directory_update_handlers_.push_back(updater);
171 
172     bool inserted1 =
173         update_handler_map_.insert(std::make_pair(type, updater)).second;
174     DCHECK(inserted1) << "Attempt to override existing type handler in map";
175 
176     bool inserted2 =
177         commit_contributor_map_.insert(std::make_pair(type, committer)).second;
178     DCHECK(inserted2) << "Attempt to override existing type handler in map";
179   }
180 
181   enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
182   DCHECK(Intersection(GetEnabledDirectoryTypes(),
183                       GetEnabledNonBlockingTypes()).Empty());
184 }
185 
InitializeNonBlockingType(ModelType type,const DataTypeState & data_type_state,scoped_refptr<base::SequencedTaskRunner> type_task_runner,base::WeakPtr<NonBlockingTypeProcessor> processor)186 void ModelTypeRegistry::InitializeNonBlockingType(
187     ModelType type,
188     const DataTypeState& data_type_state,
189     scoped_refptr<base::SequencedTaskRunner> type_task_runner,
190     base::WeakPtr<NonBlockingTypeProcessor> processor) {
191   DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
192 
193   // Initialize CoreProcessor -> Processor communication channel.
194   scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface(
195       new NonBlockingTypeProcessorWrapper(processor, type_task_runner));
196   scoped_ptr<NonBlockingTypeProcessorCore> core(
197       new NonBlockingTypeProcessorCore(
198           type, data_type_state, processor_interface.Pass()));
199 
200   // Initialize Processor -> CoreProcessor communication channel.
201   scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface(
202       new NonBlockingTypeProcessorCoreWrapper(
203           core->AsWeakPtr(),
204           scoped_refptr<base::SequencedTaskRunner>(
205               base::MessageLoopProxy::current())));
206   type_task_runner->PostTask(FROM_HERE,
207                              base::Bind(&NonBlockingTypeProcessor::OnConnect,
208                                         processor,
209                                         base::Passed(&core_interface)));
210 
211   DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
212   DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
213 
214   update_handler_map_.insert(std::make_pair(type, core.get()));
215   commit_contributor_map_.insert(std::make_pair(type, core.get()));
216 
217   // The container takes ownership.
218   non_blocking_type_processor_cores_.push_back(core.release());
219 
220   DCHECK(Intersection(GetEnabledDirectoryTypes(),
221                       GetEnabledNonBlockingTypes()).Empty());
222 }
223 
RemoveNonBlockingType(ModelType type)224 void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) {
225   DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
226   DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
227   DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
228 
229   size_t updaters_erased = update_handler_map_.erase(type);
230   size_t committers_erased = commit_contributor_map_.erase(type);
231 
232   DCHECK_EQ(1U, updaters_erased);
233   DCHECK_EQ(1U, committers_erased);
234 
235   // Remove from the ScopedVector, deleting the core in the process.
236   for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it =
237        non_blocking_type_processor_cores_.begin();
238        it != non_blocking_type_processor_cores_.end(); ++it) {
239     if ((*it)->GetModelType() == type) {
240       non_blocking_type_processor_cores_.erase(it);
241       break;
242     }
243   }
244 }
245 
GetEnabledTypes() const246 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
247   return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
248 }
249 
update_handler_map()250 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
251   return &update_handler_map_;
252 }
253 
commit_contributor_map()254 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
255   return &commit_contributor_map_;
256 }
257 
258 DirectoryTypeDebugInfoEmitterMap*
directory_type_debug_info_emitter_map()259 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
260   return &directory_type_debug_info_emitter_map_;
261 }
262 
RegisterDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)263 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
264     syncer::TypeDebugInfoObserver* observer) {
265   if (!type_debug_info_observers_.HasObserver(observer))
266     type_debug_info_observers_.AddObserver(observer);
267 }
268 
UnregisterDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)269 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
270     syncer::TypeDebugInfoObserver* observer) {
271   type_debug_info_observers_.RemoveObserver(observer);
272 }
273 
HasDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)274 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
275     syncer::TypeDebugInfoObserver* observer) {
276   return type_debug_info_observers_.HasObserver(observer);
277 }
278 
RequestEmitDebugInfo()279 void ModelTypeRegistry::RequestEmitDebugInfo() {
280   for (DirectoryTypeDebugInfoEmitterMap::iterator it =
281        directory_type_debug_info_emitter_map_.begin();
282        it != directory_type_debug_info_emitter_map_.end(); ++it) {
283     it->second->EmitCommitCountersUpdate();
284     it->second->EmitUpdateCountersUpdate();
285     it->second->EmitStatusCountersUpdate();
286   }
287 }
288 
GetEnabledDirectoryTypes() const289 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
290   return enabled_directory_types_;
291 }
292 
GetEnabledNonBlockingTypes() const293 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
294   ModelTypeSet enabled_off_thread_types;
295   for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it =
296            non_blocking_type_processor_cores_.begin();
297        it != non_blocking_type_processor_cores_.end(); ++it) {
298     enabled_off_thread_types.Put((*it)->GetModelType());
299   }
300   return enabled_off_thread_types;
301 }
302 
303 }  // namespace syncer
304