1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/chromeos/drive/sync_client.h"
6
7 #include <vector>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop_proxy.h"
11 #include "chrome/browser/chromeos/drive/drive.pb.h"
12 #include "chrome/browser/chromeos/drive/file_cache.h"
13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
14 #include "chrome/browser/chromeos/drive/file_system/operation_observer.h"
15 #include "chrome/browser/chromeos/drive/file_system_util.h"
16 #include "chrome/browser/chromeos/drive/job_scheduler.h"
17 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
18 #include "content/public/browser/browser_thread.h"
19 #include "google_apis/drive/task_util.h"
20
21 using content::BrowserThread;
22
23 namespace drive {
24 namespace internal {
25
26 namespace {
27
28 // The delay constant is used to delay processing a sync task. We should not
29 // process SyncTasks immediately for the following reasons:
30 //
31 // 1) For fetching, the user may accidentally click on "Make available
32 // offline" checkbox on a file, and immediately cancel it in a second.
33 // It's a waste to fetch the file in this case.
34 //
35 // 2) For uploading, file writing via HTML5 file system API is performed in
36 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We
37 // shouldn't start uploading right after the step 1). Besides, the user
38 // may edit the same file repeatedly in a short period of time.
39 //
40 // TODO(satorux): We should find a way to handle the upload case more nicely,
41 // and shorten the delay. crbug.com/134774
42 const int kDelaySeconds = 1;
43
44 // The delay constant is used to delay retrying a sync task on server errors.
45 const int kLongDelaySeconds = 600;
46
47 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
48 // fetched (not present locally), to |to_update| if the file needs update.
CollectBacklog(ResourceMetadata * metadata,std::vector<std::string> * to_fetch,std::vector<std::string> * to_update)49 void CollectBacklog(ResourceMetadata* metadata,
50 std::vector<std::string>* to_fetch,
51 std::vector<std::string>* to_update) {
52 DCHECK(to_fetch);
53 DCHECK(to_update);
54
55 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
56 for (; !it->IsAtEnd(); it->Advance()) {
57 const std::string& local_id = it->GetID();
58 const ResourceEntry& entry = it->GetValue();
59 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
60 to_update->push_back(local_id);
61 continue;
62 }
63
64 bool should_update = false;
65 switch (entry.metadata_edit_state()) {
66 case ResourceEntry::CLEAN:
67 break;
68 case ResourceEntry::SYNCING:
69 case ResourceEntry::DIRTY:
70 should_update = true;
71 break;
72 }
73
74 if (entry.file_specific_info().cache_state().is_pinned() &&
75 !entry.file_specific_info().cache_state().is_present())
76 to_fetch->push_back(local_id);
77
78 if (entry.file_specific_info().cache_state().is_dirty())
79 should_update = true;
80
81 if (should_update)
82 to_update->push_back(local_id);
83 }
84 DCHECK(!it->HasError());
85 }
86
87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
CheckExistingPinnedFiles(ResourceMetadata * metadata,FileCache * cache,std::vector<std::string> * local_ids)88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
89 FileCache* cache,
90 std::vector<std::string>* local_ids) {
91 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
92 for (; !it->IsAtEnd(); it->Advance()) {
93 const ResourceEntry& entry = it->GetValue();
94 const FileCacheEntry& cache_state =
95 entry.file_specific_info().cache_state();
96 const std::string& local_id = it->GetID();
97 if (!cache_state.is_pinned() || !cache_state.is_present())
98 continue;
99
100 // If MD5s don't match, it indicates the local cache file is stale, unless
101 // the file is dirty (the MD5 is "local"). We should never re-fetch the
102 // file when we have a locally modified version.
103 if (entry.file_specific_info().md5() == cache_state.md5() ||
104 cache_state.is_dirty())
105 continue;
106
107 FileError error = cache->Remove(local_id);
108 if (error != FILE_ERROR_OK) {
109 LOG(WARNING) << "Failed to remove cache entry: " << local_id;
110 continue;
111 }
112
113 error = cache->Pin(local_id);
114 if (error != FILE_ERROR_OK) {
115 LOG(WARNING) << "Failed to pin cache entry: " << local_id;
116 continue;
117 }
118
119 local_ids->push_back(local_id);
120 }
121 DCHECK(!it->HasError());
122 }
123
124 // Runs the task and returns a dummy cancel closure.
RunTaskAndReturnDummyCancelClosure(const base::Closure & task)125 base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) {
126 task.Run();
127 return base::Closure();
128 }
129
130 } // namespace
131
SyncTask()132 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
~SyncTask()133 SyncClient::SyncTask::~SyncTask() {}
134
SyncClient(base::SequencedTaskRunner * blocking_task_runner,file_system::OperationObserver * observer,JobScheduler * scheduler,ResourceMetadata * metadata,FileCache * cache,LoaderController * loader_controller,const base::FilePath & temporary_file_directory)135 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
136 file_system::OperationObserver* observer,
137 JobScheduler* scheduler,
138 ResourceMetadata* metadata,
139 FileCache* cache,
140 LoaderController* loader_controller,
141 const base::FilePath& temporary_file_directory)
142 : blocking_task_runner_(blocking_task_runner),
143 operation_observer_(observer),
144 metadata_(metadata),
145 cache_(cache),
146 download_operation_(new file_system::DownloadOperation(
147 blocking_task_runner,
148 observer,
149 scheduler,
150 metadata,
151 cache,
152 temporary_file_directory)),
153 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
154 observer,
155 scheduler,
156 metadata,
157 cache,
158 loader_controller)),
159 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
160 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
161 weak_ptr_factory_(this) {
162 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
163 }
164
~SyncClient()165 SyncClient::~SyncClient() {
166 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
167 }
168
StartProcessingBacklog()169 void SyncClient::StartProcessingBacklog() {
170 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
171
172 std::vector<std::string>* to_fetch = new std::vector<std::string>;
173 std::vector<std::string>* to_update = new std::vector<std::string>;
174 blocking_task_runner_->PostTaskAndReply(
175 FROM_HERE,
176 base::Bind(&CollectBacklog, metadata_, to_fetch, to_update),
177 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
178 weak_ptr_factory_.GetWeakPtr(),
179 base::Owned(to_fetch),
180 base::Owned(to_update)));
181 }
182
StartCheckingExistingPinnedFiles()183 void SyncClient::StartCheckingExistingPinnedFiles() {
184 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
185
186 std::vector<std::string>* local_ids = new std::vector<std::string>;
187 blocking_task_runner_->PostTaskAndReply(
188 FROM_HERE,
189 base::Bind(&CheckExistingPinnedFiles,
190 metadata_,
191 cache_,
192 local_ids),
193 base::Bind(&SyncClient::AddFetchTasks,
194 weak_ptr_factory_.GetWeakPtr(),
195 base::Owned(local_ids)));
196 }
197
AddFetchTask(const std::string & local_id)198 void SyncClient::AddFetchTask(const std::string& local_id) {
199 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
200 AddFetchTaskInternal(local_id, delay_);
201 }
202
RemoveFetchTask(const std::string & local_id)203 void SyncClient::RemoveFetchTask(const std::string& local_id) {
204 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
205
206 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
207 if (it == tasks_.end())
208 return;
209
210 SyncTask* task = &it->second;
211 switch (task->state) {
212 case PENDING:
213 tasks_.erase(it);
214 break;
215 case RUNNING:
216 if (!task->cancel_closure.is_null())
217 task->cancel_closure.Run();
218 break;
219 }
220 }
221
AddUpdateTask(const ClientContext & context,const std::string & local_id)222 void SyncClient::AddUpdateTask(const ClientContext& context,
223 const std::string& local_id) {
224 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
225 AddUpdateTaskInternal(context, local_id, delay_);
226 }
227
AddFetchTaskInternal(const std::string & local_id,const base::TimeDelta & delay)228 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
229 const base::TimeDelta& delay) {
230 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
231
232 SyncTask task;
233 task.task = base::Bind(
234 &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
235 base::Unretained(download_operation_.get()),
236 local_id,
237 ClientContext(BACKGROUND),
238 GetFileContentInitializedCallback(),
239 google_apis::GetContentCallback(),
240 base::Bind(&SyncClient::OnFetchFileComplete,
241 weak_ptr_factory_.GetWeakPtr(),
242 local_id));
243 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
244 }
245
AddUpdateTaskInternal(const ClientContext & context,const std::string & local_id,const base::TimeDelta & delay)246 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
247 const std::string& local_id,
248 const base::TimeDelta& delay) {
249 SyncTask task;
250 task.task = base::Bind(
251 &RunTaskAndReturnDummyCancelClosure,
252 base::Bind(&EntryUpdatePerformer::UpdateEntry,
253 base::Unretained(entry_update_performer_.get()),
254 local_id,
255 context,
256 base::Bind(&SyncClient::OnUpdateComplete,
257 weak_ptr_factory_.GetWeakPtr(),
258 local_id)));
259 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
260 }
261
AddTask(const SyncTasks::key_type & key,const SyncTask & task,const base::TimeDelta & delay)262 void SyncClient::AddTask(const SyncTasks::key_type& key,
263 const SyncTask& task,
264 const base::TimeDelta& delay) {
265 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
266
267 SyncTasks::iterator it = tasks_.find(key);
268 if (it != tasks_.end()) {
269 switch (it->second.state) {
270 case PENDING:
271 // The same task will run, do nothing.
272 break;
273 case RUNNING:
274 // Something has changed since the task started. Schedule rerun.
275 it->second.should_run_again = true;
276 break;
277 }
278 return;
279 }
280
281 DCHECK_EQ(PENDING, task.state);
282 tasks_[key] = task;
283
284 base::MessageLoopProxy::current()->PostDelayedTask(
285 FROM_HERE,
286 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
287 delay);
288 }
289
StartTask(const SyncTasks::key_type & key)290 void SyncClient::StartTask(const SyncTasks::key_type& key) {
291 SyncTasks::iterator it = tasks_.find(key);
292 if (it == tasks_.end())
293 return;
294
295 SyncTask* task = &it->second;
296 switch (task->state) {
297 case PENDING:
298 task->state = RUNNING;
299 task->cancel_closure = task->task.Run();
300 break;
301 case RUNNING: // Do nothing.
302 break;
303 }
304 }
305
OnGetLocalIdsOfBacklog(const std::vector<std::string> * to_fetch,const std::vector<std::string> * to_update)306 void SyncClient::OnGetLocalIdsOfBacklog(
307 const std::vector<std::string>* to_fetch,
308 const std::vector<std::string>* to_update) {
309 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
310
311 // Give priority to upload tasks over fetch tasks, so that dirty files are
312 // uploaded as soon as possible.
313 for (size_t i = 0; i < to_update->size(); ++i) {
314 const std::string& local_id = (*to_update)[i];
315 DVLOG(1) << "Queuing to update: " << local_id;
316 AddUpdateTask(ClientContext(BACKGROUND), local_id);
317 }
318
319 for (size_t i = 0; i < to_fetch->size(); ++i) {
320 const std::string& local_id = (*to_fetch)[i];
321 DVLOG(1) << "Queuing to fetch: " << local_id;
322 AddFetchTaskInternal(local_id, delay_);
323 }
324 }
325
AddFetchTasks(const std::vector<std::string> * local_ids)326 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328
329 for (size_t i = 0; i < local_ids->size(); ++i)
330 AddFetchTask((*local_ids)[i]);
331 }
332
OnTaskComplete(SyncType type,const std::string & local_id)333 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
334 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
335
336 const SyncTasks::key_type key(type, local_id);
337 SyncTasks::iterator it = tasks_.find(key);
338 DCHECK(it != tasks_.end());
339
340 if (it->second.should_run_again) {
341 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
342 it->second.should_run_again = false;
343 it->second.task.Run();
344 return false;
345 }
346
347 tasks_.erase(it);
348 return true;
349 }
350
OnFetchFileComplete(const std::string & local_id,FileError error,const base::FilePath & local_path,scoped_ptr<ResourceEntry> entry)351 void SyncClient::OnFetchFileComplete(const std::string& local_id,
352 FileError error,
353 const base::FilePath& local_path,
354 scoped_ptr<ResourceEntry> entry) {
355 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
356
357 if (!OnTaskComplete(FETCH, local_id))
358 return;
359
360 if (error == FILE_ERROR_OK) {
361 DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
362 } else {
363 switch (error) {
364 case FILE_ERROR_ABORT:
365 // If user cancels download, unpin the file so that we do not sync the
366 // file again.
367 base::PostTaskAndReplyWithResult(
368 blocking_task_runner_,
369 FROM_HERE,
370 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
371 base::Bind(&util::EmptyFileOperationCallback));
372 break;
373 case FILE_ERROR_NO_CONNECTION:
374 // Add the task again so that we'll retry once the connection is back.
375 AddFetchTaskInternal(local_id, delay_);
376 break;
377 case FILE_ERROR_SERVICE_UNAVAILABLE:
378 // Add the task again so that we'll retry once the service is back.
379 AddFetchTaskInternal(local_id, long_delay_);
380 operation_observer_->OnDriveSyncError(
381 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
382 local_id);
383 break;
384 default:
385 operation_observer_->OnDriveSyncError(
386 file_system::DRIVE_SYNC_ERROR_MISC,
387 local_id);
388 LOG(WARNING) << "Failed to fetch " << local_id
389 << ": " << FileErrorToString(error);
390 }
391 }
392 }
393
OnUpdateComplete(const std::string & local_id,FileError error)394 void SyncClient::OnUpdateComplete(const std::string& local_id,
395 FileError error) {
396 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
397
398 if (!OnTaskComplete(UPDATE, local_id))
399 return;
400
401 if (error == FILE_ERROR_OK) {
402 DVLOG(1) << "Updated " << local_id;
403
404 // Add update tasks for child entries which may be waiting for the parent to
405 // be updated.
406 ResourceEntryVector* entries = new ResourceEntryVector;
407 base::PostTaskAndReplyWithResult(
408 blocking_task_runner_.get(),
409 FROM_HERE,
410 base::Bind(&ResourceMetadata::ReadDirectoryById,
411 base::Unretained(metadata_), local_id, entries),
412 base::Bind(&SyncClient::AddChildUpdateTasks,
413 weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
414 } else {
415 switch (error) {
416 case FILE_ERROR_ABORT:
417 // Ignore it because this is caused by user's cancel operations.
418 break;
419 case FILE_ERROR_NO_CONNECTION:
420 // Add the task again so that we'll retry once the connection is back.
421 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
422 base::TimeDelta::FromSeconds(0));
423 break;
424 case FILE_ERROR_SERVICE_UNAVAILABLE:
425 // Add the task again so that we'll retry once the service is back.
426 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
427 operation_observer_->OnDriveSyncError(
428 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
429 local_id);
430 break;
431 default:
432 operation_observer_->OnDriveSyncError(
433 file_system::DRIVE_SYNC_ERROR_MISC,
434 local_id);
435 LOG(WARNING) << "Failed to update " << local_id << ": "
436 << FileErrorToString(error);
437 }
438 }
439 }
440
AddChildUpdateTasks(const ResourceEntryVector * entries,FileError error)441 void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
442 FileError error) {
443 if (error != FILE_ERROR_OK)
444 return;
445
446 for (size_t i = 0; i < entries->size(); ++i) {
447 const ResourceEntry& entry = (*entries)[i];
448 if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
449 AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
450 base::TimeDelta::FromSeconds(0));
451 }
452 }
453 }
454
455 } // namespace internal
456 } // namespace drive
457