• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "sync/engine/non_blocking_type_processor_core.h"
6 
7 #include "base/bind.h"
8 #include "base/format_macros.h"
9 #include "base/logging.h"
10 #include "base/strings/stringprintf.h"
11 #include "sync/engine/commit_contribution.h"
12 #include "sync/engine/non_blocking_type_commit_contribution.h"
13 #include "sync/engine/non_blocking_type_processor_interface.h"
14 #include "sync/engine/sync_thread_sync_entity.h"
15 #include "sync/syncable/syncable_util.h"
16 #include "sync/util/time.h"
17 
18 namespace syncer {
19 
NonBlockingTypeProcessorCore(ModelType type,const DataTypeState & initial_state,scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)20 NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore(
21     ModelType type,
22     const DataTypeState& initial_state,
23     scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)
24     : type_(type),
25       data_type_state_(initial_state),
26       processor_interface_(processor_interface.Pass()),
27       entities_deleter_(&entities_),
28       weak_ptr_factory_(this) {
29 }
30 
~NonBlockingTypeProcessorCore()31 NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() {
32 }
33 
GetModelType() const34 ModelType NonBlockingTypeProcessorCore::GetModelType() const {
35   DCHECK(CalledOnValidThread());
36   return type_;
37 }
38 
39 // UpdateHandler implementation.
GetDownloadProgress(sync_pb::DataTypeProgressMarker * progress_marker) const40 void NonBlockingTypeProcessorCore::GetDownloadProgress(
41     sync_pb::DataTypeProgressMarker* progress_marker) const {
42   DCHECK(CalledOnValidThread());
43   progress_marker->CopyFrom(data_type_state_.progress_marker);
44 }
45 
GetDataTypeContext(sync_pb::DataTypeContext * context) const46 void NonBlockingTypeProcessorCore::GetDataTypeContext(
47     sync_pb::DataTypeContext* context) const {
48   DCHECK(CalledOnValidThread());
49   context->CopyFrom(data_type_state_.type_context);
50 }
51 
ProcessGetUpdatesResponse(const sync_pb::DataTypeProgressMarker & progress_marker,const sync_pb::DataTypeContext & mutated_context,const SyncEntityList & applicable_updates,sessions::StatusController * status)52 SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
53     const sync_pb::DataTypeProgressMarker& progress_marker,
54     const sync_pb::DataTypeContext& mutated_context,
55     const SyncEntityList& applicable_updates,
56     sessions::StatusController* status) {
57   DCHECK(CalledOnValidThread());
58 
59   // TODO(rlarocque): Handle data type context conflicts.
60   data_type_state_.type_context = mutated_context;
61   data_type_state_.progress_marker = progress_marker;
62 
63   UpdateResponseDataList response_datas;
64 
65   for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
66        update_it != applicable_updates.end();
67        ++update_it) {
68     const sync_pb::SyncEntity* update_entity = *update_it;
69     if (!update_entity->server_defined_unique_tag().empty()) {
70       // We can't commit an item unless we know its parent ID.  This is where
71       // we learn that ID and remember it forever.
72       DCHECK_EQ(ModelTypeToRootTag(type_),
73                 update_entity->server_defined_unique_tag());
74       if (!data_type_state_.type_root_id.empty()) {
75         DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string());
76       }
77       data_type_state_.type_root_id = update_entity->id_string();
78     } else {
79       // Normal updates are handled here.
80       const std::string& client_tag_hash =
81           update_entity->client_defined_unique_tag();
82       DCHECK(!client_tag_hash.empty());
83       EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
84       if (map_it == entities_.end()) {
85         SyncThreadSyncEntity* entity =
86             SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(),
87                                                    client_tag_hash,
88                                                    update_entity->version());
89         entities_.insert(std::make_pair(client_tag_hash, entity));
90       } else {
91         SyncThreadSyncEntity* entity = map_it->second;
92         entity->ReceiveUpdate(update_entity->version());
93       }
94 
95       // Prepare the message for the model thread.
96       UpdateResponseData response_data;
97       response_data.id = update_entity->id_string();
98       response_data.client_tag_hash = client_tag_hash;
99       response_data.response_version = update_entity->version();
100       response_data.ctime = ProtoTimeToTime(update_entity->ctime());
101       response_data.mtime = ProtoTimeToTime(update_entity->mtime());
102       response_data.non_unique_name = update_entity->name();
103       response_data.deleted = update_entity->deleted();
104       response_data.specifics = update_entity->specifics();
105 
106       response_datas.push_back(response_data);
107     }
108   }
109 
110   // Forward these updates to the model thread so it can do the rest.
111   processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas);
112 
113   return SYNCER_OK;
114 }
115 
ApplyUpdates(sessions::StatusController * status)116 void NonBlockingTypeProcessorCore::ApplyUpdates(
117     sessions::StatusController* status) {
118   DCHECK(CalledOnValidThread());
119   // This function is called only when we've finished a download cycle, ie. we
120   // got a response with changes_remaining == 0.  If this is our first download
121   // cycle, we should update our state so the NonBlockingTypeProcessor knows
122   // that it's safe to commit items now.
123   if (!data_type_state_.initial_sync_done) {
124     data_type_state_.initial_sync_done = true;
125 
126     UpdateResponseDataList empty_update_list;
127     processor_interface_->ReceiveUpdateResponse(data_type_state_,
128                                                 empty_update_list);
129   }
130 }
131 
PassiveApplyUpdates(sessions::StatusController * status)132 void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
133     sessions::StatusController* status) {
134   NOTREACHED()
135       << "Non-blocking types should never apply updates on sync thread.  "
136       << "ModelType is: " << ModelTypeToString(type_);
137 }
138 
EnqueueForCommit(const CommitRequestDataList & list)139 void NonBlockingTypeProcessorCore::EnqueueForCommit(
140     const CommitRequestDataList& list) {
141   DCHECK(CalledOnValidThread());
142 
143   DCHECK(CanCommitItems())
144       << "Asked to commit items before type was initialized.  "
145       << "ModelType is: " << ModelTypeToString(type_);
146 
147   for (CommitRequestDataList::const_iterator it = list.begin();
148        it != list.end();
149        ++it) {
150     StorePendingCommit(*it);
151   }
152 }
153 
154 // CommitContributor implementation.
155 scoped_ptr<CommitContribution>
GetContribution(size_t max_entries)156 NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) {
157   DCHECK(CalledOnValidThread());
158 
159   size_t space_remaining = max_entries;
160   std::vector<int64> sequence_numbers;
161   google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
162 
163   if (!CanCommitItems())
164     return scoped_ptr<CommitContribution>();
165 
166   // TODO(rlarocque): Avoid iterating here.
167   for (EntityMap::const_iterator it = entities_.begin();
168        it != entities_.end() && space_remaining > 0;
169        ++it) {
170     SyncThreadSyncEntity* entity = it->second;
171     if (entity->IsCommitPending()) {
172       sync_pb::SyncEntity* commit_entity = commit_entities.Add();
173       int64 sequence_number = -1;
174 
175       entity->PrepareCommitProto(commit_entity, &sequence_number);
176       HelpInitializeCommitEntity(commit_entity);
177       sequence_numbers.push_back(sequence_number);
178 
179       space_remaining--;
180     }
181   }
182 
183   if (commit_entities.size() == 0)
184     return scoped_ptr<CommitContribution>();
185 
186   return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
187       data_type_state_.type_context, commit_entities, sequence_numbers, this));
188 }
189 
StorePendingCommit(const CommitRequestData & request)190 void NonBlockingTypeProcessorCore::StorePendingCommit(
191     const CommitRequestData& request) {
192   if (!request.deleted) {
193     DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics));
194   }
195 
196   EntityMap::iterator map_it = entities_.find(request.client_tag_hash);
197   if (map_it == entities_.end()) {
198     SyncThreadSyncEntity* entity =
199         SyncThreadSyncEntity::FromCommitRequest(request.id,
200                                                 request.client_tag_hash,
201                                                 request.sequence_number,
202                                                 request.base_version,
203                                                 request.ctime,
204                                                 request.mtime,
205                                                 request.non_unique_name,
206                                                 request.deleted,
207                                                 request.specifics);
208     entities_.insert(std::make_pair(request.client_tag_hash, entity));
209   } else {
210     SyncThreadSyncEntity* entity = map_it->second;
211     entity->RequestCommit(request.id,
212                           request.client_tag_hash,
213                           request.sequence_number,
214                           request.base_version,
215                           request.ctime,
216                           request.mtime,
217                           request.non_unique_name,
218                           request.deleted,
219                           request.specifics);
220   }
221 
222   // TODO: Nudge SyncScheduler.
223 }
224 
OnCommitResponse(const CommitResponseDataList & response_list)225 void NonBlockingTypeProcessorCore::OnCommitResponse(
226     const CommitResponseDataList& response_list) {
227   for (CommitResponseDataList::const_iterator response_it =
228            response_list.begin();
229        response_it != response_list.end();
230        ++response_it) {
231     const std::string client_tag_hash = response_it->client_tag_hash;
232     EntityMap::iterator map_it = entities_.find(client_tag_hash);
233 
234     // There's no way we could have committed an entry we know nothing about.
235     if (map_it == entities_.end()) {
236       NOTREACHED() << "Received commit response for item unknown to us."
237                    << " Model type: " << ModelTypeToString(type_)
238                    << " ID: " << response_it->id;
239       continue;
240     }
241 
242     SyncThreadSyncEntity* entity = map_it->second;
243     entity->ReceiveCommitResponse(response_it->id,
244                                   response_it->response_version,
245                                   response_it->sequence_number);
246   }
247 
248   // Send the responses back to the model thread.  It needs to know which
249   // items have been successfully committed so it can save that information in
250   // permanent storage.
251   processor_interface_->ReceiveCommitResponse(data_type_state_, response_list);
252 }
253 
254 base::WeakPtr<NonBlockingTypeProcessorCore>
AsWeakPtr()255 NonBlockingTypeProcessorCore::AsWeakPtr() {
256   return weak_ptr_factory_.GetWeakPtr();
257 }
258 
CanCommitItems() const259 bool NonBlockingTypeProcessorCore::CanCommitItems() const {
260   // We can't commit anything until we know the type's parent node.
261   // We'll get it in the first update response.
262   return !data_type_state_.type_root_id.empty() &&
263          data_type_state_.initial_sync_done;
264 }
265 
HelpInitializeCommitEntity(sync_pb::SyncEntity * sync_entity)266 void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity(
267     sync_pb::SyncEntity* sync_entity) {
268   // Initial commits need our help to generate a client ID.
269   if (!sync_entity->has_id_string()) {
270     DCHECK_EQ(kUncommittedVersion, sync_entity->version());
271     const int64 id = data_type_state_.next_client_id++;
272     sync_entity->set_id_string(
273         base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id));
274   }
275 
276   // Always include enough specifics to identify the type.  Do this even in
277   // deletion requests, where the specifics are otherwise invalid.
278   if (!sync_entity->has_specifics()) {
279     AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
280   }
281 
282   // We're always responsible for the parent ID.
283   sync_entity->set_parent_id_string(data_type_state_.type_root_id);
284 }
285 
286 }  // namespace syncer
287