1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/memory/scoped_ptr.h"
10 #include "base/sequenced_task_runner.h"
11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
13 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
14
15 using fileapi::FileSystemURL;
16
17 namespace sync_file_system {
18 namespace drive_backend {
19
20 namespace {
21
22 class SyncTaskAdapter : public ExclusiveTask {
23 public:
SyncTaskAdapter(const SyncTaskManager::Task & task)24 explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
~SyncTaskAdapter()25 virtual ~SyncTaskAdapter() {}
26
RunExclusive(const SyncStatusCallback & callback)27 virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
28 task_.Run(callback);
29 }
30
31 private:
32 SyncTaskManager::Task task_;
33
34 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
35 };
36
37 } // namespace
38
PendingTask()39 SyncTaskManager::PendingTask::PendingTask() {}
40
PendingTask(const base::Closure & task,Priority pri,int seq)41 SyncTaskManager::PendingTask::PendingTask(
42 const base::Closure& task, Priority pri, int seq)
43 : task(task), priority(pri), seq(seq) {}
44
~PendingTask()45 SyncTaskManager::PendingTask::~PendingTask() {}
46
operator ()(const PendingTask & left,const PendingTask & right) const47 bool SyncTaskManager::PendingTaskComparator::operator()(
48 const PendingTask& left,
49 const PendingTask& right) const {
50 if (left.priority != right.priority)
51 return left.priority < right.priority;
52 return left.seq > right.seq;
53 }
54
SyncTaskManager(base::WeakPtr<Client> client,size_t maximum_background_task,base::SequencedTaskRunner * task_runner)55 SyncTaskManager::SyncTaskManager(
56 base::WeakPtr<Client> client,
57 size_t maximum_background_task,
58 base::SequencedTaskRunner* task_runner)
59 : client_(client),
60 maximum_background_task_(maximum_background_task),
61 pending_task_seq_(0),
62 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
63 task_runner_(task_runner) {
64 }
65
~SyncTaskManager()66 SyncTaskManager::~SyncTaskManager() {
67 client_.reset();
68 token_.reset();
69 }
70
Initialize(SyncStatusCode status)71 void SyncTaskManager::Initialize(SyncStatusCode status) {
72 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
73 DCHECK(!token_);
74 NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()),
75 status);
76 }
77
ScheduleTask(const tracked_objects::Location & from_here,const Task & task,Priority priority,const SyncStatusCallback & callback)78 void SyncTaskManager::ScheduleTask(
79 const tracked_objects::Location& from_here,
80 const Task& task,
81 Priority priority,
82 const SyncStatusCallback& callback) {
83 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
84
85 ScheduleSyncTask(from_here,
86 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
87 priority,
88 callback);
89 }
90
ScheduleSyncTask(const tracked_objects::Location & from_here,scoped_ptr<SyncTask> task,Priority priority,const SyncStatusCallback & callback)91 void SyncTaskManager::ScheduleSyncTask(
92 const tracked_objects::Location& from_here,
93 scoped_ptr<SyncTask> task,
94 Priority priority,
95 const SyncStatusCallback& callback) {
96 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
97
98 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
99 if (!token) {
100 PushPendingTask(
101 base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
102 base::Passed(&task), priority, callback),
103 priority);
104 return;
105 }
106 RunTask(token.Pass(), task.Pass());
107 }
108
ScheduleTaskIfIdle(const tracked_objects::Location & from_here,const Task & task,const SyncStatusCallback & callback)109 bool SyncTaskManager::ScheduleTaskIfIdle(
110 const tracked_objects::Location& from_here,
111 const Task& task,
112 const SyncStatusCallback& callback) {
113 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
114
115 return ScheduleSyncTaskIfIdle(
116 from_here,
117 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
118 callback);
119 }
120
ScheduleSyncTaskIfIdle(const tracked_objects::Location & from_here,scoped_ptr<SyncTask> task,const SyncStatusCallback & callback)121 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
122 const tracked_objects::Location& from_here,
123 scoped_ptr<SyncTask> task,
124 const SyncStatusCallback& callback) {
125 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
126
127 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
128 if (!token)
129 return false;
130 RunTask(token.Pass(), task.Pass());
131 return true;
132 }
133
134 // static
NotifyTaskDone(scoped_ptr<SyncTaskToken> token,SyncStatusCode status)135 void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
136 SyncStatusCode status) {
137 DCHECK(token);
138
139 SyncTaskManager* manager = token->manager();
140 if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
141 DCHECK(!manager);
142 SyncStatusCallback callback = token->callback();
143 token->clear_callback();
144 callback.Run(status);
145 return;
146 }
147
148 if (manager)
149 manager->NotifyTaskDoneBody(token.Pass(), status);
150 }
151
152 // static
UpdateBlockingFactor(scoped_ptr<SyncTaskToken> current_task_token,scoped_ptr<BlockingFactor> blocking_factor,const Continuation & continuation)153 void SyncTaskManager::UpdateBlockingFactor(
154 scoped_ptr<SyncTaskToken> current_task_token,
155 scoped_ptr<BlockingFactor> blocking_factor,
156 const Continuation& continuation) {
157 DCHECK(current_task_token);
158
159 SyncTaskManager* manager = current_task_token->manager();
160 if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
161 DCHECK(!manager);
162 continuation.Run(current_task_token.Pass());
163 return;
164 }
165
166 if (!manager)
167 return;
168
169 scoped_ptr<SyncTaskToken> foreground_task_token;
170 scoped_ptr<SyncTaskToken> background_task_token;
171 scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
172 if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
173 foreground_task_token = current_task_token.Pass();
174 else
175 background_task_token = current_task_token.Pass();
176
177 manager->UpdateBlockingFactorBody(foreground_task_token.Pass(),
178 background_task_token.Pass(),
179 task_log.Pass(),
180 blocking_factor.Pass(),
181 continuation);
182 }
183
IsRunningTask(int64 token_id) const184 bool SyncTaskManager::IsRunningTask(int64 token_id) const {
185 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
186
187 // If the client is gone, all task should be aborted.
188 if (!client_)
189 return false;
190
191 if (token_id == SyncTaskToken::kForegroundTaskTokenID)
192 return true;
193
194 return ContainsKey(running_background_tasks_, token_id);
195 }
196
DetachFromSequence()197 void SyncTaskManager::DetachFromSequence() {
198 sequence_checker_.DetachFromSequence();
199 }
200
NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,SyncStatusCode status)201 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
202 SyncStatusCode status) {
203 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
204 DCHECK(token);
205
206 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
207 << " (" << SyncStatusCodeToString(status) << ")"
208 << " " << token_->location().ToString();
209
210 if (token->blocking_factor()) {
211 dependency_manager_.Erase(token->blocking_factor());
212 token->clear_blocking_factor();
213 }
214
215 if (client_) {
216 if (token->has_task_log()) {
217 token->FinalizeTaskLog(SyncStatusCodeToString(status));
218 client_->RecordTaskLog(token->PassTaskLog());
219 }
220 }
221
222 scoped_ptr<SyncTask> task;
223 SyncStatusCallback callback = token->callback();
224 token->clear_callback();
225 if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
226 token_ = token.Pass();
227 task = running_foreground_task_.Pass();
228 } else {
229 task = running_background_tasks_.take_and_erase(token->token_id());
230 }
231
232 // Acquire the token to prevent a new task to jump into the queue.
233 token = token_.Pass();
234
235 bool task_used_network = false;
236 if (task)
237 task_used_network = task->used_network();
238
239 if (client_)
240 client_->NotifyLastOperationStatus(status, task_used_network);
241
242 if (!callback.is_null())
243 callback.Run(status);
244
245 // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
246 // making the call-chaing longer.
247 task_runner_->PostTask(
248 FROM_HERE,
249 base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
250 AsWeakPtr(), base::Passed(&token)));
251 }
252
UpdateBlockingFactorBody(scoped_ptr<SyncTaskToken> foreground_task_token,scoped_ptr<SyncTaskToken> background_task_token,scoped_ptr<TaskLogger::TaskLog> task_log,scoped_ptr<BlockingFactor> blocking_factor,const Continuation & continuation)253 void SyncTaskManager::UpdateBlockingFactorBody(
254 scoped_ptr<SyncTaskToken> foreground_task_token,
255 scoped_ptr<SyncTaskToken> background_task_token,
256 scoped_ptr<TaskLogger::TaskLog> task_log,
257 scoped_ptr<BlockingFactor> blocking_factor,
258 const Continuation& continuation) {
259 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
260
261 // Run the task directly if the parallelization is disabled.
262 if (!maximum_background_task_) {
263 DCHECK(foreground_task_token);
264 DCHECK(!background_task_token);
265 foreground_task_token->SetTaskLog(task_log.Pass());
266 continuation.Run(foreground_task_token.Pass());
267 return;
268 }
269
270 // Clear existing |blocking_factor| from |dependency_manager_| before
271 // getting |foreground_task_token|, so that we can avoid dead lock.
272 if (background_task_token && background_task_token->blocking_factor()) {
273 dependency_manager_.Erase(background_task_token->blocking_factor());
274 background_task_token->clear_blocking_factor();
275 }
276
277 // Try to get |foreground_task_token|. If it's not available, wait for
278 // current foreground task to finish.
279 if (!foreground_task_token) {
280 DCHECK(background_task_token);
281 foreground_task_token = GetToken(background_task_token->location(),
282 SyncStatusCallback());
283 if (!foreground_task_token) {
284 PushPendingTask(
285 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
286 AsWeakPtr(),
287 base::Passed(&foreground_task_token),
288 base::Passed(&background_task_token),
289 base::Passed(&task_log),
290 base::Passed(&blocking_factor),
291 continuation),
292 PRIORITY_HIGH);
293 MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
294 return;
295 }
296 }
297
298 // Check if the task can run as a background task now.
299 // If there are too many task running or any other task blocks current
300 // task, wait for any other task to finish.
301 bool task_number_limit_exceeded =
302 !background_task_token &&
303 running_background_tasks_.size() >= maximum_background_task_;
304 if (task_number_limit_exceeded ||
305 !dependency_manager_.Insert(blocking_factor.get())) {
306 DCHECK(!running_background_tasks_.empty());
307 DCHECK(pending_backgrounding_task_.is_null());
308
309 // Wait for NotifyTaskDone to release a |blocking_factor|.
310 pending_backgrounding_task_ =
311 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
312 AsWeakPtr(),
313 base::Passed(&foreground_task_token),
314 base::Passed(&background_task_token),
315 base::Passed(&task_log),
316 base::Passed(&blocking_factor),
317 continuation);
318 return;
319 }
320
321 if (background_task_token) {
322 background_task_token->set_blocking_factor(blocking_factor.Pass());
323 } else {
324 tracked_objects::Location from_here = foreground_task_token->location();
325 SyncStatusCallback callback = foreground_task_token->callback();
326 foreground_task_token->clear_callback();
327
328 background_task_token =
329 SyncTaskToken::CreateForBackgroundTask(
330 AsWeakPtr(),
331 task_token_seq_++,
332 blocking_factor.Pass());
333 background_task_token->UpdateTask(from_here, callback);
334 running_background_tasks_.set(background_task_token->token_id(),
335 running_foreground_task_.Pass());
336 }
337
338 token_ = foreground_task_token.Pass();
339 MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
340 background_task_token->SetTaskLog(task_log.Pass());
341 continuation.Run(background_task_token.Pass());
342 }
343
GetToken(const tracked_objects::Location & from_here,const SyncStatusCallback & callback)344 scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
345 const tracked_objects::Location& from_here,
346 const SyncStatusCallback& callback) {
347 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
348
349 if (!token_)
350 return scoped_ptr<SyncTaskToken>();
351 token_->UpdateTask(from_here, callback);
352 return token_.Pass();
353 }
354
PushPendingTask(const base::Closure & closure,Priority priority)355 void SyncTaskManager::PushPendingTask(
356 const base::Closure& closure, Priority priority) {
357 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
358
359 pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
360 }
361
RunTask(scoped_ptr<SyncTaskToken> token,scoped_ptr<SyncTask> task)362 void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
363 scoped_ptr<SyncTask> task) {
364 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
365 DCHECK(!running_foreground_task_);
366
367 running_foreground_task_ = task.Pass();
368 running_foreground_task_->RunPreflight(token.Pass());
369 }
370
MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken> token)371 void SyncTaskManager::MaybeStartNextForegroundTask(
372 scoped_ptr<SyncTaskToken> token) {
373 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
374
375 if (token) {
376 DCHECK(!token_);
377 token_ = token.Pass();
378 }
379
380 if (!pending_backgrounding_task_.is_null()) {
381 base::Closure closure = pending_backgrounding_task_;
382 pending_backgrounding_task_.Reset();
383 closure.Run();
384 return;
385 }
386
387 if (!token_)
388 return;
389
390 if (!pending_tasks_.empty()) {
391 base::Closure closure = pending_tasks_.top().task;
392 pending_tasks_.pop();
393 closure.Run();
394 return;
395 }
396
397 if (client_)
398 client_->MaybeScheduleNextTask();
399 }
400
401 } // namespace drive_backend
402 } // namespace sync_file_system
403