1 // Copyright 2013 The Flutter Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "flutter/fml/concurrent_message_loop.h" 6 7 #include <algorithm> 8 9 #include "flutter/fml/thread.h" 10 #include "flutter/fml/trace_event.h" 11 12 namespace fml { 13 Create(size_t worker_count)14std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create( 15 size_t worker_count) { 16 return std::shared_ptr<ConcurrentMessageLoop>{ 17 new ConcurrentMessageLoop(worker_count)}; 18 } 19 ConcurrentMessageLoop(size_t worker_count)20ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count) 21 : worker_count_(std::max<size_t>(worker_count, 1ul)) { 22 for (size_t i = 0; i < worker_count_; ++i) { 23 workers_.emplace_back([i, this]() { 24 fml::Thread::SetCurrentThreadName( 25 std::string{"io.flutter.worker." + std::to_string(i + 1)}); 26 WorkerMain(); 27 }); 28 } 29 } 30 ~ConcurrentMessageLoop()31ConcurrentMessageLoop::~ConcurrentMessageLoop() { 32 Terminate(); 33 for (auto& worker : workers_) { 34 worker.join(); 35 } 36 } 37 GetWorkerCount() const38size_t ConcurrentMessageLoop::GetWorkerCount() const { 39 return worker_count_; 40 } 41 GetTaskRunner()42std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() { 43 return std::make_shared<ConcurrentTaskRunner>(weak_from_this()); 44 } 45 PostTask(fml::closure task)46void ConcurrentMessageLoop::PostTask(fml::closure task) { 47 if (!task) { 48 return; 49 } 50 51 std::unique_lock lock(tasks_mutex_); 52 53 // Don't just drop tasks on the floor in case of shutdown. 54 if (shutdown_) { 55 FML_DLOG(WARNING) 56 << "Tried to post a task to shutdown concurrent message " 57 "loop. The task will be executed on the callers thread."; 58 lock.unlock(); 59 task(); 60 return; 61 } 62 63 tasks_.push(task); 64 65 // Unlock the mutex before notifying the condition variable because that mutex 66 // has to be acquired on the other thread anyway. Waiting in this scope till 67 // it is acquired there is a pessimization. 68 lock.unlock(); 69 70 tasks_condition_.notify_one(); 71 } 72 WorkerMain()73void ConcurrentMessageLoop::WorkerMain() { 74 while (true) { 75 std::unique_lock lock(tasks_mutex_); 76 tasks_condition_.wait(lock, 77 [&]() { return tasks_.size() > 0 || shutdown_; }); 78 79 if (tasks_.size() == 0) { 80 // This can only be caused by shutdown. 81 FML_DCHECK(shutdown_); 82 break; 83 } 84 85 auto task = tasks_.front(); 86 tasks_.pop(); 87 88 // Don't hold onto the mutex while the task is being executed as it could 89 // itself try to post another tasks to this message loop. 90 lock.unlock(); 91 92 TRACE_EVENT0("flutter", "ConcurrentWorkerWake"); 93 // Execute the one tasks we woke up for. 94 task(); 95 } 96 } 97 Terminate()98void ConcurrentMessageLoop::Terminate() { 99 std::scoped_lock lock(tasks_mutex_); 100 shutdown_ = true; 101 tasks_condition_.notify_all(); 102 } 103 ConcurrentTaskRunner(std::weak_ptr<ConcurrentMessageLoop> weak_loop)104ConcurrentTaskRunner::ConcurrentTaskRunner( 105 std::weak_ptr<ConcurrentMessageLoop> weak_loop) 106 : weak_loop_(std::move(weak_loop)) {} 107 108 ConcurrentTaskRunner::~ConcurrentTaskRunner() = default; 109 PostTask(fml::closure task)110void ConcurrentTaskRunner::PostTask(fml::closure task) { 111 if (!task) { 112 return; 113 } 114 115 if (auto loop = weak_loop_.lock()) { 116 loop->PostTask(task); 117 return; 118 } 119 120 FML_DLOG(WARNING) 121 << "Tried to post to a concurrent message loop that has already died. " 122 "Executing the task on the callers thread."; 123 task(); 124 } 125 126 } // namespace fml 127