• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
6 #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
7 
8 #include <deque>
9 #include <set>
10 
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/gtest_prod_util.h"
14 #include "base/macros.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/threading/non_thread_safe.h"
18 #include "base/time/time.h"
19 #include "base/timer/timer.h"
20 #include "net/base/backoff_entry.h"
21 
22 namespace syncer {
23 
24 // A queue that dispatches tasks, ignores duplicates, and provides backoff
25 // semantics.
26 //
27 // |T| is the task type.
28 //
29 // For each task added to the queue, the HandleTaskCallback will eventually be
30 // invoked.  For each invocation, the user of TaskQueue must call exactly one of
31 // |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
32 //
33 // To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
34 //
35 // Example usage:
36 //
37 // void Handle(const Foo& foo);
38 // ...
39 // TaskQueue<Foo> queue(base::Bind(&Handle),
40 //                      base::TimeDelta::FromSeconds(1),
41 //                      base::TimeDelta::FromMinutes(1));
42 // ...
43 // {
44 //   Foo foo;
45 //   // Add foo to the queue.  At some point, Handle will be invoked in this
46 //   // message loop.
47 //   queue.AddToQueue(foo);
48 // }
49 // ...
50 // void Handle(const Foo& foo) {
51 //   DoSomethingWith(foo);
52 //   // We must call one of the three methods to tell the queue how we're
53 //   // dealing with foo.  Of course, we are free to call in the the context of
54 //   // this HandleTaskCallback or outside the context if we so choose.
55 //   if (SuccessfullyHandled(foo)) {
56 //     queue.MarkAsSucceeded(foo);
57 //   } else if (Failed(foo)) {
58 //     queue.MarkAsFailed(foo);
59 //     if (ShouldRetry(foo)) {
60 //       queue.AddToQueue(foo);
61 //     }
62 //   } else {
63 //     Cancel(foo);
64 //   }
65 // }
66 //
67 template <typename T>
68 class TaskQueue : base::NonThreadSafe {
69  public:
70   // A callback provided by users of the TaskQueue to handle tasks.
71   //
72   // This callback is invoked by the queue with a task to be handled.  The
73   // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
74   // or |Cancel| to signify completion of the task.
75   typedef base::Callback<void(const T&)> HandleTaskCallback;
76 
77   // Construct a TaskQueue.
78   //
79   // |callback| the callback to be invoked for handling tasks.
80   //
81   // |initial_backoff_delay| the initial amount of time the queue will wait
82   // before dispatching tasks after a failed task (see |MarkAsFailed|).  May be
83   // zero.  Subsequent failures will increase the delay up to
84   // |max_backoff_delay|.
85   //
86   // |max_backoff_delay| the maximum amount of time the queue will wait before
87   // dispatching tasks.  May be zero.  Must be greater than or equal to
88   // |initial_backoff_delay|.
89   TaskQueue(const HandleTaskCallback& callback,
90             const base::TimeDelta& initial_backoff_delay,
91             const base::TimeDelta& max_backoff_delay);
92 
93   // Add |task| to the end of the queue.
94   //
95   // If |task| is already present (as determined by operator==) it is not added.
96   void AddToQueue(const T& task);
97 
98   // Mark |task| as completing successfully.
99   //
100   // Marking a task as completing successfully will reduce or eliminate any
101   // backoff delay in effect.
102   //
103   // May only be called after the HandleTaskCallback has been invoked with
104   // |task|.
105   void MarkAsSucceeded(const T& task);
106 
107   // Mark |task| as failed.
108   //
109   // Marking a task as failed will cause a backoff, i.e. a delay in dispatching
110   // of subsequent tasks.  Repeated failures will increase the delay.
111   //
112   // May only be called after the HandleTaskCallback has been invoked with
113   // |task|.
114   void MarkAsFailed(const T& task);
115 
116   // Cancel |task|.
117   //
118   // |task| is removed from the queue and will not be retried.  Does not affect
119   // the backoff delay.
120   //
121   // May only be called after the HandleTaskCallback has been invoked with
122   // |task|.
123   void Cancel(const T& task);
124 
125   // Reset any backoff delay and resume dispatching of tasks.
126   //
127   // Useful for when you know the cause of previous failures has been resolved
128   // and you want don't want to wait for the accumulated backoff delay to
129   // elapse.
130   void ResetBackoff();
131 
132   // Use |timer| for scheduled events.
133   //
134   // Used in tests.  See also MockTimer.
135   void SetTimerForTest(scoped_ptr<base::Timer> timer);
136 
137  private:
138   void FinishTask(const T& task);
139   void ScheduleDispatch();
140   void Dispatch();
141   // Return true if we should dispatch tasks.
142   bool ShouldDispatch();
143 
144   const HandleTaskCallback process_callback_;
145   net::BackoffEntry::Policy backoff_policy_;
146   scoped_ptr<net::BackoffEntry> backoff_entry_;
147   // The number of tasks currently being handled.
148   int num_in_progress_;
149   std::deque<T> queue_;
150   // The set of tasks in queue_ or currently being handled.
151   std::set<T> tasks_;
152   base::Closure dispatch_closure_;
153   scoped_ptr<base::Timer> backoff_timer_;
154   base::TimeDelta delay_;
155 
156   // Must be last data member.
157   base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
158 
159   DISALLOW_COPY_AND_ASSIGN(TaskQueue);
160 };
161 
162 // The maximum number of tasks that may be concurrently executed.  Think
163 // carefully before changing this value.  The desired behavior of backoff may
164 // not be obvious when there is more than one concurrent task
165 const int kMaxConcurrentTasks = 1;
166 
167 template <typename T>
TaskQueue(const HandleTaskCallback & callback,const base::TimeDelta & initial_backoff_delay,const base::TimeDelta & max_backoff_delay)168 TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
169                         const base::TimeDelta& initial_backoff_delay,
170                         const base::TimeDelta& max_backoff_delay)
171     : process_callback_(callback),
172       backoff_policy_({}),
173       num_in_progress_(0),
174       weak_ptr_factory_(this) {
175   DCHECK_LE(initial_backoff_delay.InMicroseconds(),
176             max_backoff_delay.InMicroseconds());
177   backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
178   backoff_policy_.multiply_factor = 2.0;
179   backoff_policy_.jitter_factor = 0.1;
180   backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
181   backoff_policy_.entry_lifetime_ms = -1;
182   backoff_policy_.always_use_initial_delay = false;
183   backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
184   dispatch_closure_ =
185       base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
186   backoff_timer_.reset(new base::Timer(false, false));
187 }
188 
189 template <typename T>
AddToQueue(const T & task)190 void TaskQueue<T>::AddToQueue(const T& task) {
191   DCHECK(CalledOnValidThread());
192   // Ignore duplicates.
193   if (tasks_.find(task) == tasks_.end()) {
194     queue_.push_back(task);
195     tasks_.insert(task);
196   }
197   ScheduleDispatch();
198 }
199 
200 template <typename T>
MarkAsSucceeded(const T & task)201 void TaskQueue<T>::MarkAsSucceeded(const T& task) {
202   DCHECK(CalledOnValidThread());
203   FinishTask(task);
204   // The task succeeded.  Stop any pending timer, reset (clear) the backoff, and
205   // reschedule a dispatch.
206   backoff_timer_->Stop();
207   backoff_entry_->Reset();
208   ScheduleDispatch();
209 }
210 
211 template <typename T>
MarkAsFailed(const T & task)212 void TaskQueue<T>::MarkAsFailed(const T& task) {
213   DCHECK(CalledOnValidThread());
214   FinishTask(task);
215   backoff_entry_->InformOfRequest(false);
216   ScheduleDispatch();
217 }
218 
219 template <typename T>
Cancel(const T & task)220 void TaskQueue<T>::Cancel(const T& task) {
221   DCHECK(CalledOnValidThread());
222   FinishTask(task);
223   ScheduleDispatch();
224 }
225 
226 template <typename T>
ResetBackoff()227 void TaskQueue<T>::ResetBackoff() {
228   backoff_timer_->Stop();
229   backoff_entry_->Reset();
230   ScheduleDispatch();
231 }
232 
233 template <typename T>
SetTimerForTest(scoped_ptr<base::Timer> timer)234 void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) {
235   DCHECK(CalledOnValidThread());
236   DCHECK(timer.get());
237   backoff_timer_ = timer.Pass();
238 }
239 
240 template <typename T>
FinishTask(const T & task)241 void TaskQueue<T>::FinishTask(const T& task) {
242   DCHECK(CalledOnValidThread());
243   DCHECK_GE(num_in_progress_, 1);
244   --num_in_progress_;
245   const size_t num_erased = tasks_.erase(task);
246   DCHECK_EQ(1U, num_erased);
247 }
248 
249 template <typename T>
ScheduleDispatch()250 void TaskQueue<T>::ScheduleDispatch() {
251   DCHECK(CalledOnValidThread());
252   if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
253     return;
254   }
255 
256   backoff_timer_->Start(
257       FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
258 }
259 
260 template <typename T>
Dispatch()261 void TaskQueue<T>::Dispatch() {
262   DCHECK(CalledOnValidThread());
263   if (!ShouldDispatch()) {
264     return;
265   }
266 
267   DCHECK(!queue_.empty());
268   const T& task = queue_.front();
269   ++num_in_progress_;
270   DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
271   base::MessageLoop::current()->PostTask(FROM_HERE,
272                                          base::Bind(process_callback_, task));
273   queue_.pop_front();
274 }
275 
276 template <typename T>
ShouldDispatch()277 bool TaskQueue<T>::ShouldDispatch() {
278   return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
279 }
280 
281 }  // namespace syncer
282 
283 #endif  //  SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
284