1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
19
20 #include <memory>
21 #include <optional>
22 #include <utility>
23 #include <vector>
24
25 #include "perfetto/base/status.h"
26 #include "perfetto/ext/base/status_or.h"
27 #include "perfetto/ext/base/threading/future_combinators.h"
28 #include "perfetto/ext/base/threading/poll.h"
29
30 namespace perfetto {
31 namespace base {
32
33 template <typename T>
34 class Stream;
35
36 // Helper function for adding all the elements in parameter pack to a vector.
37 template <typename T, typename... Elements>
AddAllToVector(std::vector<T> &)38 void AddAllToVector(std::vector<T>&) {}
39
40 template <typename T, typename... Elements>
AddAllToVector(std::vector<T> & vec,T first,Elements...rest)41 void AddAllToVector(std::vector<T>& vec, T first, Elements... rest) {
42 vec.emplace_back(std::forward<T>(first));
43 AddAllToVector(vec, std::forward<Elements>(rest)...);
44 }
45
46 // For a Function which returns Stream<U>, returns the U.
47 template <typename Function, typename T>
48 using StreamReturn =
49 typename std::invoke_result<Function, T>::type::PollableItem;
50
51 // Implementation of StreamPollable for creating a Stream<T> from a
52 // std::vector<T>.
53 template <typename T>
54 class ImmediateStreamImpl : public StreamPollable<T> {
55 public:
ImmediateStreamImpl(std::vector<T> values)56 explicit ImmediateStreamImpl(std::vector<T> values)
57 : values_(std::move(values)) {}
58
PollNext(PollContext *)59 StreamPollResult<T> PollNext(PollContext*) override {
60 if (index_ >= values_.size()) {
61 return DonePollResult();
62 }
63 return StreamPollResult<T>(std::move(values_[index_++]));
64 }
65
66 private:
67 std::vector<T> values_;
68 uint32_t index_ = 0;
69 };
70
71 // Implementation of a StreamPollable for creating a Stream<U> from a Stream<T>
72 // and a functor with prototype Future<U>(T).
73 template <typename Function, typename T>
74 class MapFutureStreamImpl : public StreamPollable<FutureReturn<Function, T>> {
75 public:
76 using U = FutureReturn<Function, T>;
77
MapFutureStreamImpl(Stream<T> stream,Function map_fn)78 MapFutureStreamImpl(Stream<T> stream, Function map_fn)
79 : stream_(std::move(stream)), map_fn_(std::move(map_fn)) {}
80
PollNext(PollContext * context)81 StreamPollResult<U> PollNext(PollContext* context) override {
82 if (!future_) {
83 ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, stream_.PollNext(context));
84 if (res.IsDone()) {
85 return DonePollResult();
86 }
87 future_ = map_fn_(std::move(res.item()));
88 }
89 ASSIGN_OR_RETURN_IF_PENDING_FUTURE(res, future_->Poll(context));
90 future_ = std::nullopt;
91 return res;
92 }
93
94 private:
95 Stream<T> stream_;
96 Function map_fn_;
97 std::optional<Future<U>> future_;
98 };
99
100 // Implementation of a StreamPollable for creating a concatenating two streams
101 // together.
102 template <typename T>
103 class ConcatStreamImpl : public StreamPollable<T> {
104 public:
ConcatStreamImpl(Stream<T> first,Stream<T> second)105 explicit ConcatStreamImpl(Stream<T> first, Stream<T> second)
106 : first_(std::move(first)), second_(std::move(second)) {}
107
PollNext(PollContext * context)108 StreamPollResult<T> PollNext(PollContext* context) override {
109 if (first_) {
110 ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, first_->PollNext(context));
111 if (!res.IsDone()) {
112 return res.item();
113 }
114 first_ = std::nullopt;
115 }
116 return second_.PollNext(context);
117 }
118
119 private:
120 std::optional<Stream<T>> first_;
121 Stream<T> second_;
122 };
123
124 // Implementation of a StreamPollable for creating a Stream<T> from a
125 // std::vector<Stream<T>>. Values are returned from the inner streams as soon as
126 // they are available.
127 template <typename T>
128 class FlattenImpl : public StreamPollable<T> {
129 public:
FlattenImpl(std::vector<Stream<T>> streams)130 explicit FlattenImpl(std::vector<Stream<T>> streams)
131 : registered_handles_(static_cast<uint32_t>(streams.size())) {
132 for (auto& stream : streams) {
133 streams_.emplace_back(std::move(stream));
134 }
135 }
136
PollNext(PollContext * upstream)137 StreamPollResult<T> PollNext(PollContext* upstream) override {
138 for (uint32_t i = 0; i < streams_.size(); ++i) {
139 auto& stream = streams_[i];
140 if (!stream) {
141 continue;
142 }
143 std::optional<PollContext> ctx = PollContextForStream(upstream, i);
144 if (!ctx) {
145 continue;
146 }
147 StreamPollResult<T> res = stream->PollNext(&*ctx);
148 if (res.IsPending()) {
149 PERFETTO_CHECK(!registered_handles_[i].empty());
150 continue;
151 }
152 if (!res.IsDone()) {
153 return res;
154 }
155 // StreamPollable has returned EOF. Clear it and the registered handles
156 // out.
157 stream = std::nullopt;
158 ++eof_streams_;
159 registered_handles_[i].clear();
160 }
161
162 // Every child stream being EOF means we have reached EOF as well.
163 if (eof_streams_ == streams_.size()) {
164 return DonePollResult();
165 }
166 // Every remaining stream must be pending so we can make no further
167 // progress. Register all the child handles with the context and return.
168 for (const FlatSet<PlatformHandle>& handles : registered_handles_) {
169 upstream->RegisterAllInterested(handles);
170 }
171 return PendingPollResult();
172 }
173
174 private:
PollContextForStream(PollContext * upstream,uint32_t stream_idx)175 std::optional<PollContext> PollContextForStream(PollContext* upstream,
176 uint32_t stream_idx) {
177 FlatSet<PlatformHandle>& state = registered_handles_[stream_idx];
178 if (state.empty()) {
179 return PollContext(&state, &upstream->ready_handles());
180 }
181 for (PlatformHandle handle : upstream->ready_handles()) {
182 if (state.count(handle)) {
183 state.clear();
184 return PollContext(&state, &upstream->ready_handles());
185 }
186 }
187 return std::nullopt;
188 }
189
190 std::vector<std::optional<Stream<T>>> streams_;
191 std::vector<FlatSet<PlatformHandle>> registered_handles_;
192 uint32_t eof_streams_ = 0;
193 };
194
195 // Implementation of a Stream<T> which immediately completes and calls a
196 // function in the destructor.
197 template <typename T, typename Function>
198 class OnDestroyStreamImpl : public StreamPollable<T> {
199 public:
OnDestroyStreamImpl(Function fn)200 explicit OnDestroyStreamImpl(Function fn) : fn_(std::move(fn)) {}
~OnDestroyStreamImpl()201 ~OnDestroyStreamImpl() override { fn_(); }
202
PollNext(PollContext *)203 StreamPollResult<T> PollNext(PollContext*) override {
204 return DonePollResult();
205 }
206
207 private:
208 Function fn_;
209 };
210
211 // Interface for converting a Stream<T> into a Future<U>.
212 //
213 // The goal of this interface is to allow a Stream to be converted to a Future,
214 // allowing short-circuiting (i.e. allowing the Future to complete before
215 // the stream finishes).
216 //
217 // The flexibility of this interface allows both supporting the traditional
218 // notion of collecting i.e. converting a Stream<T> to a Future<vector<T>> but
219 // also more advanced functionality like completing a Future<Status> early
220 // when errors are detected, racing Future<T> against each other and returning
221 // the first value produced etc.
222 template <typename T, typename U>
223 class Collector {
224 public:
225 virtual ~Collector() = default;
226
227 // Receives the next item from a Stream<T>. If the wrapping Future<U> can be
228 // completed, returns the a value U which completes that future. Otherwise,
229 // returns std::nullopt.
230 virtual std::optional<U> OnNext(T value) = 0;
231
232 // Called when the stream has completed and returns the |U| which will be
233 // used to complete the future. This method will only be called if OnNext
234 // returned std::nullopt for every element in the stream.
235 virtual U OnDone() = 0;
236 };
237
238 // Implementation of a StreamPollable which converts a Stream<T> to a Future<U>
239 // using an implementation of Collector<T, U>.
240 template <typename T, typename U>
241 class CollectImpl : public FuturePollable<U> {
242 public:
CollectImpl(Stream<T> stream,std::unique_ptr<Collector<T,U>> collector)243 explicit CollectImpl(Stream<T> stream,
244 std::unique_ptr<Collector<T, U>> collector)
245 : stream_(std::move(stream)), collector_(std::move(collector)) {}
246
Poll(PollContext * context)247 FuturePollResult<U> Poll(PollContext* context) override {
248 for (;;) {
249 ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, stream_.PollNext(context));
250 if (res.IsDone()) {
251 return collector_->OnDone();
252 }
253 std::optional<U> collected = collector_->OnNext(std::move(res.item()));
254 if (collected.has_value()) {
255 return std::move(collected.value());
256 }
257 }
258 }
259
260 private:
261 Stream<T> stream_;
262 std::unique_ptr<Collector<T, U>> collector_;
263 };
264
265 // Implementation for |AllOkCollector|.
266 class AllOkCollectorImpl : public Collector<Status, Status> {
267 public:
268 ~AllOkCollectorImpl() override;
269
OnNext(Status status)270 std::optional<Status> OnNext(Status status) override {
271 return status.ok() ? std::nullopt : std::make_optional(std::move(status));
272 }
OnDone()273 Status OnDone() override { return OkStatus(); }
274 };
275
276 // Implementation for |ToFutureCheckedCollector|.
277 template <typename T>
278 class FutureCheckedCollectorImpl : public Collector<T, T> {
279 public:
OnNext(T value)280 std::optional<T> OnNext(T value) override {
281 PERFETTO_CHECK(!prev_value_);
282 prev_value_ = value;
283 return std::nullopt;
284 }
OnDone()285 T OnDone() override { return *prev_value_; }
286
287 private:
288 std::optional<T> prev_value_;
289 };
290
291 // Implementation for |StatusOrVectorCollector|.
292 template <typename T>
293 class StatusOrVectorCollectorImpl
294 : public Collector<base::StatusOr<T>, base::StatusOr<std::vector<T>>> {
295 public:
OnNext(base::StatusOr<T> val_or)296 std::optional<base::StatusOr<std::vector<T>>> OnNext(
297 base::StatusOr<T> val_or) override {
298 if (!val_or.ok()) {
299 return std::make_optional(val_or.status());
300 }
301 values_.emplace_back(std::move(val_or.value()));
302 return std::nullopt;
303 }
OnDone()304 base::StatusOr<std::vector<T>> OnDone() override {
305 return std::move(values_);
306 }
307
308 private:
309 std::vector<T> values_;
310 };
311
312 } // namespace base
313 } // namespace perfetto
314
315 #endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
316