1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/chromeos/drive/sync_client.h"
6
7 #include <vector>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop_proxy.h"
11 #include "chrome/browser/chromeos/drive/drive.pb.h"
12 #include "chrome/browser/chromeos/drive/file_cache.h"
13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
14 #include "chrome/browser/chromeos/drive/file_system/update_operation.h"
15 #include "chrome/browser/chromeos/drive/file_system_util.h"
16 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
17 #include "content/public/browser/browser_thread.h"
18 #include "google_apis/drive/task_util.h"
19
20 using content::BrowserThread;
21
22 namespace drive {
23 namespace internal {
24
25 namespace {
26
27 // The delay constant is used to delay processing a sync task. We should not
28 // process SyncTasks immediately for the following reasons:
29 //
30 // 1) For fetching, the user may accidentally click on "Make available
31 // offline" checkbox on a file, and immediately cancel it in a second.
32 // It's a waste to fetch the file in this case.
33 //
34 // 2) For uploading, file writing via HTML5 file system API is performed in
35 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We
36 // shouldn't start uploading right after the step 1). Besides, the user
37 // may edit the same file repeatedly in a short period of time.
38 //
39 // TODO(satorux): We should find a way to handle the upload case more nicely,
40 // and shorten the delay. crbug.com/134774
41 const int kDelaySeconds = 5;
42
43 // The delay constant is used to delay retrying a sync task on server errors.
44 const int kLongDelaySeconds = 600;
45
46 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
47 // fetched (not present locally), to |to_upload| if the file is dirty but not
48 // uploaded, or to |to_remove| if the entry is in the trash.
CollectBacklog(ResourceMetadata * metadata,std::vector<std::string> * to_fetch,std::vector<std::string> * to_upload,std::vector<std::string> * to_update)49 void CollectBacklog(ResourceMetadata* metadata,
50 std::vector<std::string>* to_fetch,
51 std::vector<std::string>* to_upload,
52 std::vector<std::string>* to_update) {
53 DCHECK(to_fetch);
54 DCHECK(to_upload);
55 DCHECK(to_update);
56
57 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
58 for (; !it->IsAtEnd(); it->Advance()) {
59 const std::string& local_id = it->GetID();
60 const ResourceEntry& entry = it->GetValue();
61 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
62 to_update->push_back(local_id);
63 continue;
64 }
65
66 switch (entry.metadata_edit_state()) {
67 case ResourceEntry::CLEAN:
68 break;
69 case ResourceEntry::SYNCING:
70 case ResourceEntry::DIRTY:
71 to_update->push_back(local_id);
72 break;
73 }
74
75 FileCacheEntry cache_entry;
76 if (it->GetCacheEntry(&cache_entry)) {
77 if (cache_entry.is_pinned() && !cache_entry.is_present())
78 to_fetch->push_back(local_id);
79
80 if (cache_entry.is_dirty())
81 to_upload->push_back(local_id);
82 }
83 }
84 DCHECK(!it->HasError());
85 }
86
87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
CheckExistingPinnedFiles(ResourceMetadata * metadata,FileCache * cache,std::vector<std::string> * local_ids)88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
89 FileCache* cache,
90 std::vector<std::string>* local_ids) {
91 scoped_ptr<FileCache::Iterator> it = cache->GetIterator();
92 for (; !it->IsAtEnd(); it->Advance()) {
93 const FileCacheEntry& cache_entry = it->GetValue();
94 const std::string& local_id = it->GetID();
95 if (!cache_entry.is_pinned() || !cache_entry.is_present())
96 continue;
97
98 ResourceEntry entry;
99 FileError error = metadata->GetResourceEntryById(local_id, &entry);
100 if (error != FILE_ERROR_OK) {
101 LOG(WARNING) << "Entry not found: " << local_id;
102 continue;
103 }
104
105 // If MD5s don't match, it indicates the local cache file is stale, unless
106 // the file is dirty (the MD5 is "local"). We should never re-fetch the
107 // file when we have a locally modified version.
108 if (entry.file_specific_info().md5() == cache_entry.md5() ||
109 cache_entry.is_dirty())
110 continue;
111
112 error = cache->Remove(local_id);
113 if (error != FILE_ERROR_OK) {
114 LOG(WARNING) << "Failed to remove cache entry: " << local_id;
115 continue;
116 }
117
118 error = cache->Pin(local_id);
119 if (error != FILE_ERROR_OK) {
120 LOG(WARNING) << "Failed to pin cache entry: " << local_id;
121 continue;
122 }
123
124 local_ids->push_back(local_id);
125 }
126 DCHECK(!it->HasError());
127 }
128
129 } // namespace
130
SyncTask()131 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
~SyncTask()132 SyncClient::SyncTask::~SyncTask() {}
133
SyncClient(base::SequencedTaskRunner * blocking_task_runner,file_system::OperationObserver * observer,JobScheduler * scheduler,ResourceMetadata * metadata,FileCache * cache,const base::FilePath & temporary_file_directory)134 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
135 file_system::OperationObserver* observer,
136 JobScheduler* scheduler,
137 ResourceMetadata* metadata,
138 FileCache* cache,
139 const base::FilePath& temporary_file_directory)
140 : blocking_task_runner_(blocking_task_runner),
141 metadata_(metadata),
142 cache_(cache),
143 download_operation_(new file_system::DownloadOperation(
144 blocking_task_runner,
145 observer,
146 scheduler,
147 metadata,
148 cache,
149 temporary_file_directory)),
150 update_operation_(new file_system::UpdateOperation(blocking_task_runner,
151 observer,
152 scheduler,
153 metadata,
154 cache)),
155 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
156 observer,
157 scheduler,
158 metadata)),
159 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
160 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
161 weak_ptr_factory_(this) {
162 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
163 }
164
~SyncClient()165 SyncClient::~SyncClient() {
166 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
167 }
168
StartProcessingBacklog()169 void SyncClient::StartProcessingBacklog() {
170 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
171
172 std::vector<std::string>* to_fetch = new std::vector<std::string>;
173 std::vector<std::string>* to_upload = new std::vector<std::string>;
174 std::vector<std::string>* to_remove = new std::vector<std::string>;
175 blocking_task_runner_->PostTaskAndReply(
176 FROM_HERE,
177 base::Bind(&CollectBacklog, metadata_, to_fetch, to_upload, to_remove),
178 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
179 weak_ptr_factory_.GetWeakPtr(),
180 base::Owned(to_fetch),
181 base::Owned(to_upload),
182 base::Owned(to_remove)));
183 }
184
StartCheckingExistingPinnedFiles()185 void SyncClient::StartCheckingExistingPinnedFiles() {
186 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
187
188 std::vector<std::string>* local_ids = new std::vector<std::string>;
189 blocking_task_runner_->PostTaskAndReply(
190 FROM_HERE,
191 base::Bind(&CheckExistingPinnedFiles,
192 metadata_,
193 cache_,
194 local_ids),
195 base::Bind(&SyncClient::AddFetchTasks,
196 weak_ptr_factory_.GetWeakPtr(),
197 base::Owned(local_ids)));
198 }
199
AddFetchTask(const std::string & local_id)200 void SyncClient::AddFetchTask(const std::string& local_id) {
201 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
202 AddFetchTaskInternal(local_id, delay_);
203 }
204
RemoveFetchTask(const std::string & local_id)205 void SyncClient::RemoveFetchTask(const std::string& local_id) {
206 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
207
208 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
209 if (it == tasks_.end())
210 return;
211
212 SyncTask* task = &it->second;
213 switch (task->state) {
214 case PENDING:
215 tasks_.erase(it);
216 break;
217 case RUNNING:
218 // TODO(kinaba): Cancel tasks in JobScheduler as well. crbug.com/248856
219 break;
220 }
221 }
222
AddUploadTask(const ClientContext & context,const std::string & local_id)223 void SyncClient::AddUploadTask(const ClientContext& context,
224 const std::string& local_id) {
225 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
226 AddUploadTaskInternal(context, local_id,
227 file_system::UpdateOperation::RUN_CONTENT_CHECK,
228 delay_);
229 }
230
AddUpdateTask(const std::string & local_id)231 void SyncClient::AddUpdateTask(const std::string& local_id) {
232 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
233 AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0));
234 }
235
AddFetchTaskInternal(const std::string & local_id,const base::TimeDelta & delay)236 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
237 const base::TimeDelta& delay) {
238 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
239
240 SyncTask task;
241 task.task = base::Bind(
242 &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
243 base::Unretained(download_operation_.get()),
244 local_id,
245 ClientContext(BACKGROUND),
246 GetFileContentInitializedCallback(),
247 google_apis::GetContentCallback(),
248 base::Bind(&SyncClient::OnFetchFileComplete,
249 weak_ptr_factory_.GetWeakPtr(),
250 local_id));
251 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
252 }
253
AddUploadTaskInternal(const ClientContext & context,const std::string & local_id,file_system::UpdateOperation::ContentCheckMode content_check_mode,const base::TimeDelta & delay)254 void SyncClient::AddUploadTaskInternal(
255 const ClientContext& context,
256 const std::string& local_id,
257 file_system::UpdateOperation::ContentCheckMode content_check_mode,
258 const base::TimeDelta& delay) {
259 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
260
261 SyncTask task;
262 task.task = base::Bind(
263 &file_system::UpdateOperation::UpdateFileByLocalId,
264 base::Unretained(update_operation_.get()),
265 local_id,
266 context,
267 content_check_mode,
268 base::Bind(&SyncClient::OnUploadFileComplete,
269 weak_ptr_factory_.GetWeakPtr(),
270 local_id));
271 AddTask(SyncTasks::key_type(UPLOAD, local_id), task, delay);
272 }
273
AddUpdateTaskInternal(const std::string & local_id,const base::TimeDelta & delay)274 void SyncClient::AddUpdateTaskInternal(const std::string& local_id,
275 const base::TimeDelta& delay) {
276 SyncTask task;
277 task.task = base::Bind(
278 &EntryUpdatePerformer::UpdateEntry,
279 base::Unretained(entry_update_performer_.get()),
280 local_id,
281 base::Bind(&SyncClient::OnUpdateComplete,
282 weak_ptr_factory_.GetWeakPtr(),
283 local_id));
284 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
285 }
286
AddTask(const SyncTasks::key_type & key,const SyncTask & task,const base::TimeDelta & delay)287 void SyncClient::AddTask(const SyncTasks::key_type& key,
288 const SyncTask& task,
289 const base::TimeDelta& delay) {
290 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
291
292 SyncTasks::iterator it = tasks_.find(key);
293 if (it != tasks_.end()) {
294 switch (it->second.state) {
295 case PENDING:
296 // The same task will run, do nothing.
297 break;
298 case RUNNING:
299 // Something has changed since the task started. Schedule rerun.
300 it->second.should_run_again = true;
301 break;
302 }
303 return;
304 }
305
306 DCHECK_EQ(PENDING, task.state);
307 tasks_[key] = task;
308
309 base::MessageLoopProxy::current()->PostDelayedTask(
310 FROM_HERE,
311 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
312 delay);
313 }
314
StartTask(const SyncTasks::key_type & key)315 void SyncClient::StartTask(const SyncTasks::key_type& key) {
316 SyncTasks::iterator it = tasks_.find(key);
317 if (it == tasks_.end())
318 return;
319
320 SyncTask* task = &it->second;
321 switch (task->state) {
322 case PENDING:
323 task->state = RUNNING;
324 task->task.Run();
325 break;
326 case RUNNING: // Do nothing.
327 break;
328 }
329 }
330
OnGetLocalIdsOfBacklog(const std::vector<std::string> * to_fetch,const std::vector<std::string> * to_upload,const std::vector<std::string> * to_update)331 void SyncClient::OnGetLocalIdsOfBacklog(
332 const std::vector<std::string>* to_fetch,
333 const std::vector<std::string>* to_upload,
334 const std::vector<std::string>* to_update) {
335 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
336
337 // Give priority to upload tasks over fetch tasks, so that dirty files are
338 // uploaded as soon as possible.
339 for (size_t i = 0; i < to_upload->size(); ++i) {
340 const std::string& local_id = (*to_upload)[i];
341 DVLOG(1) << "Queuing to upload: " << local_id;
342 AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
343 file_system::UpdateOperation::NO_CONTENT_CHECK,
344 delay_);
345 }
346
347 for (size_t i = 0; i < to_fetch->size(); ++i) {
348 const std::string& local_id = (*to_fetch)[i];
349 DVLOG(1) << "Queuing to fetch: " << local_id;
350 AddFetchTaskInternal(local_id, delay_);
351 }
352
353 for (size_t i = 0; i < to_update->size(); ++i) {
354 const std::string& local_id = (*to_update)[i];
355 DVLOG(1) << "Queuing to update: " << local_id;
356 AddUpdateTask(local_id);
357 }
358 }
359
AddFetchTasks(const std::vector<std::string> * local_ids)360 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
361 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
362
363 for (size_t i = 0; i < local_ids->size(); ++i)
364 AddFetchTask((*local_ids)[i]);
365 }
366
OnTaskComplete(SyncType type,const std::string & local_id)367 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
368 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
369
370 const SyncTasks::key_type key(type, local_id);
371 SyncTasks::iterator it = tasks_.find(key);
372 DCHECK(it != tasks_.end());
373
374 if (it->second.should_run_again) {
375 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
376 it->second.should_run_again = false;
377 it->second.task.Run();
378 return false;
379 }
380
381 tasks_.erase(it);
382 return true;
383 }
384
OnFetchFileComplete(const std::string & local_id,FileError error,const base::FilePath & local_path,scoped_ptr<ResourceEntry> entry)385 void SyncClient::OnFetchFileComplete(const std::string& local_id,
386 FileError error,
387 const base::FilePath& local_path,
388 scoped_ptr<ResourceEntry> entry) {
389 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
390
391 if (!OnTaskComplete(FETCH, local_id))
392 return;
393
394 if (error == FILE_ERROR_OK) {
395 DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
396 } else {
397 switch (error) {
398 case FILE_ERROR_ABORT:
399 // If user cancels download, unpin the file so that we do not sync the
400 // file again.
401 base::PostTaskAndReplyWithResult(
402 blocking_task_runner_,
403 FROM_HERE,
404 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
405 base::Bind(&util::EmptyFileOperationCallback));
406 break;
407 case FILE_ERROR_NO_CONNECTION:
408 // Add the task again so that we'll retry once the connection is back.
409 AddFetchTaskInternal(local_id, delay_);
410 break;
411 case FILE_ERROR_SERVICE_UNAVAILABLE:
412 // Add the task again so that we'll retry once the service is back.
413 AddFetchTaskInternal(local_id, long_delay_);
414 break;
415 default:
416 LOG(WARNING) << "Failed to fetch " << local_id
417 << ": " << FileErrorToString(error);
418 }
419 }
420 }
421
OnUploadFileComplete(const std::string & local_id,FileError error)422 void SyncClient::OnUploadFileComplete(const std::string& local_id,
423 FileError error) {
424 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
425
426 if (!OnTaskComplete(UPLOAD, local_id))
427 return;
428
429 if (error == FILE_ERROR_OK) {
430 DVLOG(1) << "Uploaded " << local_id;
431 } else {
432 switch (error) {
433 case FILE_ERROR_NO_CONNECTION:
434 // Add the task again so that we'll retry once the connection is back.
435 AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
436 file_system::UpdateOperation::NO_CONTENT_CHECK,
437 delay_);
438 break;
439 case FILE_ERROR_SERVICE_UNAVAILABLE:
440 // Add the task again so that we'll retry once the service is back.
441 AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
442 file_system::UpdateOperation::NO_CONTENT_CHECK,
443 long_delay_);
444 break;
445 default:
446 LOG(WARNING) << "Failed to upload " << local_id << ": "
447 << FileErrorToString(error);
448 }
449 }
450 }
451
OnUpdateComplete(const std::string & local_id,FileError error)452 void SyncClient::OnUpdateComplete(const std::string& local_id,
453 FileError error) {
454 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
455
456 if (!OnTaskComplete(UPDATE, local_id))
457 return;
458
459 if (error == FILE_ERROR_OK) {
460 DVLOG(1) << "Updated " << local_id;
461 } else {
462 switch (error) {
463 case FILE_ERROR_NO_CONNECTION:
464 // Add the task again so that we'll retry once the connection is back.
465 AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0));
466 break;
467 case FILE_ERROR_SERVICE_UNAVAILABLE:
468 // Add the task again so that we'll retry once the service is back.
469 AddUpdateTaskInternal(local_id, long_delay_);
470 break;
471 default:
472 LOG(WARNING) << "Failed to update " << local_id << ": "
473 << FileErrorToString(error);
474 }
475 }
476 }
477
478 } // namespace internal
479 } // namespace drive
480