• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
19 
20 #include <memory>
21 #include <optional>
22 #include <utility>
23 #include <vector>
24 
25 #include "perfetto/base/status.h"
26 #include "perfetto/ext/base/status_or.h"
27 #include "perfetto/ext/base/threading/future_combinators.h"
28 #include "perfetto/ext/base/threading/poll.h"
29 
30 namespace perfetto {
31 namespace base {
32 
33 template <typename T>
34 class Stream;
35 
36 // Helper function for adding all the elements in parameter pack to a vector.
37 template <typename T, typename... Elements>
AddAllToVector(std::vector<T> &)38 void AddAllToVector(std::vector<T>&) {}
39 
40 template <typename T, typename... Elements>
AddAllToVector(std::vector<T> & vec,T first,Elements...rest)41 void AddAllToVector(std::vector<T>& vec, T first, Elements... rest) {
42   vec.emplace_back(std::forward<T>(first));
43   AddAllToVector(vec, std::forward<Elements>(rest)...);
44 }
45 
46 // For a Function which returns Stream<U>, returns the U.
47 template <typename Function, typename T>
48 using StreamReturn =
49     typename std::invoke_result<Function, T>::type::PollableItem;
50 
51 // Implementation of StreamPollable for creating a Stream<T> from a
52 // std::vector<T>.
53 template <typename T>
54 class ImmediateStreamImpl : public StreamPollable<T> {
55  public:
ImmediateStreamImpl(std::vector<T> values)56   explicit ImmediateStreamImpl(std::vector<T> values)
57       : values_(std::move(values)) {}
58 
PollNext(PollContext *)59   StreamPollResult<T> PollNext(PollContext*) override {
60     if (index_ >= values_.size()) {
61       return DonePollResult();
62     }
63     return StreamPollResult<T>(std::move(values_[index_++]));
64   }
65 
66  private:
67   std::vector<T> values_;
68   uint32_t index_ = 0;
69 };
70 
71 // Implementation of a StreamPollable for creating a Stream<U> from a Stream<T>
72 // and a functor with prototype Future<U>(T).
73 template <typename Function, typename T>
74 class MapFutureStreamImpl : public StreamPollable<FutureReturn<Function, T>> {
75  public:
76   using U = FutureReturn<Function, T>;
77 
MapFutureStreamImpl(Stream<T> stream,Function map_fn)78   MapFutureStreamImpl(Stream<T> stream, Function map_fn)
79       : stream_(std::move(stream)), map_fn_(std::move(map_fn)) {}
80 
PollNext(PollContext * context)81   StreamPollResult<U> PollNext(PollContext* context) override {
82     if (!future_) {
83       ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, stream_.PollNext(context));
84       if (res.IsDone()) {
85         return DonePollResult();
86       }
87       future_ = map_fn_(std::move(res.item()));
88     }
89     ASSIGN_OR_RETURN_IF_PENDING_FUTURE(res, future_->Poll(context));
90     future_ = std::nullopt;
91     return res;
92   }
93 
94  private:
95   Stream<T> stream_;
96   Function map_fn_;
97   std::optional<Future<U>> future_;
98 };
99 
100 // Implementation of a StreamPollable for creating a concatenating two streams
101 // together.
102 template <typename T>
103 class ConcatStreamImpl : public StreamPollable<T> {
104  public:
ConcatStreamImpl(Stream<T> first,Stream<T> second)105   explicit ConcatStreamImpl(Stream<T> first, Stream<T> second)
106       : first_(std::move(first)), second_(std::move(second)) {}
107 
PollNext(PollContext * context)108   StreamPollResult<T> PollNext(PollContext* context) override {
109     if (first_) {
110       ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, first_->PollNext(context));
111       if (!res.IsDone()) {
112         return res.item();
113       }
114       first_ = std::nullopt;
115     }
116     return second_.PollNext(context);
117   }
118 
119  private:
120   std::optional<Stream<T>> first_;
121   Stream<T> second_;
122 };
123 
124 // Implementation of a StreamPollable for creating a Stream<T> from a
125 // std::vector<Stream<T>>. Values are returned from the inner streams as soon as
126 // they are available.
127 template <typename T>
128 class FlattenImpl : public StreamPollable<T> {
129  public:
FlattenImpl(std::vector<Stream<T>> streams)130   explicit FlattenImpl(std::vector<Stream<T>> streams)
131       : registered_handles_(static_cast<uint32_t>(streams.size())) {
132     for (auto& stream : streams) {
133       streams_.emplace_back(std::move(stream));
134     }
135   }
136 
PollNext(PollContext * upstream)137   StreamPollResult<T> PollNext(PollContext* upstream) override {
138     for (uint32_t i = 0; i < streams_.size(); ++i) {
139       auto& stream = streams_[i];
140       if (!stream) {
141         continue;
142       }
143       std::optional<PollContext> ctx = PollContextForStream(upstream, i);
144       if (!ctx) {
145         continue;
146       }
147       StreamPollResult<T> res = stream->PollNext(&*ctx);
148       if (res.IsPending()) {
149         PERFETTO_CHECK(!registered_handles_[i].empty());
150         continue;
151       }
152       if (!res.IsDone()) {
153         return res;
154       }
155       // StreamPollable has returned EOF. Clear it and the registered handles
156       // out.
157       stream = std::nullopt;
158       ++eof_streams_;
159       registered_handles_[i].clear();
160     }
161 
162     // Every child stream being EOF means we have reached EOF as well.
163     if (eof_streams_ == streams_.size()) {
164       return DonePollResult();
165     }
166     // Every remaining stream must be pending so we can make no further
167     // progress. Register all the child handles with the context and return.
168     for (const FlatSet<PlatformHandle>& handles : registered_handles_) {
169       upstream->RegisterAllInterested(handles);
170     }
171     return PendingPollResult();
172   }
173 
174  private:
PollContextForStream(PollContext * upstream,uint32_t stream_idx)175   std::optional<PollContext> PollContextForStream(PollContext* upstream,
176                                                   uint32_t stream_idx) {
177     FlatSet<PlatformHandle>& state = registered_handles_[stream_idx];
178     if (state.empty()) {
179       return PollContext(&state, &upstream->ready_handles());
180     }
181     for (PlatformHandle handle : upstream->ready_handles()) {
182       if (state.count(handle)) {
183         state.clear();
184         return PollContext(&state, &upstream->ready_handles());
185       }
186     }
187     return std::nullopt;
188   }
189 
190   std::vector<std::optional<Stream<T>>> streams_;
191   std::vector<FlatSet<PlatformHandle>> registered_handles_;
192   uint32_t eof_streams_ = 0;
193 };
194 
195 // Implementation of a Stream<T> which immediately completes and calls a
196 // function in the destructor.
197 template <typename T, typename Function>
198 class OnDestroyStreamImpl : public StreamPollable<T> {
199  public:
OnDestroyStreamImpl(Function fn)200   explicit OnDestroyStreamImpl(Function fn) : fn_(std::move(fn)) {}
~OnDestroyStreamImpl()201   ~OnDestroyStreamImpl() override { fn_(); }
202 
PollNext(PollContext *)203   StreamPollResult<T> PollNext(PollContext*) override {
204     return DonePollResult();
205   }
206 
207  private:
208   Function fn_;
209 };
210 
211 // Interface for converting a Stream<T> into a Future<U>.
212 //
213 // The goal of this interface is to allow a Stream to be converted to a Future,
214 // allowing short-circuiting (i.e. allowing the Future to complete before
215 // the stream finishes).
216 //
217 // The flexibility of this interface allows both supporting the traditional
218 // notion of collecting i.e. converting a Stream<T> to a Future<vector<T>> but
219 // also more advanced functionality like completing a Future<Status> early
220 // when errors are detected, racing Future<T> against each other and returning
221 // the first value produced etc.
222 template <typename T, typename U>
223 class Collector {
224  public:
225   virtual ~Collector() = default;
226 
227   // Receives the next item from a Stream<T>. If the wrapping Future<U> can be
228   // completed, returns the a value U which completes that future. Otherwise,
229   // returns std::nullopt.
230   virtual std::optional<U> OnNext(T value) = 0;
231 
232   // Called when the stream has completed and returns the |U| which will be
233   // used to complete the future. This method will only be called if OnNext
234   // returned std::nullopt for every element in the stream.
235   virtual U OnDone() = 0;
236 };
237 
238 // Implementation of a StreamPollable which converts a Stream<T> to a Future<U>
239 // using an implementation of Collector<T, U>.
240 template <typename T, typename U>
241 class CollectImpl : public FuturePollable<U> {
242  public:
CollectImpl(Stream<T> stream,std::unique_ptr<Collector<T,U>> collector)243   explicit CollectImpl(Stream<T> stream,
244                        std::unique_ptr<Collector<T, U>> collector)
245       : stream_(std::move(stream)), collector_(std::move(collector)) {}
246 
Poll(PollContext * context)247   FuturePollResult<U> Poll(PollContext* context) override {
248     for (;;) {
249       ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, stream_.PollNext(context));
250       if (res.IsDone()) {
251         return collector_->OnDone();
252       }
253       std::optional<U> collected = collector_->OnNext(std::move(res.item()));
254       if (collected.has_value()) {
255         return std::move(collected.value());
256       }
257     }
258   }
259 
260  private:
261   Stream<T> stream_;
262   std::unique_ptr<Collector<T, U>> collector_;
263 };
264 
265 // Implementation for |AllOkCollector|.
266 class AllOkCollectorImpl : public Collector<Status, Status> {
267  public:
268   ~AllOkCollectorImpl() override;
269 
OnNext(Status status)270   std::optional<Status> OnNext(Status status) override {
271     return status.ok() ? std::nullopt : std::make_optional(std::move(status));
272   }
OnDone()273   Status OnDone() override { return OkStatus(); }
274 };
275 
276 // Implementation for |ToFutureCheckedCollector|.
277 template <typename T>
278 class FutureCheckedCollectorImpl : public Collector<T, T> {
279  public:
OnNext(T value)280   std::optional<T> OnNext(T value) override {
281     PERFETTO_CHECK(!prev_value_);
282     prev_value_ = value;
283     return std::nullopt;
284   }
OnDone()285   T OnDone() override { return *prev_value_; }
286 
287  private:
288   std::optional<T> prev_value_;
289 };
290 
291 // Implementation for |StatusOrVectorCollector|.
292 template <typename T>
293 class StatusOrVectorCollectorImpl
294     : public Collector<base::StatusOr<T>, base::StatusOr<std::vector<T>>> {
295  public:
OnNext(base::StatusOr<T> val_or)296   std::optional<base::StatusOr<std::vector<T>>> OnNext(
297       base::StatusOr<T> val_or) override {
298     if (!val_or.ok()) {
299       return std::make_optional(val_or.status());
300     }
301     values_.emplace_back(std::move(val_or.value()));
302     return std::nullopt;
303   }
OnDone()304   base::StatusOr<std::vector<T>> OnDone() override {
305     return std::move(values_);
306   }
307 
308  private:
309   std::vector<T> values_;
310 };
311 
312 }  // namespace base
313 }  // namespace perfetto
314 
315 #endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
316