1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
6
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/drive/event_logger.h"
13 #include "chrome/common/pref_names.h"
14 #include "content/public/browser/browser_thread.h"
15 #include "google_apis/drive/drive_api_parser.h"
16
17 using content::BrowserThread;
18
19 namespace drive {
20
21 namespace {
22
23 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
24 // throttling or server error. The delay before retrying a job is shared among
25 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
26 //
27 // According to the API documentation, kMaxRetryCount should be the same as
28 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
29 // But currently multiplied by 2 to ensure upload related jobs retried for a
30 // sufficient number of times. crbug.com/269918
31 const int kMaxThrottleCount = 4;
32 const int kMaxRetryCount = 2 * kMaxThrottleCount;
33
34 // GetDefaultValue returns a value constructed by the default constructor.
35 template<typename T> struct DefaultValueCreator {
GetDefaultValuedrive::__anon60830c740111::DefaultValueCreator36 static T GetDefaultValue() { return T(); }
37 };
38 template<typename T> struct DefaultValueCreator<const T&> {
GetDefaultValuedrive::__anon60830c740111::DefaultValueCreator39 static T GetDefaultValue() { return T(); }
40 };
41
42 // Helper of CreateErrorRunCallback implementation.
43 // Provides:
44 // - ResultType; the type of the Callback which should be returned by
45 // CreateErrorRunCallback.
46 // - Run(): a static function which takes the original |callback| and |error|,
47 // and runs the |callback|.Run() with the error code and default values
48 // for remaining arguments.
49 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
50
51 // CreateErrorRunCallback with two arguments.
52 template<typename P1>
53 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
Rundrive::__anon60830c740111::CreateErrorRunCallbackHelper54 static void Run(
55 const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
56 google_apis::GDataErrorCode error) {
57 callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
58 }
59 };
60
61 // Returns a callback with the tail parameter bound to its default value.
62 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
63 template<typename CallbackType>
64 base::Callback<void(google_apis::GDataErrorCode)>
CreateErrorRunCallback(const base::Callback<CallbackType> & callback)65 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
66 return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
67 }
68
69 // Parameter struct for RunUploadNewFile.
70 struct UploadNewFileParams {
71 std::string parent_resource_id;
72 base::FilePath local_file_path;
73 std::string title;
74 std::string content_type;
75 DriveUploader::UploadNewFileOptions options;
76 UploadCompletionCallback callback;
77 google_apis::ProgressCallback progress_callback;
78 };
79
80 // Helper function to work around the arity limitation of base::Bind.
RunUploadNewFile(DriveUploaderInterface * uploader,const UploadNewFileParams & params)81 google_apis::CancelCallback RunUploadNewFile(
82 DriveUploaderInterface* uploader,
83 const UploadNewFileParams& params) {
84 return uploader->UploadNewFile(params.parent_resource_id,
85 params.local_file_path,
86 params.title,
87 params.content_type,
88 params.options,
89 params.callback,
90 params.progress_callback);
91 }
92
93 // Parameter struct for RunUploadExistingFile.
94 struct UploadExistingFileParams {
95 std::string resource_id;
96 base::FilePath local_file_path;
97 std::string content_type;
98 DriveUploader::UploadExistingFileOptions options;
99 std::string etag;
100 UploadCompletionCallback callback;
101 google_apis::ProgressCallback progress_callback;
102 };
103
104 // Helper function to work around the arity limitation of base::Bind.
RunUploadExistingFile(DriveUploaderInterface * uploader,const UploadExistingFileParams & params)105 google_apis::CancelCallback RunUploadExistingFile(
106 DriveUploaderInterface* uploader,
107 const UploadExistingFileParams& params) {
108 return uploader->UploadExistingFile(params.resource_id,
109 params.local_file_path,
110 params.content_type,
111 params.options,
112 params.callback,
113 params.progress_callback);
114 }
115
116 // Parameter struct for RunResumeUploadFile.
117 struct ResumeUploadFileParams {
118 GURL upload_location;
119 base::FilePath local_file_path;
120 std::string content_type;
121 UploadCompletionCallback callback;
122 google_apis::ProgressCallback progress_callback;
123 };
124
125 // Helper function to adjust the return type.
RunResumeUploadFile(DriveUploaderInterface * uploader,const ResumeUploadFileParams & params)126 google_apis::CancelCallback RunResumeUploadFile(
127 DriveUploaderInterface* uploader,
128 const ResumeUploadFileParams& params) {
129 return uploader->ResumeUploadFile(params.upload_location,
130 params.local_file_path,
131 params.content_type,
132 params.callback,
133 params.progress_callback);
134 }
135
136 } // namespace
137
138 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
139 const int JobScheduler::kMaxJobCount[] = {
140 5, // METADATA_QUEUE
141 1, // FILE_QUEUE
142 };
143
JobEntry(JobType type)144 JobScheduler::JobEntry::JobEntry(JobType type)
145 : job_info(type),
146 context(ClientContext(USER_INITIATED)),
147 retry_count(0) {
148 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
149 }
150
~JobEntry()151 JobScheduler::JobEntry::~JobEntry() {
152 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
153 }
154
155 struct JobScheduler::ResumeUploadParams {
156 base::FilePath drive_file_path;
157 base::FilePath local_file_path;
158 std::string content_type;
159 };
160
JobScheduler(PrefService * pref_service,EventLogger * logger,DriveServiceInterface * drive_service,base::SequencedTaskRunner * blocking_task_runner)161 JobScheduler::JobScheduler(
162 PrefService* pref_service,
163 EventLogger* logger,
164 DriveServiceInterface* drive_service,
165 base::SequencedTaskRunner* blocking_task_runner)
166 : throttle_count_(0),
167 wait_until_(base::Time::Now()),
168 disable_throttling_(false),
169 logger_(logger),
170 drive_service_(drive_service),
171 uploader_(new DriveUploader(drive_service, blocking_task_runner)),
172 pref_service_(pref_service),
173 weak_ptr_factory_(this) {
174 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
175
176 for (int i = 0; i < NUM_QUEUES; ++i)
177 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
178
179 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
180 }
181
~JobScheduler()182 JobScheduler::~JobScheduler() {
183 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
184
185 size_t num_queued_jobs = 0;
186 for (int i = 0; i < NUM_QUEUES; ++i)
187 num_queued_jobs += queue_[i]->GetNumberOfJobs();
188 DCHECK_EQ(num_queued_jobs, job_map_.size());
189
190 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
191 }
192
GetJobInfoList()193 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
194 std::vector<JobInfo> job_info_list;
195 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
196 job_info_list.push_back(iter.GetCurrentValue()->job_info);
197 return job_info_list;
198 }
199
AddObserver(JobListObserver * observer)200 void JobScheduler::AddObserver(JobListObserver* observer) {
201 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
202 observer_list_.AddObserver(observer);
203 }
204
RemoveObserver(JobListObserver * observer)205 void JobScheduler::RemoveObserver(JobListObserver* observer) {
206 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
207 observer_list_.RemoveObserver(observer);
208 }
209
CancelJob(JobID job_id)210 void JobScheduler::CancelJob(JobID job_id) {
211 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
212
213 JobEntry* job = job_map_.Lookup(job_id);
214 if (job) {
215 if (job->job_info.state == STATE_RUNNING) {
216 // If the job is running an HTTP request, cancel it via |cancel_callback|
217 // returned from the request, and wait for termination in the normal
218 // callback handler, OnJobDone.
219 if (!job->cancel_callback.is_null())
220 job->cancel_callback.Run();
221 } else {
222 AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
223 }
224 }
225 }
226
CancelAllJobs()227 void JobScheduler::CancelAllJobs() {
228 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
229
230 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
231 // removable during iteration.
232 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
233 CancelJob(iter.GetCurrentKey());
234 }
235
GetAboutResource(const google_apis::AboutResourceCallback & callback)236 void JobScheduler::GetAboutResource(
237 const google_apis::AboutResourceCallback& callback) {
238 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
239 DCHECK(!callback.is_null());
240
241 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
242 new_job->task = base::Bind(
243 &DriveServiceInterface::GetAboutResource,
244 base::Unretained(drive_service_),
245 base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
246 weak_ptr_factory_.GetWeakPtr(),
247 new_job->job_info.job_id,
248 callback));
249 new_job->abort_callback = CreateErrorRunCallback(callback);
250 StartJob(new_job);
251 }
252
GetAppList(const google_apis::AppListCallback & callback)253 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
254 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
255 DCHECK(!callback.is_null());
256
257 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
258 new_job->task = base::Bind(
259 &DriveServiceInterface::GetAppList,
260 base::Unretained(drive_service_),
261 base::Bind(&JobScheduler::OnGetAppListJobDone,
262 weak_ptr_factory_.GetWeakPtr(),
263 new_job->job_info.job_id,
264 callback));
265 new_job->abort_callback = CreateErrorRunCallback(callback);
266 StartJob(new_job);
267 }
268
GetAllFileList(const google_apis::FileListCallback & callback)269 void JobScheduler::GetAllFileList(
270 const google_apis::FileListCallback& callback) {
271 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
272 DCHECK(!callback.is_null());
273
274 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
275 new_job->task = base::Bind(
276 &DriveServiceInterface::GetAllFileList,
277 base::Unretained(drive_service_),
278 base::Bind(&JobScheduler::OnGetFileListJobDone,
279 weak_ptr_factory_.GetWeakPtr(),
280 new_job->job_info.job_id,
281 callback));
282 new_job->abort_callback = CreateErrorRunCallback(callback);
283 StartJob(new_job);
284 }
285
GetFileListInDirectory(const std::string & directory_resource_id,const google_apis::FileListCallback & callback)286 void JobScheduler::GetFileListInDirectory(
287 const std::string& directory_resource_id,
288 const google_apis::FileListCallback& callback) {
289 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
290 DCHECK(!callback.is_null());
291
292 JobEntry* new_job = CreateNewJob(
293 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
294 new_job->task = base::Bind(
295 &DriveServiceInterface::GetFileListInDirectory,
296 base::Unretained(drive_service_),
297 directory_resource_id,
298 base::Bind(&JobScheduler::OnGetFileListJobDone,
299 weak_ptr_factory_.GetWeakPtr(),
300 new_job->job_info.job_id,
301 callback));
302 new_job->abort_callback = CreateErrorRunCallback(callback);
303 StartJob(new_job);
304 }
305
Search(const std::string & search_query,const google_apis::FileListCallback & callback)306 void JobScheduler::Search(const std::string& search_query,
307 const google_apis::FileListCallback& callback) {
308 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
309 DCHECK(!callback.is_null());
310
311 JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
312 new_job->task = base::Bind(
313 &DriveServiceInterface::Search,
314 base::Unretained(drive_service_),
315 search_query,
316 base::Bind(&JobScheduler::OnGetFileListJobDone,
317 weak_ptr_factory_.GetWeakPtr(),
318 new_job->job_info.job_id,
319 callback));
320 new_job->abort_callback = CreateErrorRunCallback(callback);
321 StartJob(new_job);
322 }
323
GetChangeList(int64 start_changestamp,const google_apis::ChangeListCallback & callback)324 void JobScheduler::GetChangeList(
325 int64 start_changestamp,
326 const google_apis::ChangeListCallback& callback) {
327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328 DCHECK(!callback.is_null());
329
330 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
331 new_job->task = base::Bind(
332 &DriveServiceInterface::GetChangeList,
333 base::Unretained(drive_service_),
334 start_changestamp,
335 base::Bind(&JobScheduler::OnGetChangeListJobDone,
336 weak_ptr_factory_.GetWeakPtr(),
337 new_job->job_info.job_id,
338 callback));
339 new_job->abort_callback = CreateErrorRunCallback(callback);
340 StartJob(new_job);
341 }
342
GetRemainingChangeList(const GURL & next_link,const google_apis::ChangeListCallback & callback)343 void JobScheduler::GetRemainingChangeList(
344 const GURL& next_link,
345 const google_apis::ChangeListCallback& callback) {
346 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
347 DCHECK(!callback.is_null());
348
349 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
350 new_job->task = base::Bind(
351 &DriveServiceInterface::GetRemainingChangeList,
352 base::Unretained(drive_service_),
353 next_link,
354 base::Bind(&JobScheduler::OnGetChangeListJobDone,
355 weak_ptr_factory_.GetWeakPtr(),
356 new_job->job_info.job_id,
357 callback));
358 new_job->abort_callback = CreateErrorRunCallback(callback);
359 StartJob(new_job);
360 }
361
GetRemainingFileList(const GURL & next_link,const google_apis::FileListCallback & callback)362 void JobScheduler::GetRemainingFileList(
363 const GURL& next_link,
364 const google_apis::FileListCallback& callback) {
365 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
366 DCHECK(!callback.is_null());
367
368 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
369 new_job->task = base::Bind(
370 &DriveServiceInterface::GetRemainingFileList,
371 base::Unretained(drive_service_),
372 next_link,
373 base::Bind(&JobScheduler::OnGetFileListJobDone,
374 weak_ptr_factory_.GetWeakPtr(),
375 new_job->job_info.job_id,
376 callback));
377 new_job->abort_callback = CreateErrorRunCallback(callback);
378 StartJob(new_job);
379 }
380
GetFileResource(const std::string & resource_id,const ClientContext & context,const google_apis::FileResourceCallback & callback)381 void JobScheduler::GetFileResource(
382 const std::string& resource_id,
383 const ClientContext& context,
384 const google_apis::FileResourceCallback& callback) {
385 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
386 DCHECK(!callback.is_null());
387
388 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
389 new_job->context = context;
390 new_job->task = base::Bind(
391 &DriveServiceInterface::GetFileResource,
392 base::Unretained(drive_service_),
393 resource_id,
394 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
395 weak_ptr_factory_.GetWeakPtr(),
396 new_job->job_info.job_id,
397 callback));
398 new_job->abort_callback = CreateErrorRunCallback(callback);
399 StartJob(new_job);
400 }
401
GetShareUrl(const std::string & resource_id,const GURL & embed_origin,const ClientContext & context,const google_apis::GetShareUrlCallback & callback)402 void JobScheduler::GetShareUrl(
403 const std::string& resource_id,
404 const GURL& embed_origin,
405 const ClientContext& context,
406 const google_apis::GetShareUrlCallback& callback) {
407 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
408 DCHECK(!callback.is_null());
409
410 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
411 new_job->context = context;
412 new_job->task = base::Bind(
413 &DriveServiceInterface::GetShareUrl,
414 base::Unretained(drive_service_),
415 resource_id,
416 embed_origin,
417 base::Bind(&JobScheduler::OnGetShareUrlJobDone,
418 weak_ptr_factory_.GetWeakPtr(),
419 new_job->job_info.job_id,
420 callback));
421 new_job->abort_callback = CreateErrorRunCallback(callback);
422 StartJob(new_job);
423 }
424
TrashResource(const std::string & resource_id,const ClientContext & context,const google_apis::EntryActionCallback & callback)425 void JobScheduler::TrashResource(
426 const std::string& resource_id,
427 const ClientContext& context,
428 const google_apis::EntryActionCallback& callback) {
429 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
430 DCHECK(!callback.is_null());
431
432 JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
433 new_job->context = context;
434 new_job->task = base::Bind(
435 &DriveServiceInterface::TrashResource,
436 base::Unretained(drive_service_),
437 resource_id,
438 base::Bind(&JobScheduler::OnEntryActionJobDone,
439 weak_ptr_factory_.GetWeakPtr(),
440 new_job->job_info.job_id,
441 callback));
442 new_job->abort_callback = callback;
443 StartJob(new_job);
444 }
445
CopyResource(const std::string & resource_id,const std::string & parent_resource_id,const std::string & new_title,const base::Time & last_modified,const google_apis::FileResourceCallback & callback)446 void JobScheduler::CopyResource(
447 const std::string& resource_id,
448 const std::string& parent_resource_id,
449 const std::string& new_title,
450 const base::Time& last_modified,
451 const google_apis::FileResourceCallback& callback) {
452 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
453 DCHECK(!callback.is_null());
454
455 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
456 new_job->task = base::Bind(
457 &DriveServiceInterface::CopyResource,
458 base::Unretained(drive_service_),
459 resource_id,
460 parent_resource_id,
461 new_title,
462 last_modified,
463 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
464 weak_ptr_factory_.GetWeakPtr(),
465 new_job->job_info.job_id,
466 callback));
467 new_job->abort_callback = CreateErrorRunCallback(callback);
468 StartJob(new_job);
469 }
470
UpdateResource(const std::string & resource_id,const std::string & parent_resource_id,const std::string & new_title,const base::Time & last_modified,const base::Time & last_viewed_by_me,const ClientContext & context,const google_apis::FileResourceCallback & callback)471 void JobScheduler::UpdateResource(
472 const std::string& resource_id,
473 const std::string& parent_resource_id,
474 const std::string& new_title,
475 const base::Time& last_modified,
476 const base::Time& last_viewed_by_me,
477 const ClientContext& context,
478 const google_apis::FileResourceCallback& callback) {
479 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
480 DCHECK(!callback.is_null());
481
482 JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
483 new_job->context = context;
484 new_job->task = base::Bind(
485 &DriveServiceInterface::UpdateResource,
486 base::Unretained(drive_service_),
487 resource_id,
488 parent_resource_id,
489 new_title,
490 last_modified,
491 last_viewed_by_me,
492 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
493 weak_ptr_factory_.GetWeakPtr(),
494 new_job->job_info.job_id,
495 callback));
496 new_job->abort_callback = CreateErrorRunCallback(callback);
497 StartJob(new_job);
498 }
499
RenameResource(const std::string & resource_id,const std::string & new_title,const google_apis::EntryActionCallback & callback)500 void JobScheduler::RenameResource(
501 const std::string& resource_id,
502 const std::string& new_title,
503 const google_apis::EntryActionCallback& callback) {
504 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
505 DCHECK(!callback.is_null());
506
507 JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
508 new_job->task = base::Bind(
509 &DriveServiceInterface::RenameResource,
510 base::Unretained(drive_service_),
511 resource_id,
512 new_title,
513 base::Bind(&JobScheduler::OnEntryActionJobDone,
514 weak_ptr_factory_.GetWeakPtr(),
515 new_job->job_info.job_id,
516 callback));
517 new_job->abort_callback = callback;
518 StartJob(new_job);
519 }
520
AddResourceToDirectory(const std::string & parent_resource_id,const std::string & resource_id,const google_apis::EntryActionCallback & callback)521 void JobScheduler::AddResourceToDirectory(
522 const std::string& parent_resource_id,
523 const std::string& resource_id,
524 const google_apis::EntryActionCallback& callback) {
525 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
526 DCHECK(!callback.is_null());
527
528 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
529 new_job->task = base::Bind(
530 &DriveServiceInterface::AddResourceToDirectory,
531 base::Unretained(drive_service_),
532 parent_resource_id,
533 resource_id,
534 base::Bind(&JobScheduler::OnEntryActionJobDone,
535 weak_ptr_factory_.GetWeakPtr(),
536 new_job->job_info.job_id,
537 callback));
538 new_job->abort_callback = callback;
539 StartJob(new_job);
540 }
541
RemoveResourceFromDirectory(const std::string & parent_resource_id,const std::string & resource_id,const ClientContext & context,const google_apis::EntryActionCallback & callback)542 void JobScheduler::RemoveResourceFromDirectory(
543 const std::string& parent_resource_id,
544 const std::string& resource_id,
545 const ClientContext& context,
546 const google_apis::EntryActionCallback& callback) {
547 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
548
549 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
550 new_job->context = context;
551 new_job->task = base::Bind(
552 &DriveServiceInterface::RemoveResourceFromDirectory,
553 base::Unretained(drive_service_),
554 parent_resource_id,
555 resource_id,
556 base::Bind(&JobScheduler::OnEntryActionJobDone,
557 weak_ptr_factory_.GetWeakPtr(),
558 new_job->job_info.job_id,
559 callback));
560 new_job->abort_callback = callback;
561 StartJob(new_job);
562 }
563
AddNewDirectory(const std::string & parent_resource_id,const std::string & directory_title,const DriveServiceInterface::AddNewDirectoryOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)564 void JobScheduler::AddNewDirectory(
565 const std::string& parent_resource_id,
566 const std::string& directory_title,
567 const DriveServiceInterface::AddNewDirectoryOptions& options,
568 const ClientContext& context,
569 const google_apis::FileResourceCallback& callback) {
570 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
571
572 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
573 new_job->context = context;
574 new_job->task = base::Bind(
575 &DriveServiceInterface::AddNewDirectory,
576 base::Unretained(drive_service_),
577 parent_resource_id,
578 directory_title,
579 options,
580 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
581 weak_ptr_factory_.GetWeakPtr(),
582 new_job->job_info.job_id,
583 callback));
584 new_job->abort_callback = CreateErrorRunCallback(callback);
585 StartJob(new_job);
586 }
587
DownloadFile(const base::FilePath & virtual_path,int64 expected_file_size,const base::FilePath & local_cache_path,const std::string & resource_id,const ClientContext & context,const google_apis::DownloadActionCallback & download_action_callback,const google_apis::GetContentCallback & get_content_callback)588 JobID JobScheduler::DownloadFile(
589 const base::FilePath& virtual_path,
590 int64 expected_file_size,
591 const base::FilePath& local_cache_path,
592 const std::string& resource_id,
593 const ClientContext& context,
594 const google_apis::DownloadActionCallback& download_action_callback,
595 const google_apis::GetContentCallback& get_content_callback) {
596 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
597
598 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
599 new_job->job_info.file_path = virtual_path;
600 new_job->job_info.num_total_bytes = expected_file_size;
601 new_job->context = context;
602 new_job->task = base::Bind(
603 &DriveServiceInterface::DownloadFile,
604 base::Unretained(drive_service_),
605 local_cache_path,
606 resource_id,
607 base::Bind(&JobScheduler::OnDownloadActionJobDone,
608 weak_ptr_factory_.GetWeakPtr(),
609 new_job->job_info.job_id,
610 download_action_callback),
611 get_content_callback,
612 base::Bind(&JobScheduler::UpdateProgress,
613 weak_ptr_factory_.GetWeakPtr(),
614 new_job->job_info.job_id));
615 new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
616 StartJob(new_job);
617 return new_job->job_info.job_id;
618 }
619
UploadNewFile(const std::string & parent_resource_id,const base::FilePath & drive_file_path,const base::FilePath & local_file_path,const std::string & title,const std::string & content_type,const DriveUploader::UploadNewFileOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)620 void JobScheduler::UploadNewFile(
621 const std::string& parent_resource_id,
622 const base::FilePath& drive_file_path,
623 const base::FilePath& local_file_path,
624 const std::string& title,
625 const std::string& content_type,
626 const DriveUploader::UploadNewFileOptions& options,
627 const ClientContext& context,
628 const google_apis::FileResourceCallback& callback) {
629 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
630
631 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
632 new_job->job_info.file_path = drive_file_path;
633 new_job->context = context;
634
635 UploadNewFileParams params;
636 params.parent_resource_id = parent_resource_id;
637 params.local_file_path = local_file_path;
638 params.title = title;
639 params.content_type = content_type;
640 params.options = options;
641
642 ResumeUploadParams resume_params;
643 resume_params.local_file_path = params.local_file_path;
644 resume_params.content_type = params.content_type;
645
646 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
647 weak_ptr_factory_.GetWeakPtr(),
648 new_job->job_info.job_id,
649 resume_params,
650 callback);
651 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
652 weak_ptr_factory_.GetWeakPtr(),
653 new_job->job_info.job_id);
654 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
655 new_job->abort_callback = CreateErrorRunCallback(callback);
656 StartJob(new_job);
657 }
658
UploadExistingFile(const std::string & resource_id,const base::FilePath & drive_file_path,const base::FilePath & local_file_path,const std::string & content_type,const DriveUploader::UploadExistingFileOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)659 void JobScheduler::UploadExistingFile(
660 const std::string& resource_id,
661 const base::FilePath& drive_file_path,
662 const base::FilePath& local_file_path,
663 const std::string& content_type,
664 const DriveUploader::UploadExistingFileOptions& options,
665 const ClientContext& context,
666 const google_apis::FileResourceCallback& callback) {
667 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
668
669 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
670 new_job->job_info.file_path = drive_file_path;
671 new_job->context = context;
672
673 UploadExistingFileParams params;
674 params.resource_id = resource_id;
675 params.local_file_path = local_file_path;
676 params.content_type = content_type;
677 params.options = options;
678
679 ResumeUploadParams resume_params;
680 resume_params.local_file_path = params.local_file_path;
681 resume_params.content_type = params.content_type;
682
683 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
684 weak_ptr_factory_.GetWeakPtr(),
685 new_job->job_info.job_id,
686 resume_params,
687 callback);
688 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
689 weak_ptr_factory_.GetWeakPtr(),
690 new_job->job_info.job_id);
691 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
692 new_job->abort_callback = CreateErrorRunCallback(callback);
693 StartJob(new_job);
694 }
695
AddPermission(const std::string & resource_id,const std::string & email,google_apis::drive::PermissionRole role,const google_apis::EntryActionCallback & callback)696 void JobScheduler::AddPermission(
697 const std::string& resource_id,
698 const std::string& email,
699 google_apis::drive::PermissionRole role,
700 const google_apis::EntryActionCallback& callback) {
701 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
702 DCHECK(!callback.is_null());
703
704 JobEntry* new_job = CreateNewJob(TYPE_ADD_PERMISSION);
705 new_job->task = base::Bind(&DriveServiceInterface::AddPermission,
706 base::Unretained(drive_service_),
707 resource_id,
708 email,
709 role,
710 base::Bind(&JobScheduler::OnEntryActionJobDone,
711 weak_ptr_factory_.GetWeakPtr(),
712 new_job->job_info.job_id,
713 callback));
714 new_job->abort_callback = callback;
715 StartJob(new_job);
716 }
717
CreateNewJob(JobType type)718 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
719 JobEntry* job = new JobEntry(type);
720 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|.
721 return job;
722 }
723
StartJob(JobEntry * job)724 void JobScheduler::StartJob(JobEntry* job) {
725 DCHECK(!job->task.is_null());
726
727 QueueJob(job->job_info.job_id);
728 NotifyJobAdded(job->job_info);
729 DoJobLoop(GetJobQueueType(job->job_info.job_type));
730 }
731
QueueJob(JobID job_id)732 void JobScheduler::QueueJob(JobID job_id) {
733 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
734
735 JobEntry* job_entry = job_map_.Lookup(job_id);
736 DCHECK(job_entry);
737 const JobInfo& job_info = job_entry->job_info;
738
739 QueueType queue_type = GetJobQueueType(job_info.job_type);
740 queue_[queue_type]->Push(job_id, job_entry->context.type);
741
742 const std::string retry_prefix = job_entry->retry_count > 0 ?
743 base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
744 logger_->Log(logging::LOG_INFO,
745 "Job queued%s: %s - %s",
746 retry_prefix.c_str(),
747 job_info.ToString().c_str(),
748 GetQueueInfo(queue_type).c_str());
749 }
750
DoJobLoop(QueueType queue_type)751 void JobScheduler::DoJobLoop(QueueType queue_type) {
752 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
753
754 const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
755
756 // Abort all USER_INITAITED jobs when not accepted.
757 if (accepted_priority < USER_INITIATED) {
758 std::vector<JobID> jobs;
759 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
760 for (size_t i = 0; i < jobs.size(); ++i) {
761 JobEntry* job = job_map_.Lookup(jobs[i]);
762 DCHECK(job);
763 AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
764 }
765 }
766
767 // Wait when throttled.
768 const base::Time now = base::Time::Now();
769 if (now < wait_until_) {
770 base::MessageLoopProxy::current()->PostDelayedTask(
771 FROM_HERE,
772 base::Bind(&JobScheduler::DoJobLoop,
773 weak_ptr_factory_.GetWeakPtr(),
774 queue_type),
775 wait_until_ - now);
776 return;
777 }
778
779 // Run the job with the highest priority in the queue.
780 JobID job_id = -1;
781 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
782 return;
783
784 JobEntry* entry = job_map_.Lookup(job_id);
785 DCHECK(entry);
786
787 JobInfo* job_info = &entry->job_info;
788 job_info->state = STATE_RUNNING;
789 job_info->start_time = now;
790 NotifyJobUpdated(*job_info);
791
792 entry->cancel_callback = entry->task.Run();
793
794 UpdateWait();
795
796 logger_->Log(logging::LOG_INFO,
797 "Job started: %s - %s",
798 job_info->ToString().c_str(),
799 GetQueueInfo(queue_type).c_str());
800 }
801
GetCurrentAcceptedPriority(QueueType queue_type)802 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
803 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
804
805 const int kNoJobShouldRun = -1;
806
807 // Should stop if Drive was disabled while running the fetch loop.
808 if (pref_service_->GetBoolean(prefs::kDisableDrive))
809 return kNoJobShouldRun;
810
811 // Should stop if the network is not online.
812 if (net::NetworkChangeNotifier::IsOffline())
813 return kNoJobShouldRun;
814
815 // For the file queue, if it is on cellular network, only user initiated
816 // operations are allowed to start.
817 if (queue_type == FILE_QUEUE &&
818 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
819 net::NetworkChangeNotifier::IsConnectionCellular(
820 net::NetworkChangeNotifier::GetConnectionType()))
821 return USER_INITIATED;
822
823 // Otherwise, every operations including background tasks are allowed.
824 return BACKGROUND;
825 }
826
UpdateWait()827 void JobScheduler::UpdateWait() {
828 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
829
830 if (disable_throttling_ || throttle_count_ == 0)
831 return;
832
833 // Exponential backoff: https://developers.google.com/drive/handle-errors.
834 base::TimeDelta delay =
835 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
836 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
837 VLOG(1) << "Throttling for " << delay.InMillisecondsF();
838
839 wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
840 }
841
OnJobDone(JobID job_id,google_apis::GDataErrorCode error)842 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
843 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
844
845 JobEntry* job_entry = job_map_.Lookup(job_id);
846 DCHECK(job_entry);
847 JobInfo* job_info = &job_entry->job_info;
848 QueueType queue_type = GetJobQueueType(job_info->job_type);
849 queue_[queue_type]->MarkFinished(job_id);
850
851 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
852 bool success = (GDataToFileError(error) == FILE_ERROR_OK);
853 logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
854 "Job done: %s => %s (elapsed time: %sms) - %s",
855 job_info->ToString().c_str(),
856 GDataErrorCodeToString(error).c_str(),
857 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
858 GetQueueInfo(queue_type).c_str());
859
860 // Retry, depending on the error.
861 const bool is_server_error =
862 error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
863 error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
864 if (is_server_error) {
865 if (throttle_count_ < kMaxThrottleCount)
866 ++throttle_count_;
867 UpdateWait();
868 } else {
869 throttle_count_ = 0;
870 }
871
872 const bool should_retry =
873 is_server_error && job_entry->retry_count < kMaxRetryCount;
874 if (should_retry) {
875 job_entry->cancel_callback.Reset();
876 job_info->state = STATE_RETRY;
877 NotifyJobUpdated(*job_info);
878
879 ++job_entry->retry_count;
880
881 // Requeue the job.
882 QueueJob(job_id);
883 } else {
884 NotifyJobDone(*job_info, error);
885 // The job has finished, no retry will happen in the scheduler. Now we can
886 // remove the job info from the map.
887 job_map_.Remove(job_id);
888 }
889
890 // Post a task to continue the job loop. This allows us to finish handling
891 // the current job before starting the next one.
892 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
893 base::Bind(&JobScheduler::DoJobLoop,
894 weak_ptr_factory_.GetWeakPtr(),
895 queue_type));
896 return !should_retry;
897 }
898
OnGetFileListJobDone(JobID job_id,const google_apis::FileListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::FileList> file_list)899 void JobScheduler::OnGetFileListJobDone(
900 JobID job_id,
901 const google_apis::FileListCallback& callback,
902 google_apis::GDataErrorCode error,
903 scoped_ptr<google_apis::FileList> file_list) {
904 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
905 DCHECK(!callback.is_null());
906
907 if (OnJobDone(job_id, error))
908 callback.Run(error, file_list.Pass());
909 }
910
OnGetChangeListJobDone(JobID job_id,const google_apis::ChangeListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::ChangeList> change_list)911 void JobScheduler::OnGetChangeListJobDone(
912 JobID job_id,
913 const google_apis::ChangeListCallback& callback,
914 google_apis::GDataErrorCode error,
915 scoped_ptr<google_apis::ChangeList> change_list) {
916 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
917 DCHECK(!callback.is_null());
918
919 if (OnJobDone(job_id, error))
920 callback.Run(error, change_list.Pass());
921 }
922
OnGetFileResourceJobDone(JobID job_id,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::FileResource> entry)923 void JobScheduler::OnGetFileResourceJobDone(
924 JobID job_id,
925 const google_apis::FileResourceCallback& callback,
926 google_apis::GDataErrorCode error,
927 scoped_ptr<google_apis::FileResource> entry) {
928 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
929 DCHECK(!callback.is_null());
930
931 if (OnJobDone(job_id, error))
932 callback.Run(error, entry.Pass());
933 }
934
OnGetAboutResourceJobDone(JobID job_id,const google_apis::AboutResourceCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::AboutResource> about_resource)935 void JobScheduler::OnGetAboutResourceJobDone(
936 JobID job_id,
937 const google_apis::AboutResourceCallback& callback,
938 google_apis::GDataErrorCode error,
939 scoped_ptr<google_apis::AboutResource> about_resource) {
940 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
941 DCHECK(!callback.is_null());
942
943 if (OnJobDone(job_id, error))
944 callback.Run(error, about_resource.Pass());
945 }
946
OnGetShareUrlJobDone(JobID job_id,const google_apis::GetShareUrlCallback & callback,google_apis::GDataErrorCode error,const GURL & share_url)947 void JobScheduler::OnGetShareUrlJobDone(
948 JobID job_id,
949 const google_apis::GetShareUrlCallback& callback,
950 google_apis::GDataErrorCode error,
951 const GURL& share_url) {
952 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
953 DCHECK(!callback.is_null());
954
955 if (OnJobDone(job_id, error))
956 callback.Run(error, share_url);
957 }
958
OnGetAppListJobDone(JobID job_id,const google_apis::AppListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::AppList> app_list)959 void JobScheduler::OnGetAppListJobDone(
960 JobID job_id,
961 const google_apis::AppListCallback& callback,
962 google_apis::GDataErrorCode error,
963 scoped_ptr<google_apis::AppList> app_list) {
964 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
965 DCHECK(!callback.is_null());
966
967 if (OnJobDone(job_id, error))
968 callback.Run(error, app_list.Pass());
969 }
970
OnEntryActionJobDone(JobID job_id,const google_apis::EntryActionCallback & callback,google_apis::GDataErrorCode error)971 void JobScheduler::OnEntryActionJobDone(
972 JobID job_id,
973 const google_apis::EntryActionCallback& callback,
974 google_apis::GDataErrorCode error) {
975 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
976 DCHECK(!callback.is_null());
977
978 if (OnJobDone(job_id, error))
979 callback.Run(error);
980 }
981
OnDownloadActionJobDone(JobID job_id,const google_apis::DownloadActionCallback & callback,google_apis::GDataErrorCode error,const base::FilePath & temp_file)982 void JobScheduler::OnDownloadActionJobDone(
983 JobID job_id,
984 const google_apis::DownloadActionCallback& callback,
985 google_apis::GDataErrorCode error,
986 const base::FilePath& temp_file) {
987 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
988 DCHECK(!callback.is_null());
989
990 if (OnJobDone(job_id, error))
991 callback.Run(error, temp_file);
992 }
993
OnUploadCompletionJobDone(JobID job_id,const ResumeUploadParams & resume_params,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,const GURL & upload_location,scoped_ptr<google_apis::FileResource> entry)994 void JobScheduler::OnUploadCompletionJobDone(
995 JobID job_id,
996 const ResumeUploadParams& resume_params,
997 const google_apis::FileResourceCallback& callback,
998 google_apis::GDataErrorCode error,
999 const GURL& upload_location,
1000 scoped_ptr<google_apis::FileResource> entry) {
1001 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1002 DCHECK(!callback.is_null());
1003
1004 if (!upload_location.is_empty()) {
1005 // If upload_location is available, update the task to resume the
1006 // upload process from the terminated point.
1007 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1008 // so OnJobDone called below will be in charge to re-queue the job.
1009 JobEntry* job_entry = job_map_.Lookup(job_id);
1010 DCHECK(job_entry);
1011
1012 ResumeUploadFileParams params;
1013 params.upload_location = upload_location;
1014 params.local_file_path = resume_params.local_file_path;
1015 params.content_type = resume_params.content_type;
1016 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1017 weak_ptr_factory_.GetWeakPtr(),
1018 job_id,
1019 job_entry->task,
1020 callback);
1021 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1022 weak_ptr_factory_.GetWeakPtr(),
1023 job_id);
1024 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1025 }
1026
1027 if (OnJobDone(job_id, error))
1028 callback.Run(error, entry.Pass());
1029 }
1030
OnResumeUploadFileDone(JobID job_id,const base::Callback<google_apis::CancelCallback ()> & original_task,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,const GURL & upload_location,scoped_ptr<google_apis::FileResource> entry)1031 void JobScheduler::OnResumeUploadFileDone(
1032 JobID job_id,
1033 const base::Callback<google_apis::CancelCallback()>& original_task,
1034 const google_apis::FileResourceCallback& callback,
1035 google_apis::GDataErrorCode error,
1036 const GURL& upload_location,
1037 scoped_ptr<google_apis::FileResource> entry) {
1038 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1039 DCHECK(!original_task.is_null());
1040 DCHECK(!callback.is_null());
1041
1042 if (upload_location.is_empty()) {
1043 // If upload_location is not available, we should discard it and stop trying
1044 // to resume. Restore the original task.
1045 JobEntry* job_entry = job_map_.Lookup(job_id);
1046 DCHECK(job_entry);
1047 job_entry->task = original_task;
1048 }
1049
1050 if (OnJobDone(job_id, error))
1051 callback.Run(error, entry.Pass());
1052 }
1053
UpdateProgress(JobID job_id,int64 progress,int64 total)1054 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1055 JobEntry* job_entry = job_map_.Lookup(job_id);
1056 DCHECK(job_entry);
1057
1058 job_entry->job_info.num_completed_bytes = progress;
1059 if (total != -1)
1060 job_entry->job_info.num_total_bytes = total;
1061 NotifyJobUpdated(job_entry->job_info);
1062 }
1063
OnConnectionTypeChanged(net::NetworkChangeNotifier::ConnectionType type)1064 void JobScheduler::OnConnectionTypeChanged(
1065 net::NetworkChangeNotifier::ConnectionType type) {
1066 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1067
1068 // Resume the job loop.
1069 // Note that we don't need to check the network connection status as it will
1070 // be checked in GetCurrentAcceptedPriority().
1071 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1072 DoJobLoop(static_cast<QueueType>(i));
1073 }
1074
GetJobQueueType(JobType type)1075 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1076 switch (type) {
1077 case TYPE_GET_ABOUT_RESOURCE:
1078 case TYPE_GET_APP_LIST:
1079 case TYPE_GET_ALL_RESOURCE_LIST:
1080 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1081 case TYPE_SEARCH:
1082 case TYPE_GET_CHANGE_LIST:
1083 case TYPE_GET_REMAINING_CHANGE_LIST:
1084 case TYPE_GET_REMAINING_FILE_LIST:
1085 case TYPE_GET_RESOURCE_ENTRY:
1086 case TYPE_GET_SHARE_URL:
1087 case TYPE_TRASH_RESOURCE:
1088 case TYPE_COPY_RESOURCE:
1089 case TYPE_UPDATE_RESOURCE:
1090 case TYPE_RENAME_RESOURCE:
1091 case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1092 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1093 case TYPE_ADD_NEW_DIRECTORY:
1094 case TYPE_CREATE_FILE:
1095 case TYPE_ADD_PERMISSION:
1096 return METADATA_QUEUE;
1097
1098 case TYPE_DOWNLOAD_FILE:
1099 case TYPE_UPLOAD_NEW_FILE:
1100 case TYPE_UPLOAD_EXISTING_FILE:
1101 return FILE_QUEUE;
1102 }
1103 NOTREACHED();
1104 return FILE_QUEUE;
1105 }
1106
AbortNotRunningJob(JobEntry * job,google_apis::GDataErrorCode error)1107 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1108 google_apis::GDataErrorCode error) {
1109 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1110
1111 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1112 const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1113 logger_->Log(logging::LOG_INFO,
1114 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1115 job->job_info.ToString().c_str(),
1116 GDataErrorCodeToString(error).c_str(),
1117 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1118 GetQueueInfo(queue_type).c_str());
1119
1120 base::Callback<void(google_apis::GDataErrorCode)> callback =
1121 job->abort_callback;
1122 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1123 NotifyJobDone(job->job_info, error);
1124 job_map_.Remove(job->job_info.job_id);
1125 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1126 base::Bind(callback, error));
1127 }
1128
NotifyJobAdded(const JobInfo & job_info)1129 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1130 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1131 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1132 }
1133
NotifyJobDone(const JobInfo & job_info,google_apis::GDataErrorCode error)1134 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1135 google_apis::GDataErrorCode error) {
1136 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1137 FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1138 OnJobDone(job_info, GDataToFileError(error)));
1139 }
1140
NotifyJobUpdated(const JobInfo & job_info)1141 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1142 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1143 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1144 }
1145
GetQueueInfo(QueueType type) const1146 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1147 return QueueTypeToString(type) + " " + queue_[type]->ToString();
1148 }
1149
1150 // static
QueueTypeToString(QueueType type)1151 std::string JobScheduler::QueueTypeToString(QueueType type) {
1152 switch (type) {
1153 case METADATA_QUEUE:
1154 return "METADATA_QUEUE";
1155 case FILE_QUEUE:
1156 return "FILE_QUEUE";
1157 case NUM_QUEUES:
1158 break; // This value is just a sentinel. Should never be used.
1159 }
1160 NOTREACHED();
1161 return "";
1162 }
1163
1164 } // namespace drive
1165