• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/sync/engine/syncer.h"
6 
7 #include "base/message_loop.h"
8 #include "base/time.h"
9 #include "chrome/browser/sync/engine/apply_updates_command.h"
10 #include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h"
11 #include "chrome/browser/sync/engine/build_commit_command.h"
12 #include "chrome/browser/sync/engine/cleanup_disabled_types_command.h"
13 #include "chrome/browser/sync/engine/clear_data_command.h"
14 #include "chrome/browser/sync/engine/conflict_resolver.h"
15 #include "chrome/browser/sync/engine/download_updates_command.h"
16 #include "chrome/browser/sync/engine/get_commit_ids_command.h"
17 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
18 #include "chrome/browser/sync/engine/post_commit_message_command.h"
19 #include "chrome/browser/sync/engine/process_commit_response_command.h"
20 #include "chrome/browser/sync/engine/process_updates_command.h"
21 #include "chrome/browser/sync/engine/resolve_conflicts_command.h"
22 #include "chrome/browser/sync/engine/store_timestamps_command.h"
23 #include "chrome/browser/sync/engine/syncer_end_command.h"
24 #include "chrome/browser/sync/engine/syncer_types.h"
25 #include "chrome/browser/sync/engine/syncer_util.h"
26 #include "chrome/browser/sync/engine/syncproto.h"
27 #include "chrome/browser/sync/engine/verify_updates_command.h"
28 #include "chrome/browser/sync/syncable/directory_manager.h"
29 #include "chrome/browser/sync/syncable/syncable-inl.h"
30 #include "chrome/browser/sync/syncable/syncable.h"
31 
32 using base::TimeDelta;
33 using sync_pb::ClientCommand;
34 using syncable::Blob;
35 using syncable::IS_UNAPPLIED_UPDATE;
36 using syncable::SERVER_CTIME;
37 using syncable::SERVER_IS_DEL;
38 using syncable::SERVER_IS_DIR;
39 using syncable::SERVER_MTIME;
40 using syncable::SERVER_NON_UNIQUE_NAME;
41 using syncable::SERVER_PARENT_ID;
42 using syncable::SERVER_POSITION_IN_PARENT;
43 using syncable::SERVER_SPECIFICS;
44 using syncable::SERVER_VERSION;
45 using syncable::SYNCER;
46 using syncable::ScopedDirLookup;
47 using syncable::WriteTransaction;
48 
49 namespace browser_sync {
50 
51 using sessions::ScopedSessionContextConflictResolver;
52 using sessions::StatusController;
53 using sessions::SyncSession;
54 using sessions::ConflictProgress;
55 
Syncer()56 Syncer::Syncer()
57     : early_exit_requested_(false),
58       pre_conflict_resolution_closure_(NULL) {
59 }
60 
~Syncer()61 Syncer::~Syncer() {}
62 
ExitRequested()63 bool Syncer::ExitRequested() {
64   base::AutoLock lock(early_exit_requested_lock_);
65   return early_exit_requested_;
66 }
67 
RequestEarlyExit()68 void Syncer::RequestEarlyExit() {
69   base::AutoLock lock(early_exit_requested_lock_);
70   early_exit_requested_ = true;
71 }
72 
73 // TODO(tim): Deprecated.
SyncShare(sessions::SyncSession * session)74 void Syncer::SyncShare(sessions::SyncSession* session) {
75   ScopedDirLookup dir(session->context()->directory_manager(),
76                       session->context()->account_name());
77   // The directory must be good here.
78   CHECK(dir.good());
79 
80   const sessions::SyncSourceInfo& source(session->source());
81   if (sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA ==
82       source.updates_source) {
83     SyncShare(session, CLEAR_PRIVATE_DATA, SYNCER_END);
84     return;
85   } else {
86     SyncShare(session, SYNCER_BEGIN, SYNCER_END);
87   }
88 }
89 
SyncShare(sessions::SyncSession * session,const SyncerStep first_step,const SyncerStep last_step)90 void Syncer::SyncShare(sessions::SyncSession* session,
91                        const SyncerStep first_step,
92                        const SyncerStep last_step) {
93   ScopedDirLookup dir(session->context()->directory_manager(),
94                       session->context()->account_name());
95   // The directory must be good here.
96   CHECK(dir.good());
97 
98   ScopedSessionContextConflictResolver scoped(session->context(),
99                                               &resolver_);
100   SyncerStep current_step = first_step;
101 
102   SyncerStep next_step = current_step;
103   while (!ExitRequested()) {
104     switch (current_step) {
105       case SYNCER_BEGIN:
106         VLOG(1) << "Syncer Begin";
107         // This isn't perfect, as we can end up bundling extensions activity
108         // intended for the next session into the current one.  We could do a
109         // test-and-reset as with the source, but note that also falls short if
110         // the commit request fails (e.g. due to lost connection), as we will
111         // fall all the way back to the syncer thread main loop in that case,
112         // creating a new session when a connection is established, losing the
113         // records set here on the original attempt.  This should provide us
114         // with the right data "most of the time", and we're only using this
115         // for analysis purposes, so Law of Large Numbers FTW.
116         session->context()->extensions_monitor()->GetAndClearRecords(
117             session->mutable_extensions_activity());
118         next_step = CLEANUP_DISABLED_TYPES;
119         break;
120       case CLEANUP_DISABLED_TYPES: {
121         VLOG(1) << "Cleaning up disabled types";
122         CleanupDisabledTypesCommand cleanup;
123         cleanup.Execute(session);
124         next_step = DOWNLOAD_UPDATES;
125         break;
126       }
127       case DOWNLOAD_UPDATES: {
128         VLOG(1) << "Downloading Updates";
129         DownloadUpdatesCommand download_updates;
130         download_updates.Execute(session);
131         next_step = PROCESS_CLIENT_COMMAND;
132         break;
133       }
134       case PROCESS_CLIENT_COMMAND: {
135         VLOG(1) << "Processing Client Command";
136         ProcessClientCommand(session);
137         next_step = VERIFY_UPDATES;
138         break;
139       }
140       case VERIFY_UPDATES: {
141         VLOG(1) << "Verifying Updates";
142         VerifyUpdatesCommand verify_updates;
143         verify_updates.Execute(session);
144         next_step = PROCESS_UPDATES;
145         break;
146       }
147       case PROCESS_UPDATES: {
148         VLOG(1) << "Processing Updates";
149         ProcessUpdatesCommand process_updates;
150         process_updates.Execute(session);
151         next_step = STORE_TIMESTAMPS;
152         break;
153       }
154       case STORE_TIMESTAMPS: {
155         VLOG(1) << "Storing timestamps";
156         StoreTimestampsCommand store_timestamps;
157         store_timestamps.Execute(session);
158         // We should download all of the updates before attempting to process
159         // them.
160         if (session->status_controller()->ServerSaysNothingMoreToDownload() ||
161             !session->status_controller()->download_updates_succeeded()) {
162           next_step = APPLY_UPDATES;
163         } else {
164           next_step = DOWNLOAD_UPDATES;
165         }
166         break;
167       }
168       case APPLY_UPDATES: {
169         VLOG(1) << "Applying Updates";
170         ApplyUpdatesCommand apply_updates;
171         apply_updates.Execute(session);
172         next_step = BUILD_COMMIT_REQUEST;
173         break;
174       }
175       // These two steps are combined since they are executed within the same
176       // write transaction.
177       case BUILD_COMMIT_REQUEST: {
178         session->status_controller()->set_syncing(true);
179 
180         VLOG(1) << "Processing Commit Request";
181         ScopedDirLookup dir(session->context()->directory_manager(),
182                             session->context()->account_name());
183         if (!dir.good()) {
184           LOG(ERROR) << "Scoped dir lookup failed!";
185           return;
186         }
187         WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__);
188         sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans);
189 
190         VLOG(1) << "Getting the Commit IDs";
191         GetCommitIdsCommand get_commit_ids_command(
192             session->context()->max_commit_batch_size());
193         get_commit_ids_command.Execute(session);
194 
195         if (!session->status_controller()->commit_ids().empty()) {
196           VLOG(1) << "Building a commit message";
197           BuildCommitCommand build_commit_command;
198           build_commit_command.Execute(session);
199 
200           next_step = POST_COMMIT_MESSAGE;
201         } else {
202           next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
203         }
204 
205         break;
206       }
207       case POST_COMMIT_MESSAGE: {
208         VLOG(1) << "Posting a commit request";
209         PostCommitMessageCommand post_commit_command;
210         post_commit_command.Execute(session);
211         next_step = PROCESS_COMMIT_RESPONSE;
212         break;
213       }
214       case PROCESS_COMMIT_RESPONSE: {
215         VLOG(1) << "Processing the commit response";
216         session->status_controller()->reset_num_conflicting_commits();
217         ProcessCommitResponseCommand process_response_command;
218         process_response_command.Execute(session);
219         next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
220         break;
221       }
222       case BUILD_AND_PROCESS_CONFLICT_SETS: {
223         VLOG(1) << "Building and Processing Conflict Sets";
224         BuildAndProcessConflictSetsCommand build_process_conflict_sets;
225         build_process_conflict_sets.Execute(session);
226         if (session->status_controller()->conflict_sets_built())
227           next_step = SYNCER_END;
228         else
229           next_step = RESOLVE_CONFLICTS;
230         break;
231       }
232       case RESOLVE_CONFLICTS: {
233         VLOG(1) << "Resolving Conflicts";
234 
235         // Trigger the pre_conflict_resolution_closure_, which is a testing
236         // hook for the unit tests, if it is non-NULL.
237         if (pre_conflict_resolution_closure_) {
238           pre_conflict_resolution_closure_->Run();
239         }
240 
241         StatusController* status = session->status_controller();
242         status->reset_conflicts_resolved();
243         ResolveConflictsCommand resolve_conflicts_command;
244         resolve_conflicts_command.Execute(session);
245         if (status->HasConflictingUpdates())
246           next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS;
247         else
248           next_step = SYNCER_END;
249         break;
250       }
251       case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: {
252         StatusController* status = session->status_controller();
253         VLOG(1) << "Applying updates to resolve conflicts";
254         ApplyUpdatesCommand apply_updates;
255         int before_conflicting_updates = status->TotalNumConflictingItems();
256         apply_updates.Execute(session);
257         int after_conflicting_updates = status->TotalNumConflictingItems();
258         status->update_conflicts_resolved(before_conflicting_updates >
259                                           after_conflicting_updates);
260         if (status->conflicts_resolved())
261           next_step = RESOLVE_CONFLICTS;
262         else
263           next_step = SYNCER_END;
264         break;
265       }
266       case CLEAR_PRIVATE_DATA: {
267         VLOG(1) << "Clear Private Data";
268         ClearDataCommand clear_data_command;
269         clear_data_command.Execute(session);
270         next_step = SYNCER_END;
271         break;
272       }
273       case SYNCER_END: {
274         break;
275       }
276       default:
277         LOG(ERROR) << "Unknown command: " << current_step;
278     }
279     if (last_step == current_step)
280       break;
281     current_step = next_step;
282   }
283 
284   VLOG(1) << "Syncer End";
285   SyncerEndCommand syncer_end_command;
286   syncer_end_command.Execute(session);
287   return;
288 }
289 
ProcessClientCommand(sessions::SyncSession * session)290 void Syncer::ProcessClientCommand(sessions::SyncSession* session) {
291   const ClientToServerResponse& response =
292       session->status_controller()->updates_response();
293   if (!response.has_client_command())
294     return;
295   const ClientCommand& command = response.client_command();
296 
297   // The server limits the number of items a client can commit in one batch.
298   if (command.has_max_commit_batch_size()) {
299     session->context()->set_max_commit_batch_size(
300         command.max_commit_batch_size());
301   }
302   if (command.has_set_sync_long_poll_interval()) {
303     session->delegate()->OnReceivedLongPollIntervalUpdate(
304         TimeDelta::FromSeconds(command.set_sync_long_poll_interval()));
305   }
306   if (command.has_set_sync_poll_interval()) {
307     session->delegate()->OnReceivedShortPollIntervalUpdate(
308         TimeDelta::FromSeconds(command.set_sync_poll_interval()));
309   }
310 }
311 
CopyServerFields(syncable::Entry * src,syncable::MutableEntry * dest)312 void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) {
313   dest->Put(SERVER_NON_UNIQUE_NAME, src->Get(SERVER_NON_UNIQUE_NAME));
314   dest->Put(SERVER_PARENT_ID, src->Get(SERVER_PARENT_ID));
315   dest->Put(SERVER_MTIME, src->Get(SERVER_MTIME));
316   dest->Put(SERVER_CTIME, src->Get(SERVER_CTIME));
317   dest->Put(SERVER_VERSION, src->Get(SERVER_VERSION));
318   dest->Put(SERVER_IS_DIR, src->Get(SERVER_IS_DIR));
319   dest->Put(SERVER_IS_DEL, src->Get(SERVER_IS_DEL));
320   dest->Put(IS_UNAPPLIED_UPDATE, src->Get(IS_UNAPPLIED_UPDATE));
321   dest->Put(SERVER_SPECIFICS, src->Get(SERVER_SPECIFICS));
322   dest->Put(SERVER_POSITION_IN_PARENT, src->Get(SERVER_POSITION_IN_PARENT));
323 }
324 
ClearServerData(syncable::MutableEntry * entry)325 void ClearServerData(syncable::MutableEntry* entry) {
326   entry->Put(SERVER_NON_UNIQUE_NAME, "");
327   entry->Put(SERVER_PARENT_ID, syncable::kNullId);
328   entry->Put(SERVER_MTIME, 0);
329   entry->Put(SERVER_CTIME, 0);
330   entry->Put(SERVER_VERSION, 0);
331   entry->Put(SERVER_IS_DIR, false);
332   entry->Put(SERVER_IS_DEL, false);
333   entry->Put(IS_UNAPPLIED_UPDATE, false);
334   entry->Put(SERVER_SPECIFICS, sync_pb::EntitySpecifics::default_instance());
335   entry->Put(SERVER_POSITION_IN_PARENT, 0);
336 }
337 
338 }  // namespace browser_sync
339