• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
6 
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/chromeos/drive/file_system_util.h"
13 #include "chrome/browser/chromeos/drive/logging.h"
14 #include "chrome/common/pref_names.h"
15 #include "content/public/browser/browser_thread.h"
16 #include "google_apis/drive/drive_api_parser.h"
17 
18 using content::BrowserThread;
19 
20 namespace drive {
21 
22 namespace {
23 
24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
25 // throttling or server error.  The delay before retrying a job is shared among
26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
27 //
28 // According to the API documentation, kMaxRetryCount should be the same as
29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
30 // But currently multiplied by 2 to ensure upload related jobs retried for a
31 // sufficient number of times. crbug.com/269918
32 const int kMaxThrottleCount = 4;
33 const int kMaxRetryCount = 2 * kMaxThrottleCount;
34 
35 // GetDefaultValue returns a value constructed by the default constructor.
36 template<typename T> struct DefaultValueCreator {
GetDefaultValuedrive::__anon20bd5afa0111::DefaultValueCreator37   static T GetDefaultValue() { return T(); }
38 };
39 template<typename T> struct DefaultValueCreator<const T&> {
GetDefaultValuedrive::__anon20bd5afa0111::DefaultValueCreator40   static T GetDefaultValue() { return T(); }
41 };
42 
43 // Helper of CreateErrorRunCallback implementation.
44 // Provides:
45 // - ResultType; the type of the Callback which should be returned by
46 //     CreateErrorRunCallback.
47 // - Run(): a static function which takes the original |callback| and |error|,
48 //     and runs the |callback|.Run() with the error code and default values
49 //     for remaining arguments.
50 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
51 
52 // CreateErrorRunCallback with two arguments.
53 template<typename P1>
54 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
Rundrive::__anon20bd5afa0111::CreateErrorRunCallbackHelper55   static void Run(
56       const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
57       google_apis::GDataErrorCode error) {
58     callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
59   }
60 };
61 
62 // Returns a callback with the tail parameter bound to its default value.
63 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
64 template<typename CallbackType>
65 base::Callback<void(google_apis::GDataErrorCode)>
CreateErrorRunCallback(const base::Callback<CallbackType> & callback)66 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
67   return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
68 }
69 
70 // Parameter struct for RunUploadNewFile.
71 struct UploadNewFileParams {
72   std::string parent_resource_id;
73   base::FilePath local_file_path;
74   std::string title;
75   std::string content_type;
76   UploadCompletionCallback callback;
77   google_apis::ProgressCallback progress_callback;
78 };
79 
80 // Helper function to work around the arity limitation of base::Bind.
RunUploadNewFile(DriveUploaderInterface * uploader,const UploadNewFileParams & params)81 google_apis::CancelCallback RunUploadNewFile(
82     DriveUploaderInterface* uploader,
83     const UploadNewFileParams& params) {
84   return uploader->UploadNewFile(params.parent_resource_id,
85                                  params.local_file_path,
86                                  params.title,
87                                  params.content_type,
88                                  params.callback,
89                                  params.progress_callback);
90 }
91 
92 // Parameter struct for RunUploadExistingFile.
93 struct UploadExistingFileParams {
94   std::string resource_id;
95   base::FilePath local_file_path;
96   std::string content_type;
97   std::string etag;
98   UploadCompletionCallback callback;
99   google_apis::ProgressCallback progress_callback;
100 };
101 
102 // Helper function to work around the arity limitation of base::Bind.
RunUploadExistingFile(DriveUploaderInterface * uploader,const UploadExistingFileParams & params)103 google_apis::CancelCallback RunUploadExistingFile(
104     DriveUploaderInterface* uploader,
105     const UploadExistingFileParams& params) {
106   return uploader->UploadExistingFile(params.resource_id,
107                                       params.local_file_path,
108                                       params.content_type,
109                                       params.etag,
110                                       params.callback,
111                                       params.progress_callback);
112 }
113 
114 // Parameter struct for RunResumeUploadFile.
115 struct ResumeUploadFileParams {
116   GURL upload_location;
117   base::FilePath local_file_path;
118   std::string content_type;
119   UploadCompletionCallback callback;
120   google_apis::ProgressCallback progress_callback;
121 };
122 
123 // Helper function to adjust the return type.
RunResumeUploadFile(DriveUploaderInterface * uploader,const ResumeUploadFileParams & params)124 google_apis::CancelCallback RunResumeUploadFile(
125     DriveUploaderInterface* uploader,
126     const ResumeUploadFileParams& params) {
127   return uploader->ResumeUploadFile(params.upload_location,
128                                     params.local_file_path,
129                                     params.content_type,
130                                     params.callback,
131                                     params.progress_callback);
132 }
133 
134 }  // namespace
135 
136 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
137 const int JobScheduler::kMaxJobCount[] = {
138   5,  // METADATA_QUEUE
139   1,  // FILE_QUEUE
140 };
141 
JobEntry(JobType type)142 JobScheduler::JobEntry::JobEntry(JobType type)
143     : job_info(type),
144       context(ClientContext(USER_INITIATED)),
145       retry_count(0) {
146   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
147 }
148 
~JobEntry()149 JobScheduler::JobEntry::~JobEntry() {
150   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
151 }
152 
153 struct JobScheduler::ResumeUploadParams {
154   base::FilePath drive_file_path;
155   base::FilePath local_file_path;
156   std::string content_type;
157 };
158 
JobScheduler(PrefService * pref_service,DriveServiceInterface * drive_service,base::SequencedTaskRunner * blocking_task_runner)159 JobScheduler::JobScheduler(
160     PrefService* pref_service,
161     DriveServiceInterface* drive_service,
162     base::SequencedTaskRunner* blocking_task_runner)
163     : throttle_count_(0),
164       wait_until_(base::Time::Now()),
165       disable_throttling_(false),
166       drive_service_(drive_service),
167       uploader_(new DriveUploader(drive_service, blocking_task_runner)),
168       pref_service_(pref_service),
169       weak_ptr_factory_(this) {
170   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
171 
172   for (int i = 0; i < NUM_QUEUES; ++i)
173     queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
174 
175   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
176 }
177 
~JobScheduler()178 JobScheduler::~JobScheduler() {
179   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
180 
181   size_t num_queued_jobs = 0;
182   for (int i = 0; i < NUM_QUEUES; ++i)
183     num_queued_jobs += queue_[i]->GetNumberOfJobs();
184   DCHECK_EQ(num_queued_jobs, job_map_.size());
185 
186   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
187 }
188 
GetJobInfoList()189 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
190   std::vector<JobInfo> job_info_list;
191   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
192     job_info_list.push_back(iter.GetCurrentValue()->job_info);
193   return job_info_list;
194 }
195 
AddObserver(JobListObserver * observer)196 void JobScheduler::AddObserver(JobListObserver* observer) {
197   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
198   observer_list_.AddObserver(observer);
199 }
200 
RemoveObserver(JobListObserver * observer)201 void JobScheduler::RemoveObserver(JobListObserver* observer) {
202   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
203   observer_list_.RemoveObserver(observer);
204 }
205 
CancelJob(JobID job_id)206 void JobScheduler::CancelJob(JobID job_id) {
207   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
208 
209   JobEntry* job = job_map_.Lookup(job_id);
210   if (job) {
211     if (job->job_info.state == STATE_RUNNING) {
212       // If the job is running an HTTP request, cancel it via |cancel_callback|
213       // returned from the request, and wait for termination in the normal
214       // callback handler, OnJobDone.
215       if (!job->cancel_callback.is_null())
216         job->cancel_callback.Run();
217     } else {
218       AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
219     }
220   }
221 }
222 
CancelAllJobs()223 void JobScheduler::CancelAllJobs() {
224   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
225 
226   // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
227   // removable during iteration.
228   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
229     CancelJob(iter.GetCurrentKey());
230 }
231 
GetAboutResource(const google_apis::AboutResourceCallback & callback)232 void JobScheduler::GetAboutResource(
233     const google_apis::AboutResourceCallback& callback) {
234   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
235   DCHECK(!callback.is_null());
236 
237   JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
238   new_job->task = base::Bind(
239       &DriveServiceInterface::GetAboutResource,
240       base::Unretained(drive_service_),
241       base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
242                  weak_ptr_factory_.GetWeakPtr(),
243                  new_job->job_info.job_id,
244                  callback));
245   new_job->abort_callback = CreateErrorRunCallback(callback);
246   StartJob(new_job);
247 }
248 
GetAppList(const google_apis::AppListCallback & callback)249 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
250   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
251   DCHECK(!callback.is_null());
252 
253   JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
254   new_job->task = base::Bind(
255       &DriveServiceInterface::GetAppList,
256       base::Unretained(drive_service_),
257       base::Bind(&JobScheduler::OnGetAppListJobDone,
258                  weak_ptr_factory_.GetWeakPtr(),
259                  new_job->job_info.job_id,
260                  callback));
261   new_job->abort_callback = CreateErrorRunCallback(callback);
262   StartJob(new_job);
263 }
264 
GetAllResourceList(const google_apis::GetResourceListCallback & callback)265 void JobScheduler::GetAllResourceList(
266     const google_apis::GetResourceListCallback& callback) {
267   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
268   DCHECK(!callback.is_null());
269 
270   JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
271   new_job->task = base::Bind(
272       &DriveServiceInterface::GetAllResourceList,
273       base::Unretained(drive_service_),
274       base::Bind(&JobScheduler::OnGetResourceListJobDone,
275                  weak_ptr_factory_.GetWeakPtr(),
276                  new_job->job_info.job_id,
277                  callback));
278   new_job->abort_callback = CreateErrorRunCallback(callback);
279   StartJob(new_job);
280 }
281 
GetResourceListInDirectory(const std::string & directory_resource_id,const google_apis::GetResourceListCallback & callback)282 void JobScheduler::GetResourceListInDirectory(
283     const std::string& directory_resource_id,
284     const google_apis::GetResourceListCallback& callback) {
285   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
286   DCHECK(!callback.is_null());
287 
288   JobEntry* new_job = CreateNewJob(
289       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
290   new_job->task = base::Bind(
291       &DriveServiceInterface::GetResourceListInDirectory,
292       base::Unretained(drive_service_),
293       directory_resource_id,
294       base::Bind(&JobScheduler::OnGetResourceListJobDone,
295                  weak_ptr_factory_.GetWeakPtr(),
296                  new_job->job_info.job_id,
297                  callback));
298   new_job->abort_callback = CreateErrorRunCallback(callback);
299   StartJob(new_job);
300 }
301 
Search(const std::string & search_query,const google_apis::GetResourceListCallback & callback)302 void JobScheduler::Search(
303     const std::string& search_query,
304     const google_apis::GetResourceListCallback& callback) {
305   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
306   DCHECK(!callback.is_null());
307 
308   JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
309   new_job->task = base::Bind(
310       &DriveServiceInterface::Search,
311       base::Unretained(drive_service_),
312       search_query,
313       base::Bind(&JobScheduler::OnGetResourceListJobDone,
314                  weak_ptr_factory_.GetWeakPtr(),
315                  new_job->job_info.job_id,
316                  callback));
317   new_job->abort_callback = CreateErrorRunCallback(callback);
318   StartJob(new_job);
319 }
320 
GetChangeList(int64 start_changestamp,const google_apis::GetResourceListCallback & callback)321 void JobScheduler::GetChangeList(
322     int64 start_changestamp,
323     const google_apis::GetResourceListCallback& callback) {
324   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
325   DCHECK(!callback.is_null());
326 
327   JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
328   new_job->task = base::Bind(
329       &DriveServiceInterface::GetChangeList,
330       base::Unretained(drive_service_),
331       start_changestamp,
332       base::Bind(&JobScheduler::OnGetResourceListJobDone,
333                  weak_ptr_factory_.GetWeakPtr(),
334                  new_job->job_info.job_id,
335                  callback));
336   new_job->abort_callback = CreateErrorRunCallback(callback);
337   StartJob(new_job);
338 }
339 
GetRemainingChangeList(const GURL & next_link,const google_apis::GetResourceListCallback & callback)340 void JobScheduler::GetRemainingChangeList(
341     const GURL& next_link,
342     const google_apis::GetResourceListCallback& callback) {
343   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
344   DCHECK(!callback.is_null());
345 
346   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
347   new_job->task = base::Bind(
348       &DriveServiceInterface::GetRemainingChangeList,
349       base::Unretained(drive_service_),
350       next_link,
351       base::Bind(&JobScheduler::OnGetResourceListJobDone,
352                  weak_ptr_factory_.GetWeakPtr(),
353                  new_job->job_info.job_id,
354                  callback));
355   new_job->abort_callback = CreateErrorRunCallback(callback);
356   StartJob(new_job);
357 }
358 
GetRemainingFileList(const GURL & next_link,const google_apis::GetResourceListCallback & callback)359 void JobScheduler::GetRemainingFileList(
360     const GURL& next_link,
361     const google_apis::GetResourceListCallback& callback) {
362   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
363   DCHECK(!callback.is_null());
364 
365   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
366   new_job->task = base::Bind(
367       &DriveServiceInterface::GetRemainingFileList,
368       base::Unretained(drive_service_),
369       next_link,
370       base::Bind(&JobScheduler::OnGetResourceListJobDone,
371                  weak_ptr_factory_.GetWeakPtr(),
372                  new_job->job_info.job_id,
373                  callback));
374   new_job->abort_callback = CreateErrorRunCallback(callback);
375   StartJob(new_job);
376 }
377 
GetResourceEntry(const std::string & resource_id,const ClientContext & context,const google_apis::GetResourceEntryCallback & callback)378 void JobScheduler::GetResourceEntry(
379     const std::string& resource_id,
380     const ClientContext& context,
381     const google_apis::GetResourceEntryCallback& callback) {
382   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
383   DCHECK(!callback.is_null());
384 
385   JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
386   new_job->context = context;
387   new_job->task = base::Bind(
388       &DriveServiceInterface::GetResourceEntry,
389       base::Unretained(drive_service_),
390       resource_id,
391       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
392                  weak_ptr_factory_.GetWeakPtr(),
393                  new_job->job_info.job_id,
394                  callback));
395   new_job->abort_callback = CreateErrorRunCallback(callback);
396   StartJob(new_job);
397 }
398 
GetShareUrl(const std::string & resource_id,const GURL & embed_origin,const ClientContext & context,const google_apis::GetShareUrlCallback & callback)399 void JobScheduler::GetShareUrl(
400     const std::string& resource_id,
401     const GURL& embed_origin,
402     const ClientContext& context,
403     const google_apis::GetShareUrlCallback& callback) {
404   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
405   DCHECK(!callback.is_null());
406 
407   JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
408   new_job->context = context;
409   new_job->task = base::Bind(
410       &DriveServiceInterface::GetShareUrl,
411       base::Unretained(drive_service_),
412       resource_id,
413       embed_origin,
414       base::Bind(&JobScheduler::OnGetShareUrlJobDone,
415                  weak_ptr_factory_.GetWeakPtr(),
416                  new_job->job_info.job_id,
417                  callback));
418   new_job->abort_callback = CreateErrorRunCallback(callback);
419   StartJob(new_job);
420 }
421 
TrashResource(const std::string & resource_id,const ClientContext & context,const google_apis::EntryActionCallback & callback)422 void JobScheduler::TrashResource(
423     const std::string& resource_id,
424     const ClientContext& context,
425     const google_apis::EntryActionCallback& callback) {
426   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
427   DCHECK(!callback.is_null());
428 
429   JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
430   new_job->context = context;
431   new_job->task = base::Bind(
432       &DriveServiceInterface::TrashResource,
433       base::Unretained(drive_service_),
434       resource_id,
435       base::Bind(&JobScheduler::OnEntryActionJobDone,
436                  weak_ptr_factory_.GetWeakPtr(),
437                  new_job->job_info.job_id,
438                  callback));
439   new_job->abort_callback = callback;
440   StartJob(new_job);
441 }
442 
CopyResource(const std::string & resource_id,const std::string & parent_resource_id,const std::string & new_title,const base::Time & last_modified,const google_apis::GetResourceEntryCallback & callback)443 void JobScheduler::CopyResource(
444     const std::string& resource_id,
445     const std::string& parent_resource_id,
446     const std::string& new_title,
447     const base::Time& last_modified,
448     const google_apis::GetResourceEntryCallback& callback) {
449   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
450   DCHECK(!callback.is_null());
451 
452   JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
453   new_job->task = base::Bind(
454       &DriveServiceInterface::CopyResource,
455       base::Unretained(drive_service_),
456       resource_id,
457       parent_resource_id,
458       new_title,
459       last_modified,
460       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
461                  weak_ptr_factory_.GetWeakPtr(),
462                  new_job->job_info.job_id,
463                  callback));
464   new_job->abort_callback = CreateErrorRunCallback(callback);
465   StartJob(new_job);
466 }
467 
UpdateResource(const std::string & resource_id,const std::string & parent_resource_id,const std::string & new_title,const base::Time & last_modified,const base::Time & last_viewed_by_me,const ClientContext & context,const google_apis::GetResourceEntryCallback & callback)468 void JobScheduler::UpdateResource(
469     const std::string& resource_id,
470     const std::string& parent_resource_id,
471     const std::string& new_title,
472     const base::Time& last_modified,
473     const base::Time& last_viewed_by_me,
474     const ClientContext& context,
475     const google_apis::GetResourceEntryCallback& callback) {
476   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
477   DCHECK(!callback.is_null());
478 
479   JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
480   new_job->context = context;
481   new_job->task = base::Bind(
482       &DriveServiceInterface::UpdateResource,
483       base::Unretained(drive_service_),
484       resource_id,
485       parent_resource_id,
486       new_title,
487       last_modified,
488       last_viewed_by_me,
489       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
490                  weak_ptr_factory_.GetWeakPtr(),
491                  new_job->job_info.job_id,
492                  callback));
493   new_job->abort_callback = CreateErrorRunCallback(callback);
494   StartJob(new_job);
495 }
496 
RenameResource(const std::string & resource_id,const std::string & new_title,const google_apis::EntryActionCallback & callback)497 void JobScheduler::RenameResource(
498     const std::string& resource_id,
499     const std::string& new_title,
500     const google_apis::EntryActionCallback& callback) {
501   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
502   DCHECK(!callback.is_null());
503 
504   JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
505   new_job->task = base::Bind(
506       &DriveServiceInterface::RenameResource,
507       base::Unretained(drive_service_),
508       resource_id,
509       new_title,
510       base::Bind(&JobScheduler::OnEntryActionJobDone,
511                  weak_ptr_factory_.GetWeakPtr(),
512                  new_job->job_info.job_id,
513                  callback));
514   new_job->abort_callback = callback;
515   StartJob(new_job);
516 }
517 
AddResourceToDirectory(const std::string & parent_resource_id,const std::string & resource_id,const google_apis::EntryActionCallback & callback)518 void JobScheduler::AddResourceToDirectory(
519     const std::string& parent_resource_id,
520     const std::string& resource_id,
521     const google_apis::EntryActionCallback& callback) {
522   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
523   DCHECK(!callback.is_null());
524 
525   JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
526   new_job->task = base::Bind(
527       &DriveServiceInterface::AddResourceToDirectory,
528       base::Unretained(drive_service_),
529       parent_resource_id,
530       resource_id,
531       base::Bind(&JobScheduler::OnEntryActionJobDone,
532                  weak_ptr_factory_.GetWeakPtr(),
533                  new_job->job_info.job_id,
534                  callback));
535   new_job->abort_callback = callback;
536   StartJob(new_job);
537 }
538 
RemoveResourceFromDirectory(const std::string & parent_resource_id,const std::string & resource_id,const ClientContext & context,const google_apis::EntryActionCallback & callback)539 void JobScheduler::RemoveResourceFromDirectory(
540     const std::string& parent_resource_id,
541     const std::string& resource_id,
542     const ClientContext& context,
543     const google_apis::EntryActionCallback& callback) {
544   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
545 
546   JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
547   new_job->context = context;
548   new_job->task = base::Bind(
549       &DriveServiceInterface::RemoveResourceFromDirectory,
550       base::Unretained(drive_service_),
551       parent_resource_id,
552       resource_id,
553       base::Bind(&JobScheduler::OnEntryActionJobDone,
554                  weak_ptr_factory_.GetWeakPtr(),
555                  new_job->job_info.job_id,
556                  callback));
557   new_job->abort_callback = callback;
558   StartJob(new_job);
559 }
560 
AddNewDirectory(const std::string & parent_resource_id,const std::string & directory_title,const google_apis::GetResourceEntryCallback & callback)561 void JobScheduler::AddNewDirectory(
562     const std::string& parent_resource_id,
563     const std::string& directory_title,
564     const google_apis::GetResourceEntryCallback& callback) {
565   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
566 
567   JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
568   new_job->task = base::Bind(
569       &DriveServiceInterface::AddNewDirectory,
570       base::Unretained(drive_service_),
571       parent_resource_id,
572       directory_title,
573       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
574                  weak_ptr_factory_.GetWeakPtr(),
575                  new_job->job_info.job_id,
576                  callback));
577   new_job->abort_callback = CreateErrorRunCallback(callback);
578   StartJob(new_job);
579 }
580 
DownloadFile(const base::FilePath & virtual_path,int64 expected_file_size,const base::FilePath & local_cache_path,const std::string & resource_id,const ClientContext & context,const google_apis::DownloadActionCallback & download_action_callback,const google_apis::GetContentCallback & get_content_callback)581 JobID JobScheduler::DownloadFile(
582     const base::FilePath& virtual_path,
583     int64 expected_file_size,
584     const base::FilePath& local_cache_path,
585     const std::string& resource_id,
586     const ClientContext& context,
587     const google_apis::DownloadActionCallback& download_action_callback,
588     const google_apis::GetContentCallback& get_content_callback) {
589   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
590 
591   JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
592   new_job->job_info.file_path = virtual_path;
593   new_job->job_info.num_total_bytes = expected_file_size;
594   new_job->context = context;
595   new_job->task = base::Bind(
596       &DriveServiceInterface::DownloadFile,
597       base::Unretained(drive_service_),
598       local_cache_path,
599       resource_id,
600       base::Bind(&JobScheduler::OnDownloadActionJobDone,
601                  weak_ptr_factory_.GetWeakPtr(),
602                  new_job->job_info.job_id,
603                  download_action_callback),
604       get_content_callback,
605       base::Bind(&JobScheduler::UpdateProgress,
606                  weak_ptr_factory_.GetWeakPtr(),
607                  new_job->job_info.job_id));
608   new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
609   StartJob(new_job);
610   return new_job->job_info.job_id;
611 }
612 
UploadNewFile(const std::string & parent_resource_id,const base::FilePath & drive_file_path,const base::FilePath & local_file_path,const std::string & title,const std::string & content_type,const ClientContext & context,const google_apis::GetResourceEntryCallback & callback)613 void JobScheduler::UploadNewFile(
614     const std::string& parent_resource_id,
615     const base::FilePath& drive_file_path,
616     const base::FilePath& local_file_path,
617     const std::string& title,
618     const std::string& content_type,
619     const ClientContext& context,
620     const google_apis::GetResourceEntryCallback& callback) {
621   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
622 
623   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
624   new_job->job_info.file_path = drive_file_path;
625   new_job->context = context;
626 
627   UploadNewFileParams params;
628   params.parent_resource_id = parent_resource_id;
629   params.local_file_path = local_file_path;
630   params.title = title;
631   params.content_type = content_type;
632 
633   ResumeUploadParams resume_params;
634   resume_params.local_file_path = params.local_file_path;
635   resume_params.content_type = params.content_type;
636 
637   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
638                                weak_ptr_factory_.GetWeakPtr(),
639                                new_job->job_info.job_id,
640                                resume_params,
641                                callback);
642   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
643                                         weak_ptr_factory_.GetWeakPtr(),
644                                         new_job->job_info.job_id);
645   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
646   new_job->abort_callback = CreateErrorRunCallback(callback);
647   StartJob(new_job);
648 }
649 
UploadExistingFile(const std::string & resource_id,const base::FilePath & drive_file_path,const base::FilePath & local_file_path,const std::string & content_type,const std::string & etag,const ClientContext & context,const google_apis::GetResourceEntryCallback & callback)650 void JobScheduler::UploadExistingFile(
651     const std::string& resource_id,
652     const base::FilePath& drive_file_path,
653     const base::FilePath& local_file_path,
654     const std::string& content_type,
655     const std::string& etag,
656     const ClientContext& context,
657     const google_apis::GetResourceEntryCallback& callback) {
658   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
659 
660   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
661   new_job->job_info.file_path = drive_file_path;
662   new_job->context = context;
663 
664   UploadExistingFileParams params;
665   params.resource_id = resource_id;
666   params.local_file_path = local_file_path;
667   params.content_type = content_type;
668   params.etag = etag;
669 
670   ResumeUploadParams resume_params;
671   resume_params.local_file_path = params.local_file_path;
672   resume_params.content_type = params.content_type;
673 
674   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
675                                weak_ptr_factory_.GetWeakPtr(),
676                                new_job->job_info.job_id,
677                                resume_params,
678                                callback);
679   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
680                                         weak_ptr_factory_.GetWeakPtr(),
681                                         new_job->job_info.job_id);
682   new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
683   new_job->abort_callback = CreateErrorRunCallback(callback);
684   StartJob(new_job);
685 }
686 
CreateFile(const std::string & parent_resource_id,const base::FilePath & drive_file_path,const std::string & title,const std::string & content_type,const ClientContext & context,const google_apis::GetResourceEntryCallback & callback)687 void JobScheduler::CreateFile(
688     const std::string& parent_resource_id,
689     const base::FilePath& drive_file_path,
690     const std::string& title,
691     const std::string& content_type,
692     const ClientContext& context,
693     const google_apis::GetResourceEntryCallback& callback) {
694   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
695 
696   const base::FilePath kDevNull(FILE_PATH_LITERAL("/dev/null"));
697 
698   JobEntry* new_job = CreateNewJob(TYPE_CREATE_FILE);
699   new_job->job_info.file_path = drive_file_path;
700   new_job->context = context;
701 
702   UploadNewFileParams params;
703   params.parent_resource_id = parent_resource_id;
704   params.local_file_path = kDevNull;  // Upload an empty file.
705   params.title = title;
706   params.content_type = content_type;
707 
708   ResumeUploadParams resume_params;
709   resume_params.local_file_path = params.local_file_path;
710   resume_params.content_type = params.content_type;
711 
712   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
713                                weak_ptr_factory_.GetWeakPtr(),
714                                new_job->job_info.job_id,
715                                resume_params,
716                                callback);
717   params.progress_callback = google_apis::ProgressCallback();
718 
719   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
720   new_job->abort_callback = CreateErrorRunCallback(callback);
721   StartJob(new_job);
722 }
723 
GetResourceListInDirectoryByWapi(const std::string & directory_resource_id,const google_apis::GetResourceListCallback & callback)724 void JobScheduler::GetResourceListInDirectoryByWapi(
725     const std::string& directory_resource_id,
726     const google_apis::GetResourceListCallback& callback) {
727   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
728   DCHECK(!callback.is_null());
729 
730   JobEntry* new_job = CreateNewJob(
731       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI);
732   new_job->task = base::Bind(
733       &DriveServiceInterface::GetResourceListInDirectoryByWapi,
734       base::Unretained(drive_service_),
735       directory_resource_id,
736       base::Bind(&JobScheduler::OnGetResourceListJobDone,
737                  weak_ptr_factory_.GetWeakPtr(),
738                  new_job->job_info.job_id,
739                  callback));
740   new_job->abort_callback = CreateErrorRunCallback(callback);
741   StartJob(new_job);
742 }
743 
GetRemainingResourceList(const GURL & next_link,const google_apis::GetResourceListCallback & callback)744 void JobScheduler::GetRemainingResourceList(
745     const GURL& next_link,
746     const google_apis::GetResourceListCallback& callback) {
747   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
748   DCHECK(!callback.is_null());
749 
750   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST);
751   new_job->task = base::Bind(
752       &DriveServiceInterface::GetRemainingResourceList,
753       base::Unretained(drive_service_),
754       next_link,
755       base::Bind(&JobScheduler::OnGetResourceListJobDone,
756                  weak_ptr_factory_.GetWeakPtr(),
757                  new_job->job_info.job_id,
758                  callback));
759   new_job->abort_callback = CreateErrorRunCallback(callback);
760   StartJob(new_job);
761 }
762 
CreateNewJob(JobType type)763 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
764   JobEntry* job = new JobEntry(type);
765   job->job_info.job_id = job_map_.Add(job);  // Takes the ownership of |job|.
766   return job;
767 }
768 
StartJob(JobEntry * job)769 void JobScheduler::StartJob(JobEntry* job) {
770   DCHECK(!job->task.is_null());
771 
772   QueueJob(job->job_info.job_id);
773   NotifyJobAdded(job->job_info);
774   DoJobLoop(GetJobQueueType(job->job_info.job_type));
775 }
776 
QueueJob(JobID job_id)777 void JobScheduler::QueueJob(JobID job_id) {
778   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
779 
780   JobEntry* job_entry = job_map_.Lookup(job_id);
781   DCHECK(job_entry);
782   const JobInfo& job_info = job_entry->job_info;
783 
784   QueueType queue_type = GetJobQueueType(job_info.job_type);
785   queue_[queue_type]->Push(job_id, job_entry->context.type);
786 
787   const std::string retry_prefix = job_entry->retry_count > 0 ?
788       base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
789   util::Log(logging::LOG_INFO,
790             "Job queued%s: %s - %s",
791             retry_prefix.c_str(),
792             job_info.ToString().c_str(),
793             GetQueueInfo(queue_type).c_str());
794 }
795 
DoJobLoop(QueueType queue_type)796 void JobScheduler::DoJobLoop(QueueType queue_type) {
797   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
798 
799   const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
800 
801   // Abort all USER_INITAITED jobs when not accepted.
802   if (accepted_priority < USER_INITIATED) {
803     std::vector<JobID> jobs;
804     queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
805     for (size_t i = 0; i < jobs.size(); ++i) {
806       JobEntry* job = job_map_.Lookup(jobs[i]);
807       DCHECK(job);
808       AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
809     }
810   }
811 
812   // Wait when throttled.
813   const base::Time now = base::Time::Now();
814   if (now < wait_until_) {
815     base::MessageLoopProxy::current()->PostDelayedTask(
816         FROM_HERE,
817         base::Bind(&JobScheduler::DoJobLoop,
818                    weak_ptr_factory_.GetWeakPtr(),
819                    queue_type),
820         wait_until_ - now);
821     return;
822   }
823 
824   // Run the job with the highest priority in the queue.
825   JobID job_id = -1;
826   if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
827     return;
828 
829   JobEntry* entry = job_map_.Lookup(job_id);
830   DCHECK(entry);
831 
832   JobInfo* job_info = &entry->job_info;
833   job_info->state = STATE_RUNNING;
834   job_info->start_time = now;
835   NotifyJobUpdated(*job_info);
836 
837   entry->cancel_callback = entry->task.Run();
838 
839   UpdateWait();
840 
841   util::Log(logging::LOG_INFO,
842             "Job started: %s - %s",
843             job_info->ToString().c_str(),
844             GetQueueInfo(queue_type).c_str());
845 }
846 
GetCurrentAcceptedPriority(QueueType queue_type)847 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
848   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
849 
850   const int kNoJobShouldRun = -1;
851 
852   // Should stop if Drive was disabled while running the fetch loop.
853   if (pref_service_->GetBoolean(prefs::kDisableDrive))
854     return kNoJobShouldRun;
855 
856   // Should stop if the network is not online.
857   if (net::NetworkChangeNotifier::IsOffline())
858     return kNoJobShouldRun;
859 
860   // For the file queue, if it is on cellular network, only user initiated
861   // operations are allowed to start.
862   if (queue_type == FILE_QUEUE &&
863       pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
864       net::NetworkChangeNotifier::IsConnectionCellular(
865           net::NetworkChangeNotifier::GetConnectionType()))
866     return USER_INITIATED;
867 
868   // Otherwise, every operations including background tasks are allowed.
869   return BACKGROUND;
870 }
871 
UpdateWait()872 void JobScheduler::UpdateWait() {
873   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
874 
875   if (disable_throttling_ || throttle_count_ == 0)
876     return;
877 
878   // Exponential backoff: https://developers.google.com/drive/handle-errors.
879   base::TimeDelta delay =
880       base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
881       base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
882   VLOG(1) << "Throttling for " << delay.InMillisecondsF();
883 
884   wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
885 }
886 
OnJobDone(JobID job_id,google_apis::GDataErrorCode error)887 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
888   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
889 
890   JobEntry* job_entry = job_map_.Lookup(job_id);
891   DCHECK(job_entry);
892   JobInfo* job_info = &job_entry->job_info;
893   QueueType queue_type = GetJobQueueType(job_info->job_type);
894   queue_[queue_type]->MarkFinished(job_id);
895 
896   const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
897   bool success = (GDataToFileError(error) == FILE_ERROR_OK);
898   util::Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
899             "Job done: %s => %s (elapsed time: %sms) - %s",
900             job_info->ToString().c_str(),
901             GDataErrorCodeToString(error).c_str(),
902             base::Int64ToString(elapsed.InMilliseconds()).c_str(),
903             GetQueueInfo(queue_type).c_str());
904 
905   // Retry, depending on the error.
906   const bool is_server_error =
907       error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
908       error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
909   if (is_server_error) {
910     if (throttle_count_ < kMaxThrottleCount)
911       ++throttle_count_;
912     UpdateWait();
913   } else {
914     throttle_count_ = 0;
915   }
916 
917   const bool should_retry =
918       is_server_error && job_entry->retry_count < kMaxRetryCount;
919   if (should_retry) {
920     job_entry->cancel_callback.Reset();
921     job_info->state = STATE_RETRY;
922     NotifyJobUpdated(*job_info);
923 
924     ++job_entry->retry_count;
925 
926     // Requeue the job.
927     QueueJob(job_id);
928   } else {
929     NotifyJobDone(*job_info, error);
930     // The job has finished, no retry will happen in the scheduler. Now we can
931     // remove the job info from the map.
932     job_map_.Remove(job_id);
933   }
934 
935   // Post a task to continue the job loop.  This allows us to finish handling
936   // the current job before starting the next one.
937   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
938       base::Bind(&JobScheduler::DoJobLoop,
939                  weak_ptr_factory_.GetWeakPtr(),
940                  queue_type));
941   return !should_retry;
942 }
943 
OnGetResourceListJobDone(JobID job_id,const google_apis::GetResourceListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::ResourceList> resource_list)944 void JobScheduler::OnGetResourceListJobDone(
945     JobID job_id,
946     const google_apis::GetResourceListCallback& callback,
947     google_apis::GDataErrorCode error,
948     scoped_ptr<google_apis::ResourceList> resource_list) {
949   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
950   DCHECK(!callback.is_null());
951 
952   if (OnJobDone(job_id, error))
953     callback.Run(error, resource_list.Pass());
954 }
955 
OnGetResourceEntryJobDone(JobID job_id,const google_apis::GetResourceEntryCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::ResourceEntry> entry)956 void JobScheduler::OnGetResourceEntryJobDone(
957     JobID job_id,
958     const google_apis::GetResourceEntryCallback& callback,
959     google_apis::GDataErrorCode error,
960     scoped_ptr<google_apis::ResourceEntry> entry) {
961   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
962   DCHECK(!callback.is_null());
963 
964   if (OnJobDone(job_id, error))
965     callback.Run(error, entry.Pass());
966 }
967 
OnGetAboutResourceJobDone(JobID job_id,const google_apis::AboutResourceCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::AboutResource> about_resource)968 void JobScheduler::OnGetAboutResourceJobDone(
969     JobID job_id,
970     const google_apis::AboutResourceCallback& callback,
971     google_apis::GDataErrorCode error,
972     scoped_ptr<google_apis::AboutResource> about_resource) {
973   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
974   DCHECK(!callback.is_null());
975 
976   if (OnJobDone(job_id, error))
977     callback.Run(error, about_resource.Pass());
978 }
979 
OnGetShareUrlJobDone(JobID job_id,const google_apis::GetShareUrlCallback & callback,google_apis::GDataErrorCode error,const GURL & share_url)980 void JobScheduler::OnGetShareUrlJobDone(
981     JobID job_id,
982     const google_apis::GetShareUrlCallback& callback,
983     google_apis::GDataErrorCode error,
984     const GURL& share_url) {
985   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
986   DCHECK(!callback.is_null());
987 
988   if (OnJobDone(job_id, error))
989     callback.Run(error, share_url);
990 }
991 
OnGetAppListJobDone(JobID job_id,const google_apis::AppListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::AppList> app_list)992 void JobScheduler::OnGetAppListJobDone(
993     JobID job_id,
994     const google_apis::AppListCallback& callback,
995     google_apis::GDataErrorCode error,
996     scoped_ptr<google_apis::AppList> app_list) {
997   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
998   DCHECK(!callback.is_null());
999 
1000   if (OnJobDone(job_id, error))
1001     callback.Run(error, app_list.Pass());
1002 }
1003 
OnEntryActionJobDone(JobID job_id,const google_apis::EntryActionCallback & callback,google_apis::GDataErrorCode error)1004 void JobScheduler::OnEntryActionJobDone(
1005     JobID job_id,
1006     const google_apis::EntryActionCallback& callback,
1007     google_apis::GDataErrorCode error) {
1008   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1009   DCHECK(!callback.is_null());
1010 
1011   if (OnJobDone(job_id, error))
1012     callback.Run(error);
1013 }
1014 
OnDownloadActionJobDone(JobID job_id,const google_apis::DownloadActionCallback & callback,google_apis::GDataErrorCode error,const base::FilePath & temp_file)1015 void JobScheduler::OnDownloadActionJobDone(
1016     JobID job_id,
1017     const google_apis::DownloadActionCallback& callback,
1018     google_apis::GDataErrorCode error,
1019     const base::FilePath& temp_file) {
1020   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1021   DCHECK(!callback.is_null());
1022 
1023   if (OnJobDone(job_id, error))
1024     callback.Run(error, temp_file);
1025 }
1026 
OnUploadCompletionJobDone(JobID job_id,const ResumeUploadParams & resume_params,const google_apis::GetResourceEntryCallback & callback,google_apis::GDataErrorCode error,const GURL & upload_location,scoped_ptr<google_apis::ResourceEntry> resource_entry)1027 void JobScheduler::OnUploadCompletionJobDone(
1028     JobID job_id,
1029     const ResumeUploadParams& resume_params,
1030     const google_apis::GetResourceEntryCallback& callback,
1031     google_apis::GDataErrorCode error,
1032     const GURL& upload_location,
1033     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1034   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1035   DCHECK(!callback.is_null());
1036 
1037   if (!upload_location.is_empty()) {
1038     // If upload_location is available, update the task to resume the
1039     // upload process from the terminated point.
1040     // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1041     // so OnJobDone called below will be in charge to re-queue the job.
1042     JobEntry* job_entry = job_map_.Lookup(job_id);
1043     DCHECK(job_entry);
1044 
1045     ResumeUploadFileParams params;
1046     params.upload_location = upload_location;
1047     params.local_file_path = resume_params.local_file_path;
1048     params.content_type = resume_params.content_type;
1049     params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1050                                  weak_ptr_factory_.GetWeakPtr(),
1051                                  job_id,
1052                                  job_entry->task,
1053                                  callback);
1054     params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1055                                           weak_ptr_factory_.GetWeakPtr(),
1056                                           job_id);
1057     job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1058   }
1059 
1060   if (OnJobDone(job_id, error))
1061     callback.Run(error, resource_entry.Pass());
1062 }
1063 
OnResumeUploadFileDone(JobID job_id,const base::Callback<google_apis::CancelCallback ()> & original_task,const google_apis::GetResourceEntryCallback & callback,google_apis::GDataErrorCode error,const GURL & upload_location,scoped_ptr<google_apis::ResourceEntry> resource_entry)1064 void JobScheduler::OnResumeUploadFileDone(
1065     JobID job_id,
1066     const base::Callback<google_apis::CancelCallback()>& original_task,
1067     const google_apis::GetResourceEntryCallback& callback,
1068     google_apis::GDataErrorCode error,
1069     const GURL& upload_location,
1070     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1071   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1072   DCHECK(!original_task.is_null());
1073   DCHECK(!callback.is_null());
1074 
1075   if (upload_location.is_empty()) {
1076     // If upload_location is not available, we should discard it and stop trying
1077     // to resume. Restore the original task.
1078     JobEntry* job_entry = job_map_.Lookup(job_id);
1079     DCHECK(job_entry);
1080     job_entry->task = original_task;
1081   }
1082 
1083   if (OnJobDone(job_id, error))
1084     callback.Run(error, resource_entry.Pass());
1085 }
1086 
UpdateProgress(JobID job_id,int64 progress,int64 total)1087 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1088   JobEntry* job_entry = job_map_.Lookup(job_id);
1089   DCHECK(job_entry);
1090 
1091   job_entry->job_info.num_completed_bytes = progress;
1092   if (total != -1)
1093     job_entry->job_info.num_total_bytes = total;
1094   NotifyJobUpdated(job_entry->job_info);
1095 }
1096 
OnConnectionTypeChanged(net::NetworkChangeNotifier::ConnectionType type)1097 void JobScheduler::OnConnectionTypeChanged(
1098     net::NetworkChangeNotifier::ConnectionType type) {
1099   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1100 
1101   // Resume the job loop.
1102   // Note that we don't need to check the network connection status as it will
1103   // be checked in GetCurrentAcceptedPriority().
1104   for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1105     DoJobLoop(static_cast<QueueType>(i));
1106 }
1107 
GetJobQueueType(JobType type)1108 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1109   switch (type) {
1110     case TYPE_GET_ABOUT_RESOURCE:
1111     case TYPE_GET_APP_LIST:
1112     case TYPE_GET_ALL_RESOURCE_LIST:
1113     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1114     case TYPE_SEARCH:
1115     case TYPE_GET_CHANGE_LIST:
1116     case TYPE_GET_REMAINING_CHANGE_LIST:
1117     case TYPE_GET_REMAINING_FILE_LIST:
1118     case TYPE_GET_RESOURCE_ENTRY:
1119     case TYPE_GET_SHARE_URL:
1120     case TYPE_TRASH_RESOURCE:
1121     case TYPE_COPY_RESOURCE:
1122     case TYPE_UPDATE_RESOURCE:
1123     case TYPE_RENAME_RESOURCE:
1124     case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1125     case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1126     case TYPE_ADD_NEW_DIRECTORY:
1127     case TYPE_CREATE_FILE:
1128     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI:
1129     case TYPE_GET_REMAINING_RESOURCE_LIST:
1130       return METADATA_QUEUE;
1131 
1132     case TYPE_DOWNLOAD_FILE:
1133     case TYPE_UPLOAD_NEW_FILE:
1134     case TYPE_UPLOAD_EXISTING_FILE:
1135       return FILE_QUEUE;
1136   }
1137   NOTREACHED();
1138   return FILE_QUEUE;
1139 }
1140 
AbortNotRunningJob(JobEntry * job,google_apis::GDataErrorCode error)1141 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1142                                       google_apis::GDataErrorCode error) {
1143   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1144 
1145   const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1146   const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1147   util::Log(logging::LOG_INFO,
1148             "Job aborted: %s => %s (elapsed time: %sms) - %s",
1149             job->job_info.ToString().c_str(),
1150             GDataErrorCodeToString(error).c_str(),
1151             base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1152             GetQueueInfo(queue_type).c_str());
1153 
1154   base::Callback<void(google_apis::GDataErrorCode)> callback =
1155       job->abort_callback;
1156   queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1157   NotifyJobDone(job->job_info, error);
1158   job_map_.Remove(job->job_info.job_id);
1159   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1160                                               base::Bind(callback, error));
1161 }
1162 
NotifyJobAdded(const JobInfo & job_info)1163 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1164   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1165   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1166 }
1167 
NotifyJobDone(const JobInfo & job_info,google_apis::GDataErrorCode error)1168 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1169                                  google_apis::GDataErrorCode error) {
1170   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1171   FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1172                     OnJobDone(job_info, GDataToFileError(error)));
1173 }
1174 
NotifyJobUpdated(const JobInfo & job_info)1175 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1176   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1177   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1178 }
1179 
GetQueueInfo(QueueType type) const1180 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1181   return QueueTypeToString(type) + " " + queue_[type]->ToString();
1182 }
1183 
1184 // static
QueueTypeToString(QueueType type)1185 std::string JobScheduler::QueueTypeToString(QueueType type) {
1186   switch (type) {
1187     case METADATA_QUEUE:
1188       return "METADATA_QUEUE";
1189     case FILE_QUEUE:
1190       return "FILE_QUEUE";
1191     case NUM_QUEUES:
1192       break;  // This value is just a sentinel. Should never be used.
1193   }
1194   NOTREACHED();
1195   return "";
1196 }
1197 
1198 }  // namespace drive
1199