1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
19
20 #include <functional>
21 #include <memory>
22 #include <vector>
23
24 #include "perfetto/base/status.h"
25 #include "perfetto/ext/base/status_or.h"
26 #include "perfetto/ext/base/threading/future.h"
27 #include "perfetto/ext/base/threading/stream_combinators.h"
28
29 namespace perfetto {
30 namespace base {
31
32 // Creates a Stream<T> from P, a subclass of StreamPollable<T>.
33 //
34 // This function follows the same pattern of std::make_unique, std::make_shared
35 // etc.
36 template <typename P, typename... Args>
MakeStream(Args...args)37 Stream<typename P::PollT> MakeStream(Args... args) {
38 return Stream<typename P::PollT>(
39 std::unique_ptr<StreamPollable<typename P::PollT>>(
40 new P(std::forward<Args>(args)...)));
41 }
42
43 // An asynchronous iterator for values of type T.
44 //
45 // If Future<T> is an asynchronous version of T, Stream<T> is an asynchronous
46 // version of Iterator<T>. Long-running compute/IO operations which return
47 // multiple values can be represented with a Stream<T>.
48 //
49 // Refer to the class documentation for Future<T> as most of the features and
50 // implementation of Future<T> also apply to Stream<T>.
51 template <typename T>
52 class Stream {
53 public:
54 using PollableItem = T;
55
56 // Creates a Stream from a |StreamPollable<T>|. Prefer using |MakeStream|
57 // instead of this function.
Stream(std::unique_ptr<StreamPollable<T>> pollable)58 explicit Stream(std::unique_ptr<StreamPollable<T>> pollable)
59 : pollable_(std::move(pollable)) {}
60
61 // Converts a Stream<T> to Stream<U>. This works by applying |map_fn| to each
62 // element in T and then polling the returned Future<U> to completion.
63 template <typename Function /* = Future<U>(T) */>
MapFuture(Function map_fn)64 Stream<FutureReturn<Function, T>> MapFuture(Function map_fn) && {
65 return MakeStream<MapFutureStreamImpl<Function, T>>(std::move(*this),
66 std::move(map_fn));
67 }
68
69 // Creates a stream which fully polls |this| and then polls |concat| to
70 // completion.
Concat(Stream<T> concat)71 Stream<T> Concat(Stream<T> concat) && {
72 return MakeStream<ConcatStreamImpl<T>>(std::move(*this), std::move(concat));
73 }
74
75 // Converts a Stream<T> to Future<U> by collecting elements using |collector|.
76 // See documentation on |Collector| for how to implement one.
77 template <typename U>
Collect(std::unique_ptr<Collector<T,U>> collector)78 Future<U> Collect(std::unique_ptr<Collector<T, U>> collector) && {
79 return MakeFuture<CollectImpl<T, U>>(std::move(*this),
80 std::move(collector));
81 }
82
83 // Checks if the computation backing this Stream<T> has finished.
84 //
85 // Returns a StreamPollResult<T> which is a essentially a
86 // variant<PendingPollResult, DonePollResult T>. If PendingPollResult is
87 // returned, |ctx| will be used to register interest in the various fds which
88 // are "blocking" this future from finishing. If DonePollResult is returned,
89 // Poll *must not* be called again.
PollNext(PollContext * ctx)90 StreamPollResult<T> PollNext(PollContext* ctx) {
91 return pollable_->PollNext(ctx);
92 }
93
94 private:
95 std::unique_ptr<StreamPollable<T>> pollable_;
96 };
97
98 // Alias to shorten type defintions for Stream<Status> which is common in
99 // the codebase.
100 using StatusStream = Stream<Status>;
101
102 // Alias to shorten type defintions for Stream<StatusOr<T>> which is common
103 // in the codebase.
104 template <typename T>
105 using StatusOrStream = Stream<StatusOr<T>>;
106
107 // Creates a Stream<T> which returns the next value inside |vector| every time
108 // Stream<T>::Poll is called.
109 template <typename T>
StreamFrom(std::vector<T> vector)110 Stream<T> StreamFrom(std::vector<T> vector) {
111 return MakeStream<ImmediateStreamImpl<T>>(std::move(vector));
112 }
113
114 // Creates a Stream<T> which immediately returns DonePollResult when polled.
115 template <typename T>
EmptyStream()116 Stream<T> EmptyStream() {
117 return StreamFrom(std::vector<T>());
118 }
119
120 // Creates a Stream<T> which returns |first| and each of |rest| in sequence when
121 // polled.
122 template <typename T, typename... Ts>
StreamOf(T first,Ts...rest)123 Stream<T> StreamOf(T first, Ts... rest) {
124 std::vector<T> values;
125 AddAllToVector(values, std::forward<T>(first), std::forward<Ts>(rest)...);
126 return StreamFrom(std::move(values));
127 }
128
129 // Creates a Stream<T> which returns the value of |future| before completing.
130 template <typename T>
StreamFromFuture(Future<T> future)131 Stream<T> StreamFromFuture(Future<T> future) {
132 return StreamOf(std::move(future)).MapFuture([](Future<T> value) { return value; });
133 }
134
135 // Creates a stream which returns no elements but calls |fn| in the destructor
136 // of the returned stream.
137 //
138 // This function can be used to do resource management for a stream by making
139 // the passed |fn| own the resources used by any "upstream" sources and then
140 // Concat-ing this stream with the upstream.
141 template <typename T, typename Function>
OnDestroyStream(Function fn)142 Stream<T> OnDestroyStream(Function fn) {
143 return MakeStream<OnDestroyStreamImpl<T, Function>>(std::move(fn));
144 }
145
146 // Creates a Stream<T> returning values generated by each stream in |streams| as
147 // soon as they are produced without preserving ordering.
148 //
149 // The returned Stream<T> keeps the amount of Poll calls to the inner |streams|,
150 // to a minimum only calling Poll for the Streams which are marked are ready
151 // in the PollContext.
152 template <typename T>
FlattenStreams(std::vector<Stream<T>> streams)153 Stream<T> FlattenStreams(std::vector<Stream<T>> streams) {
154 return MakeStream<FlattenImpl<T>>(std::move(streams));
155 }
156
157 // Collector for Stream<Status>::Collect() which immediately resolves the
158 // returned Future when an error status is detected. Resolves with
159 // OkStatus once the entire stream finishes after returning all OkStatus().
AllOkCollector()160 inline std::unique_ptr<Collector<Status, Status>> AllOkCollector() {
161 return std::make_unique<AllOkCollectorImpl>();
162 }
163
164 // Collector for Stream<T>::Collect() which ensures the stream returns *exactly*
165 // one T before completing. Crashes if either a) no values are produced by
166 // the Stream, b) more than one value is produced by the Stream.
167 template <typename T>
ToFutureCheckedCollector()168 inline std::unique_ptr<Collector<T, T>> ToFutureCheckedCollector() {
169 return std::make_unique<FutureCheckedCollectorImpl<T>>();
170 }
171
172 // Collector for Stream<StatusOr<T>>::Collect() which returns a vector
173 // containing all the successful results from the stream. If any element is an
174 // error, short-circuits the stream with the error.
175 template <typename T>
176 inline std::unique_ptr<
177 Collector<StatusOr<T>, StatusOr<std::vector<T>>>>
StatusOrVectorCollector()178 StatusOrVectorCollector() {
179 return std::make_unique<StatusOrVectorCollectorImpl<T>>();
180 }
181
182 } // namespace base
183 } // namespace perfetto
184
185 #endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
186