• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/chromeos/drive/sync_client.h"
6 
7 #include <vector>
8 
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop_proxy.h"
11 #include "chrome/browser/chromeos/drive/drive.pb.h"
12 #include "chrome/browser/chromeos/drive/file_cache.h"
13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
14 #include "chrome/browser/chromeos/drive/file_system/update_operation.h"
15 #include "chrome/browser/chromeos/drive/file_system_util.h"
16 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
17 #include "content/public/browser/browser_thread.h"
18 #include "google_apis/drive/task_util.h"
19 
20 using content::BrowserThread;
21 
22 namespace drive {
23 namespace internal {
24 
25 namespace {
26 
27 // The delay constant is used to delay processing a sync task. We should not
28 // process SyncTasks immediately for the following reasons:
29 //
30 // 1) For fetching, the user may accidentally click on "Make available
31 //    offline" checkbox on a file, and immediately cancel it in a second.
32 //    It's a waste to fetch the file in this case.
33 //
34 // 2) For uploading, file writing via HTML5 file system API is performed in
35 //    two steps: 1) truncate a file to 0 bytes, 2) write contents. We
36 //    shouldn't start uploading right after the step 1). Besides, the user
37 //    may edit the same file repeatedly in a short period of time.
38 //
39 // TODO(satorux): We should find a way to handle the upload case more nicely,
40 // and shorten the delay. crbug.com/134774
41 const int kDelaySeconds = 5;
42 
43 // The delay constant is used to delay retrying a sync task on server errors.
44 const int kLongDelaySeconds = 600;
45 
46 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
47 // fetched (not present locally), to |to_upload| if the file is dirty but not
48 // uploaded, or to |to_remove| if the entry is in the trash.
CollectBacklog(ResourceMetadata * metadata,std::vector<std::string> * to_fetch,std::vector<std::string> * to_upload,std::vector<std::string> * to_update)49 void CollectBacklog(ResourceMetadata* metadata,
50                     std::vector<std::string>* to_fetch,
51                     std::vector<std::string>* to_upload,
52                     std::vector<std::string>* to_update) {
53   DCHECK(to_fetch);
54   DCHECK(to_upload);
55   DCHECK(to_update);
56 
57   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
58   for (; !it->IsAtEnd(); it->Advance()) {
59     const std::string& local_id = it->GetID();
60     const ResourceEntry& entry = it->GetValue();
61     if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
62       to_update->push_back(local_id);
63       continue;
64     }
65 
66     switch (entry.metadata_edit_state()) {
67       case ResourceEntry::CLEAN:
68         break;
69       case ResourceEntry::SYNCING:
70       case ResourceEntry::DIRTY:
71         to_update->push_back(local_id);
72         break;
73     }
74 
75     FileCacheEntry cache_entry;
76     if (it->GetCacheEntry(&cache_entry)) {
77       if (cache_entry.is_pinned() && !cache_entry.is_present())
78         to_fetch->push_back(local_id);
79 
80       if (cache_entry.is_dirty())
81         to_upload->push_back(local_id);
82     }
83   }
84   DCHECK(!it->HasError());
85 }
86 
87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
CheckExistingPinnedFiles(ResourceMetadata * metadata,FileCache * cache,std::vector<std::string> * local_ids)88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
89                               FileCache* cache,
90                               std::vector<std::string>* local_ids) {
91   scoped_ptr<FileCache::Iterator> it = cache->GetIterator();
92   for (; !it->IsAtEnd(); it->Advance()) {
93     const FileCacheEntry& cache_entry = it->GetValue();
94     const std::string& local_id = it->GetID();
95     if (!cache_entry.is_pinned() || !cache_entry.is_present())
96       continue;
97 
98     ResourceEntry entry;
99     FileError error = metadata->GetResourceEntryById(local_id, &entry);
100     if (error != FILE_ERROR_OK) {
101       LOG(WARNING) << "Entry not found: " << local_id;
102       continue;
103     }
104 
105     // If MD5s don't match, it indicates the local cache file is stale, unless
106     // the file is dirty (the MD5 is "local"). We should never re-fetch the
107     // file when we have a locally modified version.
108     if (entry.file_specific_info().md5() == cache_entry.md5() ||
109         cache_entry.is_dirty())
110       continue;
111 
112     error = cache->Remove(local_id);
113     if (error != FILE_ERROR_OK) {
114       LOG(WARNING) << "Failed to remove cache entry: " << local_id;
115       continue;
116     }
117 
118     error = cache->Pin(local_id);
119     if (error != FILE_ERROR_OK) {
120       LOG(WARNING) << "Failed to pin cache entry: " << local_id;
121       continue;
122     }
123 
124     local_ids->push_back(local_id);
125   }
126   DCHECK(!it->HasError());
127 }
128 
129 }  // namespace
130 
SyncTask()131 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
~SyncTask()132 SyncClient::SyncTask::~SyncTask() {}
133 
SyncClient(base::SequencedTaskRunner * blocking_task_runner,file_system::OperationObserver * observer,JobScheduler * scheduler,ResourceMetadata * metadata,FileCache * cache,const base::FilePath & temporary_file_directory)134 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
135                        file_system::OperationObserver* observer,
136                        JobScheduler* scheduler,
137                        ResourceMetadata* metadata,
138                        FileCache* cache,
139                        const base::FilePath& temporary_file_directory)
140     : blocking_task_runner_(blocking_task_runner),
141       metadata_(metadata),
142       cache_(cache),
143       download_operation_(new file_system::DownloadOperation(
144           blocking_task_runner,
145           observer,
146           scheduler,
147           metadata,
148           cache,
149           temporary_file_directory)),
150       update_operation_(new file_system::UpdateOperation(blocking_task_runner,
151                                                          observer,
152                                                          scheduler,
153                                                          metadata,
154                                                          cache)),
155       entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
156                                                        observer,
157                                                        scheduler,
158                                                        metadata)),
159       delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
160       long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
161       weak_ptr_factory_(this) {
162   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
163 }
164 
~SyncClient()165 SyncClient::~SyncClient() {
166   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
167 }
168 
StartProcessingBacklog()169 void SyncClient::StartProcessingBacklog() {
170   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
171 
172   std::vector<std::string>* to_fetch = new std::vector<std::string>;
173   std::vector<std::string>* to_upload = new std::vector<std::string>;
174   std::vector<std::string>* to_remove = new std::vector<std::string>;
175   blocking_task_runner_->PostTaskAndReply(
176       FROM_HERE,
177       base::Bind(&CollectBacklog, metadata_, to_fetch, to_upload, to_remove),
178       base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
179                  weak_ptr_factory_.GetWeakPtr(),
180                  base::Owned(to_fetch),
181                  base::Owned(to_upload),
182                  base::Owned(to_remove)));
183 }
184 
StartCheckingExistingPinnedFiles()185 void SyncClient::StartCheckingExistingPinnedFiles() {
186   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
187 
188   std::vector<std::string>* local_ids = new std::vector<std::string>;
189   blocking_task_runner_->PostTaskAndReply(
190       FROM_HERE,
191       base::Bind(&CheckExistingPinnedFiles,
192                  metadata_,
193                  cache_,
194                  local_ids),
195       base::Bind(&SyncClient::AddFetchTasks,
196                  weak_ptr_factory_.GetWeakPtr(),
197                  base::Owned(local_ids)));
198 }
199 
AddFetchTask(const std::string & local_id)200 void SyncClient::AddFetchTask(const std::string& local_id) {
201   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
202   AddFetchTaskInternal(local_id, delay_);
203 }
204 
RemoveFetchTask(const std::string & local_id)205 void SyncClient::RemoveFetchTask(const std::string& local_id) {
206   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
207 
208   SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
209   if (it == tasks_.end())
210     return;
211 
212   SyncTask* task = &it->second;
213   switch (task->state) {
214     case PENDING:
215       tasks_.erase(it);
216       break;
217     case RUNNING:
218       // TODO(kinaba): Cancel tasks in JobScheduler as well. crbug.com/248856
219       break;
220   }
221 }
222 
AddUploadTask(const ClientContext & context,const std::string & local_id)223 void SyncClient::AddUploadTask(const ClientContext& context,
224                                const std::string& local_id) {
225   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
226   AddUploadTaskInternal(context, local_id,
227                         file_system::UpdateOperation::RUN_CONTENT_CHECK,
228                         delay_);
229 }
230 
AddUpdateTask(const std::string & local_id)231 void SyncClient::AddUpdateTask(const std::string& local_id) {
232   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
233   AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0));
234 }
235 
AddFetchTaskInternal(const std::string & local_id,const base::TimeDelta & delay)236 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
237                                       const base::TimeDelta& delay) {
238   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
239 
240   SyncTask task;
241   task.task = base::Bind(
242       &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
243       base::Unretained(download_operation_.get()),
244       local_id,
245       ClientContext(BACKGROUND),
246       GetFileContentInitializedCallback(),
247       google_apis::GetContentCallback(),
248       base::Bind(&SyncClient::OnFetchFileComplete,
249                  weak_ptr_factory_.GetWeakPtr(),
250                  local_id));
251   AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
252 }
253 
AddUploadTaskInternal(const ClientContext & context,const std::string & local_id,file_system::UpdateOperation::ContentCheckMode content_check_mode,const base::TimeDelta & delay)254 void SyncClient::AddUploadTaskInternal(
255     const ClientContext& context,
256     const std::string& local_id,
257     file_system::UpdateOperation::ContentCheckMode content_check_mode,
258     const base::TimeDelta& delay) {
259   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
260 
261   SyncTask task;
262   task.task = base::Bind(
263       &file_system::UpdateOperation::UpdateFileByLocalId,
264       base::Unretained(update_operation_.get()),
265       local_id,
266       context,
267       content_check_mode,
268       base::Bind(&SyncClient::OnUploadFileComplete,
269                  weak_ptr_factory_.GetWeakPtr(),
270                  local_id));
271   AddTask(SyncTasks::key_type(UPLOAD, local_id), task, delay);
272 }
273 
AddUpdateTaskInternal(const std::string & local_id,const base::TimeDelta & delay)274 void SyncClient::AddUpdateTaskInternal(const std::string& local_id,
275                                        const base::TimeDelta& delay) {
276   SyncTask task;
277   task.task = base::Bind(
278       &EntryUpdatePerformer::UpdateEntry,
279       base::Unretained(entry_update_performer_.get()),
280       local_id,
281       base::Bind(&SyncClient::OnUpdateComplete,
282                  weak_ptr_factory_.GetWeakPtr(),
283                  local_id));
284   AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
285 }
286 
AddTask(const SyncTasks::key_type & key,const SyncTask & task,const base::TimeDelta & delay)287 void SyncClient::AddTask(const SyncTasks::key_type& key,
288                          const SyncTask& task,
289                          const base::TimeDelta& delay) {
290   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
291 
292   SyncTasks::iterator it = tasks_.find(key);
293   if (it != tasks_.end()) {
294     switch (it->second.state) {
295       case PENDING:
296         // The same task will run, do nothing.
297         break;
298       case RUNNING:
299         // Something has changed since the task started. Schedule rerun.
300         it->second.should_run_again = true;
301         break;
302     }
303     return;
304   }
305 
306   DCHECK_EQ(PENDING, task.state);
307   tasks_[key] = task;
308 
309   base::MessageLoopProxy::current()->PostDelayedTask(
310       FROM_HERE,
311       base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
312       delay);
313 }
314 
StartTask(const SyncTasks::key_type & key)315 void SyncClient::StartTask(const SyncTasks::key_type& key) {
316   SyncTasks::iterator it = tasks_.find(key);
317   if (it == tasks_.end())
318     return;
319 
320   SyncTask* task = &it->second;
321   switch (task->state) {
322     case PENDING:
323       task->state = RUNNING;
324       task->task.Run();
325       break;
326     case RUNNING:  // Do nothing.
327       break;
328   }
329 }
330 
OnGetLocalIdsOfBacklog(const std::vector<std::string> * to_fetch,const std::vector<std::string> * to_upload,const std::vector<std::string> * to_update)331 void SyncClient::OnGetLocalIdsOfBacklog(
332     const std::vector<std::string>* to_fetch,
333     const std::vector<std::string>* to_upload,
334     const std::vector<std::string>* to_update) {
335   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
336 
337   // Give priority to upload tasks over fetch tasks, so that dirty files are
338   // uploaded as soon as possible.
339   for (size_t i = 0; i < to_upload->size(); ++i) {
340     const std::string& local_id = (*to_upload)[i];
341     DVLOG(1) << "Queuing to upload: " << local_id;
342     AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
343                           file_system::UpdateOperation::NO_CONTENT_CHECK,
344                           delay_);
345   }
346 
347   for (size_t i = 0; i < to_fetch->size(); ++i) {
348     const std::string& local_id = (*to_fetch)[i];
349     DVLOG(1) << "Queuing to fetch: " << local_id;
350     AddFetchTaskInternal(local_id, delay_);
351   }
352 
353   for (size_t i = 0; i < to_update->size(); ++i) {
354     const std::string& local_id = (*to_update)[i];
355     DVLOG(1) << "Queuing to update: " << local_id;
356     AddUpdateTask(local_id);
357   }
358 }
359 
AddFetchTasks(const std::vector<std::string> * local_ids)360 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
361   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
362 
363   for (size_t i = 0; i < local_ids->size(); ++i)
364     AddFetchTask((*local_ids)[i]);
365 }
366 
OnTaskComplete(SyncType type,const std::string & local_id)367 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
368   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
369 
370   const SyncTasks::key_type key(type, local_id);
371   SyncTasks::iterator it = tasks_.find(key);
372   DCHECK(it != tasks_.end());
373 
374   if (it->second.should_run_again) {
375     DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
376     it->second.should_run_again = false;
377     it->second.task.Run();
378     return false;
379   }
380 
381   tasks_.erase(it);
382   return true;
383 }
384 
OnFetchFileComplete(const std::string & local_id,FileError error,const base::FilePath & local_path,scoped_ptr<ResourceEntry> entry)385 void SyncClient::OnFetchFileComplete(const std::string& local_id,
386                                      FileError error,
387                                      const base::FilePath& local_path,
388                                      scoped_ptr<ResourceEntry> entry) {
389   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
390 
391   if (!OnTaskComplete(FETCH, local_id))
392     return;
393 
394   if (error == FILE_ERROR_OK) {
395     DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
396   } else {
397     switch (error) {
398       case FILE_ERROR_ABORT:
399         // If user cancels download, unpin the file so that we do not sync the
400         // file again.
401         base::PostTaskAndReplyWithResult(
402             blocking_task_runner_,
403             FROM_HERE,
404             base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
405             base::Bind(&util::EmptyFileOperationCallback));
406         break;
407       case FILE_ERROR_NO_CONNECTION:
408         // Add the task again so that we'll retry once the connection is back.
409         AddFetchTaskInternal(local_id, delay_);
410         break;
411       case FILE_ERROR_SERVICE_UNAVAILABLE:
412         // Add the task again so that we'll retry once the service is back.
413         AddFetchTaskInternal(local_id, long_delay_);
414         break;
415       default:
416         LOG(WARNING) << "Failed to fetch " << local_id
417                      << ": " << FileErrorToString(error);
418     }
419   }
420 }
421 
OnUploadFileComplete(const std::string & local_id,FileError error)422 void SyncClient::OnUploadFileComplete(const std::string& local_id,
423                                       FileError error) {
424   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
425 
426   if (!OnTaskComplete(UPLOAD, local_id))
427     return;
428 
429   if (error == FILE_ERROR_OK) {
430     DVLOG(1) << "Uploaded " << local_id;
431   } else {
432     switch (error) {
433       case FILE_ERROR_NO_CONNECTION:
434         // Add the task again so that we'll retry once the connection is back.
435         AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
436                               file_system::UpdateOperation::NO_CONTENT_CHECK,
437                               delay_);
438         break;
439       case FILE_ERROR_SERVICE_UNAVAILABLE:
440         // Add the task again so that we'll retry once the service is back.
441         AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
442                               file_system::UpdateOperation::NO_CONTENT_CHECK,
443                               long_delay_);
444         break;
445       default:
446         LOG(WARNING) << "Failed to upload " << local_id << ": "
447                      << FileErrorToString(error);
448     }
449   }
450 }
451 
OnUpdateComplete(const std::string & local_id,FileError error)452 void SyncClient::OnUpdateComplete(const std::string& local_id,
453                                   FileError error) {
454   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
455 
456   if (!OnTaskComplete(UPDATE, local_id))
457     return;
458 
459   if (error == FILE_ERROR_OK) {
460     DVLOG(1) << "Updated " << local_id;
461   } else {
462     switch (error) {
463       case FILE_ERROR_NO_CONNECTION:
464         // Add the task again so that we'll retry once the connection is back.
465         AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0));
466         break;
467       case FILE_ERROR_SERVICE_UNAVAILABLE:
468         // Add the task again so that we'll retry once the service is back.
469         AddUpdateTaskInternal(local_id, long_delay_);
470         break;
471       default:
472         LOG(WARNING) << "Failed to update " << local_id << ": "
473                      << FileErrorToString(error);
474     }
475   }
476 }
477 
478 }  // namespace internal
479 }  // namespace drive
480