1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
19 
20 #include <functional>
21 #include <memory>
22 #include <optional>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26 
27 #include "perfetto/base/status.h"
28 #include "perfetto/base/task_runner.h"
29 #include "perfetto/ext/base/threading/channel.h"
30 #include "perfetto/ext/base/threading/future.h"
31 #include "perfetto/ext/base/threading/poll.h"
32 #include "perfetto/ext/base/threading/stream.h"
33 #include "perfetto/ext/base/threading/thread_pool.h"
34 #include "perfetto/ext/base/unix_task_runner.h"
35 
36 namespace perfetto {
37 namespace base {
38 
39 // Blocks the calling thread until |fd| is considered "readable". In Linux,
40 // this corresponds to |POLLOUT| or |POLLHUP| being returned if |fd| is polled.
BlockUntilReadableFd(base::PlatformHandle fd)41 inline void BlockUntilReadableFd(base::PlatformHandle fd) {
42   base::UnixTaskRunner runner;
43   runner.AddFileDescriptorWatch(fd, [&runner]() { runner.Quit(); });
44   runner.Run();
45 }
46 
47 // Creates a Stream<T> which returns all the data from |channel| and completes
48 // when |channel| is closed.
49 //
50 // Note: the caller retains ownership of the passed channel and must ensure that
51 // the channel outlives the lifetime of the returned stream.
52 template <typename T>
ReadChannelStream(Channel<T> * channel)53 Stream<T> ReadChannelStream(Channel<T>* channel) {
54   class ReadImpl : public StreamPollable<T> {
55    public:
56     explicit ReadImpl(Channel<T>* reader) : reader_(reader) {}
57 
58     StreamPollResult<T> PollNext(PollContext* ctx) override {
59       auto result = reader_->ReadNonBlocking();
60       if (!result.item.has_value()) {
61         if (result.is_closed) {
62           return DonePollResult();
63         }
64         ctx->RegisterInterested(reader_->read_fd());
65         return PendingPollResult();
66       }
67       return std::move(*result.item);
68     }
69 
70    private:
71     Channel<T>* reader_ = nullptr;
72   };
73   return MakeStream<ReadImpl>(channel);
74 }
75 
76 // Creates a Future<FVoid> which handles writing |item| into |channel|. The
77 // Future is completed when the item is succesfully written.
78 //
79 // Note: the caller retains ownership of the passed channel and must ensure that
80 // the channel outlives the lifetime of the returned future.
81 template <typename T>
WriteChannelFuture(Channel<T> * channel,T item)82 Future<FVoid> WriteChannelFuture(Channel<T>* channel, T item) {
83   class WriteImpl : public FuturePollable<FVoid> {
84    public:
85     WriteImpl(Channel<T>* writer, T to_write)
86         : writer_(writer), to_write_(std::move(to_write)) {}
87 
88     FuturePollResult<FVoid> Poll(PollContext* ctx) override {
89       auto res = writer_->WriteNonBlocking(std::move(to_write_));
90       PERFETTO_CHECK(!res.is_closed);
91       if (!res.success) {
92         ctx->RegisterInterested(writer_->write_fd());
93         return PendingPollResult();
94       }
95       return FVoid();
96     }
97 
98    private:
99     Channel<T>* writer_ = nullptr;
100     T to_write_;
101   };
102   return MakeFuture<WriteImpl>(channel, std::move(item));
103 }
104 
105 // Creates a Stream<T> which yields the result of executing |fn| on |pool|
106 // repeatedly. The returned stream only completes when |fn| returns
107 // std::nullopt.
108 //
109 // The intended usage of this function is to schedule CPU intensive work on a
110 // background thread pool and receive regular "updates" on the progress by:
111 // a) breaking the work into chunks
112 // b) returning some indication of progress/partial results through |T|.
113 template <typename T>
RunOnThreadPool(ThreadPool * pool,std::function<std::optional<T> ()> fn)114 Stream<T> RunOnThreadPool(ThreadPool* pool,
115                           std::function<std::optional<T>()> fn) {
116   class RunOnPoolImpl : public StreamPollable<T> {
117    public:
118     explicit RunOnPoolImpl(ThreadPool* pool,
119                            std::function<std::optional<T>()> fn)
120         : pool_(pool),
121           fn_(std::make_shared<std::function<std::optional<T>()>>(
122               std::move(fn))),
123           channel_(new Channel<T>(1)),
124           channel_stream_(ReadChannelStream(channel_.get())) {
125       RunFn();
126     }
127 
128     StreamPollResult<T> PollNext(PollContext* ctx) override {
129       ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, channel_stream_.PollNext(ctx));
130       if (res.IsDone()) {
131         return DonePollResult();
132       }
133       RunFn();
134       return res;
135     }
136 
137    private:
138     void RunFn() {
139       pool_->PostTask([channel = channel_, fn = fn_]() mutable {
140         auto opt_value = (*fn)();
141         if (!opt_value) {
142           // Clear out the function to ensure that any captured state is freed
143           // before we inform the caller.
144           fn.reset();
145           channel->Close();
146           return;
147         }
148         auto write_res =
149             channel->WriteNonBlocking(std::move(opt_value.value()));
150         PERFETTO_CHECK(write_res.success);
151         PERFETTO_CHECK(!write_res.is_closed);
152       });
153     }
154 
155     ThreadPool* pool_ = nullptr;
156     std::shared_ptr<std::function<std::optional<T>()>> fn_;
157     std::shared_ptr<Channel<T>> channel_;
158     base::Stream<T> channel_stream_;
159   };
160   return MakeStream<RunOnPoolImpl>(pool, std::move(fn));
161 }
162 
163 // Creates a Future<T> which yields the result of executing |fn| on |pool|. The
164 // returned completes with the return value of |fn|.
165 //
166 // The intended usage of this function is to schedule CPU intensive work on a
167 // background thread pool and have the result returned when available.
168 template <typename T>
RunOnceOnThreadPool(ThreadPool * pool,std::function<T ()> fn)169 Future<T> RunOnceOnThreadPool(ThreadPool* pool, std::function<T()> fn) {
170   return RunOnThreadPool<T>(
171              pool,
172              [done = false, fn = std::move(fn)]() mutable -> std::optional<T> {
173                if (done) {
174                  return std::nullopt;
175                }
176                done = true;
177                return fn();
178              })
179       .Collect(base::ToFutureCheckedCollector<T>());
180 }
181 
182 }  // namespace base
183 }  // namespace perfetto
184 
185 #endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
186