• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
19 
20 #include <functional>
21 #include <memory>
22 #include <optional>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26 
27 #include "perfetto/base/status.h"
28 #include "perfetto/base/task_runner.h"
29 #include "perfetto/ext/base/threading/channel.h"
30 #include "perfetto/ext/base/threading/future.h"
31 #include "perfetto/ext/base/threading/poll.h"
32 #include "perfetto/ext/base/threading/stream.h"
33 #include "perfetto/ext/base/threading/thread_pool.h"
34 #include "perfetto/ext/base/unix_task_runner.h"
35 
36 namespace perfetto {
37 namespace base {
38 
39 // Blocks the calling thread until |fd| is considered "readable". In Linux,
40 // this corresponds to |POLLOUT| or |POLLHUP| being returned if |fd| is polled.
41 // If |timeout_ms| is specified, waits that many ms before stopping.
42 //
43 // Returns true if the function returned because the fd was readable and false
44 // otherwise.
45 inline bool BlockUntilReadableFd(
46     base::PlatformHandle fd,
47     std::optional<uint32_t> timeout_ms = std::nullopt) {
48   bool is_readable = false;
49   base::UnixTaskRunner runner;
50   runner.AddFileDescriptorWatch(fd, [&runner, &is_readable]() {
51     is_readable = true;
52     runner.Quit();
53   });
54   if (timeout_ms) {
55     runner.PostDelayedTask([&runner]() { runner.Quit(); }, *timeout_ms);
56   }
57   runner.Run();
58   return is_readable;
59 }
60 
61 // Creates a Stream<T> which returns all the data from |channel| and completes
62 // when |channel| is closed.
63 //
64 // Note: the caller retains ownership of the passed channel and must ensure that
65 // the channel outlives the lifetime of the returned stream.
66 template <typename T>
ReadChannelStream(Channel<T> * channel)67 Stream<T> ReadChannelStream(Channel<T>* channel) {
68   class ReadImpl : public StreamPollable<T> {
69    public:
70     explicit ReadImpl(Channel<T>* reader) : reader_(reader) {}
71 
72     StreamPollResult<T> PollNext(PollContext* ctx) override {
73       auto result = reader_->ReadNonBlocking();
74       if (!result.item.has_value()) {
75         if (result.is_closed) {
76           return DonePollResult();
77         }
78         ctx->RegisterInterested(reader_->read_fd());
79         return PendingPollResult();
80       }
81       return std::move(*result.item);
82     }
83 
84    private:
85     Channel<T>* reader_ = nullptr;
86   };
87   return MakeStream<ReadImpl>(channel);
88 }
89 
90 // Creates a Future<FVoid> which handles writing |item| into |channel|. The
91 // Future is completed when the item is succesfully written.
92 //
93 // Note: the caller retains ownership of the passed channel and must ensure that
94 // the channel outlives the lifetime of the returned future.
95 template <typename T>
WriteChannelFuture(Channel<T> * channel,T item)96 Future<FVoid> WriteChannelFuture(Channel<T>* channel, T item) {
97   class WriteImpl : public FuturePollable<FVoid> {
98    public:
99     WriteImpl(Channel<T>* writer, T to_write)
100         : writer_(writer), to_write_(std::move(to_write)) {}
101 
102     FuturePollResult<FVoid> Poll(PollContext* ctx) override {
103       auto res = writer_->WriteNonBlocking(std::move(to_write_));
104       PERFETTO_CHECK(!res.is_closed);
105       if (!res.success) {
106         ctx->RegisterInterested(writer_->write_fd());
107         return PendingPollResult();
108       }
109       return FVoid();
110     }
111 
112    private:
113     Channel<T>* writer_ = nullptr;
114     T to_write_;
115   };
116   return MakeFuture<WriteImpl>(channel, std::move(item));
117 }
118 
119 // Creates a Stream<T> which yields the result of executing |fn| on |pool|
120 // repeatedly. The returned stream only completes when |fn| returns
121 // std::nullopt.
122 //
123 // Callers can optionally specify a |on_destroy| function which is executed when
124 // the returned stream is destroyed. This is useful for informing the work
125 // spawned on the thread pool that the result is no longer necessary.
126 //
127 // The intended usage of this function is to schedule CPU intensive work on a
128 // background thread pool and receive regular "updates" on the progress by:
129 // a) breaking the work into chunks
130 // b) returning some indication of progress/partial results through |T|.
131 template <typename T>
132 Stream<T> RunOnThreadPool(
133     ThreadPool* pool,
134     std::function<std::optional<T>()> fn,
135     std::function<void()> on_destroy = [] {}) {
136   class RunOnPoolImpl : public StreamPollable<T> {
137    public:
RunOnPoolImpl(ThreadPool * pool,std::function<std::optional<T> ()> fn,std::function<void ()> on_destroy)138     explicit RunOnPoolImpl(ThreadPool* pool,
139                            std::function<std::optional<T>()> fn,
140                            std::function<void()> on_destroy)
141         : pool_(pool),
142           fn_(std::make_shared<std::function<std::optional<T>()>>(
143               std::move(fn))),
144           on_destroy_(std::move(on_destroy)),
145           channel_(new Channel<T>(1)),
146           channel_stream_(ReadChannelStream(channel_.get())) {
147       RunFn();
148     }
149 
~RunOnPoolImpl()150     ~RunOnPoolImpl() override { on_destroy_(); }
151 
PollNext(PollContext * ctx)152     StreamPollResult<T> PollNext(PollContext* ctx) override {
153       ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, channel_stream_.PollNext(ctx));
154       if (res.IsDone()) {
155         return DonePollResult();
156       }
157       RunFn();
158       return res;
159     }
160 
161    private:
RunFn()162     void RunFn() {
163       pool_->PostTask([channel = channel_, fn = fn_]() mutable {
164         auto opt_value = (*fn)();
165         if (!opt_value) {
166           // Clear out the function to ensure that any captured state is freed
167           // before we inform the caller.
168           fn.reset();
169           channel->Close();
170           return;
171         }
172         auto write_res =
173             channel->WriteNonBlocking(std::move(opt_value.value()));
174         PERFETTO_CHECK(write_res.success);
175         PERFETTO_CHECK(!write_res.is_closed);
176       });
177     }
178 
179     ThreadPool* pool_ = nullptr;
180     std::shared_ptr<std::function<std::optional<T>()>> fn_;
181     std::function<void()> on_destroy_;
182     std::shared_ptr<Channel<T>> channel_;
183     base::Stream<T> channel_stream_;
184   };
185   return MakeStream<RunOnPoolImpl>(pool, std::move(fn), std::move(on_destroy));
186 }
187 
188 // Creates a Future<T> which yields the result of executing |fn| on |pool|. The
189 // returned completes with the return value of |fn|.
190 //
191 // The intended usage of this function is to schedule CPU intensive work on a
192 // background thread pool and have the result returned when available.
193 template <typename T>
RunOnceOnThreadPool(ThreadPool * pool,std::function<T ()> fn)194 Future<T> RunOnceOnThreadPool(ThreadPool* pool, std::function<T()> fn) {
195   return RunOnThreadPool<T>(
196              pool,
197              [done = false, fn = std::move(fn)]() mutable -> std::optional<T> {
198                if (done) {
199                  return std::nullopt;
200                }
201                done = true;
202                return fn();
203              })
204       .Collect(base::ToFutureCheckedCollector<T>());
205 }
206 
207 }  // namespace base
208 }  // namespace perfetto
209 
210 #endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
211