• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Flutter Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #define FML_USED_ON_EMBEDDER
6 
7 #include "flutter/fml/message_loop_task_queues.h"
8 #include "flutter/fml/make_copyable.h"
9 #include "flutter/fml/message_loop_impl.h"
10 
11 namespace fml {
12 
13 std::mutex MessageLoopTaskQueues::creation_mutex_;
14 
15 const size_t TaskQueueId::kUnmerged = ULONG_MAX;
16 
17 fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;
18 
TaskQueueEntry()19 TaskQueueEntry::TaskQueueEntry()
20     : owner_of(_kUnmerged), subsumed_by(_kUnmerged) {
21   wakeable = NULL;
22   task_observers = TaskObservers();
23   delayed_tasks = DelayedTaskQueue();
24 }
25 
GetInstance()26 fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
27   std::scoped_lock creation(creation_mutex_);
28   if (!instance_) {
29     instance_ = fml::MakeRefCounted<MessageLoopTaskQueues>();
30   }
31   return instance_;
32 }
33 
CreateTaskQueue()34 TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
35   std::lock_guard guard(queue_mutex_);
36   TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
37   ++task_queue_id_counter_;
38   queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
39 
40   return loop_id;
41 }
42 
MessageLoopTaskQueues()43 MessageLoopTaskQueues::MessageLoopTaskQueues()
44     : task_queue_id_counter_(0), order_(0) {}
45 
46 MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
47 
Dispose(TaskQueueId queue_id)48 void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
49   std::lock_guard guard(queue_mutex_);
50   const auto& queue_entry = queue_entries_.at(queue_id);
51   FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
52   TaskQueueId subsumed = queue_entry->owner_of;
53   queue_entries_.erase(queue_id);
54   if (subsumed != _kUnmerged) {
55     queue_entries_.erase(subsumed);
56   }
57 }
58 
DisposeTasks(TaskQueueId queue_id)59 void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
60   DelayedTaskQueue dispose_tasks;
61   DelayedTaskQueue dispose_merged_tasks;
62   {
63     std::lock_guard guard(queue_mutex_);
64     const auto& queue_entry = queue_entries_.at(queue_id);
65     FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
66     TaskQueueId subsumed = queue_entry->owner_of;
67     dispose_tasks = std::move(queue_entry->delayed_tasks);
68     if (subsumed != _kUnmerged) {
69       dispose_merged_tasks = std::move(queue_entries_.at(subsumed)->delayed_tasks);
70     }
71   }
72 }
73 
RegisterTask(TaskQueueId queue_id,fml::closure task,fml::TimePoint target_time)74 void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
75                                          fml::closure task,
76                                          fml::TimePoint target_time) {
77   std::lock_guard guard(queue_mutex_);
78   size_t order = order_++;
79   const auto& queue_entry = queue_entries_.at(queue_id);
80   queue_entry->delayed_tasks.push({order, std::move(task), target_time});
81   TaskQueueId loop_to_wake = queue_id;
82   if (queue_entry->subsumed_by != _kUnmerged) {
83     loop_to_wake = queue_entry->subsumed_by;
84   }
85   WakeUpUnlocked(loop_to_wake,
86                  queue_entry->delayed_tasks.top().GetTargetTime());
87 }
88 
HasPendingTasks(TaskQueueId queue_id) const89 bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
90   std::lock_guard guard(queue_mutex_);
91   return HasPendingTasksUnlocked(queue_id);
92 }
93 
GetTasksToRunNow(TaskQueueId queue_id,FlushType type,std::vector<fml::closure> & invocations)94 void MessageLoopTaskQueues::GetTasksToRunNow(
95     TaskQueueId queue_id,
96     FlushType type,
97     std::vector<fml::closure>& invocations) {
98   std::lock_guard guard(queue_mutex_);
99 
100   if (!HasPendingTasksUnlocked(queue_id)) {
101     return;
102   }
103 
104   const auto now = fml::TimePoint::Now();
105 
106   while (HasPendingTasksUnlocked(queue_id)) {
107     TaskQueueId top_queue = _kUnmerged;
108     const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
109     if (top.GetTargetTime() > now) {
110       break;
111     }
112     invocations.emplace_back(std::move(top.GetTask()));
113     queue_entries_.at(top_queue)->delayed_tasks.pop();
114     if (type == FlushType::kSingle) {
115       break;
116     }
117   }
118 
119   if (!HasPendingTasksUnlocked(queue_id)) {
120     WakeUpUnlocked(queue_id, fml::TimePoint::Max());
121   } else {
122     WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
123   }
124 }
125 
WakeUpUnlocked(TaskQueueId queue_id,fml::TimePoint time) const126 void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
127                                            fml::TimePoint time) const {
128   if (queue_entries_.at(queue_id)->wakeable) {
129     queue_entries_.at(queue_id)->wakeable->WakeUp(time);
130   }
131 }
132 
GetNumPendingTasks(TaskQueueId queue_id) const133 size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
134   std::lock_guard guard(queue_mutex_);
135 
136   const auto& queue_entry = queue_entries_.at(queue_id);
137   if (queue_entry->subsumed_by != _kUnmerged) {
138     return 0;
139   }
140   size_t total_tasks = 0;
141   total_tasks += queue_entry->delayed_tasks.size();
142 
143   TaskQueueId subsumed = queue_entry->owner_of;
144   if (subsumed != _kUnmerged) {
145     const auto& subsumed_entry = queue_entries_.at(subsumed);
146     total_tasks += subsumed_entry->delayed_tasks.size();
147   }
148   return total_tasks;
149 }
150 
AddTaskObserver(TaskQueueId queue_id,intptr_t key,fml::closure callback)151 void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
152                                             intptr_t key,
153                                             fml::closure callback) {
154   std::lock_guard guard(queue_mutex_);
155   FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
156   queue_entries_.at(queue_id)->task_observers[key] = std::move(callback);
157 }
158 
RemoveTaskObserver(TaskQueueId queue_id,intptr_t key)159 void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
160                                                intptr_t key) {
161   std::lock_guard guard(queue_mutex_);
162   queue_entries_.at(queue_id)->task_observers.erase(key);
163 }
164 
GetObserversToNotify(TaskQueueId queue_id) const165 std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
166     TaskQueueId queue_id) const {
167   std::lock_guard guard(queue_mutex_);
168   std::vector<fml::closure> observers;
169 
170   if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
171     return observers;
172   }
173 
174   for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
175     observers.push_back(observer.second);
176   }
177 
178   TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
179   if (subsumed != _kUnmerged) {
180     for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
181       observers.push_back(observer.second);
182     }
183   }
184 
185   return observers;
186 }
187 
SetWakeable(TaskQueueId queue_id,fml::Wakeable * wakeable)188 void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
189                                         fml::Wakeable* wakeable) {
190   std::lock_guard guard(queue_mutex_);
191   FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
192       << "Wakeable can only be set once.";
193   queue_entries_.at(queue_id)->wakeable = wakeable;
194 }
195 
Merge(TaskQueueId owner,TaskQueueId subsumed)196 bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
197   if (owner == subsumed) {
198     return true;
199   }
200 
201   std::lock_guard guard(queue_mutex_);
202   auto& owner_entry = queue_entries_.at(owner);
203   auto& subsumed_entry = queue_entries_.at(subsumed);
204 
205   if (owner_entry->owner_of == subsumed) {
206     return true;
207   }
208 
209   std::vector<TaskQueueId> owner_subsumed_keys = {
210       owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of,
211       subsumed_entry->subsumed_by};
212 
213   for (auto key : owner_subsumed_keys) {
214     if (key != _kUnmerged) {
215       return false;
216     }
217   }
218 
219   owner_entry->owner_of = subsumed;
220   subsumed_entry->subsumed_by = owner;
221 
222   if (HasPendingTasksUnlocked(owner)) {
223     WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
224   }
225 
226   return true;
227 }
228 
Unmerge(TaskQueueId owner)229 bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
230   std::lock_guard guard(queue_mutex_);
231   const auto& owner_entry = queue_entries_.at(owner);
232 
233   const TaskQueueId subsumed = owner_entry->owner_of;
234   if (subsumed == _kUnmerged) {
235     return false;
236   }
237 
238   queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
239   owner_entry->owner_of = _kUnmerged;
240 
241   if (HasPendingTasksUnlocked(owner)) {
242     WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
243   }
244 
245   if (HasPendingTasksUnlocked(subsumed)) {
246     WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
247   }
248 
249   return true;
250 }
251 
Owns(TaskQueueId owner,TaskQueueId subsumed) const252 bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
253                                  TaskQueueId subsumed) const {
254   std::lock_guard guard(queue_mutex_);
255   return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed;
256 }
257 
258 // Subsumed queues will never have pending tasks.
259 // Owning queues will consider both their and their subsumed tasks.
HasPendingTasksUnlocked(TaskQueueId queue_id) const260 bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
261     TaskQueueId queue_id) const {
262   const auto& entry = queue_entries_.at(queue_id);
263   bool is_subsumed = entry->subsumed_by != _kUnmerged;
264   if (is_subsumed) {
265     return false;
266   }
267 
268   if (!entry->delayed_tasks.empty()) {
269     return true;
270   }
271 
272   const TaskQueueId subsumed = entry->owner_of;
273   if (subsumed == _kUnmerged) {
274     // this is not an owner and queue is empty.
275     return false;
276   } else {
277     return !queue_entries_.at(subsumed)->delayed_tasks.empty();
278   }
279 }
280 
GetNextWakeTimeUnlocked(TaskQueueId queue_id) const281 fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
282     TaskQueueId queue_id) const {
283   TaskQueueId tmp = _kUnmerged;
284   return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime();
285 }
286 
PeekNextTaskUnlocked(TaskQueueId owner,TaskQueueId & top_queue_id) const287 const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
288     TaskQueueId owner,
289     TaskQueueId& top_queue_id) const {
290   FML_DCHECK(HasPendingTasksUnlocked(owner));
291   const auto& entry = queue_entries_.at(owner);
292   const TaskQueueId subsumed = entry->owner_of;
293   if (subsumed == _kUnmerged) {
294     top_queue_id = owner;
295     return entry->delayed_tasks.top();
296   }
297 
298   const auto& owner_tasks = entry->delayed_tasks;
299   const auto& subsumed_tasks = queue_entries_.at(subsumed)->delayed_tasks;
300 
301   // we are owning another task queue
302   const bool subsumed_has_task = !subsumed_tasks.empty();
303   const bool owner_has_task = !owner_tasks.empty();
304   if (owner_has_task && subsumed_has_task) {
305     const auto owner_task = owner_tasks.top();
306     const auto subsumed_task = subsumed_tasks.top();
307     if (owner_task > subsumed_task) {
308       top_queue_id = subsumed;
309     } else {
310       top_queue_id = owner;
311     }
312   } else if (owner_has_task) {
313     top_queue_id = owner;
314   } else {
315     top_queue_id = subsumed;
316   }
317   return queue_entries_.at(top_queue_id)->delayed_tasks.top();
318 }
319 
320 }  // namespace fml
321