1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
19
20 #include <functional>
21 #include <memory>
22 #include <vector>
23
24 #include "perfetto/base/status.h"
25 #include "perfetto/ext/base/status_or.h"
26 #include "perfetto/ext/base/threading/future.h"
27 #include "perfetto/ext/base/threading/stream_combinators.h"
28
29 namespace perfetto {
30 namespace base {
31
32 // Creates a Stream<T> from P, a subclass of StreamPollable<T>.
33 //
34 // This function follows the same pattern of std::make_unique, std::make_shared
35 // etc.
36 template <typename P, typename... Args>
MakeStream(Args...args)37 Stream<typename P::PollT> MakeStream(Args... args) {
38 return Stream<typename P::PollT>(
39 std::unique_ptr<StreamPollable<typename P::PollT>>(
40 new P(std::forward<Args>(args)...)));
41 }
42
43 // An asynchronous iterator for values of type T.
44 //
45 // If Future<T> is an asynchronous version of T, Stream<T> is an asynchronous
46 // version of Iterator<T>. Long-running compute/IO operations which return
47 // multiple values can be represented with a Stream<T>.
48 //
49 // Note: Streams *must* be polled on the same thread on which they were
50 // created. The |SpawnResultStreams| can be used to move of the results of
51 // Streams between threads in a safe manner.
52 //
53 // Refer to the class documentation for Future<T> as most of the features and
54 // implementation of Future<T> also apply to Stream<T>.
55 template <typename T>
56 class Stream {
57 public:
58 using PollableItem = T;
59
60 // Creates a Stream from a |StreamPollable<T>|. Prefer using |MakeStream|
61 // instead of this function.
Stream(std::unique_ptr<StreamPollable<T>> pollable)62 explicit Stream(std::unique_ptr<StreamPollable<T>> pollable)
63 : pollable_(std::move(pollable)) {}
64
65 // Converts a Stream<T> to Stream<U>. This works by applying |map_fn| to each
66 // element in T and then polling the returned Future<U> to completion.
67 template <typename Function /* = Future<U>(T) */>
MapFuture(Function map_fn)68 Stream<FutureReturn<Function, T>> MapFuture(Function map_fn) && {
69 return MakeStream<MapFutureStreamImpl<Function, T>>(std::move(*this),
70 std::move(map_fn));
71 }
72
73 // Creates a stream which fully polls |this| and then polls |concat| to
74 // completion.
Concat(Stream<T> concat)75 Stream<T> Concat(Stream<T> concat) && {
76 return MakeStream<ConcatStreamImpl<T>>(std::move(*this), std::move(concat));
77 }
78
79 // Converts a Stream<T> to Future<U> by collecting elements using |collector|.
80 // See documentation on |Collector| for how to implement one.
81 template <typename U>
Collect(std::unique_ptr<Collector<T,U>> collector)82 Future<U> Collect(std::unique_ptr<Collector<T, U>> collector) && {
83 return MakeFuture<CollectImpl<T, U>>(std::move(*this),
84 std::move(collector));
85 }
86
87 // Checks if the computation backing this Stream<T> has finished.
88 //
89 // Returns a StreamPollResult<T> which is a essentially a
90 // variant<PendingPollResult, DonePollResult T>. If PendingPollResult is
91 // returned, |ctx| will be used to register interest in the various fds which
92 // are "blocking" this future from finishing. If DonePollResult is returned,
93 // Poll *must not* be called again.
PollNext(PollContext * ctx)94 StreamPollResult<T> PollNext(PollContext* ctx) {
95 return pollable_->PollNext(ctx);
96 }
97
98 private:
99 std::unique_ptr<StreamPollable<T>> pollable_;
100 };
101
102 // Alias to shorten type defintions for Stream<Status> which is common in
103 // the codebase.
104 using StatusStream = Stream<Status>;
105
106 // Alias to shorten type defintions for Stream<StatusOr<T>> which is common
107 // in the codebase.
108 template <typename T>
109 using StatusOrStream = Stream<StatusOr<T>>;
110
111 // Creates a Stream<T> which returns the next value inside |vector| every time
112 // Stream<T>::Poll is called.
113 template <typename T>
StreamFrom(std::vector<T> vector)114 Stream<T> StreamFrom(std::vector<T> vector) {
115 return MakeStream<ImmediateStreamImpl<T>>(std::move(vector));
116 }
117
118 // Creates a Stream<T> which immediately returns DonePollResult when polled.
119 template <typename T>
EmptyStream()120 Stream<T> EmptyStream() {
121 return StreamFrom(std::vector<T>());
122 }
123
124 // Creates a Stream<T> which returns |first| and each of |rest| in sequence when
125 // polled.
126 template <typename T, typename... Ts>
StreamOf(T first,Ts...rest)127 Stream<T> StreamOf(T first, Ts... rest) {
128 std::vector<T> values;
129 AddAllToVector(values, std::forward<T>(first), std::forward<Ts>(rest)...);
130 return StreamFrom(std::move(values));
131 }
132
133 // Creates a Stream<T> which returns the value of |future| before completing.
134 template <typename T>
StreamFromFuture(Future<T> future)135 Stream<T> StreamFromFuture(Future<T> future) {
136 return StreamOf(std::move(future)).MapFuture([](Future<T> value) {
137 return value;
138 });
139 }
140
141 // Creates a stream which returns no elements but calls |fn| in the destructor
142 // of the returned stream.
143 //
144 // This function can be used to do resource management for a stream by making
145 // the passed |fn| own the resources used by any "upstream" sources and then
146 // Concat-ing this stream with the upstream.
147 template <typename T, typename Function>
OnDestroyStream(Function fn)148 Stream<T> OnDestroyStream(Function fn) {
149 return MakeStream<OnDestroyStreamImpl<T, Function>>(std::move(fn));
150 }
151
152 // Creates a Stream<T> returning values generated by each stream in |streams| as
153 // soon as they are produced without preserving ordering.
154 //
155 // The returned Stream<T> keeps the amount of Poll calls to the inner |streams|,
156 // to a minimum only calling Poll for the Streams which are marked are ready
157 // in the PollContext.
158 template <typename T>
FlattenStreams(std::vector<Stream<T>> streams)159 Stream<T> FlattenStreams(std::vector<Stream<T>> streams) {
160 return MakeStream<FlattenImpl<T>>(std::move(streams));
161 }
162
163 // Collector for Stream<Status>::Collect() which immediately resolves the
164 // returned Future when an error status is detected. Resolves with
165 // OkStatus once the entire stream finishes after returning all OkStatus().
AllOkCollector()166 inline std::unique_ptr<Collector<Status, Status>> AllOkCollector() {
167 return std::make_unique<AllOkCollectorImpl>();
168 }
169
170 // Collector for Stream<T>::Collect() which ensures the stream returns *exactly*
171 // one T before completing. Crashes if either a) no values are produced by
172 // the Stream, b) more than one value is produced by the Stream.
173 template <typename T>
ToFutureCheckedCollector()174 inline std::unique_ptr<Collector<T, T>> ToFutureCheckedCollector() {
175 return std::make_unique<FutureCheckedCollectorImpl<T>>();
176 }
177
178 // Collector for Stream<StatusOr<T>>::Collect() which returns a vector
179 // containing all the successful results from the stream. If any element is an
180 // error, short-circuits the stream with the error.
181 template <typename T>
182 inline std::unique_ptr<Collector<StatusOr<T>, StatusOr<std::vector<T>>>>
StatusOrVectorCollector()183 StatusOrVectorCollector() {
184 return std::make_unique<StatusOrVectorCollectorImpl<T>>();
185 }
186
187 } // namespace base
188 } // namespace perfetto
189
190 #endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
191