• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/chromeos/drive/sync_client.h"
6 
7 #include <vector>
8 
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop_proxy.h"
11 #include "chrome/browser/chromeos/drive/drive.pb.h"
12 #include "chrome/browser/chromeos/drive/file_cache.h"
13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
14 #include "chrome/browser/chromeos/drive/file_system/operation_observer.h"
15 #include "chrome/browser/chromeos/drive/file_system_util.h"
16 #include "chrome/browser/chromeos/drive/job_scheduler.h"
17 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
18 #include "content/public/browser/browser_thread.h"
19 #include "google_apis/drive/task_util.h"
20 
21 using content::BrowserThread;
22 
23 namespace drive {
24 namespace internal {
25 
26 namespace {
27 
28 // The delay constant is used to delay processing a sync task. We should not
29 // process SyncTasks immediately for the following reasons:
30 //
31 // 1) For fetching, the user may accidentally click on "Make available
32 //    offline" checkbox on a file, and immediately cancel it in a second.
33 //    It's a waste to fetch the file in this case.
34 //
35 // 2) For uploading, file writing via HTML5 file system API is performed in
36 //    two steps: 1) truncate a file to 0 bytes, 2) write contents. We
37 //    shouldn't start uploading right after the step 1). Besides, the user
38 //    may edit the same file repeatedly in a short period of time.
39 //
40 // TODO(satorux): We should find a way to handle the upload case more nicely,
41 // and shorten the delay. crbug.com/134774
42 const int kDelaySeconds = 1;
43 
44 // The delay constant is used to delay retrying a sync task on server errors.
45 const int kLongDelaySeconds = 600;
46 
47 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
48 // fetched (not present locally), to |to_update| if the file needs update.
CollectBacklog(ResourceMetadata * metadata,std::vector<std::string> * to_fetch,std::vector<std::string> * to_update)49 void CollectBacklog(ResourceMetadata* metadata,
50                     std::vector<std::string>* to_fetch,
51                     std::vector<std::string>* to_update) {
52   DCHECK(to_fetch);
53   DCHECK(to_update);
54 
55   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
56   for (; !it->IsAtEnd(); it->Advance()) {
57     const std::string& local_id = it->GetID();
58     const ResourceEntry& entry = it->GetValue();
59     if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
60       to_update->push_back(local_id);
61       continue;
62     }
63 
64     bool should_update = false;
65     switch (entry.metadata_edit_state()) {
66       case ResourceEntry::CLEAN:
67         break;
68       case ResourceEntry::SYNCING:
69       case ResourceEntry::DIRTY:
70         should_update = true;
71         break;
72     }
73 
74     if (entry.file_specific_info().cache_state().is_pinned() &&
75         !entry.file_specific_info().cache_state().is_present())
76       to_fetch->push_back(local_id);
77 
78     if (entry.file_specific_info().cache_state().is_dirty())
79       should_update = true;
80 
81     if (should_update)
82       to_update->push_back(local_id);
83   }
84   DCHECK(!it->HasError());
85 }
86 
87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
CheckExistingPinnedFiles(ResourceMetadata * metadata,FileCache * cache,std::vector<std::string> * local_ids)88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
89                               FileCache* cache,
90                               std::vector<std::string>* local_ids) {
91   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
92   for (; !it->IsAtEnd(); it->Advance()) {
93     const ResourceEntry& entry = it->GetValue();
94     const FileCacheEntry& cache_state =
95         entry.file_specific_info().cache_state();
96     const std::string& local_id = it->GetID();
97     if (!cache_state.is_pinned() || !cache_state.is_present())
98       continue;
99 
100     // If MD5s don't match, it indicates the local cache file is stale, unless
101     // the file is dirty (the MD5 is "local"). We should never re-fetch the
102     // file when we have a locally modified version.
103     if (entry.file_specific_info().md5() == cache_state.md5() ||
104         cache_state.is_dirty())
105       continue;
106 
107     FileError error = cache->Remove(local_id);
108     if (error != FILE_ERROR_OK) {
109       LOG(WARNING) << "Failed to remove cache entry: " << local_id;
110       continue;
111     }
112 
113     error = cache->Pin(local_id);
114     if (error != FILE_ERROR_OK) {
115       LOG(WARNING) << "Failed to pin cache entry: " << local_id;
116       continue;
117     }
118 
119     local_ids->push_back(local_id);
120   }
121   DCHECK(!it->HasError());
122 }
123 
124 // Runs the task and returns a dummy cancel closure.
RunTaskAndReturnDummyCancelClosure(const base::Closure & task)125 base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) {
126   task.Run();
127   return base::Closure();
128 }
129 
130 }  // namespace
131 
SyncTask()132 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
~SyncTask()133 SyncClient::SyncTask::~SyncTask() {}
134 
SyncClient(base::SequencedTaskRunner * blocking_task_runner,file_system::OperationObserver * observer,JobScheduler * scheduler,ResourceMetadata * metadata,FileCache * cache,LoaderController * loader_controller,const base::FilePath & temporary_file_directory)135 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
136                        file_system::OperationObserver* observer,
137                        JobScheduler* scheduler,
138                        ResourceMetadata* metadata,
139                        FileCache* cache,
140                        LoaderController* loader_controller,
141                        const base::FilePath& temporary_file_directory)
142     : blocking_task_runner_(blocking_task_runner),
143       operation_observer_(observer),
144       metadata_(metadata),
145       cache_(cache),
146       download_operation_(new file_system::DownloadOperation(
147           blocking_task_runner,
148           observer,
149           scheduler,
150           metadata,
151           cache,
152           temporary_file_directory)),
153       entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
154                                                        observer,
155                                                        scheduler,
156                                                        metadata,
157                                                        cache,
158                                                        loader_controller)),
159       delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
160       long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
161       weak_ptr_factory_(this) {
162   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
163 }
164 
~SyncClient()165 SyncClient::~SyncClient() {
166   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
167 }
168 
StartProcessingBacklog()169 void SyncClient::StartProcessingBacklog() {
170   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
171 
172   std::vector<std::string>* to_fetch = new std::vector<std::string>;
173   std::vector<std::string>* to_update = new std::vector<std::string>;
174   blocking_task_runner_->PostTaskAndReply(
175       FROM_HERE,
176       base::Bind(&CollectBacklog, metadata_, to_fetch, to_update),
177       base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
178                  weak_ptr_factory_.GetWeakPtr(),
179                  base::Owned(to_fetch),
180                  base::Owned(to_update)));
181 }
182 
StartCheckingExistingPinnedFiles()183 void SyncClient::StartCheckingExistingPinnedFiles() {
184   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
185 
186   std::vector<std::string>* local_ids = new std::vector<std::string>;
187   blocking_task_runner_->PostTaskAndReply(
188       FROM_HERE,
189       base::Bind(&CheckExistingPinnedFiles,
190                  metadata_,
191                  cache_,
192                  local_ids),
193       base::Bind(&SyncClient::AddFetchTasks,
194                  weak_ptr_factory_.GetWeakPtr(),
195                  base::Owned(local_ids)));
196 }
197 
AddFetchTask(const std::string & local_id)198 void SyncClient::AddFetchTask(const std::string& local_id) {
199   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
200   AddFetchTaskInternal(local_id, delay_);
201 }
202 
RemoveFetchTask(const std::string & local_id)203 void SyncClient::RemoveFetchTask(const std::string& local_id) {
204   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
205 
206   SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
207   if (it == tasks_.end())
208     return;
209 
210   SyncTask* task = &it->second;
211   switch (task->state) {
212     case PENDING:
213       tasks_.erase(it);
214       break;
215     case RUNNING:
216       if (!task->cancel_closure.is_null())
217         task->cancel_closure.Run();
218       break;
219   }
220 }
221 
AddUpdateTask(const ClientContext & context,const std::string & local_id)222 void SyncClient::AddUpdateTask(const ClientContext& context,
223                                const std::string& local_id) {
224   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
225   AddUpdateTaskInternal(context, local_id, delay_);
226 }
227 
AddFetchTaskInternal(const std::string & local_id,const base::TimeDelta & delay)228 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
229                                       const base::TimeDelta& delay) {
230   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
231 
232   SyncTask task;
233   task.task = base::Bind(
234       &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
235       base::Unretained(download_operation_.get()),
236       local_id,
237       ClientContext(BACKGROUND),
238       GetFileContentInitializedCallback(),
239       google_apis::GetContentCallback(),
240       base::Bind(&SyncClient::OnFetchFileComplete,
241                  weak_ptr_factory_.GetWeakPtr(),
242                  local_id));
243   AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
244 }
245 
AddUpdateTaskInternal(const ClientContext & context,const std::string & local_id,const base::TimeDelta & delay)246 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
247                                        const std::string& local_id,
248                                        const base::TimeDelta& delay) {
249   SyncTask task;
250   task.task = base::Bind(
251       &RunTaskAndReturnDummyCancelClosure,
252       base::Bind(&EntryUpdatePerformer::UpdateEntry,
253                  base::Unretained(entry_update_performer_.get()),
254                  local_id,
255                  context,
256                  base::Bind(&SyncClient::OnUpdateComplete,
257                             weak_ptr_factory_.GetWeakPtr(),
258                             local_id)));
259   AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
260 }
261 
AddTask(const SyncTasks::key_type & key,const SyncTask & task,const base::TimeDelta & delay)262 void SyncClient::AddTask(const SyncTasks::key_type& key,
263                          const SyncTask& task,
264                          const base::TimeDelta& delay) {
265   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
266 
267   SyncTasks::iterator it = tasks_.find(key);
268   if (it != tasks_.end()) {
269     switch (it->second.state) {
270       case PENDING:
271         // The same task will run, do nothing.
272         break;
273       case RUNNING:
274         // Something has changed since the task started. Schedule rerun.
275         it->second.should_run_again = true;
276         break;
277     }
278     return;
279   }
280 
281   DCHECK_EQ(PENDING, task.state);
282   tasks_[key] = task;
283 
284   base::MessageLoopProxy::current()->PostDelayedTask(
285       FROM_HERE,
286       base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
287       delay);
288 }
289 
StartTask(const SyncTasks::key_type & key)290 void SyncClient::StartTask(const SyncTasks::key_type& key) {
291   SyncTasks::iterator it = tasks_.find(key);
292   if (it == tasks_.end())
293     return;
294 
295   SyncTask* task = &it->second;
296   switch (task->state) {
297     case PENDING:
298       task->state = RUNNING;
299       task->cancel_closure = task->task.Run();
300       break;
301     case RUNNING:  // Do nothing.
302       break;
303   }
304 }
305 
OnGetLocalIdsOfBacklog(const std::vector<std::string> * to_fetch,const std::vector<std::string> * to_update)306 void SyncClient::OnGetLocalIdsOfBacklog(
307     const std::vector<std::string>* to_fetch,
308     const std::vector<std::string>* to_update) {
309   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
310 
311   // Give priority to upload tasks over fetch tasks, so that dirty files are
312   // uploaded as soon as possible.
313   for (size_t i = 0; i < to_update->size(); ++i) {
314     const std::string& local_id = (*to_update)[i];
315     DVLOG(1) << "Queuing to update: " << local_id;
316     AddUpdateTask(ClientContext(BACKGROUND), local_id);
317   }
318 
319   for (size_t i = 0; i < to_fetch->size(); ++i) {
320     const std::string& local_id = (*to_fetch)[i];
321     DVLOG(1) << "Queuing to fetch: " << local_id;
322     AddFetchTaskInternal(local_id, delay_);
323   }
324 }
325 
AddFetchTasks(const std::vector<std::string> * local_ids)326 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
327   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328 
329   for (size_t i = 0; i < local_ids->size(); ++i)
330     AddFetchTask((*local_ids)[i]);
331 }
332 
OnTaskComplete(SyncType type,const std::string & local_id)333 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
334   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
335 
336   const SyncTasks::key_type key(type, local_id);
337   SyncTasks::iterator it = tasks_.find(key);
338   DCHECK(it != tasks_.end());
339 
340   if (it->second.should_run_again) {
341     DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
342     it->second.should_run_again = false;
343     it->second.task.Run();
344     return false;
345   }
346 
347   tasks_.erase(it);
348   return true;
349 }
350 
OnFetchFileComplete(const std::string & local_id,FileError error,const base::FilePath & local_path,scoped_ptr<ResourceEntry> entry)351 void SyncClient::OnFetchFileComplete(const std::string& local_id,
352                                      FileError error,
353                                      const base::FilePath& local_path,
354                                      scoped_ptr<ResourceEntry> entry) {
355   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
356 
357   if (!OnTaskComplete(FETCH, local_id))
358     return;
359 
360   if (error == FILE_ERROR_OK) {
361     DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
362   } else {
363     switch (error) {
364       case FILE_ERROR_ABORT:
365         // If user cancels download, unpin the file so that we do not sync the
366         // file again.
367         base::PostTaskAndReplyWithResult(
368             blocking_task_runner_,
369             FROM_HERE,
370             base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
371             base::Bind(&util::EmptyFileOperationCallback));
372         break;
373       case FILE_ERROR_NO_CONNECTION:
374         // Add the task again so that we'll retry once the connection is back.
375         AddFetchTaskInternal(local_id, delay_);
376         break;
377       case FILE_ERROR_SERVICE_UNAVAILABLE:
378         // Add the task again so that we'll retry once the service is back.
379         AddFetchTaskInternal(local_id, long_delay_);
380         operation_observer_->OnDriveSyncError(
381             file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
382             local_id);
383         break;
384       default:
385         operation_observer_->OnDriveSyncError(
386             file_system::DRIVE_SYNC_ERROR_MISC,
387             local_id);
388         LOG(WARNING) << "Failed to fetch " << local_id
389                      << ": " << FileErrorToString(error);
390     }
391   }
392 }
393 
OnUpdateComplete(const std::string & local_id,FileError error)394 void SyncClient::OnUpdateComplete(const std::string& local_id,
395                                   FileError error) {
396   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
397 
398   if (!OnTaskComplete(UPDATE, local_id))
399     return;
400 
401   if (error == FILE_ERROR_OK) {
402     DVLOG(1) << "Updated " << local_id;
403 
404     // Add update tasks for child entries which may be waiting for the parent to
405     // be updated.
406     ResourceEntryVector* entries = new ResourceEntryVector;
407     base::PostTaskAndReplyWithResult(
408         blocking_task_runner_.get(),
409         FROM_HERE,
410         base::Bind(&ResourceMetadata::ReadDirectoryById,
411                    base::Unretained(metadata_), local_id, entries),
412         base::Bind(&SyncClient::AddChildUpdateTasks,
413                    weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
414   } else {
415     switch (error) {
416       case FILE_ERROR_ABORT:
417         // Ignore it because this is caused by user's cancel operations.
418         break;
419       case FILE_ERROR_NO_CONNECTION:
420         // Add the task again so that we'll retry once the connection is back.
421         AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
422                               base::TimeDelta::FromSeconds(0));
423         break;
424       case FILE_ERROR_SERVICE_UNAVAILABLE:
425         // Add the task again so that we'll retry once the service is back.
426         AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
427         operation_observer_->OnDriveSyncError(
428             file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
429             local_id);
430         break;
431       default:
432         operation_observer_->OnDriveSyncError(
433             file_system::DRIVE_SYNC_ERROR_MISC,
434             local_id);
435         LOG(WARNING) << "Failed to update " << local_id << ": "
436                      << FileErrorToString(error);
437     }
438   }
439 }
440 
AddChildUpdateTasks(const ResourceEntryVector * entries,FileError error)441 void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
442                                      FileError error) {
443   if (error != FILE_ERROR_OK)
444     return;
445 
446   for (size_t i = 0; i < entries->size(); ++i) {
447     const ResourceEntry& entry = (*entries)[i];
448     if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
449       AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
450                             base::TimeDelta::FromSeconds(0));
451     }
452   }
453 }
454 
455 }  // namespace internal
456 }  // namespace drive
457