• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Flutter Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef FLUTTER_SHELL_COMMON_PIPELINE_H_
6 #define FLUTTER_SHELL_COMMON_PIPELINE_H_
7 
8 #include "flutter/fml/macros.h"
9 #include "flutter/fml/memory/ref_counted.h"
10 #include "flutter/fml/synchronization/semaphore.h"
11 #include "flutter/fml/trace_event.h"
12 
13 #include <deque>
14 #include <memory>
15 #include <mutex>
16 
17 namespace flutter {
18 
19 enum class PipelineConsumeResult {
20   NoneAvailable,
21   Done,
22   MoreAvailable,
23 };
24 
25 size_t GetNextPipelineTraceID();
26 
27 /// A thread-safe queue of resources for a single consumer and a single
28 /// producer.
29 template <class R>
30 class Pipeline : public fml::RefCountedThreadSafe<Pipeline<R>> {
31  public:
32   using Resource = R;
33   using ResourcePtr = std::unique_ptr<Resource>;
34 
35   /// Denotes a spot in the pipeline reserved for the producer to finish
36   /// preparing a completed pipeline resource.
37   class ProducerContinuation {
38    public:
ProducerContinuation()39     ProducerContinuation() : trace_id_(0) {}
40 
ProducerContinuation(ProducerContinuation && other)41     ProducerContinuation(ProducerContinuation&& other)
42         : continuation_(other.continuation_), trace_id_(other.trace_id_) {
43       other.continuation_ = nullptr;
44       other.trace_id_ = 0;
45     }
46 
47     ProducerContinuation& operator=(ProducerContinuation&& other) {
48       std::swap(continuation_, other.continuation_);
49       std::swap(trace_id_, other.trace_id_);
50       return *this;
51     }
52 
~ProducerContinuation()53     ~ProducerContinuation() {
54       if (continuation_) {
55         continuation_(nullptr, trace_id_);
56         TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_);
57         // The continuation is being dropped on the floor. End the flow.
58         TRACE_FLOW_END("flutter", "PipelineItem", trace_id_);
59         TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id_);
60       }
61     }
62 
Complete(ResourcePtr resource)63     void Complete(ResourcePtr resource) {
64       if (continuation_) {
65         continuation_(std::move(resource), trace_id_);
66         continuation_ = nullptr;
67         TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_);
68         TRACE_FLOW_STEP("flutter", "PipelineItem", trace_id_);
69       }
70     }
71 
72     operator bool() const { return continuation_ != nullptr; }
73 
74    private:
75     friend class Pipeline;
76     using Continuation = std::function<void(ResourcePtr, size_t)>;
77 
78     Continuation continuation_;
79     size_t trace_id_;
80 
ProducerContinuation(Continuation continuation,size_t trace_id)81     ProducerContinuation(Continuation continuation, size_t trace_id)
82         : continuation_(continuation), trace_id_(trace_id) {
83       TRACE_FLOW_BEGIN("flutter", "PipelineItem", trace_id_);
84       TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineItem", trace_id_);
85       TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineProduce", trace_id_);
86     }
87 
88     FML_DISALLOW_COPY_AND_ASSIGN(ProducerContinuation);
89   };
90 
Pipeline(uint32_t depth)91   explicit Pipeline(uint32_t depth)
92       : depth_(depth), empty_(depth), available_(0) {}
93 
94   ~Pipeline() = default;
95 
IsValid()96   bool IsValid() const { return empty_.IsValid() && available_.IsValid(); }
97 
Produce()98   ProducerContinuation Produce() {
99     if (!empty_.TryWait()) {
100       return {};
101     }
102 
103     return ProducerContinuation{
104         std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1,
105                   std::placeholders::_2),  // continuation
106         GetNextPipelineTraceID()};         // trace id
107   }
108 
109   // Pushes task to the front of the pipeline.
110   //
111   // If we exceed the depth completing this continuation, we drop the
112   // last frame to preserve the depth of the pipeline.
113   //
114   // Note: Use |Pipeline::Produce| where possible. This should only be
115   // used to en-queue high-priority resources.
ProduceToFront()116   ProducerContinuation ProduceToFront() {
117     return ProducerContinuation{
118         std::bind(&Pipeline::ProducerCommitFront, this, std::placeholders::_1,
119                   std::placeholders::_2),  // continuation
120         GetNextPipelineTraceID()};         // trace id
121   }
122 
123   using Consumer = std::function<void(ResourcePtr)>;
124 
125   FML_WARN_UNUSED_RESULT
Consume(Consumer consumer)126   PipelineConsumeResult Consume(Consumer consumer) {
127     if (consumer == nullptr) {
128       return PipelineConsumeResult::NoneAvailable;
129     }
130 
131     if (!available_.TryWait()) {
132       return PipelineConsumeResult::NoneAvailable;
133     }
134 
135     ResourcePtr resource;
136     size_t trace_id = 0;
137     size_t items_count = 0;
138 
139     {
140       std::scoped_lock lock(queue_mutex_);
141       std::tie(resource, trace_id) = std::move(queue_.front());
142       queue_.pop_front();
143       items_count = queue_.size();
144     }
145 
146     {
147       TRACE_EVENT0("flutter", "PipelineConsume");
148       consumer(std::move(resource));
149     }
150 
151     empty_.Signal();
152 
153     TRACE_FLOW_END("flutter", "PipelineItem", trace_id);
154     TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id);
155 
156     return items_count > 0 ? PipelineConsumeResult::MoreAvailable
157                            : PipelineConsumeResult::Done;
158   }
159 
160  private:
161   uint32_t depth_;
162   fml::Semaphore empty_;
163   fml::Semaphore available_;
164   std::mutex queue_mutex_;
165   std::deque<std::pair<ResourcePtr, size_t>> queue_;
166 
ProducerCommit(ResourcePtr resource,size_t trace_id)167   void ProducerCommit(ResourcePtr resource, size_t trace_id) {
168     {
169       std::scoped_lock lock(queue_mutex_);
170       queue_.emplace_back(std::move(resource), trace_id);
171     }
172 
173     // Ensure the queue mutex is not held as that would be a pessimization.
174     available_.Signal();
175   }
176 
ProducerCommitFront(ResourcePtr resource,size_t trace_id)177   void ProducerCommitFront(ResourcePtr resource, size_t trace_id) {
178     {
179       std::scoped_lock lock(queue_mutex_);
180       queue_.emplace_front(std::move(resource), trace_id);
181       while (queue_.size() > depth_) {
182         queue_.pop_back();
183       }
184     }
185 
186     // Ensure the queue mutex is not held as that would be a pessimization.
187     available_.Signal();
188   }
189 
190   FML_DISALLOW_COPY_AND_ASSIGN(Pipeline);
191 };
192 
193 }  // namespace flutter
194 
195 #endif  // FLUTTER_SHELL_COMMON_PIPELINE_H_
196