1 // Copyright 2013 The Flutter Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #define FML_USED_ON_EMBEDDER
6
7 #include "flutter/fml/message_loop_task_queues.h"
8 #include "flutter/fml/make_copyable.h"
9 #include "flutter/fml/message_loop_impl.h"
10
11 namespace fml {
12
13 std::mutex MessageLoopTaskQueues::creation_mutex_;
14
15 const size_t TaskQueueId::kUnmerged = ULONG_MAX;
16
17 fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;
18
TaskQueueEntry()19 TaskQueueEntry::TaskQueueEntry()
20 : owner_of(_kUnmerged), subsumed_by(_kUnmerged) {
21 wakeable = NULL;
22 task_observers = TaskObservers();
23 delayed_tasks = DelayedTaskQueue();
24 }
25
GetInstance()26 fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
27 std::scoped_lock creation(creation_mutex_);
28 if (!instance_) {
29 instance_ = fml::MakeRefCounted<MessageLoopTaskQueues>();
30 }
31 return instance_;
32 }
33
CreateTaskQueue()34 TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
35 std::lock_guard guard(queue_mutex_);
36 TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
37 ++task_queue_id_counter_;
38 queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
39
40 return loop_id;
41 }
42
MessageLoopTaskQueues()43 MessageLoopTaskQueues::MessageLoopTaskQueues()
44 : task_queue_id_counter_(0), order_(0) {}
45
46 MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
47
Dispose(TaskQueueId queue_id)48 void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
49 std::lock_guard guard(queue_mutex_);
50 const auto& queue_entry = queue_entries_.at(queue_id);
51 FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
52 TaskQueueId subsumed = queue_entry->owner_of;
53 queue_entries_.erase(queue_id);
54 if (subsumed != _kUnmerged) {
55 queue_entries_.erase(subsumed);
56 }
57 }
58
DisposeTasks(TaskQueueId queue_id)59 void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
60 DelayedTaskQueue dispose_tasks;
61 DelayedTaskQueue dispose_merged_tasks;
62 {
63 std::lock_guard guard(queue_mutex_);
64 const auto& queue_entry = queue_entries_.at(queue_id);
65 FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
66 TaskQueueId subsumed = queue_entry->owner_of;
67 dispose_tasks = std::move(queue_entry->delayed_tasks);
68 if (subsumed != _kUnmerged) {
69 dispose_merged_tasks = std::move(queue_entries_.at(subsumed)->delayed_tasks);
70 }
71 }
72 }
73
RegisterTask(TaskQueueId queue_id,fml::closure task,fml::TimePoint target_time)74 void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
75 fml::closure task,
76 fml::TimePoint target_time) {
77 std::lock_guard guard(queue_mutex_);
78 size_t order = order_++;
79 const auto& queue_entry = queue_entries_.at(queue_id);
80 queue_entry->delayed_tasks.push({order, std::move(task), target_time});
81 TaskQueueId loop_to_wake = queue_id;
82 if (queue_entry->subsumed_by != _kUnmerged) {
83 loop_to_wake = queue_entry->subsumed_by;
84 }
85 WakeUpUnlocked(loop_to_wake,
86 queue_entry->delayed_tasks.top().GetTargetTime());
87 }
88
HasPendingTasks(TaskQueueId queue_id) const89 bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
90 std::lock_guard guard(queue_mutex_);
91 return HasPendingTasksUnlocked(queue_id);
92 }
93
GetTasksToRunNow(TaskQueueId queue_id,FlushType type,std::vector<fml::closure> & invocations)94 void MessageLoopTaskQueues::GetTasksToRunNow(
95 TaskQueueId queue_id,
96 FlushType type,
97 std::vector<fml::closure>& invocations) {
98 std::lock_guard guard(queue_mutex_);
99
100 if (!HasPendingTasksUnlocked(queue_id)) {
101 return;
102 }
103
104 const auto now = fml::TimePoint::Now();
105
106 while (HasPendingTasksUnlocked(queue_id)) {
107 TaskQueueId top_queue = _kUnmerged;
108 const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
109 if (top.GetTargetTime() > now) {
110 break;
111 }
112 invocations.emplace_back(std::move(top.GetTask()));
113 queue_entries_.at(top_queue)->delayed_tasks.pop();
114 if (type == FlushType::kSingle) {
115 break;
116 }
117 }
118
119 if (!HasPendingTasksUnlocked(queue_id)) {
120 WakeUpUnlocked(queue_id, fml::TimePoint::Max());
121 } else {
122 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
123 }
124 }
125
WakeUpUnlocked(TaskQueueId queue_id,fml::TimePoint time) const126 void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
127 fml::TimePoint time) const {
128 if (queue_entries_.at(queue_id)->wakeable) {
129 queue_entries_.at(queue_id)->wakeable->WakeUp(time);
130 }
131 }
132
GetNumPendingTasks(TaskQueueId queue_id) const133 size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
134 std::lock_guard guard(queue_mutex_);
135
136 const auto& queue_entry = queue_entries_.at(queue_id);
137 if (queue_entry->subsumed_by != _kUnmerged) {
138 return 0;
139 }
140 size_t total_tasks = 0;
141 total_tasks += queue_entry->delayed_tasks.size();
142
143 TaskQueueId subsumed = queue_entry->owner_of;
144 if (subsumed != _kUnmerged) {
145 const auto& subsumed_entry = queue_entries_.at(subsumed);
146 total_tasks += subsumed_entry->delayed_tasks.size();
147 }
148 return total_tasks;
149 }
150
AddTaskObserver(TaskQueueId queue_id,intptr_t key,fml::closure callback)151 void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
152 intptr_t key,
153 fml::closure callback) {
154 std::lock_guard guard(queue_mutex_);
155 FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
156 queue_entries_.at(queue_id)->task_observers[key] = std::move(callback);
157 }
158
RemoveTaskObserver(TaskQueueId queue_id,intptr_t key)159 void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
160 intptr_t key) {
161 std::lock_guard guard(queue_mutex_);
162 queue_entries_.at(queue_id)->task_observers.erase(key);
163 }
164
GetObserversToNotify(TaskQueueId queue_id) const165 std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
166 TaskQueueId queue_id) const {
167 std::lock_guard guard(queue_mutex_);
168 std::vector<fml::closure> observers;
169
170 if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
171 return observers;
172 }
173
174 for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
175 observers.push_back(observer.second);
176 }
177
178 TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
179 if (subsumed != _kUnmerged) {
180 for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
181 observers.push_back(observer.second);
182 }
183 }
184
185 return observers;
186 }
187
SetWakeable(TaskQueueId queue_id,fml::Wakeable * wakeable)188 void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
189 fml::Wakeable* wakeable) {
190 std::lock_guard guard(queue_mutex_);
191 FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
192 << "Wakeable can only be set once.";
193 queue_entries_.at(queue_id)->wakeable = wakeable;
194 }
195
Merge(TaskQueueId owner,TaskQueueId subsumed)196 bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
197 if (owner == subsumed) {
198 return true;
199 }
200
201 std::lock_guard guard(queue_mutex_);
202 auto& owner_entry = queue_entries_.at(owner);
203 auto& subsumed_entry = queue_entries_.at(subsumed);
204
205 if (owner_entry->owner_of == subsumed) {
206 return true;
207 }
208
209 std::vector<TaskQueueId> owner_subsumed_keys = {
210 owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of,
211 subsumed_entry->subsumed_by};
212
213 for (auto key : owner_subsumed_keys) {
214 if (key != _kUnmerged) {
215 return false;
216 }
217 }
218
219 owner_entry->owner_of = subsumed;
220 subsumed_entry->subsumed_by = owner;
221
222 if (HasPendingTasksUnlocked(owner)) {
223 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
224 }
225
226 return true;
227 }
228
Unmerge(TaskQueueId owner)229 bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
230 std::lock_guard guard(queue_mutex_);
231 const auto& owner_entry = queue_entries_.at(owner);
232
233 const TaskQueueId subsumed = owner_entry->owner_of;
234 if (subsumed == _kUnmerged) {
235 return false;
236 }
237
238 queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
239 owner_entry->owner_of = _kUnmerged;
240
241 if (HasPendingTasksUnlocked(owner)) {
242 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
243 }
244
245 if (HasPendingTasksUnlocked(subsumed)) {
246 WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
247 }
248
249 return true;
250 }
251
Owns(TaskQueueId owner,TaskQueueId subsumed) const252 bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
253 TaskQueueId subsumed) const {
254 std::lock_guard guard(queue_mutex_);
255 return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed;
256 }
257
258 // Subsumed queues will never have pending tasks.
259 // Owning queues will consider both their and their subsumed tasks.
HasPendingTasksUnlocked(TaskQueueId queue_id) const260 bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
261 TaskQueueId queue_id) const {
262 const auto& entry = queue_entries_.at(queue_id);
263 bool is_subsumed = entry->subsumed_by != _kUnmerged;
264 if (is_subsumed) {
265 return false;
266 }
267
268 if (!entry->delayed_tasks.empty()) {
269 return true;
270 }
271
272 const TaskQueueId subsumed = entry->owner_of;
273 if (subsumed == _kUnmerged) {
274 // this is not an owner and queue is empty.
275 return false;
276 } else {
277 return !queue_entries_.at(subsumed)->delayed_tasks.empty();
278 }
279 }
280
GetNextWakeTimeUnlocked(TaskQueueId queue_id) const281 fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
282 TaskQueueId queue_id) const {
283 TaskQueueId tmp = _kUnmerged;
284 return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime();
285 }
286
PeekNextTaskUnlocked(TaskQueueId owner,TaskQueueId & top_queue_id) const287 const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
288 TaskQueueId owner,
289 TaskQueueId& top_queue_id) const {
290 FML_DCHECK(HasPendingTasksUnlocked(owner));
291 const auto& entry = queue_entries_.at(owner);
292 const TaskQueueId subsumed = entry->owner_of;
293 if (subsumed == _kUnmerged) {
294 top_queue_id = owner;
295 return entry->delayed_tasks.top();
296 }
297
298 const auto& owner_tasks = entry->delayed_tasks;
299 const auto& subsumed_tasks = queue_entries_.at(subsumed)->delayed_tasks;
300
301 // we are owning another task queue
302 const bool subsumed_has_task = !subsumed_tasks.empty();
303 const bool owner_has_task = !owner_tasks.empty();
304 if (owner_has_task && subsumed_has_task) {
305 const auto owner_task = owner_tasks.top();
306 const auto subsumed_task = subsumed_tasks.top();
307 if (owner_task > subsumed_task) {
308 top_queue_id = subsumed;
309 } else {
310 top_queue_id = owner;
311 }
312 } else if (owner_has_task) {
313 top_queue_id = owner;
314 } else {
315 top_queue_id = subsumed;
316 }
317 return queue_entries_.at(top_queue_id)->delayed_tasks.top();
318 }
319
320 } // namespace fml
321