• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6 
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/memory/scoped_ptr.h"
10 #include "base/sequenced_task_runner.h"
11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
13 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
14 
15 using fileapi::FileSystemURL;
16 
17 namespace sync_file_system {
18 namespace drive_backend {
19 
20 namespace {
21 
22 class SyncTaskAdapter : public ExclusiveTask {
23  public:
SyncTaskAdapter(const SyncTaskManager::Task & task)24   explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
~SyncTaskAdapter()25   virtual ~SyncTaskAdapter() {}
26 
RunExclusive(const SyncStatusCallback & callback)27   virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
28     task_.Run(callback);
29   }
30 
31  private:
32   SyncTaskManager::Task task_;
33 
34   DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
35 };
36 
37 }  // namespace
38 
PendingTask()39 SyncTaskManager::PendingTask::PendingTask() {}
40 
PendingTask(const base::Closure & task,Priority pri,int seq)41 SyncTaskManager::PendingTask::PendingTask(
42     const base::Closure& task, Priority pri, int seq)
43     : task(task), priority(pri), seq(seq) {}
44 
~PendingTask()45 SyncTaskManager::PendingTask::~PendingTask() {}
46 
operator ()(const PendingTask & left,const PendingTask & right) const47 bool SyncTaskManager::PendingTaskComparator::operator()(
48     const PendingTask& left,
49     const PendingTask& right) const {
50   if (left.priority != right.priority)
51     return left.priority < right.priority;
52   return left.seq > right.seq;
53 }
54 
SyncTaskManager(base::WeakPtr<Client> client,size_t maximum_background_task,base::SequencedTaskRunner * task_runner)55 SyncTaskManager::SyncTaskManager(
56     base::WeakPtr<Client> client,
57     size_t maximum_background_task,
58     base::SequencedTaskRunner* task_runner)
59     : client_(client),
60       maximum_background_task_(maximum_background_task),
61       pending_task_seq_(0),
62       task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
63       task_runner_(task_runner) {
64 }
65 
~SyncTaskManager()66 SyncTaskManager::~SyncTaskManager() {
67   client_.reset();
68   token_.reset();
69 }
70 
Initialize(SyncStatusCode status)71 void SyncTaskManager::Initialize(SyncStatusCode status) {
72   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
73   DCHECK(!token_);
74   NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()),
75                  status);
76 }
77 
ScheduleTask(const tracked_objects::Location & from_here,const Task & task,Priority priority,const SyncStatusCallback & callback)78 void SyncTaskManager::ScheduleTask(
79     const tracked_objects::Location& from_here,
80     const Task& task,
81     Priority priority,
82     const SyncStatusCallback& callback) {
83   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
84 
85   ScheduleSyncTask(from_here,
86                    scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
87                    priority,
88                    callback);
89 }
90 
ScheduleSyncTask(const tracked_objects::Location & from_here,scoped_ptr<SyncTask> task,Priority priority,const SyncStatusCallback & callback)91 void SyncTaskManager::ScheduleSyncTask(
92     const tracked_objects::Location& from_here,
93     scoped_ptr<SyncTask> task,
94     Priority priority,
95     const SyncStatusCallback& callback) {
96   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
97 
98   scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
99   if (!token) {
100     PushPendingTask(
101         base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
102                    base::Passed(&task), priority, callback),
103         priority);
104     return;
105   }
106   RunTask(token.Pass(), task.Pass());
107 }
108 
ScheduleTaskIfIdle(const tracked_objects::Location & from_here,const Task & task,const SyncStatusCallback & callback)109 bool SyncTaskManager::ScheduleTaskIfIdle(
110         const tracked_objects::Location& from_here,
111         const Task& task,
112         const SyncStatusCallback& callback) {
113   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
114 
115   return ScheduleSyncTaskIfIdle(
116       from_here,
117       scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
118       callback);
119 }
120 
ScheduleSyncTaskIfIdle(const tracked_objects::Location & from_here,scoped_ptr<SyncTask> task,const SyncStatusCallback & callback)121 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
122     const tracked_objects::Location& from_here,
123     scoped_ptr<SyncTask> task,
124     const SyncStatusCallback& callback) {
125   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
126 
127   scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
128   if (!token)
129     return false;
130   RunTask(token.Pass(), task.Pass());
131   return true;
132 }
133 
134 // static
NotifyTaskDone(scoped_ptr<SyncTaskToken> token,SyncStatusCode status)135 void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
136                                      SyncStatusCode status) {
137   DCHECK(token);
138 
139   SyncTaskManager* manager = token->manager();
140   if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
141     DCHECK(!manager);
142     SyncStatusCallback callback = token->callback();
143     token->clear_callback();
144     callback.Run(status);
145     return;
146   }
147 
148   if (manager)
149     manager->NotifyTaskDoneBody(token.Pass(), status);
150 }
151 
152 // static
UpdateBlockingFactor(scoped_ptr<SyncTaskToken> current_task_token,scoped_ptr<BlockingFactor> blocking_factor,const Continuation & continuation)153 void SyncTaskManager::UpdateBlockingFactor(
154     scoped_ptr<SyncTaskToken> current_task_token,
155     scoped_ptr<BlockingFactor> blocking_factor,
156     const Continuation& continuation) {
157   DCHECK(current_task_token);
158 
159   SyncTaskManager* manager = current_task_token->manager();
160   if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
161     DCHECK(!manager);
162     continuation.Run(current_task_token.Pass());
163     return;
164   }
165 
166   if (!manager)
167     return;
168 
169   scoped_ptr<SyncTaskToken> foreground_task_token;
170   scoped_ptr<SyncTaskToken> background_task_token;
171   scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
172   if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
173     foreground_task_token = current_task_token.Pass();
174   else
175     background_task_token = current_task_token.Pass();
176 
177   manager->UpdateBlockingFactorBody(foreground_task_token.Pass(),
178                                     background_task_token.Pass(),
179                                     task_log.Pass(),
180                                     blocking_factor.Pass(),
181                                     continuation);
182 }
183 
IsRunningTask(int64 token_id) const184 bool SyncTaskManager::IsRunningTask(int64 token_id) const {
185   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
186 
187   // If the client is gone, all task should be aborted.
188   if (!client_)
189     return false;
190 
191   if (token_id == SyncTaskToken::kForegroundTaskTokenID)
192     return true;
193 
194   return ContainsKey(running_background_tasks_, token_id);
195 }
196 
DetachFromSequence()197 void SyncTaskManager::DetachFromSequence() {
198   sequence_checker_.DetachFromSequence();
199 }
200 
NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,SyncStatusCode status)201 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
202                                          SyncStatusCode status) {
203   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
204   DCHECK(token);
205 
206   DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
207            << " (" << SyncStatusCodeToString(status) << ")"
208            << " " << token_->location().ToString();
209 
210   if (token->blocking_factor()) {
211     dependency_manager_.Erase(token->blocking_factor());
212     token->clear_blocking_factor();
213   }
214 
215   if (client_) {
216     if (token->has_task_log()) {
217       token->FinalizeTaskLog(SyncStatusCodeToString(status));
218       client_->RecordTaskLog(token->PassTaskLog());
219     }
220   }
221 
222   scoped_ptr<SyncTask> task;
223   SyncStatusCallback callback = token->callback();
224   token->clear_callback();
225   if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
226     token_ = token.Pass();
227     task = running_foreground_task_.Pass();
228   } else {
229     task = running_background_tasks_.take_and_erase(token->token_id());
230   }
231 
232   // Acquire the token to prevent a new task to jump into the queue.
233   token = token_.Pass();
234 
235   bool task_used_network = false;
236   if (task)
237     task_used_network = task->used_network();
238 
239   if (client_)
240     client_->NotifyLastOperationStatus(status, task_used_network);
241 
242   if (!callback.is_null())
243     callback.Run(status);
244 
245   // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
246   // making the call-chaing longer.
247   task_runner_->PostTask(
248       FROM_HERE,
249       base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
250                  AsWeakPtr(), base::Passed(&token)));
251 }
252 
UpdateBlockingFactorBody(scoped_ptr<SyncTaskToken> foreground_task_token,scoped_ptr<SyncTaskToken> background_task_token,scoped_ptr<TaskLogger::TaskLog> task_log,scoped_ptr<BlockingFactor> blocking_factor,const Continuation & continuation)253 void SyncTaskManager::UpdateBlockingFactorBody(
254     scoped_ptr<SyncTaskToken> foreground_task_token,
255     scoped_ptr<SyncTaskToken> background_task_token,
256     scoped_ptr<TaskLogger::TaskLog> task_log,
257     scoped_ptr<BlockingFactor> blocking_factor,
258     const Continuation& continuation) {
259   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
260 
261   // Run the task directly if the parallelization is disabled.
262   if (!maximum_background_task_) {
263     DCHECK(foreground_task_token);
264     DCHECK(!background_task_token);
265     foreground_task_token->SetTaskLog(task_log.Pass());
266     continuation.Run(foreground_task_token.Pass());
267     return;
268   }
269 
270   // Clear existing |blocking_factor| from |dependency_manager_| before
271   // getting |foreground_task_token|, so that we can avoid dead lock.
272   if (background_task_token && background_task_token->blocking_factor()) {
273     dependency_manager_.Erase(background_task_token->blocking_factor());
274     background_task_token->clear_blocking_factor();
275   }
276 
277   // Try to get |foreground_task_token|.  If it's not available, wait for
278   // current foreground task to finish.
279   if (!foreground_task_token) {
280     DCHECK(background_task_token);
281     foreground_task_token = GetToken(background_task_token->location(),
282                                      SyncStatusCallback());
283     if (!foreground_task_token) {
284       PushPendingTask(
285           base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
286                      AsWeakPtr(),
287                      base::Passed(&foreground_task_token),
288                      base::Passed(&background_task_token),
289                      base::Passed(&task_log),
290                      base::Passed(&blocking_factor),
291                      continuation),
292           PRIORITY_HIGH);
293       MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
294       return;
295     }
296   }
297 
298   // Check if the task can run as a background task now.
299   // If there are too many task running or any other task blocks current
300   // task, wait for any other task to finish.
301   bool task_number_limit_exceeded =
302       !background_task_token &&
303       running_background_tasks_.size() >= maximum_background_task_;
304   if (task_number_limit_exceeded ||
305       !dependency_manager_.Insert(blocking_factor.get())) {
306     DCHECK(!running_background_tasks_.empty());
307     DCHECK(pending_backgrounding_task_.is_null());
308 
309     // Wait for NotifyTaskDone to release a |blocking_factor|.
310     pending_backgrounding_task_ =
311         base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
312                    AsWeakPtr(),
313                    base::Passed(&foreground_task_token),
314                    base::Passed(&background_task_token),
315                    base::Passed(&task_log),
316                    base::Passed(&blocking_factor),
317                    continuation);
318     return;
319   }
320 
321   if (background_task_token) {
322     background_task_token->set_blocking_factor(blocking_factor.Pass());
323   } else {
324     tracked_objects::Location from_here = foreground_task_token->location();
325     SyncStatusCallback callback = foreground_task_token->callback();
326     foreground_task_token->clear_callback();
327 
328     background_task_token =
329         SyncTaskToken::CreateForBackgroundTask(
330             AsWeakPtr(),
331             task_token_seq_++,
332             blocking_factor.Pass());
333     background_task_token->UpdateTask(from_here, callback);
334     running_background_tasks_.set(background_task_token->token_id(),
335                                   running_foreground_task_.Pass());
336   }
337 
338   token_ = foreground_task_token.Pass();
339   MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
340   background_task_token->SetTaskLog(task_log.Pass());
341   continuation.Run(background_task_token.Pass());
342 }
343 
GetToken(const tracked_objects::Location & from_here,const SyncStatusCallback & callback)344 scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
345     const tracked_objects::Location& from_here,
346     const SyncStatusCallback& callback) {
347   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
348 
349   if (!token_)
350     return scoped_ptr<SyncTaskToken>();
351   token_->UpdateTask(from_here, callback);
352   return token_.Pass();
353 }
354 
PushPendingTask(const base::Closure & closure,Priority priority)355 void SyncTaskManager::PushPendingTask(
356     const base::Closure& closure, Priority priority) {
357   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
358 
359   pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
360 }
361 
RunTask(scoped_ptr<SyncTaskToken> token,scoped_ptr<SyncTask> task)362 void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
363                               scoped_ptr<SyncTask> task) {
364   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
365   DCHECK(!running_foreground_task_);
366 
367   running_foreground_task_ = task.Pass();
368   running_foreground_task_->RunPreflight(token.Pass());
369 }
370 
MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken> token)371 void SyncTaskManager::MaybeStartNextForegroundTask(
372     scoped_ptr<SyncTaskToken> token) {
373   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
374 
375   if (token) {
376     DCHECK(!token_);
377     token_ = token.Pass();
378   }
379 
380   if (!pending_backgrounding_task_.is_null()) {
381     base::Closure closure = pending_backgrounding_task_;
382     pending_backgrounding_task_.Reset();
383     closure.Run();
384     return;
385   }
386 
387   if (!token_)
388     return;
389 
390   if (!pending_tasks_.empty()) {
391     base::Closure closure = pending_tasks_.top().task;
392     pending_tasks_.pop();
393     closure.Run();
394     return;
395   }
396 
397   if (client_)
398     client_->MaybeScheduleNextTask();
399 }
400 
401 }  // namespace drive_backend
402 }  // namespace sync_file_system
403