1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
19
20 #include <atomic>
21 #include <cstdint>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <optional>
26 #include <utility>
27 #include <vector>
28
29 #include "perfetto/base/compiler.h"
30 #include "perfetto/base/flat_set.h"
31 #include "perfetto/base/platform_handle.h"
32 #include "perfetto/base/task_runner.h"
33 #include "perfetto/ext/base/event_fd.h"
34 #include "perfetto/ext/base/flat_hash_map.h"
35 #include "perfetto/ext/base/thread_checker.h"
36 #include "perfetto/ext/base/threading/channel.h"
37 #include "perfetto/ext/base/threading/future.h"
38 #include "perfetto/ext/base/threading/poll.h"
39 #include "perfetto/ext/base/threading/stream.h"
40 #include "perfetto/ext/base/threading/stream_combinators.h"
41 #include "perfetto/ext/base/threading/util.h"
42 #include "perfetto/ext/base/uuid.h"
43 #include "perfetto/ext/base/weak_ptr.h"
44
45 namespace perfetto {
46 namespace base {
47
48 class PolledFuture;
49
50 // A RAII object which tracks the polling of a Future.
51 //
52 // When this object is dropped, the backing Future will be cancelled as
53 // soon as possible. In practice, the cancellation happens on the TaskRunner
54 // thread so there can be some delay.
55 class SpawnHandle {
56 public:
57 SpawnHandle(TaskRunner* task_runner, std::function<Future<FVoid>()> fn);
58 ~SpawnHandle();
59
60 private:
61 SpawnHandle(const SpawnHandle&) = delete;
62 SpawnHandle& operator=(const SpawnHandle&) = delete;
63
64 TaskRunner* task_runner_ = nullptr;
65 std::shared_ptr<std::unique_ptr<PolledFuture>> polled_future_;
66 };
67
68 // Specialization of SpawnHandle used by Futures/Streams which return T.
69 //
70 // Values of T are returned through a Channel<T> which allows reading these
71 // values on a different thread to where the polling happens.
72 template <typename T>
73 class ResultSpawnHandle {
74 public:
ResultSpawnHandle(TaskRunner * task_runner,std::shared_ptr<Channel<T>> channel,std::function<Future<FVoid> ()> fn)75 ResultSpawnHandle(TaskRunner* task_runner,
76 std::shared_ptr<Channel<T>> channel,
77 std::function<Future<FVoid>()> fn)
78 : handle_(task_runner, std::move(fn)), channel_(std::move(channel)) {}
79
channel()80 Channel<T>* channel() const { return channel_.get(); }
81
82 private:
83 SpawnHandle handle_;
84 std::shared_ptr<Channel<T>> channel_;
85 };
86
87 // "Spawns" a Future<FVoid> on the given TaskRunner and returns an RAII
88 // SpawnHandle which can be used to cancel the spawn.
89 //
90 // Spawning a Future means to poll it to completion. In Perfetto, this is done
91 // by using a TaskRunner object to track FD readiness and polling the Future
92 // when progress can be made.
93 //
94 // The returned SpawnHandle should be stashed as it is responsible for the
95 // lifetime of the pollling. If the SpawnHandle is dropped, the Future is
96 // cancelled and dropped ASAP (this happens on the TaskRunner thread so there
97 // can be some delay).
SpawnFuture(TaskRunner * task_runner,std::function<Future<FVoid> ()> fn)98 PERFETTO_WARN_UNUSED_RESULT inline SpawnHandle SpawnFuture(
99 TaskRunner* task_runner,
100 std::function<Future<FVoid>()> fn) {
101 return SpawnHandle(task_runner, std::move(fn));
102 }
103
104 // Variant of |SpawnFuture| for a Stream<T> allowing returning items of T.
105 //
106 // See ResultSpawnHandle for how elements from the stream can be consumed.
107 template <typename T>
SpawnResultStream(TaskRunner * task_runner,std::function<Stream<T> ()> fn)108 PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle<T> SpawnResultStream(
109 TaskRunner* task_runner,
110 std::function<Stream<T>()> fn) {
111 class AllVoidCollector : public Collector<FVoid, FVoid> {
112 public:
113 std::optional<FVoid> OnNext(FVoid) override { return std::nullopt; }
114 FVoid OnDone() override { return FVoid(); }
115 };
116 auto channel = std::make_shared<Channel<T>>(4);
117 return ResultSpawnHandle<T>(
118 task_runner, channel, [c = channel, fn = std::move(fn)]() {
119 return fn()
120 .MapFuture([c](T value) {
121 return WriteChannelFuture(c.get(), std::move(value));
122 })
123 .Concat(OnDestroyStream<FVoid>([c]() { c->Close(); }))
124 .Collect(std::unique_ptr<Collector<FVoid, FVoid>>(
125 new AllVoidCollector()));
126 });
127 }
128
129 // Variant of |SpawnFuture| for a Future<T> allowing returning items of T.
130 //
131 // See ResultSpawnHandle for how elements from the future can be consumed.
132 template <typename T>
SpawnResultFuture(TaskRunner * task_runner,std::function<Future<T> ()> fn)133 PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle<T> SpawnResultFuture(
134 TaskRunner* task_runner,
135 std::function<Future<T>()> fn) {
136 return SpawnResultStream<T>(task_runner, [fn = std::move(fn)]() {
137 return StreamFromFuture(std::move(fn()));
138 });
139 }
140
141 } // namespace base
142 } // namespace perfetto
143
144 #endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
145