1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
19
20 #include <atomic>
21 #include <cstdint>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <optional>
26 #include <utility>
27 #include <vector>
28
29 #include "perfetto/base/compiler.h"
30 #include "perfetto/base/flat_set.h"
31 #include "perfetto/base/platform_handle.h"
32 #include "perfetto/base/task_runner.h"
33 #include "perfetto/ext/base/event_fd.h"
34 #include "perfetto/ext/base/flat_hash_map.h"
35 #include "perfetto/ext/base/thread_checker.h"
36 #include "perfetto/ext/base/threading/channel.h"
37 #include "perfetto/ext/base/threading/future.h"
38 #include "perfetto/ext/base/threading/poll.h"
39 #include "perfetto/ext/base/threading/stream.h"
40 #include "perfetto/ext/base/threading/stream_combinators.h"
41 #include "perfetto/ext/base/threading/util.h"
42 #include "perfetto/ext/base/uuid.h"
43 #include "perfetto/ext/base/weak_ptr.h"
44
45 namespace perfetto {
46 namespace base {
47
48 class PolledFuture;
49
50 // A RAII object which tracks the polling of a Future.
51 //
52 // When this object is dropped, the backing Future will be cancelled as
53 // soon as possible. In practice, the cancellation happens on the TaskRunner
54 // thread so there can be some delay.
55 class SpawnHandle {
56 public:
57 SpawnHandle(TaskRunner* task_runner, std::function<Future<FVoid>()> fn);
58
59 SpawnHandle(SpawnHandle&&) = default;
60 SpawnHandle& operator=(SpawnHandle&&) = default;
61
62 ~SpawnHandle();
63
64 private:
65 SpawnHandle(const SpawnHandle&) = delete;
66 SpawnHandle& operator=(const SpawnHandle&) = delete;
67
68 TaskRunner* task_runner_ = nullptr;
69 std::shared_ptr<std::unique_ptr<PolledFuture>> polled_future_;
70 };
71
72 // "Spawns" a Future<FVoid> on the given TaskRunner and returns an RAII
73 // SpawnHandle which can be used to cancel the spawn.
74 //
75 // Spawning a Future means to poll it to completion. In Perfetto, this is done
76 // by using a TaskRunner object to track FD readiness and polling the Future
77 // when progress can be made.
78 //
79 // The returned SpawnHandle should be stashed as it is responsible for the
80 // lifetime of the pollling. If the SpawnHandle is dropped, the Future is
81 // cancelled and dropped ASAP (this happens on the TaskRunner thread so there
82 // can be some delay).
SpawnFuture(TaskRunner * task_runner,std::function<Future<FVoid> ()> fn)83 PERFETTO_WARN_UNUSED_RESULT inline SpawnHandle SpawnFuture(
84 TaskRunner* task_runner,
85 std::function<Future<FVoid>()> fn) {
86 return SpawnHandle(task_runner, std::move(fn));
87 }
88
89 // Variant of |SpawnFuture| for a Stream<T> allowing returning items of T.
90 //
91 // The Stream<T> returned by this function can be consumed on any thread, not
92 // just the thread which ran this function.
93 //
94 // Dropping the returned stream does not affect the polling of the underlying
95 // stream (i.e. the stream returned by |fn|); the polled values will simply be
96 // dropped.
97 //
98 // Dropping the returned SpawnHandle causes the underlying stream to be
99 // cancelled and dropped ASAP (this happens on the TaskRunner thread so there
100 // can be some delay). The returned channel will return all the values that were
101 // produced by the underlying stream before the cancellation.
102 template <typename T>
SpawnResultStream(TaskRunner * runner,std::function<Stream<T> ()> fn)103 PERFETTO_WARN_UNUSED_RESULT std::pair<SpawnHandle, Stream<T>> SpawnResultStream(
104 TaskRunner* runner,
105 std::function<Stream<T>()> fn) {
106 class AllVoidCollector : public Collector<FVoid, FVoid> {
107 public:
108 std::optional<FVoid> OnNext(FVoid) override { return std::nullopt; }
109 FVoid OnDone() override { return FVoid(); }
110 };
111 auto channel = std::make_shared<Channel<T>>(4);
112 auto control = std::make_shared<Channel<FVoid>>(1);
113 SpawnHandle handle(runner, [channel, control, fn = std::move(fn)]() {
114 return fn()
115 .MapFuture([channel, control](T value) mutable {
116 if (control->ReadNonBlocking().is_closed) {
117 return base::Future<base::FVoid>(base::FVoid());
118 }
119 return WriteChannelFuture(channel.get(), std::move(value));
120 })
121 .Concat(OnDestroyStream<FVoid>([c = channel]() { c->Close(); }))
122 .template Collect<base::FVoid>(std::make_unique<AllVoidCollector>());
123 });
124 Stream<T> stream = ReadChannelStream(channel.get())
125 .Concat(OnDestroyStream<T>([channel, control]() {
126 // Close the control stream and drain an element from
127 // the channel to unblock it in case it was blocked.
128 // NOTE: the ordering here is important as we could
129 // deadlock if it was the other way around!
130 control->Close();
131 base::ignore_result(channel->ReadNonBlocking());
132 }));
133 return std::make_pair(std::move(handle), std::move(stream));
134 }
135
136 // Variant of |SpawnResultStream| but for Future<T>.
137 template <typename T>
138 PERFETTO_WARN_UNUSED_RESULT inline std::pair<SpawnHandle, Future<T>>
SpawnResultFuture(TaskRunner * task_runner,std::function<Future<T> ()> fn)139 SpawnResultFuture(TaskRunner* task_runner, std::function<Future<T>()> fn) {
140 auto [handle, stream] = SpawnResultStream<T>(
141 task_runner, [fn = std::move(fn)]() { return StreamFromFuture(fn()); });
142 return std::make_pair(std::move(handle), std::move(stream).Collect(
143 ToFutureCheckedCollector<T>()));
144 }
145
146 } // namespace base
147 } // namespace perfetto
148
149 #endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
150