• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Flutter Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "flutter/fml/concurrent_message_loop.h"
6 
7 #include <algorithm>
8 
9 #include "flutter/fml/thread.h"
10 #include "flutter/fml/trace_event.h"
11 
12 namespace fml {
13 
Create(size_t worker_count)14 std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
15     size_t worker_count) {
16   return std::shared_ptr<ConcurrentMessageLoop>{
17       new ConcurrentMessageLoop(worker_count)};
18 }
19 
ConcurrentMessageLoop(size_t worker_count)20 ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
21     : worker_count_(std::max<size_t>(worker_count, 1ul)) {
22   for (size_t i = 0; i < worker_count_; ++i) {
23     workers_.emplace_back([i, this]() {
24       fml::Thread::SetCurrentThreadName(
25           std::string{"io.flutter.worker." + std::to_string(i + 1)});
26       WorkerMain();
27     });
28   }
29 }
30 
~ConcurrentMessageLoop()31 ConcurrentMessageLoop::~ConcurrentMessageLoop() {
32   Terminate();
33   for (auto& worker : workers_) {
34     worker.join();
35   }
36 }
37 
GetWorkerCount() const38 size_t ConcurrentMessageLoop::GetWorkerCount() const {
39   return worker_count_;
40 }
41 
GetTaskRunner()42 std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
43   return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
44 }
45 
PostTask(fml::closure task)46 void ConcurrentMessageLoop::PostTask(fml::closure task) {
47   if (!task) {
48     return;
49   }
50 
51   std::unique_lock lock(tasks_mutex_);
52 
53   // Don't just drop tasks on the floor in case of shutdown.
54   if (shutdown_) {
55     FML_DLOG(WARNING)
56         << "Tried to post a task to shutdown concurrent message "
57            "loop. The task will be executed on the callers thread.";
58     lock.unlock();
59     task();
60     return;
61   }
62 
63   tasks_.push(task);
64 
65   // Unlock the mutex before notifying the condition variable because that mutex
66   // has to be acquired on the other thread anyway. Waiting in this scope till
67   // it is acquired there is a pessimization.
68   lock.unlock();
69 
70   tasks_condition_.notify_one();
71 }
72 
WorkerMain()73 void ConcurrentMessageLoop::WorkerMain() {
74   while (true) {
75     std::unique_lock lock(tasks_mutex_);
76     tasks_condition_.wait(lock,
77                           [&]() { return tasks_.size() > 0 || shutdown_; });
78 
79     if (tasks_.size() == 0) {
80       // This can only be caused by shutdown.
81       FML_DCHECK(shutdown_);
82       break;
83     }
84 
85     auto task = tasks_.front();
86     tasks_.pop();
87 
88     // Don't hold onto the mutex while the task is being executed as it could
89     // itself try to post another tasks to this message loop.
90     lock.unlock();
91 
92     TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
93     // Execute the one tasks we woke up for.
94     task();
95   }
96 }
97 
Terminate()98 void ConcurrentMessageLoop::Terminate() {
99   std::scoped_lock lock(tasks_mutex_);
100   shutdown_ = true;
101   tasks_condition_.notify_all();
102 }
103 
ConcurrentTaskRunner(std::weak_ptr<ConcurrentMessageLoop> weak_loop)104 ConcurrentTaskRunner::ConcurrentTaskRunner(
105     std::weak_ptr<ConcurrentMessageLoop> weak_loop)
106     : weak_loop_(std::move(weak_loop)) {}
107 
108 ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;
109 
PostTask(fml::closure task)110 void ConcurrentTaskRunner::PostTask(fml::closure task) {
111   if (!task) {
112     return;
113   }
114 
115   if (auto loop = weak_loop_.lock()) {
116     loop->PostTask(task);
117     return;
118   }
119 
120   FML_DLOG(WARNING)
121       << "Tried to post to a concurrent message loop that has already died. "
122          "Executing the task on the callers thread.";
123   task();
124 }
125 
126 }  // namespace fml
127