1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/engine/non_blocking_type_processor.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "sync/engine/model_thread_sync_entity.h"
11 #include "sync/engine/non_blocking_type_processor_core_interface.h"
12 #include "sync/internal_api/public/sync_core_proxy.h"
13 #include "sync/syncable/syncable_util.h"
14
15 namespace syncer {
16
NonBlockingTypeProcessor(ModelType type)17 NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
18 : type_(type),
19 is_preferred_(false),
20 is_connected_(false),
21 entities_deleter_(&entities_),
22 weak_ptr_factory_for_ui_(this),
23 weak_ptr_factory_for_sync_(this) {
24 }
25
~NonBlockingTypeProcessor()26 NonBlockingTypeProcessor::~NonBlockingTypeProcessor() {
27 }
28
IsPreferred() const29 bool NonBlockingTypeProcessor::IsPreferred() const {
30 DCHECK(CalledOnValidThread());
31 return is_preferred_;
32 }
33
IsConnected() const34 bool NonBlockingTypeProcessor::IsConnected() const {
35 DCHECK(CalledOnValidThread());
36 return is_connected_;
37 }
38
GetModelType() const39 ModelType NonBlockingTypeProcessor::GetModelType() const {
40 DCHECK(CalledOnValidThread());
41 return type_;
42 }
43
Enable(scoped_ptr<SyncCoreProxy> sync_core_proxy)44 void NonBlockingTypeProcessor::Enable(
45 scoped_ptr<SyncCoreProxy> sync_core_proxy) {
46 DCHECK(CalledOnValidThread());
47 DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
48
49 is_preferred_ = true;
50
51 // TODO(rlarocque): At some point, this should be loaded from storage.
52 data_type_state_.progress_marker.set_data_type_id(
53 GetSpecificsFieldNumberFromModelType(type_));
54
55 sync_core_proxy_ = sync_core_proxy.Pass();
56 sync_core_proxy_->ConnectTypeToCore(GetModelType(),
57 data_type_state_,
58 weak_ptr_factory_for_sync_.GetWeakPtr());
59 }
60
Disable()61 void NonBlockingTypeProcessor::Disable() {
62 DCHECK(CalledOnValidThread());
63 is_preferred_ = false;
64 Disconnect();
65 }
66
Disconnect()67 void NonBlockingTypeProcessor::Disconnect() {
68 DCHECK(CalledOnValidThread());
69 DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_);
70 is_connected_ = false;
71
72 if (sync_core_proxy_) {
73 sync_core_proxy_->Disconnect(GetModelType());
74 sync_core_proxy_.reset();
75 }
76
77 weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
78 core_interface_.reset();
79 }
80
81 base::WeakPtr<NonBlockingTypeProcessor>
AsWeakPtrForUI()82 NonBlockingTypeProcessor::AsWeakPtrForUI() {
83 DCHECK(CalledOnValidThread());
84 return weak_ptr_factory_for_ui_.GetWeakPtr();
85 }
86
OnConnect(scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface)87 void NonBlockingTypeProcessor::OnConnect(
88 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
89 DCHECK(CalledOnValidThread());
90 DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
91
92 is_connected_ = true;
93 core_interface_ = core_interface.Pass();
94
95 FlushPendingCommitRequests();
96 }
97
Put(const std::string & client_tag,const sync_pb::EntitySpecifics & specifics)98 void NonBlockingTypeProcessor::Put(const std::string& client_tag,
99 const sync_pb::EntitySpecifics& specifics) {
100 DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
101
102 const std::string client_tag_hash(
103 syncable::GenerateSyncableHash(type_, client_tag));
104
105 EntityMap::iterator it = entities_.find(client_tag_hash);
106 if (it == entities_.end()) {
107 scoped_ptr<ModelThreadSyncEntity> entity(
108 ModelThreadSyncEntity::NewLocalItem(
109 client_tag, specifics, base::Time::Now()));
110 entities_.insert(std::make_pair(client_tag_hash, entity.release()));
111 } else {
112 ModelThreadSyncEntity* entity = it->second;
113 entity->MakeLocalChange(specifics);
114 }
115
116 FlushPendingCommitRequests();
117 }
118
Delete(const std::string & client_tag)119 void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
120 const std::string client_tag_hash(
121 syncable::GenerateSyncableHash(type_, client_tag));
122
123 EntityMap::iterator it = entities_.find(client_tag_hash);
124 if (it == entities_.end()) {
125 // That's unusual, but not necessarily a bad thing.
126 // Missing is as good as deleted as far as the model is concerned.
127 DLOG(WARNING) << "Attempted to delete missing item."
128 << " client tag: " << client_tag;
129 } else {
130 ModelThreadSyncEntity* entity = it->second;
131 entity->Delete();
132 }
133
134 FlushPendingCommitRequests();
135 }
136
FlushPendingCommitRequests()137 void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
138 CommitRequestDataList commit_requests;
139
140 // Don't bother sending anything if there's no one to send to.
141 if (!IsConnected())
142 return;
143
144 // Don't send anything if the type is not ready to handle commits.
145 if (!data_type_state_.initial_sync_done)
146 return;
147
148 // TODO(rlarocque): Do something smarter than iterate here.
149 for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
150 ++it) {
151 if (it->second->RequiresCommitRequest()) {
152 CommitRequestData request;
153 it->second->InitializeCommitRequestData(&request);
154 commit_requests.push_back(request);
155 it->second->SetCommitRequestInProgress();
156 }
157 }
158
159 if (!commit_requests.empty())
160 core_interface_->RequestCommits(commit_requests);
161 }
162
OnCommitCompletion(const DataTypeState & type_state,const CommitResponseDataList & response_list)163 void NonBlockingTypeProcessor::OnCommitCompletion(
164 const DataTypeState& type_state,
165 const CommitResponseDataList& response_list) {
166 data_type_state_ = type_state;
167
168 for (CommitResponseDataList::const_iterator list_it = response_list.begin();
169 list_it != response_list.end();
170 ++list_it) {
171 const CommitResponseData& response_data = *list_it;
172 const std::string& client_tag_hash = response_data.client_tag_hash;
173
174 EntityMap::iterator it = entities_.find(client_tag_hash);
175 if (it == entities_.end()) {
176 NOTREACHED() << "Received commit response for missing item."
177 << " type: " << type_ << " client_tag: " << client_tag_hash;
178 return;
179 } else {
180 it->second->ReceiveCommitResponse(response_data.id,
181 response_data.sequence_number,
182 response_data.response_version);
183 }
184 }
185 }
186
OnUpdateReceived(const DataTypeState & data_type_state,const UpdateResponseDataList & response_list)187 void NonBlockingTypeProcessor::OnUpdateReceived(
188 const DataTypeState& data_type_state,
189 const UpdateResponseDataList& response_list) {
190 bool initial_sync_just_finished =
191 !data_type_state_.initial_sync_done && data_type_state.initial_sync_done;
192
193 data_type_state_ = data_type_state;
194
195 for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
196 list_it != response_list.end();
197 ++list_it) {
198 const UpdateResponseData& response_data = *list_it;
199 const std::string& client_tag_hash = response_data.client_tag_hash;
200
201 EntityMap::iterator it = entities_.find(client_tag_hash);
202 if (it == entities_.end()) {
203 scoped_ptr<ModelThreadSyncEntity> entity =
204 ModelThreadSyncEntity::FromServerUpdate(
205 response_data.id,
206 response_data.client_tag_hash,
207 response_data.non_unique_name,
208 response_data.response_version,
209 response_data.specifics,
210 response_data.deleted,
211 response_data.ctime,
212 response_data.mtime);
213 entities_.insert(std::make_pair(client_tag_hash, entity.release()));
214 } else {
215 ModelThreadSyncEntity* entity = it->second;
216 entity->ApplyUpdateFromServer(response_data.response_version,
217 response_data.deleted,
218 response_data.specifics,
219 response_data.mtime);
220 // TODO: Do something special when conflicts are detected.
221 }
222 }
223
224 if (initial_sync_just_finished)
225 FlushPendingCommitRequests();
226
227 // TODO: Inform the model of the new or updated data.
228 }
229
230 } // namespace syncer
231