• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
19 
20 #include <functional>
21 #include <memory>
22 #include <vector>
23 
24 #include "perfetto/base/status.h"
25 #include "perfetto/ext/base/status_or.h"
26 #include "perfetto/ext/base/threading/future.h"
27 #include "perfetto/ext/base/threading/stream_combinators.h"
28 
29 namespace perfetto {
30 namespace base {
31 
32 // Creates a Stream<T> from P, a subclass of StreamPollable<T>.
33 //
34 // This function follows the same pattern of std::make_unique, std::make_shared
35 // etc.
36 template <typename P, typename... Args>
MakeStream(Args...args)37 Stream<typename P::PollT> MakeStream(Args... args) {
38   return Stream<typename P::PollT>(
39       std::unique_ptr<StreamPollable<typename P::PollT>>(
40           new P(std::forward<Args>(args)...)));
41 }
42 
43 // An asynchronous iterator for values of type T.
44 //
45 // If Future<T> is an asynchronous version of T, Stream<T> is an asynchronous
46 // version of Iterator<T>. Long-running compute/IO operations which return
47 // multiple values can be represented with a Stream<T>.
48 //
49 // Note: Streams *must* be polled on the same thread on which they were
50 // created. The |SpawnResultStreams| can be used to move of the results of
51 // Streams between threads in a safe manner.
52 //
53 // Refer to the class documentation for Future<T> as most of the features and
54 // implementation of Future<T> also apply to Stream<T>.
55 template <typename T>
56 class Stream {
57  public:
58   using PollableItem = T;
59 
60   // Creates a Stream from a |StreamPollable<T>|. Prefer using |MakeStream|
61   // instead of this function.
Stream(std::unique_ptr<StreamPollable<T>> pollable)62   explicit Stream(std::unique_ptr<StreamPollable<T>> pollable)
63       : pollable_(std::move(pollable)) {}
64 
65   // Converts a Stream<T> to Stream<U>. This works by applying |map_fn| to each
66   // element in T and then polling the returned Future<U> to completion.
67   template <typename Function /* = Future<U>(T) */>
MapFuture(Function map_fn)68   Stream<FutureReturn<Function, T>> MapFuture(Function map_fn) && {
69     return MakeStream<MapFutureStreamImpl<Function, T>>(std::move(*this),
70                                                         std::move(map_fn));
71   }
72 
73   // Creates a stream which fully polls |this| and then polls |concat| to
74   // completion.
Concat(Stream<T> concat)75   Stream<T> Concat(Stream<T> concat) && {
76     return MakeStream<ConcatStreamImpl<T>>(std::move(*this), std::move(concat));
77   }
78 
79   // Converts a Stream<T> to Future<U> by collecting elements using |collector|.
80   // See documentation on |Collector| for how to implement one.
81   template <typename U>
Collect(std::unique_ptr<Collector<T,U>> collector)82   Future<U> Collect(std::unique_ptr<Collector<T, U>> collector) && {
83     return MakeFuture<CollectImpl<T, U>>(std::move(*this),
84                                          std::move(collector));
85   }
86 
87   // Checks if the computation backing this Stream<T> has finished.
88   //
89   // Returns a StreamPollResult<T> which is a essentially a
90   // variant<PendingPollResult, DonePollResult T>. If PendingPollResult is
91   // returned, |ctx| will be used to register interest in the various fds which
92   // are "blocking" this future from finishing. If DonePollResult is returned,
93   // Poll *must not* be called again.
PollNext(PollContext * ctx)94   StreamPollResult<T> PollNext(PollContext* ctx) {
95     return pollable_->PollNext(ctx);
96   }
97 
98  private:
99   std::unique_ptr<StreamPollable<T>> pollable_;
100 };
101 
102 // Alias to shorten type defintions for Stream<Status> which is common in
103 // the codebase.
104 using StatusStream = Stream<Status>;
105 
106 // Alias to shorten type defintions for Stream<StatusOr<T>> which is common
107 // in the codebase.
108 template <typename T>
109 using StatusOrStream = Stream<StatusOr<T>>;
110 
111 // Creates a Stream<T> which returns the next value inside |vector| every time
112 // Stream<T>::Poll is called.
113 template <typename T>
StreamFrom(std::vector<T> vector)114 Stream<T> StreamFrom(std::vector<T> vector) {
115   return MakeStream<ImmediateStreamImpl<T>>(std::move(vector));
116 }
117 
118 // Creates a Stream<T> which immediately returns DonePollResult when polled.
119 template <typename T>
EmptyStream()120 Stream<T> EmptyStream() {
121   return StreamFrom(std::vector<T>());
122 }
123 
124 // Creates a Stream<T> which returns |first| and each of |rest| in sequence when
125 // polled.
126 template <typename T, typename... Ts>
StreamOf(T first,Ts...rest)127 Stream<T> StreamOf(T first, Ts... rest) {
128   std::vector<T> values;
129   AddAllToVector(values, std::forward<T>(first), std::forward<Ts>(rest)...);
130   return StreamFrom(std::move(values));
131 }
132 
133 // Creates a Stream<T> which returns the value of |future| before completing.
134 template <typename T>
StreamFromFuture(Future<T> future)135 Stream<T> StreamFromFuture(Future<T> future) {
136   return StreamOf(std::move(future)).MapFuture([](Future<T> value) {
137     return value;
138   });
139 }
140 
141 // Creates a stream which returns no elements but calls |fn| in the destructor
142 // of the returned stream.
143 //
144 // This function can be used to do resource management for a stream by making
145 // the passed |fn| own the resources used by any "upstream" sources and then
146 // Concat-ing this stream with the upstream.
147 template <typename T, typename Function>
OnDestroyStream(Function fn)148 Stream<T> OnDestroyStream(Function fn) {
149   return MakeStream<OnDestroyStreamImpl<T, Function>>(std::move(fn));
150 }
151 
152 // Creates a Stream<T> returning values generated by each stream in |streams| as
153 // soon as they are produced without preserving ordering.
154 //
155 // The returned Stream<T> keeps the amount of Poll calls to the inner |streams|,
156 // to a minimum only calling Poll for the Streams which are marked are ready
157 // in the PollContext.
158 template <typename T>
FlattenStreams(std::vector<Stream<T>> streams)159 Stream<T> FlattenStreams(std::vector<Stream<T>> streams) {
160   return MakeStream<FlattenImpl<T>>(std::move(streams));
161 }
162 
163 // Collector for Stream<Status>::Collect() which immediately resolves the
164 // returned Future when an error status is detected. Resolves with
165 // OkStatus once the entire stream finishes after returning all OkStatus().
AllOkCollector()166 inline std::unique_ptr<Collector<Status, Status>> AllOkCollector() {
167   return std::make_unique<AllOkCollectorImpl>();
168 }
169 
170 // Collector for Stream<T>::Collect() which ensures the stream returns *exactly*
171 // one T before completing. Crashes if either a) no values are produced by
172 // the Stream, b) more than one value is produced by the Stream.
173 template <typename T>
ToFutureCheckedCollector()174 inline std::unique_ptr<Collector<T, T>> ToFutureCheckedCollector() {
175   return std::make_unique<FutureCheckedCollectorImpl<T>>();
176 }
177 
178 // Collector for Stream<StatusOr<T>>::Collect() which returns a vector
179 // containing all the successful results from the stream. If any element is an
180 // error, short-circuits the stream with the error.
181 template <typename T>
182 inline std::unique_ptr<Collector<StatusOr<T>, StatusOr<std::vector<T>>>>
StatusOrVectorCollector()183 StatusOrVectorCollector() {
184   return std::make_unique<StatusOrVectorCollectorImpl<T>>();
185 }
186 
187 }  // namespace base
188 }  // namespace perfetto
189 
190 #endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
191