• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
6 
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/drive/event_logger.h"
13 #include "chrome/common/pref_names.h"
14 #include "content/public/browser/browser_thread.h"
15 #include "google_apis/drive/drive_api_parser.h"
16 
17 using content::BrowserThread;
18 
19 namespace drive {
20 
21 namespace {
22 
23 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
24 // throttling or server error.  The delay before retrying a job is shared among
25 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
26 //
27 // According to the API documentation, kMaxRetryCount should be the same as
28 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
29 // But currently multiplied by 2 to ensure upload related jobs retried for a
30 // sufficient number of times. crbug.com/269918
31 const int kMaxThrottleCount = 4;
32 const int kMaxRetryCount = 2 * kMaxThrottleCount;
33 
34 // GetDefaultValue returns a value constructed by the default constructor.
35 template<typename T> struct DefaultValueCreator {
GetDefaultValuedrive::__anon60830c740111::DefaultValueCreator36   static T GetDefaultValue() { return T(); }
37 };
38 template<typename T> struct DefaultValueCreator<const T&> {
GetDefaultValuedrive::__anon60830c740111::DefaultValueCreator39   static T GetDefaultValue() { return T(); }
40 };
41 
42 // Helper of CreateErrorRunCallback implementation.
43 // Provides:
44 // - ResultType; the type of the Callback which should be returned by
45 //     CreateErrorRunCallback.
46 // - Run(): a static function which takes the original |callback| and |error|,
47 //     and runs the |callback|.Run() with the error code and default values
48 //     for remaining arguments.
49 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
50 
51 // CreateErrorRunCallback with two arguments.
52 template<typename P1>
53 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
Rundrive::__anon60830c740111::CreateErrorRunCallbackHelper54   static void Run(
55       const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
56       google_apis::GDataErrorCode error) {
57     callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
58   }
59 };
60 
61 // Returns a callback with the tail parameter bound to its default value.
62 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
63 template<typename CallbackType>
64 base::Callback<void(google_apis::GDataErrorCode)>
CreateErrorRunCallback(const base::Callback<CallbackType> & callback)65 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
66   return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
67 }
68 
69 // Parameter struct for RunUploadNewFile.
70 struct UploadNewFileParams {
71   std::string parent_resource_id;
72   base::FilePath local_file_path;
73   std::string title;
74   std::string content_type;
75   DriveUploader::UploadNewFileOptions options;
76   UploadCompletionCallback callback;
77   google_apis::ProgressCallback progress_callback;
78 };
79 
80 // Helper function to work around the arity limitation of base::Bind.
RunUploadNewFile(DriveUploaderInterface * uploader,const UploadNewFileParams & params)81 google_apis::CancelCallback RunUploadNewFile(
82     DriveUploaderInterface* uploader,
83     const UploadNewFileParams& params) {
84   return uploader->UploadNewFile(params.parent_resource_id,
85                                  params.local_file_path,
86                                  params.title,
87                                  params.content_type,
88                                  params.options,
89                                  params.callback,
90                                  params.progress_callback);
91 }
92 
93 // Parameter struct for RunUploadExistingFile.
94 struct UploadExistingFileParams {
95   std::string resource_id;
96   base::FilePath local_file_path;
97   std::string content_type;
98   DriveUploader::UploadExistingFileOptions options;
99   std::string etag;
100   UploadCompletionCallback callback;
101   google_apis::ProgressCallback progress_callback;
102 };
103 
104 // Helper function to work around the arity limitation of base::Bind.
RunUploadExistingFile(DriveUploaderInterface * uploader,const UploadExistingFileParams & params)105 google_apis::CancelCallback RunUploadExistingFile(
106     DriveUploaderInterface* uploader,
107     const UploadExistingFileParams& params) {
108   return uploader->UploadExistingFile(params.resource_id,
109                                       params.local_file_path,
110                                       params.content_type,
111                                       params.options,
112                                       params.callback,
113                                       params.progress_callback);
114 }
115 
116 // Parameter struct for RunResumeUploadFile.
117 struct ResumeUploadFileParams {
118   GURL upload_location;
119   base::FilePath local_file_path;
120   std::string content_type;
121   UploadCompletionCallback callback;
122   google_apis::ProgressCallback progress_callback;
123 };
124 
125 // Helper function to adjust the return type.
RunResumeUploadFile(DriveUploaderInterface * uploader,const ResumeUploadFileParams & params)126 google_apis::CancelCallback RunResumeUploadFile(
127     DriveUploaderInterface* uploader,
128     const ResumeUploadFileParams& params) {
129   return uploader->ResumeUploadFile(params.upload_location,
130                                     params.local_file_path,
131                                     params.content_type,
132                                     params.callback,
133                                     params.progress_callback);
134 }
135 
136 }  // namespace
137 
138 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
139 const int JobScheduler::kMaxJobCount[] = {
140   5,  // METADATA_QUEUE
141   1,  // FILE_QUEUE
142 };
143 
JobEntry(JobType type)144 JobScheduler::JobEntry::JobEntry(JobType type)
145     : job_info(type),
146       context(ClientContext(USER_INITIATED)),
147       retry_count(0) {
148   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
149 }
150 
~JobEntry()151 JobScheduler::JobEntry::~JobEntry() {
152   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
153 }
154 
155 struct JobScheduler::ResumeUploadParams {
156   base::FilePath drive_file_path;
157   base::FilePath local_file_path;
158   std::string content_type;
159 };
160 
JobScheduler(PrefService * pref_service,EventLogger * logger,DriveServiceInterface * drive_service,base::SequencedTaskRunner * blocking_task_runner)161 JobScheduler::JobScheduler(
162     PrefService* pref_service,
163     EventLogger* logger,
164     DriveServiceInterface* drive_service,
165     base::SequencedTaskRunner* blocking_task_runner)
166     : throttle_count_(0),
167       wait_until_(base::Time::Now()),
168       disable_throttling_(false),
169       logger_(logger),
170       drive_service_(drive_service),
171       uploader_(new DriveUploader(drive_service, blocking_task_runner)),
172       pref_service_(pref_service),
173       weak_ptr_factory_(this) {
174   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
175 
176   for (int i = 0; i < NUM_QUEUES; ++i)
177     queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
178 
179   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
180 }
181 
~JobScheduler()182 JobScheduler::~JobScheduler() {
183   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
184 
185   size_t num_queued_jobs = 0;
186   for (int i = 0; i < NUM_QUEUES; ++i)
187     num_queued_jobs += queue_[i]->GetNumberOfJobs();
188   DCHECK_EQ(num_queued_jobs, job_map_.size());
189 
190   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
191 }
192 
GetJobInfoList()193 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
194   std::vector<JobInfo> job_info_list;
195   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
196     job_info_list.push_back(iter.GetCurrentValue()->job_info);
197   return job_info_list;
198 }
199 
AddObserver(JobListObserver * observer)200 void JobScheduler::AddObserver(JobListObserver* observer) {
201   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
202   observer_list_.AddObserver(observer);
203 }
204 
RemoveObserver(JobListObserver * observer)205 void JobScheduler::RemoveObserver(JobListObserver* observer) {
206   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
207   observer_list_.RemoveObserver(observer);
208 }
209 
CancelJob(JobID job_id)210 void JobScheduler::CancelJob(JobID job_id) {
211   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
212 
213   JobEntry* job = job_map_.Lookup(job_id);
214   if (job) {
215     if (job->job_info.state == STATE_RUNNING) {
216       // If the job is running an HTTP request, cancel it via |cancel_callback|
217       // returned from the request, and wait for termination in the normal
218       // callback handler, OnJobDone.
219       if (!job->cancel_callback.is_null())
220         job->cancel_callback.Run();
221     } else {
222       AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
223     }
224   }
225 }
226 
CancelAllJobs()227 void JobScheduler::CancelAllJobs() {
228   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
229 
230   // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
231   // removable during iteration.
232   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
233     CancelJob(iter.GetCurrentKey());
234 }
235 
GetAboutResource(const google_apis::AboutResourceCallback & callback)236 void JobScheduler::GetAboutResource(
237     const google_apis::AboutResourceCallback& callback) {
238   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
239   DCHECK(!callback.is_null());
240 
241   JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
242   new_job->task = base::Bind(
243       &DriveServiceInterface::GetAboutResource,
244       base::Unretained(drive_service_),
245       base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
246                  weak_ptr_factory_.GetWeakPtr(),
247                  new_job->job_info.job_id,
248                  callback));
249   new_job->abort_callback = CreateErrorRunCallback(callback);
250   StartJob(new_job);
251 }
252 
GetAppList(const google_apis::AppListCallback & callback)253 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
254   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
255   DCHECK(!callback.is_null());
256 
257   JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
258   new_job->task = base::Bind(
259       &DriveServiceInterface::GetAppList,
260       base::Unretained(drive_service_),
261       base::Bind(&JobScheduler::OnGetAppListJobDone,
262                  weak_ptr_factory_.GetWeakPtr(),
263                  new_job->job_info.job_id,
264                  callback));
265   new_job->abort_callback = CreateErrorRunCallback(callback);
266   StartJob(new_job);
267 }
268 
GetAllFileList(const google_apis::FileListCallback & callback)269 void JobScheduler::GetAllFileList(
270     const google_apis::FileListCallback& callback) {
271   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
272   DCHECK(!callback.is_null());
273 
274   JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
275   new_job->task = base::Bind(
276       &DriveServiceInterface::GetAllFileList,
277       base::Unretained(drive_service_),
278       base::Bind(&JobScheduler::OnGetFileListJobDone,
279                  weak_ptr_factory_.GetWeakPtr(),
280                  new_job->job_info.job_id,
281                  callback));
282   new_job->abort_callback = CreateErrorRunCallback(callback);
283   StartJob(new_job);
284 }
285 
GetFileListInDirectory(const std::string & directory_resource_id,const google_apis::FileListCallback & callback)286 void JobScheduler::GetFileListInDirectory(
287     const std::string& directory_resource_id,
288     const google_apis::FileListCallback& callback) {
289   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
290   DCHECK(!callback.is_null());
291 
292   JobEntry* new_job = CreateNewJob(
293       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
294   new_job->task = base::Bind(
295       &DriveServiceInterface::GetFileListInDirectory,
296       base::Unretained(drive_service_),
297       directory_resource_id,
298       base::Bind(&JobScheduler::OnGetFileListJobDone,
299                  weak_ptr_factory_.GetWeakPtr(),
300                  new_job->job_info.job_id,
301                  callback));
302   new_job->abort_callback = CreateErrorRunCallback(callback);
303   StartJob(new_job);
304 }
305 
Search(const std::string & search_query,const google_apis::FileListCallback & callback)306 void JobScheduler::Search(const std::string& search_query,
307                           const google_apis::FileListCallback& callback) {
308   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
309   DCHECK(!callback.is_null());
310 
311   JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
312   new_job->task = base::Bind(
313       &DriveServiceInterface::Search,
314       base::Unretained(drive_service_),
315       search_query,
316       base::Bind(&JobScheduler::OnGetFileListJobDone,
317                  weak_ptr_factory_.GetWeakPtr(),
318                  new_job->job_info.job_id,
319                  callback));
320   new_job->abort_callback = CreateErrorRunCallback(callback);
321   StartJob(new_job);
322 }
323 
GetChangeList(int64 start_changestamp,const google_apis::ChangeListCallback & callback)324 void JobScheduler::GetChangeList(
325     int64 start_changestamp,
326     const google_apis::ChangeListCallback& callback) {
327   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328   DCHECK(!callback.is_null());
329 
330   JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
331   new_job->task = base::Bind(
332       &DriveServiceInterface::GetChangeList,
333       base::Unretained(drive_service_),
334       start_changestamp,
335       base::Bind(&JobScheduler::OnGetChangeListJobDone,
336                  weak_ptr_factory_.GetWeakPtr(),
337                  new_job->job_info.job_id,
338                  callback));
339   new_job->abort_callback = CreateErrorRunCallback(callback);
340   StartJob(new_job);
341 }
342 
GetRemainingChangeList(const GURL & next_link,const google_apis::ChangeListCallback & callback)343 void JobScheduler::GetRemainingChangeList(
344     const GURL& next_link,
345     const google_apis::ChangeListCallback& callback) {
346   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
347   DCHECK(!callback.is_null());
348 
349   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
350   new_job->task = base::Bind(
351       &DriveServiceInterface::GetRemainingChangeList,
352       base::Unretained(drive_service_),
353       next_link,
354       base::Bind(&JobScheduler::OnGetChangeListJobDone,
355                  weak_ptr_factory_.GetWeakPtr(),
356                  new_job->job_info.job_id,
357                  callback));
358   new_job->abort_callback = CreateErrorRunCallback(callback);
359   StartJob(new_job);
360 }
361 
GetRemainingFileList(const GURL & next_link,const google_apis::FileListCallback & callback)362 void JobScheduler::GetRemainingFileList(
363     const GURL& next_link,
364     const google_apis::FileListCallback& callback) {
365   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
366   DCHECK(!callback.is_null());
367 
368   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
369   new_job->task = base::Bind(
370       &DriveServiceInterface::GetRemainingFileList,
371       base::Unretained(drive_service_),
372       next_link,
373       base::Bind(&JobScheduler::OnGetFileListJobDone,
374                  weak_ptr_factory_.GetWeakPtr(),
375                  new_job->job_info.job_id,
376                  callback));
377   new_job->abort_callback = CreateErrorRunCallback(callback);
378   StartJob(new_job);
379 }
380 
GetFileResource(const std::string & resource_id,const ClientContext & context,const google_apis::FileResourceCallback & callback)381 void JobScheduler::GetFileResource(
382     const std::string& resource_id,
383     const ClientContext& context,
384     const google_apis::FileResourceCallback& callback) {
385   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
386   DCHECK(!callback.is_null());
387 
388   JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
389   new_job->context = context;
390   new_job->task = base::Bind(
391       &DriveServiceInterface::GetFileResource,
392       base::Unretained(drive_service_),
393       resource_id,
394       base::Bind(&JobScheduler::OnGetFileResourceJobDone,
395                  weak_ptr_factory_.GetWeakPtr(),
396                  new_job->job_info.job_id,
397                  callback));
398   new_job->abort_callback = CreateErrorRunCallback(callback);
399   StartJob(new_job);
400 }
401 
GetShareUrl(const std::string & resource_id,const GURL & embed_origin,const ClientContext & context,const google_apis::GetShareUrlCallback & callback)402 void JobScheduler::GetShareUrl(
403     const std::string& resource_id,
404     const GURL& embed_origin,
405     const ClientContext& context,
406     const google_apis::GetShareUrlCallback& callback) {
407   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
408   DCHECK(!callback.is_null());
409 
410   JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
411   new_job->context = context;
412   new_job->task = base::Bind(
413       &DriveServiceInterface::GetShareUrl,
414       base::Unretained(drive_service_),
415       resource_id,
416       embed_origin,
417       base::Bind(&JobScheduler::OnGetShareUrlJobDone,
418                  weak_ptr_factory_.GetWeakPtr(),
419                  new_job->job_info.job_id,
420                  callback));
421   new_job->abort_callback = CreateErrorRunCallback(callback);
422   StartJob(new_job);
423 }
424 
TrashResource(const std::string & resource_id,const ClientContext & context,const google_apis::EntryActionCallback & callback)425 void JobScheduler::TrashResource(
426     const std::string& resource_id,
427     const ClientContext& context,
428     const google_apis::EntryActionCallback& callback) {
429   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
430   DCHECK(!callback.is_null());
431 
432   JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
433   new_job->context = context;
434   new_job->task = base::Bind(
435       &DriveServiceInterface::TrashResource,
436       base::Unretained(drive_service_),
437       resource_id,
438       base::Bind(&JobScheduler::OnEntryActionJobDone,
439                  weak_ptr_factory_.GetWeakPtr(),
440                  new_job->job_info.job_id,
441                  callback));
442   new_job->abort_callback = callback;
443   StartJob(new_job);
444 }
445 
CopyResource(const std::string & resource_id,const std::string & parent_resource_id,const std::string & new_title,const base::Time & last_modified,const google_apis::FileResourceCallback & callback)446 void JobScheduler::CopyResource(
447     const std::string& resource_id,
448     const std::string& parent_resource_id,
449     const std::string& new_title,
450     const base::Time& last_modified,
451     const google_apis::FileResourceCallback& callback) {
452   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
453   DCHECK(!callback.is_null());
454 
455   JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
456   new_job->task = base::Bind(
457       &DriveServiceInterface::CopyResource,
458       base::Unretained(drive_service_),
459       resource_id,
460       parent_resource_id,
461       new_title,
462       last_modified,
463       base::Bind(&JobScheduler::OnGetFileResourceJobDone,
464                  weak_ptr_factory_.GetWeakPtr(),
465                  new_job->job_info.job_id,
466                  callback));
467   new_job->abort_callback = CreateErrorRunCallback(callback);
468   StartJob(new_job);
469 }
470 
UpdateResource(const std::string & resource_id,const std::string & parent_resource_id,const std::string & new_title,const base::Time & last_modified,const base::Time & last_viewed_by_me,const ClientContext & context,const google_apis::FileResourceCallback & callback)471 void JobScheduler::UpdateResource(
472     const std::string& resource_id,
473     const std::string& parent_resource_id,
474     const std::string& new_title,
475     const base::Time& last_modified,
476     const base::Time& last_viewed_by_me,
477     const ClientContext& context,
478     const google_apis::FileResourceCallback& callback) {
479   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
480   DCHECK(!callback.is_null());
481 
482   JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
483   new_job->context = context;
484   new_job->task = base::Bind(
485       &DriveServiceInterface::UpdateResource,
486       base::Unretained(drive_service_),
487       resource_id,
488       parent_resource_id,
489       new_title,
490       last_modified,
491       last_viewed_by_me,
492       base::Bind(&JobScheduler::OnGetFileResourceJobDone,
493                  weak_ptr_factory_.GetWeakPtr(),
494                  new_job->job_info.job_id,
495                  callback));
496   new_job->abort_callback = CreateErrorRunCallback(callback);
497   StartJob(new_job);
498 }
499 
RenameResource(const std::string & resource_id,const std::string & new_title,const google_apis::EntryActionCallback & callback)500 void JobScheduler::RenameResource(
501     const std::string& resource_id,
502     const std::string& new_title,
503     const google_apis::EntryActionCallback& callback) {
504   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
505   DCHECK(!callback.is_null());
506 
507   JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
508   new_job->task = base::Bind(
509       &DriveServiceInterface::RenameResource,
510       base::Unretained(drive_service_),
511       resource_id,
512       new_title,
513       base::Bind(&JobScheduler::OnEntryActionJobDone,
514                  weak_ptr_factory_.GetWeakPtr(),
515                  new_job->job_info.job_id,
516                  callback));
517   new_job->abort_callback = callback;
518   StartJob(new_job);
519 }
520 
AddResourceToDirectory(const std::string & parent_resource_id,const std::string & resource_id,const google_apis::EntryActionCallback & callback)521 void JobScheduler::AddResourceToDirectory(
522     const std::string& parent_resource_id,
523     const std::string& resource_id,
524     const google_apis::EntryActionCallback& callback) {
525   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
526   DCHECK(!callback.is_null());
527 
528   JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
529   new_job->task = base::Bind(
530       &DriveServiceInterface::AddResourceToDirectory,
531       base::Unretained(drive_service_),
532       parent_resource_id,
533       resource_id,
534       base::Bind(&JobScheduler::OnEntryActionJobDone,
535                  weak_ptr_factory_.GetWeakPtr(),
536                  new_job->job_info.job_id,
537                  callback));
538   new_job->abort_callback = callback;
539   StartJob(new_job);
540 }
541 
RemoveResourceFromDirectory(const std::string & parent_resource_id,const std::string & resource_id,const ClientContext & context,const google_apis::EntryActionCallback & callback)542 void JobScheduler::RemoveResourceFromDirectory(
543     const std::string& parent_resource_id,
544     const std::string& resource_id,
545     const ClientContext& context,
546     const google_apis::EntryActionCallback& callback) {
547   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
548 
549   JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
550   new_job->context = context;
551   new_job->task = base::Bind(
552       &DriveServiceInterface::RemoveResourceFromDirectory,
553       base::Unretained(drive_service_),
554       parent_resource_id,
555       resource_id,
556       base::Bind(&JobScheduler::OnEntryActionJobDone,
557                  weak_ptr_factory_.GetWeakPtr(),
558                  new_job->job_info.job_id,
559                  callback));
560   new_job->abort_callback = callback;
561   StartJob(new_job);
562 }
563 
AddNewDirectory(const std::string & parent_resource_id,const std::string & directory_title,const DriveServiceInterface::AddNewDirectoryOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)564 void JobScheduler::AddNewDirectory(
565     const std::string& parent_resource_id,
566     const std::string& directory_title,
567     const DriveServiceInterface::AddNewDirectoryOptions& options,
568     const ClientContext& context,
569     const google_apis::FileResourceCallback& callback) {
570   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
571 
572   JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
573   new_job->context = context;
574   new_job->task = base::Bind(
575       &DriveServiceInterface::AddNewDirectory,
576       base::Unretained(drive_service_),
577       parent_resource_id,
578       directory_title,
579       options,
580       base::Bind(&JobScheduler::OnGetFileResourceJobDone,
581                  weak_ptr_factory_.GetWeakPtr(),
582                  new_job->job_info.job_id,
583                  callback));
584   new_job->abort_callback = CreateErrorRunCallback(callback);
585   StartJob(new_job);
586 }
587 
DownloadFile(const base::FilePath & virtual_path,int64 expected_file_size,const base::FilePath & local_cache_path,const std::string & resource_id,const ClientContext & context,const google_apis::DownloadActionCallback & download_action_callback,const google_apis::GetContentCallback & get_content_callback)588 JobID JobScheduler::DownloadFile(
589     const base::FilePath& virtual_path,
590     int64 expected_file_size,
591     const base::FilePath& local_cache_path,
592     const std::string& resource_id,
593     const ClientContext& context,
594     const google_apis::DownloadActionCallback& download_action_callback,
595     const google_apis::GetContentCallback& get_content_callback) {
596   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
597 
598   JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
599   new_job->job_info.file_path = virtual_path;
600   new_job->job_info.num_total_bytes = expected_file_size;
601   new_job->context = context;
602   new_job->task = base::Bind(
603       &DriveServiceInterface::DownloadFile,
604       base::Unretained(drive_service_),
605       local_cache_path,
606       resource_id,
607       base::Bind(&JobScheduler::OnDownloadActionJobDone,
608                  weak_ptr_factory_.GetWeakPtr(),
609                  new_job->job_info.job_id,
610                  download_action_callback),
611       get_content_callback,
612       base::Bind(&JobScheduler::UpdateProgress,
613                  weak_ptr_factory_.GetWeakPtr(),
614                  new_job->job_info.job_id));
615   new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
616   StartJob(new_job);
617   return new_job->job_info.job_id;
618 }
619 
UploadNewFile(const std::string & parent_resource_id,const base::FilePath & drive_file_path,const base::FilePath & local_file_path,const std::string & title,const std::string & content_type,const DriveUploader::UploadNewFileOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)620 void JobScheduler::UploadNewFile(
621     const std::string& parent_resource_id,
622     const base::FilePath& drive_file_path,
623     const base::FilePath& local_file_path,
624     const std::string& title,
625     const std::string& content_type,
626     const DriveUploader::UploadNewFileOptions& options,
627     const ClientContext& context,
628     const google_apis::FileResourceCallback& callback) {
629   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
630 
631   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
632   new_job->job_info.file_path = drive_file_path;
633   new_job->context = context;
634 
635   UploadNewFileParams params;
636   params.parent_resource_id = parent_resource_id;
637   params.local_file_path = local_file_path;
638   params.title = title;
639   params.content_type = content_type;
640   params.options = options;
641 
642   ResumeUploadParams resume_params;
643   resume_params.local_file_path = params.local_file_path;
644   resume_params.content_type = params.content_type;
645 
646   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
647                                weak_ptr_factory_.GetWeakPtr(),
648                                new_job->job_info.job_id,
649                                resume_params,
650                                callback);
651   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
652                                         weak_ptr_factory_.GetWeakPtr(),
653                                         new_job->job_info.job_id);
654   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
655   new_job->abort_callback = CreateErrorRunCallback(callback);
656   StartJob(new_job);
657 }
658 
UploadExistingFile(const std::string & resource_id,const base::FilePath & drive_file_path,const base::FilePath & local_file_path,const std::string & content_type,const DriveUploader::UploadExistingFileOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)659 void JobScheduler::UploadExistingFile(
660     const std::string& resource_id,
661     const base::FilePath& drive_file_path,
662     const base::FilePath& local_file_path,
663     const std::string& content_type,
664     const DriveUploader::UploadExistingFileOptions& options,
665     const ClientContext& context,
666     const google_apis::FileResourceCallback& callback) {
667   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
668 
669   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
670   new_job->job_info.file_path = drive_file_path;
671   new_job->context = context;
672 
673   UploadExistingFileParams params;
674   params.resource_id = resource_id;
675   params.local_file_path = local_file_path;
676   params.content_type = content_type;
677   params.options = options;
678 
679   ResumeUploadParams resume_params;
680   resume_params.local_file_path = params.local_file_path;
681   resume_params.content_type = params.content_type;
682 
683   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
684                                weak_ptr_factory_.GetWeakPtr(),
685                                new_job->job_info.job_id,
686                                resume_params,
687                                callback);
688   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
689                                         weak_ptr_factory_.GetWeakPtr(),
690                                         new_job->job_info.job_id);
691   new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
692   new_job->abort_callback = CreateErrorRunCallback(callback);
693   StartJob(new_job);
694 }
695 
AddPermission(const std::string & resource_id,const std::string & email,google_apis::drive::PermissionRole role,const google_apis::EntryActionCallback & callback)696 void JobScheduler::AddPermission(
697     const std::string& resource_id,
698     const std::string& email,
699     google_apis::drive::PermissionRole role,
700     const google_apis::EntryActionCallback& callback) {
701   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
702   DCHECK(!callback.is_null());
703 
704   JobEntry* new_job = CreateNewJob(TYPE_ADD_PERMISSION);
705   new_job->task = base::Bind(&DriveServiceInterface::AddPermission,
706                              base::Unretained(drive_service_),
707                              resource_id,
708                              email,
709                              role,
710                              base::Bind(&JobScheduler::OnEntryActionJobDone,
711                                         weak_ptr_factory_.GetWeakPtr(),
712                                         new_job->job_info.job_id,
713                                         callback));
714   new_job->abort_callback = callback;
715   StartJob(new_job);
716 }
717 
CreateNewJob(JobType type)718 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
719   JobEntry* job = new JobEntry(type);
720   job->job_info.job_id = job_map_.Add(job);  // Takes the ownership of |job|.
721   return job;
722 }
723 
StartJob(JobEntry * job)724 void JobScheduler::StartJob(JobEntry* job) {
725   DCHECK(!job->task.is_null());
726 
727   QueueJob(job->job_info.job_id);
728   NotifyJobAdded(job->job_info);
729   DoJobLoop(GetJobQueueType(job->job_info.job_type));
730 }
731 
QueueJob(JobID job_id)732 void JobScheduler::QueueJob(JobID job_id) {
733   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
734 
735   JobEntry* job_entry = job_map_.Lookup(job_id);
736   DCHECK(job_entry);
737   const JobInfo& job_info = job_entry->job_info;
738 
739   QueueType queue_type = GetJobQueueType(job_info.job_type);
740   queue_[queue_type]->Push(job_id, job_entry->context.type);
741 
742   const std::string retry_prefix = job_entry->retry_count > 0 ?
743       base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
744   logger_->Log(logging::LOG_INFO,
745                "Job queued%s: %s - %s",
746                retry_prefix.c_str(),
747                job_info.ToString().c_str(),
748                GetQueueInfo(queue_type).c_str());
749 }
750 
DoJobLoop(QueueType queue_type)751 void JobScheduler::DoJobLoop(QueueType queue_type) {
752   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
753 
754   const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
755 
756   // Abort all USER_INITAITED jobs when not accepted.
757   if (accepted_priority < USER_INITIATED) {
758     std::vector<JobID> jobs;
759     queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
760     for (size_t i = 0; i < jobs.size(); ++i) {
761       JobEntry* job = job_map_.Lookup(jobs[i]);
762       DCHECK(job);
763       AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
764     }
765   }
766 
767   // Wait when throttled.
768   const base::Time now = base::Time::Now();
769   if (now < wait_until_) {
770     base::MessageLoopProxy::current()->PostDelayedTask(
771         FROM_HERE,
772         base::Bind(&JobScheduler::DoJobLoop,
773                    weak_ptr_factory_.GetWeakPtr(),
774                    queue_type),
775         wait_until_ - now);
776     return;
777   }
778 
779   // Run the job with the highest priority in the queue.
780   JobID job_id = -1;
781   if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
782     return;
783 
784   JobEntry* entry = job_map_.Lookup(job_id);
785   DCHECK(entry);
786 
787   JobInfo* job_info = &entry->job_info;
788   job_info->state = STATE_RUNNING;
789   job_info->start_time = now;
790   NotifyJobUpdated(*job_info);
791 
792   entry->cancel_callback = entry->task.Run();
793 
794   UpdateWait();
795 
796   logger_->Log(logging::LOG_INFO,
797                "Job started: %s - %s",
798                job_info->ToString().c_str(),
799                GetQueueInfo(queue_type).c_str());
800 }
801 
GetCurrentAcceptedPriority(QueueType queue_type)802 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
803   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
804 
805   const int kNoJobShouldRun = -1;
806 
807   // Should stop if Drive was disabled while running the fetch loop.
808   if (pref_service_->GetBoolean(prefs::kDisableDrive))
809     return kNoJobShouldRun;
810 
811   // Should stop if the network is not online.
812   if (net::NetworkChangeNotifier::IsOffline())
813     return kNoJobShouldRun;
814 
815   // For the file queue, if it is on cellular network, only user initiated
816   // operations are allowed to start.
817   if (queue_type == FILE_QUEUE &&
818       pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
819       net::NetworkChangeNotifier::IsConnectionCellular(
820           net::NetworkChangeNotifier::GetConnectionType()))
821     return USER_INITIATED;
822 
823   // Otherwise, every operations including background tasks are allowed.
824   return BACKGROUND;
825 }
826 
UpdateWait()827 void JobScheduler::UpdateWait() {
828   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
829 
830   if (disable_throttling_ || throttle_count_ == 0)
831     return;
832 
833   // Exponential backoff: https://developers.google.com/drive/handle-errors.
834   base::TimeDelta delay =
835       base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
836       base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
837   VLOG(1) << "Throttling for " << delay.InMillisecondsF();
838 
839   wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
840 }
841 
OnJobDone(JobID job_id,google_apis::GDataErrorCode error)842 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
843   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
844 
845   JobEntry* job_entry = job_map_.Lookup(job_id);
846   DCHECK(job_entry);
847   JobInfo* job_info = &job_entry->job_info;
848   QueueType queue_type = GetJobQueueType(job_info->job_type);
849   queue_[queue_type]->MarkFinished(job_id);
850 
851   const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
852   bool success = (GDataToFileError(error) == FILE_ERROR_OK);
853   logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
854                "Job done: %s => %s (elapsed time: %sms) - %s",
855                job_info->ToString().c_str(),
856                GDataErrorCodeToString(error).c_str(),
857                base::Int64ToString(elapsed.InMilliseconds()).c_str(),
858                GetQueueInfo(queue_type).c_str());
859 
860   // Retry, depending on the error.
861   const bool is_server_error =
862       error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
863       error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
864   if (is_server_error) {
865     if (throttle_count_ < kMaxThrottleCount)
866       ++throttle_count_;
867     UpdateWait();
868   } else {
869     throttle_count_ = 0;
870   }
871 
872   const bool should_retry =
873       is_server_error && job_entry->retry_count < kMaxRetryCount;
874   if (should_retry) {
875     job_entry->cancel_callback.Reset();
876     job_info->state = STATE_RETRY;
877     NotifyJobUpdated(*job_info);
878 
879     ++job_entry->retry_count;
880 
881     // Requeue the job.
882     QueueJob(job_id);
883   } else {
884     NotifyJobDone(*job_info, error);
885     // The job has finished, no retry will happen in the scheduler. Now we can
886     // remove the job info from the map.
887     job_map_.Remove(job_id);
888   }
889 
890   // Post a task to continue the job loop.  This allows us to finish handling
891   // the current job before starting the next one.
892   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
893       base::Bind(&JobScheduler::DoJobLoop,
894                  weak_ptr_factory_.GetWeakPtr(),
895                  queue_type));
896   return !should_retry;
897 }
898 
OnGetFileListJobDone(JobID job_id,const google_apis::FileListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::FileList> file_list)899 void JobScheduler::OnGetFileListJobDone(
900     JobID job_id,
901     const google_apis::FileListCallback& callback,
902     google_apis::GDataErrorCode error,
903     scoped_ptr<google_apis::FileList> file_list) {
904   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
905   DCHECK(!callback.is_null());
906 
907   if (OnJobDone(job_id, error))
908     callback.Run(error, file_list.Pass());
909 }
910 
OnGetChangeListJobDone(JobID job_id,const google_apis::ChangeListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::ChangeList> change_list)911 void JobScheduler::OnGetChangeListJobDone(
912     JobID job_id,
913     const google_apis::ChangeListCallback& callback,
914     google_apis::GDataErrorCode error,
915     scoped_ptr<google_apis::ChangeList> change_list) {
916   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
917   DCHECK(!callback.is_null());
918 
919   if (OnJobDone(job_id, error))
920     callback.Run(error, change_list.Pass());
921 }
922 
OnGetFileResourceJobDone(JobID job_id,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::FileResource> entry)923 void JobScheduler::OnGetFileResourceJobDone(
924     JobID job_id,
925     const google_apis::FileResourceCallback& callback,
926     google_apis::GDataErrorCode error,
927     scoped_ptr<google_apis::FileResource> entry) {
928   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
929   DCHECK(!callback.is_null());
930 
931   if (OnJobDone(job_id, error))
932     callback.Run(error, entry.Pass());
933 }
934 
OnGetAboutResourceJobDone(JobID job_id,const google_apis::AboutResourceCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::AboutResource> about_resource)935 void JobScheduler::OnGetAboutResourceJobDone(
936     JobID job_id,
937     const google_apis::AboutResourceCallback& callback,
938     google_apis::GDataErrorCode error,
939     scoped_ptr<google_apis::AboutResource> about_resource) {
940   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
941   DCHECK(!callback.is_null());
942 
943   if (OnJobDone(job_id, error))
944     callback.Run(error, about_resource.Pass());
945 }
946 
OnGetShareUrlJobDone(JobID job_id,const google_apis::GetShareUrlCallback & callback,google_apis::GDataErrorCode error,const GURL & share_url)947 void JobScheduler::OnGetShareUrlJobDone(
948     JobID job_id,
949     const google_apis::GetShareUrlCallback& callback,
950     google_apis::GDataErrorCode error,
951     const GURL& share_url) {
952   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
953   DCHECK(!callback.is_null());
954 
955   if (OnJobDone(job_id, error))
956     callback.Run(error, share_url);
957 }
958 
OnGetAppListJobDone(JobID job_id,const google_apis::AppListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::AppList> app_list)959 void JobScheduler::OnGetAppListJobDone(
960     JobID job_id,
961     const google_apis::AppListCallback& callback,
962     google_apis::GDataErrorCode error,
963     scoped_ptr<google_apis::AppList> app_list) {
964   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
965   DCHECK(!callback.is_null());
966 
967   if (OnJobDone(job_id, error))
968     callback.Run(error, app_list.Pass());
969 }
970 
OnEntryActionJobDone(JobID job_id,const google_apis::EntryActionCallback & callback,google_apis::GDataErrorCode error)971 void JobScheduler::OnEntryActionJobDone(
972     JobID job_id,
973     const google_apis::EntryActionCallback& callback,
974     google_apis::GDataErrorCode error) {
975   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
976   DCHECK(!callback.is_null());
977 
978   if (OnJobDone(job_id, error))
979     callback.Run(error);
980 }
981 
OnDownloadActionJobDone(JobID job_id,const google_apis::DownloadActionCallback & callback,google_apis::GDataErrorCode error,const base::FilePath & temp_file)982 void JobScheduler::OnDownloadActionJobDone(
983     JobID job_id,
984     const google_apis::DownloadActionCallback& callback,
985     google_apis::GDataErrorCode error,
986     const base::FilePath& temp_file) {
987   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
988   DCHECK(!callback.is_null());
989 
990   if (OnJobDone(job_id, error))
991     callback.Run(error, temp_file);
992 }
993 
OnUploadCompletionJobDone(JobID job_id,const ResumeUploadParams & resume_params,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,const GURL & upload_location,scoped_ptr<google_apis::FileResource> entry)994 void JobScheduler::OnUploadCompletionJobDone(
995     JobID job_id,
996     const ResumeUploadParams& resume_params,
997     const google_apis::FileResourceCallback& callback,
998     google_apis::GDataErrorCode error,
999     const GURL& upload_location,
1000     scoped_ptr<google_apis::FileResource> entry) {
1001   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1002   DCHECK(!callback.is_null());
1003 
1004   if (!upload_location.is_empty()) {
1005     // If upload_location is available, update the task to resume the
1006     // upload process from the terminated point.
1007     // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1008     // so OnJobDone called below will be in charge to re-queue the job.
1009     JobEntry* job_entry = job_map_.Lookup(job_id);
1010     DCHECK(job_entry);
1011 
1012     ResumeUploadFileParams params;
1013     params.upload_location = upload_location;
1014     params.local_file_path = resume_params.local_file_path;
1015     params.content_type = resume_params.content_type;
1016     params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1017                                  weak_ptr_factory_.GetWeakPtr(),
1018                                  job_id,
1019                                  job_entry->task,
1020                                  callback);
1021     params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1022                                           weak_ptr_factory_.GetWeakPtr(),
1023                                           job_id);
1024     job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1025   }
1026 
1027   if (OnJobDone(job_id, error))
1028     callback.Run(error, entry.Pass());
1029 }
1030 
OnResumeUploadFileDone(JobID job_id,const base::Callback<google_apis::CancelCallback ()> & original_task,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,const GURL & upload_location,scoped_ptr<google_apis::FileResource> entry)1031 void JobScheduler::OnResumeUploadFileDone(
1032     JobID job_id,
1033     const base::Callback<google_apis::CancelCallback()>& original_task,
1034     const google_apis::FileResourceCallback& callback,
1035     google_apis::GDataErrorCode error,
1036     const GURL& upload_location,
1037     scoped_ptr<google_apis::FileResource> entry) {
1038   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1039   DCHECK(!original_task.is_null());
1040   DCHECK(!callback.is_null());
1041 
1042   if (upload_location.is_empty()) {
1043     // If upload_location is not available, we should discard it and stop trying
1044     // to resume. Restore the original task.
1045     JobEntry* job_entry = job_map_.Lookup(job_id);
1046     DCHECK(job_entry);
1047     job_entry->task = original_task;
1048   }
1049 
1050   if (OnJobDone(job_id, error))
1051     callback.Run(error, entry.Pass());
1052 }
1053 
UpdateProgress(JobID job_id,int64 progress,int64 total)1054 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1055   JobEntry* job_entry = job_map_.Lookup(job_id);
1056   DCHECK(job_entry);
1057 
1058   job_entry->job_info.num_completed_bytes = progress;
1059   if (total != -1)
1060     job_entry->job_info.num_total_bytes = total;
1061   NotifyJobUpdated(job_entry->job_info);
1062 }
1063 
OnConnectionTypeChanged(net::NetworkChangeNotifier::ConnectionType type)1064 void JobScheduler::OnConnectionTypeChanged(
1065     net::NetworkChangeNotifier::ConnectionType type) {
1066   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1067 
1068   // Resume the job loop.
1069   // Note that we don't need to check the network connection status as it will
1070   // be checked in GetCurrentAcceptedPriority().
1071   for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1072     DoJobLoop(static_cast<QueueType>(i));
1073 }
1074 
GetJobQueueType(JobType type)1075 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1076   switch (type) {
1077     case TYPE_GET_ABOUT_RESOURCE:
1078     case TYPE_GET_APP_LIST:
1079     case TYPE_GET_ALL_RESOURCE_LIST:
1080     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1081     case TYPE_SEARCH:
1082     case TYPE_GET_CHANGE_LIST:
1083     case TYPE_GET_REMAINING_CHANGE_LIST:
1084     case TYPE_GET_REMAINING_FILE_LIST:
1085     case TYPE_GET_RESOURCE_ENTRY:
1086     case TYPE_GET_SHARE_URL:
1087     case TYPE_TRASH_RESOURCE:
1088     case TYPE_COPY_RESOURCE:
1089     case TYPE_UPDATE_RESOURCE:
1090     case TYPE_RENAME_RESOURCE:
1091     case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1092     case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1093     case TYPE_ADD_NEW_DIRECTORY:
1094     case TYPE_CREATE_FILE:
1095     case TYPE_ADD_PERMISSION:
1096       return METADATA_QUEUE;
1097 
1098     case TYPE_DOWNLOAD_FILE:
1099     case TYPE_UPLOAD_NEW_FILE:
1100     case TYPE_UPLOAD_EXISTING_FILE:
1101       return FILE_QUEUE;
1102   }
1103   NOTREACHED();
1104   return FILE_QUEUE;
1105 }
1106 
AbortNotRunningJob(JobEntry * job,google_apis::GDataErrorCode error)1107 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1108                                       google_apis::GDataErrorCode error) {
1109   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1110 
1111   const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1112   const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1113   logger_->Log(logging::LOG_INFO,
1114                "Job aborted: %s => %s (elapsed time: %sms) - %s",
1115                job->job_info.ToString().c_str(),
1116                GDataErrorCodeToString(error).c_str(),
1117                base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1118                GetQueueInfo(queue_type).c_str());
1119 
1120   base::Callback<void(google_apis::GDataErrorCode)> callback =
1121       job->abort_callback;
1122   queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1123   NotifyJobDone(job->job_info, error);
1124   job_map_.Remove(job->job_info.job_id);
1125   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1126                                               base::Bind(callback, error));
1127 }
1128 
NotifyJobAdded(const JobInfo & job_info)1129 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1130   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1131   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1132 }
1133 
NotifyJobDone(const JobInfo & job_info,google_apis::GDataErrorCode error)1134 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1135                                  google_apis::GDataErrorCode error) {
1136   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1137   FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1138                     OnJobDone(job_info, GDataToFileError(error)));
1139 }
1140 
NotifyJobUpdated(const JobInfo & job_info)1141 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1142   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1143   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1144 }
1145 
GetQueueInfo(QueueType type) const1146 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1147   return QueueTypeToString(type) + " " + queue_[type]->ToString();
1148 }
1149 
1150 // static
QueueTypeToString(QueueType type)1151 std::string JobScheduler::QueueTypeToString(QueueType type) {
1152   switch (type) {
1153     case METADATA_QUEUE:
1154       return "METADATA_QUEUE";
1155     case FILE_QUEUE:
1156       return "FILE_QUEUE";
1157     case NUM_QUEUES:
1158       break;  // This value is just a sentinel. Should never be used.
1159   }
1160   NOTREACHED();
1161   return "";
1162 }
1163 
1164 }  // namespace drive
1165