1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/sessions/model_type_registry.h"
6
7 #include "base/bind.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "base/observer_list.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/non_blocking_sync_common.h"
13 #include "sync/engine/non_blocking_type_processor.h"
14 #include "sync/engine/non_blocking_type_processor_core.h"
15 #include "sync/engine/non_blocking_type_processor_core_interface.h"
16 #include "sync/engine/non_blocking_type_processor_interface.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18
19 namespace syncer {
20
21 namespace {
22
23 class NonBlockingTypeProcessorWrapper
24 : public NonBlockingTypeProcessorInterface {
25 public:
26 NonBlockingTypeProcessorWrapper(
27 base::WeakPtr<NonBlockingTypeProcessor> processor,
28 scoped_refptr<base::SequencedTaskRunner> processor_task_runner);
29 virtual ~NonBlockingTypeProcessorWrapper();
30
31 virtual void ReceiveCommitResponse(
32 const DataTypeState& type_state,
33 const CommitResponseDataList& response_list) OVERRIDE;
34 virtual void ReceiveUpdateResponse(
35 const DataTypeState& type_state,
36 const UpdateResponseDataList& response_list) OVERRIDE;
37
38 private:
39 base::WeakPtr<NonBlockingTypeProcessor> processor_;
40 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
41 };
42
NonBlockingTypeProcessorWrapper(base::WeakPtr<NonBlockingTypeProcessor> processor,scoped_refptr<base::SequencedTaskRunner> processor_task_runner)43 NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper(
44 base::WeakPtr<NonBlockingTypeProcessor> processor,
45 scoped_refptr<base::SequencedTaskRunner> processor_task_runner)
46 : processor_(processor), processor_task_runner_(processor_task_runner) {
47 }
48
~NonBlockingTypeProcessorWrapper()49 NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() {
50 }
51
ReceiveCommitResponse(const DataTypeState & type_state,const CommitResponseDataList & response_list)52 void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse(
53 const DataTypeState& type_state,
54 const CommitResponseDataList& response_list) {
55 processor_task_runner_->PostTask(
56 FROM_HERE,
57 base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion,
58 processor_,
59 type_state,
60 response_list));
61 }
62
ReceiveUpdateResponse(const DataTypeState & type_state,const UpdateResponseDataList & response_list)63 void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse(
64 const DataTypeState& type_state,
65 const UpdateResponseDataList& response_list) {
66 processor_task_runner_->PostTask(
67 FROM_HERE,
68 base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived,
69 processor_,
70 type_state,
71 response_list));
72 }
73
74 class NonBlockingTypeProcessorCoreWrapper
75 : public NonBlockingTypeProcessorCoreInterface {
76 public:
77 NonBlockingTypeProcessorCoreWrapper(
78 base::WeakPtr<NonBlockingTypeProcessorCore> core,
79 scoped_refptr<base::SequencedTaskRunner> sync_thread);
80 virtual ~NonBlockingTypeProcessorCoreWrapper();
81
82 virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
83
84 private:
85 base::WeakPtr<NonBlockingTypeProcessorCore> core_;
86 scoped_refptr<base::SequencedTaskRunner> sync_thread_;
87 };
88
NonBlockingTypeProcessorCoreWrapper(base::WeakPtr<NonBlockingTypeProcessorCore> core,scoped_refptr<base::SequencedTaskRunner> sync_thread)89 NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper(
90 base::WeakPtr<NonBlockingTypeProcessorCore> core,
91 scoped_refptr<base::SequencedTaskRunner> sync_thread)
92 : core_(core), sync_thread_(sync_thread) {
93 }
94
~NonBlockingTypeProcessorCoreWrapper()95 NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() {
96 }
97
RequestCommits(const CommitRequestDataList & list)98 void NonBlockingTypeProcessorCoreWrapper::RequestCommits(
99 const CommitRequestDataList& list) {
100 sync_thread_->PostTask(
101 FROM_HERE,
102 base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list));
103 }
104
105 } // namespace
106
ModelTypeRegistry()107 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {}
108
ModelTypeRegistry(const std::vector<scoped_refptr<ModelSafeWorker>> & workers,syncable::Directory * directory)109 ModelTypeRegistry::ModelTypeRegistry(
110 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
111 syncable::Directory* directory)
112 : directory_(directory) {
113 for (size_t i = 0u; i < workers.size(); ++i) {
114 workers_map_.insert(
115 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
116 }
117 }
118
~ModelTypeRegistry()119 ModelTypeRegistry::~ModelTypeRegistry() {}
120
SetEnabledDirectoryTypes(const ModelSafeRoutingInfo & routing_info)121 void ModelTypeRegistry::SetEnabledDirectoryTypes(
122 const ModelSafeRoutingInfo& routing_info) {
123 // Remove all existing directory processors and delete them. The
124 // DebugInfoEmitters are not deleted here, since we want to preserve their
125 // counters.
126 for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
127 it.Good(); it.Inc()) {
128 size_t result1 = update_handler_map_.erase(it.Get());
129 size_t result2 = commit_contributor_map_.erase(it.Get());
130 DCHECK_EQ(1U, result1);
131 DCHECK_EQ(1U, result2);
132 }
133
134 // Clear the old instances of directory update handlers and commit
135 // contributors, deleting their contents in the processs.
136 directory_update_handlers_.clear();
137 directory_commit_contributors_.clear();
138
139 // Create new ones and add them to the appropriate containers.
140 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
141 routing_iter != routing_info.end(); ++routing_iter) {
142 ModelType type = routing_iter->first;
143 ModelSafeGroup group = routing_iter->second;
144 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
145 worker_it = workers_map_.find(group);
146 DCHECK(worker_it != workers_map_.end());
147 scoped_refptr<ModelSafeWorker> worker = worker_it->second;
148
149 // DebugInfoEmitters are never deleted. Use existing one if we have it.
150 DirectoryTypeDebugInfoEmitter* emitter = NULL;
151 DirectoryTypeDebugInfoEmitterMap::iterator it =
152 directory_type_debug_info_emitter_map_.find(type);
153 if (it != directory_type_debug_info_emitter_map_.end()) {
154 emitter = it->second;
155 } else {
156 emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
157 &type_debug_info_observers_);
158 directory_type_debug_info_emitter_map_.insert(
159 std::make_pair(type, emitter));
160 directory_type_debug_info_emitters_.push_back(emitter);
161 }
162
163 DirectoryCommitContributor* committer =
164 new DirectoryCommitContributor(directory_, type, emitter);
165 DirectoryUpdateHandler* updater =
166 new DirectoryUpdateHandler(directory_, type, worker, emitter);
167
168 // These containers take ownership of their contents.
169 directory_commit_contributors_.push_back(committer);
170 directory_update_handlers_.push_back(updater);
171
172 bool inserted1 =
173 update_handler_map_.insert(std::make_pair(type, updater)).second;
174 DCHECK(inserted1) << "Attempt to override existing type handler in map";
175
176 bool inserted2 =
177 commit_contributor_map_.insert(std::make_pair(type, committer)).second;
178 DCHECK(inserted2) << "Attempt to override existing type handler in map";
179 }
180
181 enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
182 DCHECK(Intersection(GetEnabledDirectoryTypes(),
183 GetEnabledNonBlockingTypes()).Empty());
184 }
185
InitializeNonBlockingType(ModelType type,const DataTypeState & data_type_state,scoped_refptr<base::SequencedTaskRunner> type_task_runner,base::WeakPtr<NonBlockingTypeProcessor> processor)186 void ModelTypeRegistry::InitializeNonBlockingType(
187 ModelType type,
188 const DataTypeState& data_type_state,
189 scoped_refptr<base::SequencedTaskRunner> type_task_runner,
190 base::WeakPtr<NonBlockingTypeProcessor> processor) {
191 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
192
193 // Initialize CoreProcessor -> Processor communication channel.
194 scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface(
195 new NonBlockingTypeProcessorWrapper(processor, type_task_runner));
196 scoped_ptr<NonBlockingTypeProcessorCore> core(
197 new NonBlockingTypeProcessorCore(
198 type, data_type_state, processor_interface.Pass()));
199
200 // Initialize Processor -> CoreProcessor communication channel.
201 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface(
202 new NonBlockingTypeProcessorCoreWrapper(
203 core->AsWeakPtr(),
204 scoped_refptr<base::SequencedTaskRunner>(
205 base::MessageLoopProxy::current())));
206 type_task_runner->PostTask(FROM_HERE,
207 base::Bind(&NonBlockingTypeProcessor::OnConnect,
208 processor,
209 base::Passed(&core_interface)));
210
211 DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
212 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
213
214 update_handler_map_.insert(std::make_pair(type, core.get()));
215 commit_contributor_map_.insert(std::make_pair(type, core.get()));
216
217 // The container takes ownership.
218 non_blocking_type_processor_cores_.push_back(core.release());
219
220 DCHECK(Intersection(GetEnabledDirectoryTypes(),
221 GetEnabledNonBlockingTypes()).Empty());
222 }
223
RemoveNonBlockingType(ModelType type)224 void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) {
225 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
226 DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
227 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
228
229 size_t updaters_erased = update_handler_map_.erase(type);
230 size_t committers_erased = commit_contributor_map_.erase(type);
231
232 DCHECK_EQ(1U, updaters_erased);
233 DCHECK_EQ(1U, committers_erased);
234
235 // Remove from the ScopedVector, deleting the core in the process.
236 for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it =
237 non_blocking_type_processor_cores_.begin();
238 it != non_blocking_type_processor_cores_.end(); ++it) {
239 if ((*it)->GetModelType() == type) {
240 non_blocking_type_processor_cores_.erase(it);
241 break;
242 }
243 }
244 }
245
GetEnabledTypes() const246 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
247 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
248 }
249
update_handler_map()250 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
251 return &update_handler_map_;
252 }
253
commit_contributor_map()254 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
255 return &commit_contributor_map_;
256 }
257
258 DirectoryTypeDebugInfoEmitterMap*
directory_type_debug_info_emitter_map()259 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
260 return &directory_type_debug_info_emitter_map_;
261 }
262
RegisterDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)263 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
264 syncer::TypeDebugInfoObserver* observer) {
265 if (!type_debug_info_observers_.HasObserver(observer))
266 type_debug_info_observers_.AddObserver(observer);
267 }
268
UnregisterDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)269 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
270 syncer::TypeDebugInfoObserver* observer) {
271 type_debug_info_observers_.RemoveObserver(observer);
272 }
273
HasDirectoryTypeDebugInfoObserver(syncer::TypeDebugInfoObserver * observer)274 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
275 syncer::TypeDebugInfoObserver* observer) {
276 return type_debug_info_observers_.HasObserver(observer);
277 }
278
RequestEmitDebugInfo()279 void ModelTypeRegistry::RequestEmitDebugInfo() {
280 for (DirectoryTypeDebugInfoEmitterMap::iterator it =
281 directory_type_debug_info_emitter_map_.begin();
282 it != directory_type_debug_info_emitter_map_.end(); ++it) {
283 it->second->EmitCommitCountersUpdate();
284 it->second->EmitUpdateCountersUpdate();
285 it->second->EmitStatusCountersUpdate();
286 }
287 }
288
GetEnabledDirectoryTypes() const289 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
290 return enabled_directory_types_;
291 }
292
GetEnabledNonBlockingTypes() const293 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
294 ModelTypeSet enabled_off_thread_types;
295 for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it =
296 non_blocking_type_processor_cores_.begin();
297 it != non_blocking_type_processor_cores_.end(); ++it) {
298 enabled_off_thread_types.Put((*it)->GetModelType());
299 }
300 return enabled_off_thread_types;
301 }
302
303 } // namespace syncer
304