1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
6
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/drive/event_logger.h"
13 #include "chrome/common/pref_names.h"
14 #include "content/public/browser/browser_thread.h"
15 #include "google_apis/drive/drive_api_parser.h"
16
17 using content::BrowserThread;
18
19 namespace drive {
20
21 namespace {
22
23 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
24 // throttling or server error. The delay before retrying a job is shared among
25 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
26 //
27 // According to the API documentation, kMaxRetryCount should be the same as
28 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
29 // But currently multiplied by 2 to ensure upload related jobs retried for a
30 // sufficient number of times. crbug.com/269918
31 const int kMaxThrottleCount = 4;
32 const int kMaxRetryCount = 2 * kMaxThrottleCount;
33
34 // GetDefaultValue returns a value constructed by the default constructor.
35 template<typename T> struct DefaultValueCreator {
GetDefaultValuedrive::__anon7ec4d4070111::DefaultValueCreator36 static T GetDefaultValue() { return T(); }
37 };
38 template<typename T> struct DefaultValueCreator<const T&> {
GetDefaultValuedrive::__anon7ec4d4070111::DefaultValueCreator39 static T GetDefaultValue() { return T(); }
40 };
41
42 // Helper of CreateErrorRunCallback implementation.
43 // Provides:
44 // - ResultType; the type of the Callback which should be returned by
45 // CreateErrorRunCallback.
46 // - Run(): a static function which takes the original |callback| and |error|,
47 // and runs the |callback|.Run() with the error code and default values
48 // for remaining arguments.
49 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
50
51 // CreateErrorRunCallback with two arguments.
52 template<typename P1>
53 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
Rundrive::__anon7ec4d4070111::CreateErrorRunCallbackHelper54 static void Run(
55 const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
56 google_apis::GDataErrorCode error) {
57 callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
58 }
59 };
60
61 // Returns a callback with the tail parameter bound to its default value.
62 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
63 template<typename CallbackType>
64 base::Callback<void(google_apis::GDataErrorCode)>
CreateErrorRunCallback(const base::Callback<CallbackType> & callback)65 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
66 return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
67 }
68
69 // Parameter struct for RunUploadNewFile.
70 struct UploadNewFileParams {
71 std::string parent_resource_id;
72 base::FilePath local_file_path;
73 std::string title;
74 std::string content_type;
75 DriveUploader::UploadNewFileOptions options;
76 UploadCompletionCallback callback;
77 google_apis::ProgressCallback progress_callback;
78 };
79
80 // Helper function to work around the arity limitation of base::Bind.
RunUploadNewFile(DriveUploaderInterface * uploader,const UploadNewFileParams & params)81 google_apis::CancelCallback RunUploadNewFile(
82 DriveUploaderInterface* uploader,
83 const UploadNewFileParams& params) {
84 return uploader->UploadNewFile(params.parent_resource_id,
85 params.local_file_path,
86 params.title,
87 params.content_type,
88 params.options,
89 params.callback,
90 params.progress_callback);
91 }
92
93 // Parameter struct for RunUploadExistingFile.
94 struct UploadExistingFileParams {
95 std::string resource_id;
96 base::FilePath local_file_path;
97 std::string content_type;
98 DriveUploader::UploadExistingFileOptions options;
99 std::string etag;
100 UploadCompletionCallback callback;
101 google_apis::ProgressCallback progress_callback;
102 };
103
104 // Helper function to work around the arity limitation of base::Bind.
RunUploadExistingFile(DriveUploaderInterface * uploader,const UploadExistingFileParams & params)105 google_apis::CancelCallback RunUploadExistingFile(
106 DriveUploaderInterface* uploader,
107 const UploadExistingFileParams& params) {
108 return uploader->UploadExistingFile(params.resource_id,
109 params.local_file_path,
110 params.content_type,
111 params.options,
112 params.callback,
113 params.progress_callback);
114 }
115
116 // Parameter struct for RunResumeUploadFile.
117 struct ResumeUploadFileParams {
118 GURL upload_location;
119 base::FilePath local_file_path;
120 std::string content_type;
121 UploadCompletionCallback callback;
122 google_apis::ProgressCallback progress_callback;
123 };
124
125 // Helper function to adjust the return type.
RunResumeUploadFile(DriveUploaderInterface * uploader,const ResumeUploadFileParams & params)126 google_apis::CancelCallback RunResumeUploadFile(
127 DriveUploaderInterface* uploader,
128 const ResumeUploadFileParams& params) {
129 return uploader->ResumeUploadFile(params.upload_location,
130 params.local_file_path,
131 params.content_type,
132 params.callback,
133 params.progress_callback);
134 }
135
136 } // namespace
137
138 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
139 const int JobScheduler::kMaxJobCount[] = {
140 5, // METADATA_QUEUE
141 1, // FILE_QUEUE
142 };
143
JobEntry(JobType type)144 JobScheduler::JobEntry::JobEntry(JobType type)
145 : job_info(type),
146 context(ClientContext(USER_INITIATED)),
147 retry_count(0) {
148 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
149 }
150
~JobEntry()151 JobScheduler::JobEntry::~JobEntry() {
152 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
153 }
154
155 struct JobScheduler::ResumeUploadParams {
156 base::FilePath drive_file_path;
157 base::FilePath local_file_path;
158 std::string content_type;
159 };
160
JobScheduler(PrefService * pref_service,EventLogger * logger,DriveServiceInterface * drive_service,base::SequencedTaskRunner * blocking_task_runner)161 JobScheduler::JobScheduler(
162 PrefService* pref_service,
163 EventLogger* logger,
164 DriveServiceInterface* drive_service,
165 base::SequencedTaskRunner* blocking_task_runner)
166 : throttle_count_(0),
167 wait_until_(base::Time::Now()),
168 disable_throttling_(false),
169 logger_(logger),
170 drive_service_(drive_service),
171 uploader_(new DriveUploader(drive_service, blocking_task_runner)),
172 pref_service_(pref_service),
173 weak_ptr_factory_(this) {
174 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
175
176 for (int i = 0; i < NUM_QUEUES; ++i)
177 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
178
179 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
180 }
181
~JobScheduler()182 JobScheduler::~JobScheduler() {
183 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
184
185 size_t num_queued_jobs = 0;
186 for (int i = 0; i < NUM_QUEUES; ++i)
187 num_queued_jobs += queue_[i]->GetNumberOfJobs();
188 DCHECK_EQ(num_queued_jobs, job_map_.size());
189
190 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
191 }
192
GetJobInfoList()193 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
194 std::vector<JobInfo> job_info_list;
195 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
196 job_info_list.push_back(iter.GetCurrentValue()->job_info);
197 return job_info_list;
198 }
199
AddObserver(JobListObserver * observer)200 void JobScheduler::AddObserver(JobListObserver* observer) {
201 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
202 observer_list_.AddObserver(observer);
203 }
204
RemoveObserver(JobListObserver * observer)205 void JobScheduler::RemoveObserver(JobListObserver* observer) {
206 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
207 observer_list_.RemoveObserver(observer);
208 }
209
CancelJob(JobID job_id)210 void JobScheduler::CancelJob(JobID job_id) {
211 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
212
213 JobEntry* job = job_map_.Lookup(job_id);
214 if (job) {
215 if (job->job_info.state == STATE_RUNNING) {
216 // If the job is running an HTTP request, cancel it via |cancel_callback|
217 // returned from the request, and wait for termination in the normal
218 // callback handler, OnJobDone.
219 if (!job->cancel_callback.is_null())
220 job->cancel_callback.Run();
221 } else {
222 AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
223 }
224 }
225 }
226
CancelAllJobs()227 void JobScheduler::CancelAllJobs() {
228 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
229
230 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
231 // removable during iteration.
232 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
233 CancelJob(iter.GetCurrentKey());
234 }
235
GetAboutResource(const google_apis::AboutResourceCallback & callback)236 void JobScheduler::GetAboutResource(
237 const google_apis::AboutResourceCallback& callback) {
238 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
239 DCHECK(!callback.is_null());
240
241 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
242 new_job->task = base::Bind(
243 &DriveServiceInterface::GetAboutResource,
244 base::Unretained(drive_service_),
245 base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
246 weak_ptr_factory_.GetWeakPtr(),
247 new_job->job_info.job_id,
248 callback));
249 new_job->abort_callback = CreateErrorRunCallback(callback);
250 StartJob(new_job);
251 }
252
GetAppList(const google_apis::AppListCallback & callback)253 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
254 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
255 DCHECK(!callback.is_null());
256
257 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
258 new_job->task = base::Bind(
259 &DriveServiceInterface::GetAppList,
260 base::Unretained(drive_service_),
261 base::Bind(&JobScheduler::OnGetAppListJobDone,
262 weak_ptr_factory_.GetWeakPtr(),
263 new_job->job_info.job_id,
264 callback));
265 new_job->abort_callback = CreateErrorRunCallback(callback);
266 StartJob(new_job);
267 }
268
GetAllFileList(const google_apis::FileListCallback & callback)269 void JobScheduler::GetAllFileList(
270 const google_apis::FileListCallback& callback) {
271 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
272 DCHECK(!callback.is_null());
273
274 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
275 new_job->task = base::Bind(
276 &DriveServiceInterface::GetAllFileList,
277 base::Unretained(drive_service_),
278 base::Bind(&JobScheduler::OnGetFileListJobDone,
279 weak_ptr_factory_.GetWeakPtr(),
280 new_job->job_info.job_id,
281 callback));
282 new_job->abort_callback = CreateErrorRunCallback(callback);
283 StartJob(new_job);
284 }
285
GetFileListInDirectory(const std::string & directory_resource_id,const google_apis::FileListCallback & callback)286 void JobScheduler::GetFileListInDirectory(
287 const std::string& directory_resource_id,
288 const google_apis::FileListCallback& callback) {
289 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
290 DCHECK(!callback.is_null());
291
292 JobEntry* new_job = CreateNewJob(
293 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
294 new_job->task = base::Bind(
295 &DriveServiceInterface::GetFileListInDirectory,
296 base::Unretained(drive_service_),
297 directory_resource_id,
298 base::Bind(&JobScheduler::OnGetFileListJobDone,
299 weak_ptr_factory_.GetWeakPtr(),
300 new_job->job_info.job_id,
301 callback));
302 new_job->abort_callback = CreateErrorRunCallback(callback);
303 StartJob(new_job);
304 }
305
Search(const std::string & search_query,const google_apis::FileListCallback & callback)306 void JobScheduler::Search(const std::string& search_query,
307 const google_apis::FileListCallback& callback) {
308 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
309 DCHECK(!callback.is_null());
310
311 JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
312 new_job->task = base::Bind(
313 &DriveServiceInterface::Search,
314 base::Unretained(drive_service_),
315 search_query,
316 base::Bind(&JobScheduler::OnGetFileListJobDone,
317 weak_ptr_factory_.GetWeakPtr(),
318 new_job->job_info.job_id,
319 callback));
320 new_job->abort_callback = CreateErrorRunCallback(callback);
321 StartJob(new_job);
322 }
323
GetChangeList(int64 start_changestamp,const google_apis::ChangeListCallback & callback)324 void JobScheduler::GetChangeList(
325 int64 start_changestamp,
326 const google_apis::ChangeListCallback& callback) {
327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328 DCHECK(!callback.is_null());
329
330 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
331 new_job->task = base::Bind(
332 &DriveServiceInterface::GetChangeList,
333 base::Unretained(drive_service_),
334 start_changestamp,
335 base::Bind(&JobScheduler::OnGetChangeListJobDone,
336 weak_ptr_factory_.GetWeakPtr(),
337 new_job->job_info.job_id,
338 callback));
339 new_job->abort_callback = CreateErrorRunCallback(callback);
340 StartJob(new_job);
341 }
342
GetRemainingChangeList(const GURL & next_link,const google_apis::ChangeListCallback & callback)343 void JobScheduler::GetRemainingChangeList(
344 const GURL& next_link,
345 const google_apis::ChangeListCallback& callback) {
346 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
347 DCHECK(!callback.is_null());
348
349 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
350 new_job->task = base::Bind(
351 &DriveServiceInterface::GetRemainingChangeList,
352 base::Unretained(drive_service_),
353 next_link,
354 base::Bind(&JobScheduler::OnGetChangeListJobDone,
355 weak_ptr_factory_.GetWeakPtr(),
356 new_job->job_info.job_id,
357 callback));
358 new_job->abort_callback = CreateErrorRunCallback(callback);
359 StartJob(new_job);
360 }
361
GetRemainingFileList(const GURL & next_link,const google_apis::FileListCallback & callback)362 void JobScheduler::GetRemainingFileList(
363 const GURL& next_link,
364 const google_apis::FileListCallback& callback) {
365 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
366 DCHECK(!callback.is_null());
367
368 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
369 new_job->task = base::Bind(
370 &DriveServiceInterface::GetRemainingFileList,
371 base::Unretained(drive_service_),
372 next_link,
373 base::Bind(&JobScheduler::OnGetFileListJobDone,
374 weak_ptr_factory_.GetWeakPtr(),
375 new_job->job_info.job_id,
376 callback));
377 new_job->abort_callback = CreateErrorRunCallback(callback);
378 StartJob(new_job);
379 }
380
GetFileResource(const std::string & resource_id,const ClientContext & context,const google_apis::FileResourceCallback & callback)381 void JobScheduler::GetFileResource(
382 const std::string& resource_id,
383 const ClientContext& context,
384 const google_apis::FileResourceCallback& callback) {
385 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
386 DCHECK(!callback.is_null());
387
388 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
389 new_job->context = context;
390 new_job->task = base::Bind(
391 &DriveServiceInterface::GetFileResource,
392 base::Unretained(drive_service_),
393 resource_id,
394 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
395 weak_ptr_factory_.GetWeakPtr(),
396 new_job->job_info.job_id,
397 callback));
398 new_job->abort_callback = CreateErrorRunCallback(callback);
399 StartJob(new_job);
400 }
401
GetShareUrl(const std::string & resource_id,const GURL & embed_origin,const ClientContext & context,const google_apis::GetShareUrlCallback & callback)402 void JobScheduler::GetShareUrl(
403 const std::string& resource_id,
404 const GURL& embed_origin,
405 const ClientContext& context,
406 const google_apis::GetShareUrlCallback& callback) {
407 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
408 DCHECK(!callback.is_null());
409
410 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
411 new_job->context = context;
412 new_job->task = base::Bind(
413 &DriveServiceInterface::GetShareUrl,
414 base::Unretained(drive_service_),
415 resource_id,
416 embed_origin,
417 base::Bind(&JobScheduler::OnGetShareUrlJobDone,
418 weak_ptr_factory_.GetWeakPtr(),
419 new_job->job_info.job_id,
420 callback));
421 new_job->abort_callback = CreateErrorRunCallback(callback);
422 StartJob(new_job);
423 }
424
TrashResource(const std::string & resource_id,const ClientContext & context,const google_apis::EntryActionCallback & callback)425 void JobScheduler::TrashResource(
426 const std::string& resource_id,
427 const ClientContext& context,
428 const google_apis::EntryActionCallback& callback) {
429 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
430 DCHECK(!callback.is_null());
431
432 JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
433 new_job->context = context;
434 new_job->task = base::Bind(
435 &DriveServiceInterface::TrashResource,
436 base::Unretained(drive_service_),
437 resource_id,
438 base::Bind(&JobScheduler::OnEntryActionJobDone,
439 weak_ptr_factory_.GetWeakPtr(),
440 new_job->job_info.job_id,
441 callback));
442 new_job->abort_callback = callback;
443 StartJob(new_job);
444 }
445
CopyResource(const std::string & resource_id,const std::string & parent_resource_id,const std::string & new_title,const base::Time & last_modified,const google_apis::FileResourceCallback & callback)446 void JobScheduler::CopyResource(
447 const std::string& resource_id,
448 const std::string& parent_resource_id,
449 const std::string& new_title,
450 const base::Time& last_modified,
451 const google_apis::FileResourceCallback& callback) {
452 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
453 DCHECK(!callback.is_null());
454
455 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
456 new_job->task = base::Bind(
457 &DriveServiceInterface::CopyResource,
458 base::Unretained(drive_service_),
459 resource_id,
460 parent_resource_id,
461 new_title,
462 last_modified,
463 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
464 weak_ptr_factory_.GetWeakPtr(),
465 new_job->job_info.job_id,
466 callback));
467 new_job->abort_callback = CreateErrorRunCallback(callback);
468 StartJob(new_job);
469 }
470
UpdateResource(const std::string & resource_id,const std::string & parent_resource_id,const std::string & new_title,const base::Time & last_modified,const base::Time & last_viewed_by_me,const ClientContext & context,const google_apis::FileResourceCallback & callback)471 void JobScheduler::UpdateResource(
472 const std::string& resource_id,
473 const std::string& parent_resource_id,
474 const std::string& new_title,
475 const base::Time& last_modified,
476 const base::Time& last_viewed_by_me,
477 const ClientContext& context,
478 const google_apis::FileResourceCallback& callback) {
479 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
480 DCHECK(!callback.is_null());
481
482 JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
483 new_job->context = context;
484 new_job->task = base::Bind(
485 &DriveServiceInterface::UpdateResource,
486 base::Unretained(drive_service_),
487 resource_id,
488 parent_resource_id,
489 new_title,
490 last_modified,
491 last_viewed_by_me,
492 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
493 weak_ptr_factory_.GetWeakPtr(),
494 new_job->job_info.job_id,
495 callback));
496 new_job->abort_callback = CreateErrorRunCallback(callback);
497 StartJob(new_job);
498 }
499
AddResourceToDirectory(const std::string & parent_resource_id,const std::string & resource_id,const google_apis::EntryActionCallback & callback)500 void JobScheduler::AddResourceToDirectory(
501 const std::string& parent_resource_id,
502 const std::string& resource_id,
503 const google_apis::EntryActionCallback& callback) {
504 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
505 DCHECK(!callback.is_null());
506
507 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
508 new_job->task = base::Bind(
509 &DriveServiceInterface::AddResourceToDirectory,
510 base::Unretained(drive_service_),
511 parent_resource_id,
512 resource_id,
513 base::Bind(&JobScheduler::OnEntryActionJobDone,
514 weak_ptr_factory_.GetWeakPtr(),
515 new_job->job_info.job_id,
516 callback));
517 new_job->abort_callback = callback;
518 StartJob(new_job);
519 }
520
RemoveResourceFromDirectory(const std::string & parent_resource_id,const std::string & resource_id,const ClientContext & context,const google_apis::EntryActionCallback & callback)521 void JobScheduler::RemoveResourceFromDirectory(
522 const std::string& parent_resource_id,
523 const std::string& resource_id,
524 const ClientContext& context,
525 const google_apis::EntryActionCallback& callback) {
526 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
527
528 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
529 new_job->context = context;
530 new_job->task = base::Bind(
531 &DriveServiceInterface::RemoveResourceFromDirectory,
532 base::Unretained(drive_service_),
533 parent_resource_id,
534 resource_id,
535 base::Bind(&JobScheduler::OnEntryActionJobDone,
536 weak_ptr_factory_.GetWeakPtr(),
537 new_job->job_info.job_id,
538 callback));
539 new_job->abort_callback = callback;
540 StartJob(new_job);
541 }
542
AddNewDirectory(const std::string & parent_resource_id,const std::string & directory_title,const DriveServiceInterface::AddNewDirectoryOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)543 void JobScheduler::AddNewDirectory(
544 const std::string& parent_resource_id,
545 const std::string& directory_title,
546 const DriveServiceInterface::AddNewDirectoryOptions& options,
547 const ClientContext& context,
548 const google_apis::FileResourceCallback& callback) {
549 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
550
551 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
552 new_job->context = context;
553 new_job->task = base::Bind(
554 &DriveServiceInterface::AddNewDirectory,
555 base::Unretained(drive_service_),
556 parent_resource_id,
557 directory_title,
558 options,
559 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
560 weak_ptr_factory_.GetWeakPtr(),
561 new_job->job_info.job_id,
562 callback));
563 new_job->abort_callback = CreateErrorRunCallback(callback);
564 StartJob(new_job);
565 }
566
DownloadFile(const base::FilePath & virtual_path,int64 expected_file_size,const base::FilePath & local_cache_path,const std::string & resource_id,const ClientContext & context,const google_apis::DownloadActionCallback & download_action_callback,const google_apis::GetContentCallback & get_content_callback)567 JobID JobScheduler::DownloadFile(
568 const base::FilePath& virtual_path,
569 int64 expected_file_size,
570 const base::FilePath& local_cache_path,
571 const std::string& resource_id,
572 const ClientContext& context,
573 const google_apis::DownloadActionCallback& download_action_callback,
574 const google_apis::GetContentCallback& get_content_callback) {
575 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
576
577 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
578 new_job->job_info.file_path = virtual_path;
579 new_job->job_info.num_total_bytes = expected_file_size;
580 new_job->context = context;
581 new_job->task = base::Bind(
582 &DriveServiceInterface::DownloadFile,
583 base::Unretained(drive_service_),
584 local_cache_path,
585 resource_id,
586 base::Bind(&JobScheduler::OnDownloadActionJobDone,
587 weak_ptr_factory_.GetWeakPtr(),
588 new_job->job_info.job_id,
589 download_action_callback),
590 get_content_callback,
591 base::Bind(&JobScheduler::UpdateProgress,
592 weak_ptr_factory_.GetWeakPtr(),
593 new_job->job_info.job_id));
594 new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
595 StartJob(new_job);
596 return new_job->job_info.job_id;
597 }
598
UploadNewFile(const std::string & parent_resource_id,const base::FilePath & drive_file_path,const base::FilePath & local_file_path,const std::string & title,const std::string & content_type,const DriveUploader::UploadNewFileOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)599 void JobScheduler::UploadNewFile(
600 const std::string& parent_resource_id,
601 const base::FilePath& drive_file_path,
602 const base::FilePath& local_file_path,
603 const std::string& title,
604 const std::string& content_type,
605 const DriveUploader::UploadNewFileOptions& options,
606 const ClientContext& context,
607 const google_apis::FileResourceCallback& callback) {
608 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
609
610 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
611 new_job->job_info.file_path = drive_file_path;
612 new_job->context = context;
613
614 UploadNewFileParams params;
615 params.parent_resource_id = parent_resource_id;
616 params.local_file_path = local_file_path;
617 params.title = title;
618 params.content_type = content_type;
619 params.options = options;
620
621 ResumeUploadParams resume_params;
622 resume_params.local_file_path = params.local_file_path;
623 resume_params.content_type = params.content_type;
624
625 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
626 weak_ptr_factory_.GetWeakPtr(),
627 new_job->job_info.job_id,
628 resume_params,
629 callback);
630 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
631 weak_ptr_factory_.GetWeakPtr(),
632 new_job->job_info.job_id);
633 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
634 new_job->abort_callback = CreateErrorRunCallback(callback);
635 StartJob(new_job);
636 }
637
UploadExistingFile(const std::string & resource_id,const base::FilePath & drive_file_path,const base::FilePath & local_file_path,const std::string & content_type,const DriveUploader::UploadExistingFileOptions & options,const ClientContext & context,const google_apis::FileResourceCallback & callback)638 void JobScheduler::UploadExistingFile(
639 const std::string& resource_id,
640 const base::FilePath& drive_file_path,
641 const base::FilePath& local_file_path,
642 const std::string& content_type,
643 const DriveUploader::UploadExistingFileOptions& options,
644 const ClientContext& context,
645 const google_apis::FileResourceCallback& callback) {
646 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
647
648 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
649 new_job->job_info.file_path = drive_file_path;
650 new_job->context = context;
651
652 UploadExistingFileParams params;
653 params.resource_id = resource_id;
654 params.local_file_path = local_file_path;
655 params.content_type = content_type;
656 params.options = options;
657
658 ResumeUploadParams resume_params;
659 resume_params.local_file_path = params.local_file_path;
660 resume_params.content_type = params.content_type;
661
662 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
663 weak_ptr_factory_.GetWeakPtr(),
664 new_job->job_info.job_id,
665 resume_params,
666 callback);
667 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
668 weak_ptr_factory_.GetWeakPtr(),
669 new_job->job_info.job_id);
670 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
671 new_job->abort_callback = CreateErrorRunCallback(callback);
672 StartJob(new_job);
673 }
674
AddPermission(const std::string & resource_id,const std::string & email,google_apis::drive::PermissionRole role,const google_apis::EntryActionCallback & callback)675 void JobScheduler::AddPermission(
676 const std::string& resource_id,
677 const std::string& email,
678 google_apis::drive::PermissionRole role,
679 const google_apis::EntryActionCallback& callback) {
680 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
681 DCHECK(!callback.is_null());
682
683 JobEntry* new_job = CreateNewJob(TYPE_ADD_PERMISSION);
684 new_job->task = base::Bind(&DriveServiceInterface::AddPermission,
685 base::Unretained(drive_service_),
686 resource_id,
687 email,
688 role,
689 base::Bind(&JobScheduler::OnEntryActionJobDone,
690 weak_ptr_factory_.GetWeakPtr(),
691 new_job->job_info.job_id,
692 callback));
693 new_job->abort_callback = callback;
694 StartJob(new_job);
695 }
696
CreateNewJob(JobType type)697 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
698 JobEntry* job = new JobEntry(type);
699 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|.
700 return job;
701 }
702
StartJob(JobEntry * job)703 void JobScheduler::StartJob(JobEntry* job) {
704 DCHECK(!job->task.is_null());
705
706 QueueJob(job->job_info.job_id);
707 NotifyJobAdded(job->job_info);
708 DoJobLoop(GetJobQueueType(job->job_info.job_type));
709 }
710
QueueJob(JobID job_id)711 void JobScheduler::QueueJob(JobID job_id) {
712 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
713
714 JobEntry* job_entry = job_map_.Lookup(job_id);
715 DCHECK(job_entry);
716 const JobInfo& job_info = job_entry->job_info;
717
718 const QueueType queue_type = GetJobQueueType(job_info.job_type);
719 queue_[queue_type]->Push(job_id, job_entry->context.type);
720
721 const std::string retry_prefix = job_entry->retry_count > 0 ?
722 base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
723 logger_->Log(logging::LOG_INFO,
724 "Job queued%s: %s - %s",
725 retry_prefix.c_str(),
726 job_info.ToString().c_str(),
727 GetQueueInfo(queue_type).c_str());
728 }
729
DoJobLoop(QueueType queue_type)730 void JobScheduler::DoJobLoop(QueueType queue_type) {
731 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
732
733 const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
734
735 // Abort all USER_INITAITED jobs when not accepted.
736 if (accepted_priority < USER_INITIATED) {
737 std::vector<JobID> jobs;
738 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
739 for (size_t i = 0; i < jobs.size(); ++i) {
740 JobEntry* job = job_map_.Lookup(jobs[i]);
741 DCHECK(job);
742 AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
743 }
744 }
745
746 // Wait when throttled.
747 const base::Time now = base::Time::Now();
748 if (now < wait_until_) {
749 base::MessageLoopProxy::current()->PostDelayedTask(
750 FROM_HERE,
751 base::Bind(&JobScheduler::DoJobLoop,
752 weak_ptr_factory_.GetWeakPtr(),
753 queue_type),
754 wait_until_ - now);
755 return;
756 }
757
758 // Run the job with the highest priority in the queue.
759 JobID job_id = -1;
760 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
761 return;
762
763 JobEntry* entry = job_map_.Lookup(job_id);
764 DCHECK(entry);
765
766 JobInfo* job_info = &entry->job_info;
767 job_info->state = STATE_RUNNING;
768 job_info->start_time = now;
769 NotifyJobUpdated(*job_info);
770
771 entry->cancel_callback = entry->task.Run();
772
773 UpdateWait();
774
775 logger_->Log(logging::LOG_INFO,
776 "Job started: %s - %s",
777 job_info->ToString().c_str(),
778 GetQueueInfo(queue_type).c_str());
779 }
780
GetCurrentAcceptedPriority(QueueType queue_type)781 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
782 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
783
784 const int kNoJobShouldRun = -1;
785
786 // Should stop if Drive was disabled while running the fetch loop.
787 if (pref_service_->GetBoolean(prefs::kDisableDrive))
788 return kNoJobShouldRun;
789
790 // Should stop if the network is not online.
791 if (net::NetworkChangeNotifier::IsOffline())
792 return kNoJobShouldRun;
793
794 // For the file queue, if it is on cellular network, only user initiated
795 // operations are allowed to start.
796 if (queue_type == FILE_QUEUE &&
797 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
798 net::NetworkChangeNotifier::IsConnectionCellular(
799 net::NetworkChangeNotifier::GetConnectionType()))
800 return USER_INITIATED;
801
802 // Otherwise, every operations including background tasks are allowed.
803 return BACKGROUND;
804 }
805
UpdateWait()806 void JobScheduler::UpdateWait() {
807 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
808
809 if (disable_throttling_ || throttle_count_ == 0)
810 return;
811
812 // Exponential backoff: https://developers.google.com/drive/handle-errors.
813 base::TimeDelta delay =
814 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
815 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
816 VLOG(1) << "Throttling for " << delay.InMillisecondsF();
817
818 wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
819 }
820
OnJobDone(JobID job_id,google_apis::GDataErrorCode error)821 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
822 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
823
824 JobEntry* job_entry = job_map_.Lookup(job_id);
825 DCHECK(job_entry);
826 JobInfo* job_info = &job_entry->job_info;
827 QueueType queue_type = GetJobQueueType(job_info->job_type);
828 queue_[queue_type]->MarkFinished(job_id);
829
830 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
831 bool success = (GDataToFileError(error) == FILE_ERROR_OK);
832 logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
833 "Job done: %s => %s (elapsed time: %sms) - %s",
834 job_info->ToString().c_str(),
835 GDataErrorCodeToString(error).c_str(),
836 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
837 GetQueueInfo(queue_type).c_str());
838
839 // Retry, depending on the error.
840 const bool is_server_error =
841 error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
842 error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
843 if (is_server_error) {
844 if (throttle_count_ < kMaxThrottleCount)
845 ++throttle_count_;
846 UpdateWait();
847 } else {
848 throttle_count_ = 0;
849 }
850
851 const bool should_retry =
852 is_server_error && job_entry->retry_count < kMaxRetryCount;
853 if (should_retry) {
854 job_entry->cancel_callback.Reset();
855 job_info->state = STATE_RETRY;
856 NotifyJobUpdated(*job_info);
857
858 ++job_entry->retry_count;
859
860 // Requeue the job.
861 QueueJob(job_id);
862 } else {
863 NotifyJobDone(*job_info, error);
864 // The job has finished, no retry will happen in the scheduler. Now we can
865 // remove the job info from the map.
866 job_map_.Remove(job_id);
867 }
868
869 // Post a task to continue the job loop. This allows us to finish handling
870 // the current job before starting the next one.
871 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
872 base::Bind(&JobScheduler::DoJobLoop,
873 weak_ptr_factory_.GetWeakPtr(),
874 queue_type));
875 return !should_retry;
876 }
877
OnGetFileListJobDone(JobID job_id,const google_apis::FileListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::FileList> file_list)878 void JobScheduler::OnGetFileListJobDone(
879 JobID job_id,
880 const google_apis::FileListCallback& callback,
881 google_apis::GDataErrorCode error,
882 scoped_ptr<google_apis::FileList> file_list) {
883 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
884 DCHECK(!callback.is_null());
885
886 if (OnJobDone(job_id, error))
887 callback.Run(error, file_list.Pass());
888 }
889
OnGetChangeListJobDone(JobID job_id,const google_apis::ChangeListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::ChangeList> change_list)890 void JobScheduler::OnGetChangeListJobDone(
891 JobID job_id,
892 const google_apis::ChangeListCallback& callback,
893 google_apis::GDataErrorCode error,
894 scoped_ptr<google_apis::ChangeList> change_list) {
895 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
896 DCHECK(!callback.is_null());
897
898 if (OnJobDone(job_id, error))
899 callback.Run(error, change_list.Pass());
900 }
901
OnGetFileResourceJobDone(JobID job_id,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::FileResource> entry)902 void JobScheduler::OnGetFileResourceJobDone(
903 JobID job_id,
904 const google_apis::FileResourceCallback& callback,
905 google_apis::GDataErrorCode error,
906 scoped_ptr<google_apis::FileResource> entry) {
907 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
908 DCHECK(!callback.is_null());
909
910 if (OnJobDone(job_id, error))
911 callback.Run(error, entry.Pass());
912 }
913
OnGetAboutResourceJobDone(JobID job_id,const google_apis::AboutResourceCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::AboutResource> about_resource)914 void JobScheduler::OnGetAboutResourceJobDone(
915 JobID job_id,
916 const google_apis::AboutResourceCallback& callback,
917 google_apis::GDataErrorCode error,
918 scoped_ptr<google_apis::AboutResource> about_resource) {
919 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
920 DCHECK(!callback.is_null());
921
922 if (OnJobDone(job_id, error))
923 callback.Run(error, about_resource.Pass());
924 }
925
OnGetShareUrlJobDone(JobID job_id,const google_apis::GetShareUrlCallback & callback,google_apis::GDataErrorCode error,const GURL & share_url)926 void JobScheduler::OnGetShareUrlJobDone(
927 JobID job_id,
928 const google_apis::GetShareUrlCallback& callback,
929 google_apis::GDataErrorCode error,
930 const GURL& share_url) {
931 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
932 DCHECK(!callback.is_null());
933
934 if (OnJobDone(job_id, error))
935 callback.Run(error, share_url);
936 }
937
OnGetAppListJobDone(JobID job_id,const google_apis::AppListCallback & callback,google_apis::GDataErrorCode error,scoped_ptr<google_apis::AppList> app_list)938 void JobScheduler::OnGetAppListJobDone(
939 JobID job_id,
940 const google_apis::AppListCallback& callback,
941 google_apis::GDataErrorCode error,
942 scoped_ptr<google_apis::AppList> app_list) {
943 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
944 DCHECK(!callback.is_null());
945
946 if (OnJobDone(job_id, error))
947 callback.Run(error, app_list.Pass());
948 }
949
OnEntryActionJobDone(JobID job_id,const google_apis::EntryActionCallback & callback,google_apis::GDataErrorCode error)950 void JobScheduler::OnEntryActionJobDone(
951 JobID job_id,
952 const google_apis::EntryActionCallback& callback,
953 google_apis::GDataErrorCode error) {
954 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
955 DCHECK(!callback.is_null());
956
957 if (OnJobDone(job_id, error))
958 callback.Run(error);
959 }
960
OnDownloadActionJobDone(JobID job_id,const google_apis::DownloadActionCallback & callback,google_apis::GDataErrorCode error,const base::FilePath & temp_file)961 void JobScheduler::OnDownloadActionJobDone(
962 JobID job_id,
963 const google_apis::DownloadActionCallback& callback,
964 google_apis::GDataErrorCode error,
965 const base::FilePath& temp_file) {
966 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
967 DCHECK(!callback.is_null());
968
969 if (OnJobDone(job_id, error))
970 callback.Run(error, temp_file);
971 }
972
OnUploadCompletionJobDone(JobID job_id,const ResumeUploadParams & resume_params,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,const GURL & upload_location,scoped_ptr<google_apis::FileResource> entry)973 void JobScheduler::OnUploadCompletionJobDone(
974 JobID job_id,
975 const ResumeUploadParams& resume_params,
976 const google_apis::FileResourceCallback& callback,
977 google_apis::GDataErrorCode error,
978 const GURL& upload_location,
979 scoped_ptr<google_apis::FileResource> entry) {
980 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
981 DCHECK(!callback.is_null());
982
983 if (!upload_location.is_empty()) {
984 // If upload_location is available, update the task to resume the
985 // upload process from the terminated point.
986 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
987 // so OnJobDone called below will be in charge to re-queue the job.
988 JobEntry* job_entry = job_map_.Lookup(job_id);
989 DCHECK(job_entry);
990
991 ResumeUploadFileParams params;
992 params.upload_location = upload_location;
993 params.local_file_path = resume_params.local_file_path;
994 params.content_type = resume_params.content_type;
995 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
996 weak_ptr_factory_.GetWeakPtr(),
997 job_id,
998 job_entry->task,
999 callback);
1000 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1001 weak_ptr_factory_.GetWeakPtr(),
1002 job_id);
1003 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1004 }
1005
1006 if (OnJobDone(job_id, error))
1007 callback.Run(error, entry.Pass());
1008 }
1009
OnResumeUploadFileDone(JobID job_id,const base::Callback<google_apis::CancelCallback ()> & original_task,const google_apis::FileResourceCallback & callback,google_apis::GDataErrorCode error,const GURL & upload_location,scoped_ptr<google_apis::FileResource> entry)1010 void JobScheduler::OnResumeUploadFileDone(
1011 JobID job_id,
1012 const base::Callback<google_apis::CancelCallback()>& original_task,
1013 const google_apis::FileResourceCallback& callback,
1014 google_apis::GDataErrorCode error,
1015 const GURL& upload_location,
1016 scoped_ptr<google_apis::FileResource> entry) {
1017 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1018 DCHECK(!original_task.is_null());
1019 DCHECK(!callback.is_null());
1020
1021 if (upload_location.is_empty()) {
1022 // If upload_location is not available, we should discard it and stop trying
1023 // to resume. Restore the original task.
1024 JobEntry* job_entry = job_map_.Lookup(job_id);
1025 DCHECK(job_entry);
1026 job_entry->task = original_task;
1027 }
1028
1029 if (OnJobDone(job_id, error))
1030 callback.Run(error, entry.Pass());
1031 }
1032
UpdateProgress(JobID job_id,int64 progress,int64 total)1033 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1034 JobEntry* job_entry = job_map_.Lookup(job_id);
1035 DCHECK(job_entry);
1036
1037 job_entry->job_info.num_completed_bytes = progress;
1038 if (total != -1)
1039 job_entry->job_info.num_total_bytes = total;
1040 NotifyJobUpdated(job_entry->job_info);
1041 }
1042
OnConnectionTypeChanged(net::NetworkChangeNotifier::ConnectionType type)1043 void JobScheduler::OnConnectionTypeChanged(
1044 net::NetworkChangeNotifier::ConnectionType type) {
1045 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1046
1047 // Resume the job loop.
1048 // Note that we don't need to check the network connection status as it will
1049 // be checked in GetCurrentAcceptedPriority().
1050 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1051 DoJobLoop(static_cast<QueueType>(i));
1052 }
1053
GetJobQueueType(JobType type)1054 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1055 switch (type) {
1056 case TYPE_GET_ABOUT_RESOURCE:
1057 case TYPE_GET_APP_LIST:
1058 case TYPE_GET_ALL_RESOURCE_LIST:
1059 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1060 case TYPE_SEARCH:
1061 case TYPE_GET_CHANGE_LIST:
1062 case TYPE_GET_REMAINING_CHANGE_LIST:
1063 case TYPE_GET_REMAINING_FILE_LIST:
1064 case TYPE_GET_RESOURCE_ENTRY:
1065 case TYPE_GET_SHARE_URL:
1066 case TYPE_TRASH_RESOURCE:
1067 case TYPE_COPY_RESOURCE:
1068 case TYPE_UPDATE_RESOURCE:
1069 case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1070 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1071 case TYPE_ADD_NEW_DIRECTORY:
1072 case TYPE_CREATE_FILE:
1073 case TYPE_ADD_PERMISSION:
1074 return METADATA_QUEUE;
1075
1076 case TYPE_DOWNLOAD_FILE:
1077 case TYPE_UPLOAD_NEW_FILE:
1078 case TYPE_UPLOAD_EXISTING_FILE:
1079 return FILE_QUEUE;
1080 }
1081 NOTREACHED();
1082 return FILE_QUEUE;
1083 }
1084
AbortNotRunningJob(JobEntry * job,google_apis::GDataErrorCode error)1085 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1086 google_apis::GDataErrorCode error) {
1087 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1088
1089 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1090 const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1091 logger_->Log(logging::LOG_INFO,
1092 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1093 job->job_info.ToString().c_str(),
1094 GDataErrorCodeToString(error).c_str(),
1095 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1096 GetQueueInfo(queue_type).c_str());
1097
1098 base::Callback<void(google_apis::GDataErrorCode)> callback =
1099 job->abort_callback;
1100 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1101 NotifyJobDone(job->job_info, error);
1102 job_map_.Remove(job->job_info.job_id);
1103 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1104 base::Bind(callback, error));
1105 }
1106
NotifyJobAdded(const JobInfo & job_info)1107 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1109 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1110 }
1111
NotifyJobDone(const JobInfo & job_info,google_apis::GDataErrorCode error)1112 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1113 google_apis::GDataErrorCode error) {
1114 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1115 FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1116 OnJobDone(job_info, GDataToFileError(error)));
1117 }
1118
NotifyJobUpdated(const JobInfo & job_info)1119 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1120 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1121 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1122 }
1123
GetQueueInfo(QueueType type) const1124 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1125 return QueueTypeToString(type) + " " + queue_[type]->ToString();
1126 }
1127
1128 // static
QueueTypeToString(QueueType type)1129 std::string JobScheduler::QueueTypeToString(QueueType type) {
1130 switch (type) {
1131 case METADATA_QUEUE:
1132 return "METADATA_QUEUE";
1133 case FILE_QUEUE:
1134 return "FILE_QUEUE";
1135 case NUM_QUEUES:
1136 break; // This value is just a sentinel. Should never be used.
1137 }
1138 NOTREACHED();
1139 return "";
1140 }
1141
1142 } // namespace drive
1143