• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
19 
20 #include <functional>
21 #include <memory>
22 #include <vector>
23 
24 #include "perfetto/base/status.h"
25 #include "perfetto/ext/base/status_or.h"
26 #include "perfetto/ext/base/threading/future.h"
27 #include "perfetto/ext/base/threading/stream_combinators.h"
28 
29 namespace perfetto {
30 namespace base {
31 
32 // Creates a Stream<T> from P, a subclass of StreamPollable<T>.
33 //
34 // This function follows the same pattern of std::make_unique, std::make_shared
35 // etc.
36 template <typename P, typename... Args>
MakeStream(Args...args)37 Stream<typename P::PollT> MakeStream(Args... args) {
38   return Stream<typename P::PollT>(
39       std::unique_ptr<StreamPollable<typename P::PollT>>(
40           new P(std::forward<Args>(args)...)));
41 }
42 
43 // An asynchronous iterator for values of type T.
44 //
45 // If Future<T> is an asynchronous version of T, Stream<T> is an asynchronous
46 // version of Iterator<T>. Long-running compute/IO operations which return
47 // multiple values can be represented with a Stream<T>.
48 //
49 // Refer to the class documentation for Future<T> as most of the features and
50 // implementation of Future<T> also apply to Stream<T>.
51 template <typename T>
52 class Stream {
53  public:
54   using PollableItem = T;
55 
56   // Creates a Stream from a |StreamPollable<T>|. Prefer using |MakeStream|
57   // instead of this function.
Stream(std::unique_ptr<StreamPollable<T>> pollable)58   explicit Stream(std::unique_ptr<StreamPollable<T>> pollable)
59       : pollable_(std::move(pollable)) {}
60 
61   // Converts a Stream<T> to Stream<U>. This works by applying |map_fn| to each
62   // element in T and then polling the returned Future<U> to completion.
63   template <typename Function /* = Future<U>(T) */>
MapFuture(Function map_fn)64   Stream<FutureReturn<Function, T>> MapFuture(Function map_fn) && {
65     return MakeStream<MapFutureStreamImpl<Function, T>>(std::move(*this),
66                                                         std::move(map_fn));
67   }
68 
69   // Creates a stream which fully polls |this| and then polls |concat| to
70   // completion.
Concat(Stream<T> concat)71   Stream<T> Concat(Stream<T> concat) && {
72     return MakeStream<ConcatStreamImpl<T>>(std::move(*this), std::move(concat));
73   }
74 
75   // Converts a Stream<T> to Future<U> by collecting elements using |collector|.
76   // See documentation on |Collector| for how to implement one.
77   template <typename U>
Collect(std::unique_ptr<Collector<T,U>> collector)78   Future<U> Collect(std::unique_ptr<Collector<T, U>> collector) && {
79     return MakeFuture<CollectImpl<T, U>>(std::move(*this),
80                                          std::move(collector));
81   }
82 
83   // Checks if the computation backing this Stream<T> has finished.
84   //
85   // Returns a StreamPollResult<T> which is a essentially a
86   // variant<PendingPollResult, DonePollResult T>. If PendingPollResult is
87   // returned, |ctx| will be used to register interest in the various fds which
88   // are "blocking" this future from finishing. If DonePollResult is returned,
89   // Poll *must not* be called again.
PollNext(PollContext * ctx)90   StreamPollResult<T> PollNext(PollContext* ctx) {
91     return pollable_->PollNext(ctx);
92   }
93 
94  private:
95   std::unique_ptr<StreamPollable<T>> pollable_;
96 };
97 
98 // Alias to shorten type defintions for Stream<Status> which is common in
99 // the codebase.
100 using StatusStream = Stream<Status>;
101 
102 // Alias to shorten type defintions for Stream<StatusOr<T>> which is common
103 // in the codebase.
104 template <typename T>
105 using StatusOrStream = Stream<StatusOr<T>>;
106 
107 // Creates a Stream<T> which returns the next value inside |vector| every time
108 // Stream<T>::Poll is called.
109 template <typename T>
StreamFrom(std::vector<T> vector)110 Stream<T> StreamFrom(std::vector<T> vector) {
111   return MakeStream<ImmediateStreamImpl<T>>(std::move(vector));
112 }
113 
114 // Creates a Stream<T> which immediately returns DonePollResult when polled.
115 template <typename T>
EmptyStream()116 Stream<T> EmptyStream() {
117   return StreamFrom(std::vector<T>());
118 }
119 
120 // Creates a Stream<T> which returns |first| and each of |rest| in sequence when
121 // polled.
122 template <typename T, typename... Ts>
StreamOf(T first,Ts...rest)123 Stream<T> StreamOf(T first, Ts... rest) {
124   std::vector<T> values;
125   AddAllToVector(values, std::forward<T>(first), std::forward<Ts>(rest)...);
126   return StreamFrom(std::move(values));
127 }
128 
129 // Creates a Stream<T> which returns the value of |future| before completing.
130 template <typename T>
StreamFromFuture(Future<T> future)131 Stream<T> StreamFromFuture(Future<T> future) {
132   return StreamOf(std::move(future)).MapFuture([](Future<T> value) { return value; });
133 }
134 
135 // Creates a stream which returns no elements but calls |fn| in the destructor
136 // of the returned stream.
137 //
138 // This function can be used to do resource management for a stream by making
139 // the passed |fn| own the resources used by any "upstream" sources and then
140 // Concat-ing this stream with the upstream.
141 template <typename T, typename Function>
OnDestroyStream(Function fn)142 Stream<T> OnDestroyStream(Function fn) {
143   return MakeStream<OnDestroyStreamImpl<T, Function>>(std::move(fn));
144 }
145 
146 // Creates a Stream<T> returning values generated by each stream in |streams| as
147 // soon as they are produced without preserving ordering.
148 //
149 // The returned Stream<T> keeps the amount of Poll calls to the inner |streams|,
150 // to a minimum only calling Poll for the Streams which are marked are ready
151 // in the PollContext.
152 template <typename T>
FlattenStreams(std::vector<Stream<T>> streams)153 Stream<T> FlattenStreams(std::vector<Stream<T>> streams) {
154   return MakeStream<FlattenImpl<T>>(std::move(streams));
155 }
156 
157 // Collector for Stream<Status>::Collect() which immediately resolves the
158 // returned Future when an error status is detected. Resolves with
159 // OkStatus once the entire stream finishes after returning all OkStatus().
AllOkCollector()160 inline std::unique_ptr<Collector<Status, Status>> AllOkCollector() {
161   return std::make_unique<AllOkCollectorImpl>();
162 }
163 
164 // Collector for Stream<T>::Collect() which ensures the stream returns *exactly*
165 // one T before completing. Crashes if either a) no values are produced by
166 // the Stream, b) more than one value is produced by the Stream.
167 template <typename T>
ToFutureCheckedCollector()168 inline std::unique_ptr<Collector<T, T>> ToFutureCheckedCollector() {
169   return std::make_unique<FutureCheckedCollectorImpl<T>>();
170 }
171 
172 // Collector for Stream<StatusOr<T>>::Collect() which returns a vector
173 // containing all the successful results from the stream. If any element is an
174 // error, short-circuits the stream with the error.
175 template <typename T>
176 inline std::unique_ptr<
177     Collector<StatusOr<T>, StatusOr<std::vector<T>>>>
StatusOrVectorCollector()178 StatusOrVectorCollector() {
179   return std::make_unique<StatusOrVectorCollectorImpl<T>>();
180 }
181 
182 }  // namespace base
183 }  // namespace perfetto
184 
185 #endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
186