• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
19 
20 #include <atomic>
21 #include <cstdint>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <optional>
26 #include <utility>
27 #include <vector>
28 
29 #include "perfetto/base/compiler.h"
30 #include "perfetto/base/flat_set.h"
31 #include "perfetto/base/platform_handle.h"
32 #include "perfetto/base/task_runner.h"
33 #include "perfetto/ext/base/event_fd.h"
34 #include "perfetto/ext/base/flat_hash_map.h"
35 #include "perfetto/ext/base/thread_checker.h"
36 #include "perfetto/ext/base/threading/channel.h"
37 #include "perfetto/ext/base/threading/future.h"
38 #include "perfetto/ext/base/threading/poll.h"
39 #include "perfetto/ext/base/threading/stream.h"
40 #include "perfetto/ext/base/threading/stream_combinators.h"
41 #include "perfetto/ext/base/threading/util.h"
42 #include "perfetto/ext/base/uuid.h"
43 #include "perfetto/ext/base/weak_ptr.h"
44 
45 namespace perfetto {
46 namespace base {
47 
48 class PolledFuture;
49 
50 // A RAII object which tracks the polling of a Future.
51 //
52 // When this object is dropped, the backing Future will be cancelled as
53 // soon as possible. In practice, the cancellation happens on the TaskRunner
54 // thread so there can be some delay.
55 class SpawnHandle {
56  public:
57   SpawnHandle(TaskRunner* task_runner, std::function<Future<FVoid>()> fn);
58   ~SpawnHandle();
59 
60  private:
61   SpawnHandle(const SpawnHandle&) = delete;
62   SpawnHandle& operator=(const SpawnHandle&) = delete;
63 
64   TaskRunner* task_runner_ = nullptr;
65   std::shared_ptr<std::unique_ptr<PolledFuture>> polled_future_;
66 };
67 
68 // Specialization of SpawnHandle used by Futures/Streams which return T.
69 //
70 // Values of T are returned through a Channel<T> which allows reading these
71 // values on a different thread to where the polling happens.
72 template <typename T>
73 class ResultSpawnHandle {
74  public:
ResultSpawnHandle(TaskRunner * task_runner,std::shared_ptr<Channel<T>> channel,std::function<Future<FVoid> ()> fn)75   ResultSpawnHandle(TaskRunner* task_runner,
76                     std::shared_ptr<Channel<T>> channel,
77                     std::function<Future<FVoid>()> fn)
78       : handle_(task_runner, std::move(fn)), channel_(std::move(channel)) {}
79 
channel()80   Channel<T>* channel() const { return channel_.get(); }
81 
82  private:
83   SpawnHandle handle_;
84   std::shared_ptr<Channel<T>> channel_;
85 };
86 
87 // "Spawns" a Future<FVoid> on the given TaskRunner and returns an RAII
88 // SpawnHandle which can be used to cancel the spawn.
89 //
90 // Spawning a Future means to poll it to completion. In Perfetto, this is done
91 // by using a TaskRunner object to track FD readiness and polling the Future
92 // when progress can be made.
93 //
94 // The returned SpawnHandle should be stashed as it is responsible for the
95 // lifetime of the pollling. If the SpawnHandle is dropped, the Future is
96 // cancelled and dropped ASAP (this happens on the TaskRunner thread so there
97 // can be some delay).
SpawnFuture(TaskRunner * task_runner,std::function<Future<FVoid> ()> fn)98 PERFETTO_WARN_UNUSED_RESULT inline SpawnHandle SpawnFuture(
99     TaskRunner* task_runner,
100     std::function<Future<FVoid>()> fn) {
101   return SpawnHandle(task_runner, std::move(fn));
102 }
103 
104 // Variant of |SpawnFuture| for a Stream<T> allowing returning items of T.
105 //
106 // See ResultSpawnHandle for how elements from the stream can be consumed.
107 template <typename T>
SpawnResultStream(TaskRunner * task_runner,std::function<Stream<T> ()> fn)108 PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle<T> SpawnResultStream(
109     TaskRunner* task_runner,
110     std::function<Stream<T>()> fn) {
111   class AllVoidCollector : public Collector<FVoid, FVoid> {
112    public:
113     std::optional<FVoid> OnNext(FVoid) override { return std::nullopt; }
114     FVoid OnDone() override { return FVoid(); }
115   };
116   auto channel = std::make_shared<Channel<T>>(4);
117   return ResultSpawnHandle<T>(
118       task_runner, channel, [c = channel, fn = std::move(fn)]() {
119         return fn()
120             .MapFuture([c](T value) {
121               return WriteChannelFuture(c.get(), std::move(value));
122             })
123             .Concat(OnDestroyStream<FVoid>([c]() { c->Close(); }))
124             .Collect(std::unique_ptr<Collector<FVoid, FVoid>>(
125                 new AllVoidCollector()));
126       });
127 }
128 
129 // Variant of |SpawnFuture| for a Future<T> allowing returning items of T.
130 //
131 // See ResultSpawnHandle for how elements from the future can be consumed.
132 template <typename T>
SpawnResultFuture(TaskRunner * task_runner,std::function<Future<T> ()> fn)133 PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle<T> SpawnResultFuture(
134     TaskRunner* task_runner,
135     std::function<Future<T>()> fn) {
136   return SpawnResultStream<T>(task_runner, [fn = std::move(fn)]() {
137     return StreamFromFuture(std::move(fn()));
138   });
139 }
140 
141 }  // namespace base
142 }  // namespace perfetto
143 
144 #endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
145