1 // Copyright 2013 The Flutter Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef FLUTTER_SHELL_COMMON_PIPELINE_H_ 6 #define FLUTTER_SHELL_COMMON_PIPELINE_H_ 7 8 #include "flutter/fml/macros.h" 9 #include "flutter/fml/memory/ref_counted.h" 10 #include "flutter/fml/synchronization/semaphore.h" 11 #include "flutter/fml/trace_event.h" 12 13 #include <deque> 14 #include <memory> 15 #include <mutex> 16 17 namespace flutter { 18 19 enum class PipelineConsumeResult { 20 NoneAvailable, 21 Done, 22 MoreAvailable, 23 }; 24 25 size_t GetNextPipelineTraceID(); 26 27 /// A thread-safe queue of resources for a single consumer and a single 28 /// producer. 29 template <class R> 30 class Pipeline : public fml::RefCountedThreadSafe<Pipeline<R>> { 31 public: 32 using Resource = R; 33 using ResourcePtr = std::unique_ptr<Resource>; 34 35 /// Denotes a spot in the pipeline reserved for the producer to finish 36 /// preparing a completed pipeline resource. 37 class ProducerContinuation { 38 public: ProducerContinuation()39 ProducerContinuation() : trace_id_(0) {} 40 ProducerContinuation(ProducerContinuation && other)41 ProducerContinuation(ProducerContinuation&& other) 42 : continuation_(other.continuation_), trace_id_(other.trace_id_) { 43 other.continuation_ = nullptr; 44 other.trace_id_ = 0; 45 } 46 47 ProducerContinuation& operator=(ProducerContinuation&& other) { 48 std::swap(continuation_, other.continuation_); 49 std::swap(trace_id_, other.trace_id_); 50 return *this; 51 } 52 ~ProducerContinuation()53 ~ProducerContinuation() { 54 if (continuation_) { 55 continuation_(nullptr, trace_id_); 56 TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_); 57 // The continuation is being dropped on the floor. End the flow. 58 TRACE_FLOW_END("flutter", "PipelineItem", trace_id_); 59 TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id_); 60 } 61 } 62 Complete(ResourcePtr resource)63 void Complete(ResourcePtr resource) { 64 if (continuation_) { 65 continuation_(std::move(resource), trace_id_); 66 continuation_ = nullptr; 67 TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_); 68 TRACE_FLOW_STEP("flutter", "PipelineItem", trace_id_); 69 } 70 } 71 72 operator bool() const { return continuation_ != nullptr; } 73 74 private: 75 friend class Pipeline; 76 using Continuation = std::function<void(ResourcePtr, size_t)>; 77 78 Continuation continuation_; 79 size_t trace_id_; 80 ProducerContinuation(Continuation continuation,size_t trace_id)81 ProducerContinuation(Continuation continuation, size_t trace_id) 82 : continuation_(continuation), trace_id_(trace_id) { 83 TRACE_FLOW_BEGIN("flutter", "PipelineItem", trace_id_); 84 TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineItem", trace_id_); 85 TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineProduce", trace_id_); 86 } 87 88 FML_DISALLOW_COPY_AND_ASSIGN(ProducerContinuation); 89 }; 90 Pipeline(uint32_t depth)91 explicit Pipeline(uint32_t depth) 92 : depth_(depth), empty_(depth), available_(0) {} 93 94 ~Pipeline() = default; 95 IsValid()96 bool IsValid() const { return empty_.IsValid() && available_.IsValid(); } 97 Produce()98 ProducerContinuation Produce() { 99 if (!empty_.TryWait()) { 100 return {}; 101 } 102 103 return ProducerContinuation{ 104 std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1, 105 std::placeholders::_2), // continuation 106 GetNextPipelineTraceID()}; // trace id 107 } 108 109 // Pushes task to the front of the pipeline. 110 // 111 // If we exceed the depth completing this continuation, we drop the 112 // last frame to preserve the depth of the pipeline. 113 // 114 // Note: Use |Pipeline::Produce| where possible. This should only be 115 // used to en-queue high-priority resources. ProduceToFront()116 ProducerContinuation ProduceToFront() { 117 return ProducerContinuation{ 118 std::bind(&Pipeline::ProducerCommitFront, this, std::placeholders::_1, 119 std::placeholders::_2), // continuation 120 GetNextPipelineTraceID()}; // trace id 121 } 122 123 using Consumer = std::function<void(ResourcePtr)>; 124 125 FML_WARN_UNUSED_RESULT Consume(Consumer consumer)126 PipelineConsumeResult Consume(Consumer consumer) { 127 if (consumer == nullptr) { 128 return PipelineConsumeResult::NoneAvailable; 129 } 130 131 if (!available_.TryWait()) { 132 return PipelineConsumeResult::NoneAvailable; 133 } 134 135 ResourcePtr resource; 136 size_t trace_id = 0; 137 size_t items_count = 0; 138 139 { 140 std::scoped_lock lock(queue_mutex_); 141 std::tie(resource, trace_id) = std::move(queue_.front()); 142 queue_.pop_front(); 143 items_count = queue_.size(); 144 } 145 146 { 147 TRACE_EVENT0("flutter", "PipelineConsume"); 148 consumer(std::move(resource)); 149 } 150 151 empty_.Signal(); 152 153 TRACE_FLOW_END("flutter", "PipelineItem", trace_id); 154 TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id); 155 156 return items_count > 0 ? PipelineConsumeResult::MoreAvailable 157 : PipelineConsumeResult::Done; 158 } 159 160 private: 161 uint32_t depth_; 162 fml::Semaphore empty_; 163 fml::Semaphore available_; 164 std::mutex queue_mutex_; 165 std::deque<std::pair<ResourcePtr, size_t>> queue_; 166 ProducerCommit(ResourcePtr resource,size_t trace_id)167 void ProducerCommit(ResourcePtr resource, size_t trace_id) { 168 { 169 std::scoped_lock lock(queue_mutex_); 170 queue_.emplace_back(std::move(resource), trace_id); 171 } 172 173 // Ensure the queue mutex is not held as that would be a pessimization. 174 available_.Signal(); 175 } 176 ProducerCommitFront(ResourcePtr resource,size_t trace_id)177 void ProducerCommitFront(ResourcePtr resource, size_t trace_id) { 178 { 179 std::scoped_lock lock(queue_mutex_); 180 queue_.emplace_front(std::move(resource), trace_id); 181 while (queue_.size() > depth_) { 182 queue_.pop_back(); 183 } 184 } 185 186 // Ensure the queue mutex is not held as that would be a pessimization. 187 available_.Signal(); 188 } 189 190 FML_DISALLOW_COPY_AND_ASSIGN(Pipeline); 191 }; 192 193 } // namespace flutter 194 195 #endif // FLUTTER_SHELL_COMMON_PIPELINE_H_ 196