• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "sync/engine/non_blocking_type_processor.h"
6 
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "sync/engine/model_thread_sync_entity.h"
11 #include "sync/engine/non_blocking_type_processor_core_interface.h"
12 #include "sync/internal_api/public/sync_core_proxy.h"
13 #include "sync/syncable/syncable_util.h"
14 
15 namespace syncer {
16 
NonBlockingTypeProcessor(ModelType type)17 NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
18     : type_(type),
19       is_preferred_(false),
20       is_connected_(false),
21       entities_deleter_(&entities_),
22       weak_ptr_factory_for_ui_(this),
23       weak_ptr_factory_for_sync_(this) {
24 }
25 
~NonBlockingTypeProcessor()26 NonBlockingTypeProcessor::~NonBlockingTypeProcessor() {
27 }
28 
IsPreferred() const29 bool NonBlockingTypeProcessor::IsPreferred() const {
30   DCHECK(CalledOnValidThread());
31   return is_preferred_;
32 }
33 
IsConnected() const34 bool NonBlockingTypeProcessor::IsConnected() const {
35   DCHECK(CalledOnValidThread());
36   return is_connected_;
37 }
38 
GetModelType() const39 ModelType NonBlockingTypeProcessor::GetModelType() const {
40   DCHECK(CalledOnValidThread());
41   return type_;
42 }
43 
Enable(scoped_ptr<SyncCoreProxy> sync_core_proxy)44 void NonBlockingTypeProcessor::Enable(
45     scoped_ptr<SyncCoreProxy> sync_core_proxy) {
46   DCHECK(CalledOnValidThread());
47   DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
48 
49   is_preferred_ = true;
50 
51   // TODO(rlarocque): At some point, this should be loaded from storage.
52   data_type_state_.progress_marker.set_data_type_id(
53       GetSpecificsFieldNumberFromModelType(type_));
54 
55   sync_core_proxy_ = sync_core_proxy.Pass();
56   sync_core_proxy_->ConnectTypeToCore(GetModelType(),
57                                       data_type_state_,
58                                       weak_ptr_factory_for_sync_.GetWeakPtr());
59 }
60 
Disable()61 void NonBlockingTypeProcessor::Disable() {
62   DCHECK(CalledOnValidThread());
63   is_preferred_ = false;
64   Disconnect();
65 }
66 
Disconnect()67 void NonBlockingTypeProcessor::Disconnect() {
68   DCHECK(CalledOnValidThread());
69   DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_);
70   is_connected_ = false;
71 
72   if (sync_core_proxy_) {
73     sync_core_proxy_->Disconnect(GetModelType());
74     sync_core_proxy_.reset();
75   }
76 
77   weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
78   core_interface_.reset();
79 }
80 
81 base::WeakPtr<NonBlockingTypeProcessor>
AsWeakPtrForUI()82 NonBlockingTypeProcessor::AsWeakPtrForUI() {
83   DCHECK(CalledOnValidThread());
84   return weak_ptr_factory_for_ui_.GetWeakPtr();
85 }
86 
OnConnect(scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface)87 void NonBlockingTypeProcessor::OnConnect(
88     scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
89   DCHECK(CalledOnValidThread());
90   DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
91 
92   is_connected_ = true;
93   core_interface_ = core_interface.Pass();
94 
95   FlushPendingCommitRequests();
96 }
97 
Put(const std::string & client_tag,const sync_pb::EntitySpecifics & specifics)98 void NonBlockingTypeProcessor::Put(const std::string& client_tag,
99                                    const sync_pb::EntitySpecifics& specifics) {
100   DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
101 
102   const std::string client_tag_hash(
103       syncable::GenerateSyncableHash(type_, client_tag));
104 
105   EntityMap::iterator it = entities_.find(client_tag_hash);
106   if (it == entities_.end()) {
107     scoped_ptr<ModelThreadSyncEntity> entity(
108         ModelThreadSyncEntity::NewLocalItem(
109             client_tag, specifics, base::Time::Now()));
110     entities_.insert(std::make_pair(client_tag_hash, entity.release()));
111   } else {
112     ModelThreadSyncEntity* entity = it->second;
113     entity->MakeLocalChange(specifics);
114   }
115 
116   FlushPendingCommitRequests();
117 }
118 
Delete(const std::string & client_tag)119 void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
120   const std::string client_tag_hash(
121       syncable::GenerateSyncableHash(type_, client_tag));
122 
123   EntityMap::iterator it = entities_.find(client_tag_hash);
124   if (it == entities_.end()) {
125     // That's unusual, but not necessarily a bad thing.
126     // Missing is as good as deleted as far as the model is concerned.
127     DLOG(WARNING) << "Attempted to delete missing item."
128                   << " client tag: " << client_tag;
129   } else {
130     ModelThreadSyncEntity* entity = it->second;
131     entity->Delete();
132   }
133 
134   FlushPendingCommitRequests();
135 }
136 
FlushPendingCommitRequests()137 void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
138   CommitRequestDataList commit_requests;
139 
140   // Don't bother sending anything if there's no one to send to.
141   if (!IsConnected())
142     return;
143 
144   // Don't send anything if the type is not ready to handle commits.
145   if (!data_type_state_.initial_sync_done)
146     return;
147 
148   // TODO(rlarocque): Do something smarter than iterate here.
149   for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
150        ++it) {
151     if (it->second->RequiresCommitRequest()) {
152       CommitRequestData request;
153       it->second->InitializeCommitRequestData(&request);
154       commit_requests.push_back(request);
155       it->second->SetCommitRequestInProgress();
156     }
157   }
158 
159   if (!commit_requests.empty())
160     core_interface_->RequestCommits(commit_requests);
161 }
162 
OnCommitCompletion(const DataTypeState & type_state,const CommitResponseDataList & response_list)163 void NonBlockingTypeProcessor::OnCommitCompletion(
164     const DataTypeState& type_state,
165     const CommitResponseDataList& response_list) {
166   data_type_state_ = type_state;
167 
168   for (CommitResponseDataList::const_iterator list_it = response_list.begin();
169        list_it != response_list.end();
170        ++list_it) {
171     const CommitResponseData& response_data = *list_it;
172     const std::string& client_tag_hash = response_data.client_tag_hash;
173 
174     EntityMap::iterator it = entities_.find(client_tag_hash);
175     if (it == entities_.end()) {
176       NOTREACHED() << "Received commit response for missing item."
177                    << " type: " << type_ << " client_tag: " << client_tag_hash;
178       return;
179     } else {
180       it->second->ReceiveCommitResponse(response_data.id,
181                                         response_data.sequence_number,
182                                         response_data.response_version);
183     }
184   }
185 }
186 
OnUpdateReceived(const DataTypeState & data_type_state,const UpdateResponseDataList & response_list)187 void NonBlockingTypeProcessor::OnUpdateReceived(
188     const DataTypeState& data_type_state,
189     const UpdateResponseDataList& response_list) {
190   bool initial_sync_just_finished =
191       !data_type_state_.initial_sync_done && data_type_state.initial_sync_done;
192 
193   data_type_state_ = data_type_state;
194 
195   for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
196        list_it != response_list.end();
197        ++list_it) {
198     const UpdateResponseData& response_data = *list_it;
199     const std::string& client_tag_hash = response_data.client_tag_hash;
200 
201     EntityMap::iterator it = entities_.find(client_tag_hash);
202     if (it == entities_.end()) {
203       scoped_ptr<ModelThreadSyncEntity> entity =
204           ModelThreadSyncEntity::FromServerUpdate(
205               response_data.id,
206               response_data.client_tag_hash,
207               response_data.non_unique_name,
208               response_data.response_version,
209               response_data.specifics,
210               response_data.deleted,
211               response_data.ctime,
212               response_data.mtime);
213       entities_.insert(std::make_pair(client_tag_hash, entity.release()));
214     } else {
215       ModelThreadSyncEntity* entity = it->second;
216       entity->ApplyUpdateFromServer(response_data.response_version,
217                                     response_data.deleted,
218                                     response_data.specifics,
219                                     response_data.mtime);
220       // TODO: Do something special when conflicts are detected.
221     }
222   }
223 
224   if (initial_sync_just_finished)
225     FlushPendingCommitRequests();
226 
227   // TODO: Inform the model of the new or updated data.
228 }
229 
230 }  // namespace syncer
231