1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/engine/non_blocking_type_processor_core.h"
6
7 #include "base/bind.h"
8 #include "base/format_macros.h"
9 #include "base/logging.h"
10 #include "base/strings/stringprintf.h"
11 #include "sync/engine/commit_contribution.h"
12 #include "sync/engine/non_blocking_type_commit_contribution.h"
13 #include "sync/engine/non_blocking_type_processor_interface.h"
14 #include "sync/engine/sync_thread_sync_entity.h"
15 #include "sync/syncable/syncable_util.h"
16 #include "sync/util/time.h"
17
18 namespace syncer {
19
NonBlockingTypeProcessorCore(ModelType type,const DataTypeState & initial_state,scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)20 NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore(
21 ModelType type,
22 const DataTypeState& initial_state,
23 scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)
24 : type_(type),
25 data_type_state_(initial_state),
26 processor_interface_(processor_interface.Pass()),
27 entities_deleter_(&entities_),
28 weak_ptr_factory_(this) {
29 }
30
~NonBlockingTypeProcessorCore()31 NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() {
32 }
33
GetModelType() const34 ModelType NonBlockingTypeProcessorCore::GetModelType() const {
35 DCHECK(CalledOnValidThread());
36 return type_;
37 }
38
39 // UpdateHandler implementation.
GetDownloadProgress(sync_pb::DataTypeProgressMarker * progress_marker) const40 void NonBlockingTypeProcessorCore::GetDownloadProgress(
41 sync_pb::DataTypeProgressMarker* progress_marker) const {
42 DCHECK(CalledOnValidThread());
43 progress_marker->CopyFrom(data_type_state_.progress_marker);
44 }
45
GetDataTypeContext(sync_pb::DataTypeContext * context) const46 void NonBlockingTypeProcessorCore::GetDataTypeContext(
47 sync_pb::DataTypeContext* context) const {
48 DCHECK(CalledOnValidThread());
49 context->CopyFrom(data_type_state_.type_context);
50 }
51
ProcessGetUpdatesResponse(const sync_pb::DataTypeProgressMarker & progress_marker,const sync_pb::DataTypeContext & mutated_context,const SyncEntityList & applicable_updates,sessions::StatusController * status)52 SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
53 const sync_pb::DataTypeProgressMarker& progress_marker,
54 const sync_pb::DataTypeContext& mutated_context,
55 const SyncEntityList& applicable_updates,
56 sessions::StatusController* status) {
57 DCHECK(CalledOnValidThread());
58
59 // TODO(rlarocque): Handle data type context conflicts.
60 data_type_state_.type_context = mutated_context;
61 data_type_state_.progress_marker = progress_marker;
62
63 UpdateResponseDataList response_datas;
64
65 for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
66 update_it != applicable_updates.end();
67 ++update_it) {
68 const sync_pb::SyncEntity* update_entity = *update_it;
69 if (!update_entity->server_defined_unique_tag().empty()) {
70 // We can't commit an item unless we know its parent ID. This is where
71 // we learn that ID and remember it forever.
72 DCHECK_EQ(ModelTypeToRootTag(type_),
73 update_entity->server_defined_unique_tag());
74 if (!data_type_state_.type_root_id.empty()) {
75 DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string());
76 }
77 data_type_state_.type_root_id = update_entity->id_string();
78 } else {
79 // Normal updates are handled here.
80 const std::string& client_tag_hash =
81 update_entity->client_defined_unique_tag();
82 DCHECK(!client_tag_hash.empty());
83 EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
84 if (map_it == entities_.end()) {
85 SyncThreadSyncEntity* entity =
86 SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(),
87 client_tag_hash,
88 update_entity->version());
89 entities_.insert(std::make_pair(client_tag_hash, entity));
90 } else {
91 SyncThreadSyncEntity* entity = map_it->second;
92 entity->ReceiveUpdate(update_entity->version());
93 }
94
95 // Prepare the message for the model thread.
96 UpdateResponseData response_data;
97 response_data.id = update_entity->id_string();
98 response_data.client_tag_hash = client_tag_hash;
99 response_data.response_version = update_entity->version();
100 response_data.ctime = ProtoTimeToTime(update_entity->ctime());
101 response_data.mtime = ProtoTimeToTime(update_entity->mtime());
102 response_data.non_unique_name = update_entity->name();
103 response_data.deleted = update_entity->deleted();
104 response_data.specifics = update_entity->specifics();
105
106 response_datas.push_back(response_data);
107 }
108 }
109
110 // Forward these updates to the model thread so it can do the rest.
111 processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas);
112
113 return SYNCER_OK;
114 }
115
ApplyUpdates(sessions::StatusController * status)116 void NonBlockingTypeProcessorCore::ApplyUpdates(
117 sessions::StatusController* status) {
118 DCHECK(CalledOnValidThread());
119 // This function is called only when we've finished a download cycle, ie. we
120 // got a response with changes_remaining == 0. If this is our first download
121 // cycle, we should update our state so the NonBlockingTypeProcessor knows
122 // that it's safe to commit items now.
123 if (!data_type_state_.initial_sync_done) {
124 data_type_state_.initial_sync_done = true;
125
126 UpdateResponseDataList empty_update_list;
127 processor_interface_->ReceiveUpdateResponse(data_type_state_,
128 empty_update_list);
129 }
130 }
131
PassiveApplyUpdates(sessions::StatusController * status)132 void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
133 sessions::StatusController* status) {
134 NOTREACHED()
135 << "Non-blocking types should never apply updates on sync thread. "
136 << "ModelType is: " << ModelTypeToString(type_);
137 }
138
EnqueueForCommit(const CommitRequestDataList & list)139 void NonBlockingTypeProcessorCore::EnqueueForCommit(
140 const CommitRequestDataList& list) {
141 DCHECK(CalledOnValidThread());
142
143 DCHECK(CanCommitItems())
144 << "Asked to commit items before type was initialized. "
145 << "ModelType is: " << ModelTypeToString(type_);
146
147 for (CommitRequestDataList::const_iterator it = list.begin();
148 it != list.end();
149 ++it) {
150 StorePendingCommit(*it);
151 }
152 }
153
154 // CommitContributor implementation.
155 scoped_ptr<CommitContribution>
GetContribution(size_t max_entries)156 NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) {
157 DCHECK(CalledOnValidThread());
158
159 size_t space_remaining = max_entries;
160 std::vector<int64> sequence_numbers;
161 google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
162
163 if (!CanCommitItems())
164 return scoped_ptr<CommitContribution>();
165
166 // TODO(rlarocque): Avoid iterating here.
167 for (EntityMap::const_iterator it = entities_.begin();
168 it != entities_.end() && space_remaining > 0;
169 ++it) {
170 SyncThreadSyncEntity* entity = it->second;
171 if (entity->IsCommitPending()) {
172 sync_pb::SyncEntity* commit_entity = commit_entities.Add();
173 int64 sequence_number = -1;
174
175 entity->PrepareCommitProto(commit_entity, &sequence_number);
176 HelpInitializeCommitEntity(commit_entity);
177 sequence_numbers.push_back(sequence_number);
178
179 space_remaining--;
180 }
181 }
182
183 if (commit_entities.size() == 0)
184 return scoped_ptr<CommitContribution>();
185
186 return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
187 data_type_state_.type_context, commit_entities, sequence_numbers, this));
188 }
189
StorePendingCommit(const CommitRequestData & request)190 void NonBlockingTypeProcessorCore::StorePendingCommit(
191 const CommitRequestData& request) {
192 if (!request.deleted) {
193 DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics));
194 }
195
196 EntityMap::iterator map_it = entities_.find(request.client_tag_hash);
197 if (map_it == entities_.end()) {
198 SyncThreadSyncEntity* entity =
199 SyncThreadSyncEntity::FromCommitRequest(request.id,
200 request.client_tag_hash,
201 request.sequence_number,
202 request.base_version,
203 request.ctime,
204 request.mtime,
205 request.non_unique_name,
206 request.deleted,
207 request.specifics);
208 entities_.insert(std::make_pair(request.client_tag_hash, entity));
209 } else {
210 SyncThreadSyncEntity* entity = map_it->second;
211 entity->RequestCommit(request.id,
212 request.client_tag_hash,
213 request.sequence_number,
214 request.base_version,
215 request.ctime,
216 request.mtime,
217 request.non_unique_name,
218 request.deleted,
219 request.specifics);
220 }
221
222 // TODO: Nudge SyncScheduler.
223 }
224
OnCommitResponse(const CommitResponseDataList & response_list)225 void NonBlockingTypeProcessorCore::OnCommitResponse(
226 const CommitResponseDataList& response_list) {
227 for (CommitResponseDataList::const_iterator response_it =
228 response_list.begin();
229 response_it != response_list.end();
230 ++response_it) {
231 const std::string client_tag_hash = response_it->client_tag_hash;
232 EntityMap::iterator map_it = entities_.find(client_tag_hash);
233
234 // There's no way we could have committed an entry we know nothing about.
235 if (map_it == entities_.end()) {
236 NOTREACHED() << "Received commit response for item unknown to us."
237 << " Model type: " << ModelTypeToString(type_)
238 << " ID: " << response_it->id;
239 continue;
240 }
241
242 SyncThreadSyncEntity* entity = map_it->second;
243 entity->ReceiveCommitResponse(response_it->id,
244 response_it->response_version,
245 response_it->sequence_number);
246 }
247
248 // Send the responses back to the model thread. It needs to know which
249 // items have been successfully committed so it can save that information in
250 // permanent storage.
251 processor_interface_->ReceiveCommitResponse(data_type_state_, response_list);
252 }
253
254 base::WeakPtr<NonBlockingTypeProcessorCore>
AsWeakPtr()255 NonBlockingTypeProcessorCore::AsWeakPtr() {
256 return weak_ptr_factory_.GetWeakPtr();
257 }
258
CanCommitItems() const259 bool NonBlockingTypeProcessorCore::CanCommitItems() const {
260 // We can't commit anything until we know the type's parent node.
261 // We'll get it in the first update response.
262 return !data_type_state_.type_root_id.empty() &&
263 data_type_state_.initial_sync_done;
264 }
265
HelpInitializeCommitEntity(sync_pb::SyncEntity * sync_entity)266 void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity(
267 sync_pb::SyncEntity* sync_entity) {
268 // Initial commits need our help to generate a client ID.
269 if (!sync_entity->has_id_string()) {
270 DCHECK_EQ(kUncommittedVersion, sync_entity->version());
271 const int64 id = data_type_state_.next_client_id++;
272 sync_entity->set_id_string(
273 base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id));
274 }
275
276 // Always include enough specifics to identify the type. Do this even in
277 // deletion requests, where the specifics are otherwise invalid.
278 if (!sync_entity->has_specifics()) {
279 AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
280 }
281
282 // We're always responsible for the parent ID.
283 sync_entity->set_parent_id_string(data_type_state_.type_root_id);
284 }
285
286 } // namespace syncer
287