1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/incoming_task_queue.h"
6
7 #include <limits>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/callback_helpers.h"
12 #include "base/location.h"
13 #include "base/metrics/histogram_macros.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/time/time.h"
16 #include "build/build_config.h"
17
18 namespace base {
19 namespace internal {
20
21 namespace {
22
23 #if DCHECK_IS_ON()
24 // Delays larger than this are often bogus, and a warning should be emitted in
25 // debug builds to warn developers. http://crbug.com/450045
26 constexpr TimeDelta kTaskDelayWarningThreshold = TimeDelta::FromDays(14);
27 #endif
28
CalculateDelayedRuntime(TimeDelta delay)29 TimeTicks CalculateDelayedRuntime(TimeDelta delay) {
30 TimeTicks delayed_run_time;
31 if (delay > TimeDelta())
32 delayed_run_time = TimeTicks::Now() + delay;
33 else
34 DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
35 return delayed_run_time;
36 }
37
38 } // namespace
39
IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer)40 IncomingTaskQueue::IncomingTaskQueue(
41 std::unique_ptr<Observer> task_queue_observer)
42 : task_queue_observer_(std::move(task_queue_observer)),
43 triage_tasks_(this) {
44 // The constructing sequence is not necessarily the running sequence, e.g. in
45 // the case of a MessageLoop created unbound.
46 DETACH_FROM_SEQUENCE(sequence_checker_);
47 }
48
49 IncomingTaskQueue::~IncomingTaskQueue() = default;
50
AddToIncomingQueue(const Location & from_here,OnceClosure task,TimeDelta delay,Nestable nestable)51 bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
52 OnceClosure task,
53 TimeDelta delay,
54 Nestable nestable) {
55 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
56 // for details.
57 CHECK(task);
58 DLOG_IF(WARNING, delay > kTaskDelayWarningThreshold)
59 << "Requesting super-long task delay period of " << delay.InSeconds()
60 << " seconds from here: " << from_here.ToString();
61
62 PendingTask pending_task(from_here, std::move(task),
63 CalculateDelayedRuntime(delay), nestable);
64 #if defined(OS_WIN)
65 // We consider the task needs a high resolution timer if the delay is
66 // more than 0 and less than 32ms. This caps the relative error to
67 // less than 50% : a 33ms wait can wake at 48ms since the default
68 // resolution on Windows is between 10 and 15ms.
69 if (delay > TimeDelta() &&
70 delay.InMilliseconds() < (2 * Time::kMinLowResolutionThresholdMs)) {
71 pending_task.is_high_res = true;
72 }
73 #endif
74
75 if (!delay.is_zero())
76 UMA_HISTOGRAM_LONG_TIMES("MessageLoop.DelayedTaskQueue.PostedDelay", delay);
77
78 return PostPendingTask(&pending_task);
79 }
80
Shutdown()81 void IncomingTaskQueue::Shutdown() {
82 AutoLock auto_lock(incoming_queue_lock_);
83 accept_new_tasks_ = false;
84 }
85
ReportMetricsOnIdle() const86 void IncomingTaskQueue::ReportMetricsOnIdle() const {
87 UMA_HISTOGRAM_COUNTS_1M(
88 "MessageLoop.DelayedTaskQueue.PendingTasksCountOnIdle",
89 delayed_tasks_.Size());
90 }
91
TriageQueue(IncomingTaskQueue * outer)92 IncomingTaskQueue::TriageQueue::TriageQueue(IncomingTaskQueue* outer)
93 : outer_(outer) {}
94
95 IncomingTaskQueue::TriageQueue::~TriageQueue() = default;
96
Peek()97 const PendingTask& IncomingTaskQueue::TriageQueue::Peek() {
98 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
99 ReloadFromIncomingQueueIfEmpty();
100 DCHECK(!queue_.empty());
101 return queue_.front();
102 }
103
Pop()104 PendingTask IncomingTaskQueue::TriageQueue::Pop() {
105 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
106 ReloadFromIncomingQueueIfEmpty();
107 DCHECK(!queue_.empty());
108 PendingTask pending_task = std::move(queue_.front());
109 queue_.pop();
110 return pending_task;
111 }
112
HasTasks()113 bool IncomingTaskQueue::TriageQueue::HasTasks() {
114 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
115 ReloadFromIncomingQueueIfEmpty();
116 return !queue_.empty();
117 }
118
Clear()119 void IncomingTaskQueue::TriageQueue::Clear() {
120 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
121
122 // Clear() should be invoked before WillDestroyCurrentMessageLoop().
123 DCHECK(outer_->accept_new_tasks_);
124
125 // Delete all currently pending tasks but not tasks potentially posted from
126 // their destructors. See ~MessageLoop() for the full logic mitigating against
127 // infite loops when clearing pending tasks. The ScopedClosureRunner below
128 // will be bound to a task posted at the end of the queue. After it is posted,
129 // tasks will be deleted one by one, when the bound ScopedClosureRunner is
130 // deleted and sets |deleted_all_originally_pending|, we know we've deleted
131 // all originally pending tasks.
132 bool deleted_all_originally_pending = false;
133 ScopedClosureRunner capture_deleted_all_originally_pending(BindOnce(
134 [](bool* deleted_all_originally_pending) {
135 *deleted_all_originally_pending = true;
136 },
137 Unretained(&deleted_all_originally_pending)));
138 outer_->AddToIncomingQueue(
139 FROM_HERE,
140 BindOnce([](ScopedClosureRunner) {},
141 std::move(capture_deleted_all_originally_pending)),
142 TimeDelta(), Nestable::kNestable);
143
144 while (!deleted_all_originally_pending) {
145 PendingTask pending_task = Pop();
146
147 if (!pending_task.delayed_run_time.is_null()) {
148 outer_->delayed_tasks().Push(std::move(pending_task));
149 }
150 }
151 }
152
ReloadFromIncomingQueueIfEmpty()153 void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() {
154 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
155 if (queue_.empty()) {
156 outer_->ReloadWorkQueue(&queue_);
157 }
158 }
159
DelayedQueue()160 IncomingTaskQueue::DelayedQueue::DelayedQueue() {
161 DETACH_FROM_SEQUENCE(sequence_checker_);
162 }
163
164 IncomingTaskQueue::DelayedQueue::~DelayedQueue() = default;
165
Push(PendingTask pending_task)166 void IncomingTaskQueue::DelayedQueue::Push(PendingTask pending_task) {
167 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
168
169 if (pending_task.is_high_res)
170 ++pending_high_res_tasks_;
171
172 queue_.push(std::move(pending_task));
173 }
174
Peek()175 const PendingTask& IncomingTaskQueue::DelayedQueue::Peek() {
176 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
177 DCHECK(!queue_.empty());
178 return queue_.top();
179 }
180
Pop()181 PendingTask IncomingTaskQueue::DelayedQueue::Pop() {
182 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
183 DCHECK(!queue_.empty());
184 PendingTask delayed_task = std::move(const_cast<PendingTask&>(queue_.top()));
185 queue_.pop();
186
187 if (delayed_task.is_high_res)
188 --pending_high_res_tasks_;
189 DCHECK_GE(pending_high_res_tasks_, 0);
190
191 return delayed_task;
192 }
193
HasTasks()194 bool IncomingTaskQueue::DelayedQueue::HasTasks() {
195 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
196 // TODO(robliao): The other queues don't check for IsCancelled(). Should they?
197 while (!queue_.empty() && Peek().task.IsCancelled())
198 Pop();
199
200 return !queue_.empty();
201 }
202
Clear()203 void IncomingTaskQueue::DelayedQueue::Clear() {
204 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
205 while (!queue_.empty())
206 Pop();
207 }
208
Size() const209 size_t IncomingTaskQueue::DelayedQueue::Size() const {
210 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
211 return queue_.size();
212 }
213
DeferredQueue()214 IncomingTaskQueue::DeferredQueue::DeferredQueue() {
215 DETACH_FROM_SEQUENCE(sequence_checker_);
216 }
217
218 IncomingTaskQueue::DeferredQueue::~DeferredQueue() = default;
219
Push(PendingTask pending_task)220 void IncomingTaskQueue::DeferredQueue::Push(PendingTask pending_task) {
221 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
222 queue_.push(std::move(pending_task));
223 }
224
Peek()225 const PendingTask& IncomingTaskQueue::DeferredQueue::Peek() {
226 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
227 DCHECK(!queue_.empty());
228 return queue_.front();
229 }
230
Pop()231 PendingTask IncomingTaskQueue::DeferredQueue::Pop() {
232 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
233 DCHECK(!queue_.empty());
234 PendingTask deferred_task = std::move(queue_.front());
235 queue_.pop();
236 return deferred_task;
237 }
238
HasTasks()239 bool IncomingTaskQueue::DeferredQueue::HasTasks() {
240 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
241 return !queue_.empty();
242 }
243
Clear()244 void IncomingTaskQueue::DeferredQueue::Clear() {
245 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
246 while (!queue_.empty())
247 Pop();
248 }
249
PostPendingTask(PendingTask * pending_task)250 bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
251 // Warning: Don't try to short-circuit, and handle this thread's tasks more
252 // directly, as it could starve handling of foreign threads. Put every task
253 // into this queue.
254 bool accept_new_tasks;
255 bool was_empty = false;
256 {
257 AutoLock auto_lock(incoming_queue_lock_);
258 accept_new_tasks = accept_new_tasks_;
259 if (accept_new_tasks) {
260 was_empty =
261 PostPendingTaskLockRequired(pending_task) && triage_queue_empty_;
262 }
263 }
264
265 if (!accept_new_tasks) {
266 // Clear the pending task outside of |incoming_queue_lock_| to prevent any
267 // chance of self-deadlock if destroying a task also posts a task to this
268 // queue.
269 pending_task->task.Reset();
270 return false;
271 }
272
273 // Let |task_queue_observer_| know of the queued task. This is done outside
274 // |incoming_queue_lock_| to avoid conflating locks (DidQueueTask() can also
275 // use a lock).
276 task_queue_observer_->DidQueueTask(was_empty);
277
278 return true;
279 }
280
PostPendingTaskLockRequired(PendingTask * pending_task)281 bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) {
282 incoming_queue_lock_.AssertAcquired();
283
284 // Initialize the sequence number. The sequence number is used for delayed
285 // tasks (to facilitate FIFO sorting when two tasks have the same
286 // delayed_run_time value) and for identifying the task in about:tracing.
287 pending_task->sequence_num = next_sequence_num_++;
288
289 task_queue_observer_->WillQueueTask(pending_task);
290
291 bool was_empty = incoming_queue_.empty();
292 incoming_queue_.push(std::move(*pending_task));
293 return was_empty;
294 }
295
ReloadWorkQueue(TaskQueue * work_queue)296 void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
297 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
298
299 // Make sure no tasks are lost.
300 DCHECK(work_queue->empty());
301
302 // Acquire all we can from the inter-thread queue with one lock acquisition.
303 AutoLock lock(incoming_queue_lock_);
304 incoming_queue_.swap(*work_queue);
305 triage_queue_empty_ = work_queue->empty();
306 }
307
308 } // namespace internal
309 } // namespace base
310