• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
19 
20 #include <atomic>
21 #include <cstdint>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <optional>
26 #include <utility>
27 #include <vector>
28 
29 #include "perfetto/base/compiler.h"
30 #include "perfetto/base/flat_set.h"
31 #include "perfetto/base/platform_handle.h"
32 #include "perfetto/base/task_runner.h"
33 #include "perfetto/ext/base/event_fd.h"
34 #include "perfetto/ext/base/flat_hash_map.h"
35 #include "perfetto/ext/base/thread_checker.h"
36 #include "perfetto/ext/base/threading/channel.h"
37 #include "perfetto/ext/base/threading/future.h"
38 #include "perfetto/ext/base/threading/poll.h"
39 #include "perfetto/ext/base/threading/stream.h"
40 #include "perfetto/ext/base/threading/stream_combinators.h"
41 #include "perfetto/ext/base/threading/util.h"
42 #include "perfetto/ext/base/uuid.h"
43 #include "perfetto/ext/base/weak_ptr.h"
44 
45 namespace perfetto {
46 namespace base {
47 
48 class PolledFuture;
49 
50 // A RAII object which tracks the polling of a Future.
51 //
52 // When this object is dropped, the backing Future will be cancelled as
53 // soon as possible. In practice, the cancellation happens on the TaskRunner
54 // thread so there can be some delay.
55 class SpawnHandle {
56  public:
57   SpawnHandle(TaskRunner* task_runner, std::function<Future<FVoid>()> fn);
58 
59   SpawnHandle(SpawnHandle&&) = default;
60   SpawnHandle& operator=(SpawnHandle&&) = default;
61 
62   ~SpawnHandle();
63 
64  private:
65   SpawnHandle(const SpawnHandle&) = delete;
66   SpawnHandle& operator=(const SpawnHandle&) = delete;
67 
68   TaskRunner* task_runner_ = nullptr;
69   std::shared_ptr<std::unique_ptr<PolledFuture>> polled_future_;
70 };
71 
72 // "Spawns" a Future<FVoid> on the given TaskRunner and returns an RAII
73 // SpawnHandle which can be used to cancel the spawn.
74 //
75 // Spawning a Future means to poll it to completion. In Perfetto, this is done
76 // by using a TaskRunner object to track FD readiness and polling the Future
77 // when progress can be made.
78 //
79 // The returned SpawnHandle should be stashed as it is responsible for the
80 // lifetime of the pollling. If the SpawnHandle is dropped, the Future is
81 // cancelled and dropped ASAP (this happens on the TaskRunner thread so there
82 // can be some delay).
SpawnFuture(TaskRunner * task_runner,std::function<Future<FVoid> ()> fn)83 PERFETTO_WARN_UNUSED_RESULT inline SpawnHandle SpawnFuture(
84     TaskRunner* task_runner,
85     std::function<Future<FVoid>()> fn) {
86   return SpawnHandle(task_runner, std::move(fn));
87 }
88 
89 // Variant of |SpawnFuture| for a Stream<T> allowing returning items of T.
90 //
91 // The Stream<T> returned by this function can be consumed on any thread, not
92 // just the thread which ran this function.
93 //
94 // Dropping the returned stream does not affect the polling of the underlying
95 // stream (i.e. the stream returned by |fn|); the polled values will simply be
96 // dropped.
97 //
98 // Dropping the returned SpawnHandle causes the underlying stream to be
99 // cancelled and dropped ASAP (this happens on the TaskRunner thread so there
100 // can be some delay). The returned channel will return all the values that were
101 // produced by the underlying stream before the cancellation.
102 template <typename T>
SpawnResultStream(TaskRunner * runner,std::function<Stream<T> ()> fn)103 PERFETTO_WARN_UNUSED_RESULT std::pair<SpawnHandle, Stream<T>> SpawnResultStream(
104     TaskRunner* runner,
105     std::function<Stream<T>()> fn) {
106   class AllVoidCollector : public Collector<FVoid, FVoid> {
107    public:
108     std::optional<FVoid> OnNext(FVoid) override { return std::nullopt; }
109     FVoid OnDone() override { return FVoid(); }
110   };
111   auto channel = std::make_shared<Channel<T>>(4);
112   auto control = std::make_shared<Channel<FVoid>>(1);
113   SpawnHandle handle(runner, [channel, control, fn = std::move(fn)]() {
114     return fn()
115         .MapFuture([channel, control](T value) mutable {
116           if (control->ReadNonBlocking().is_closed) {
117             return base::Future<base::FVoid>(base::FVoid());
118           }
119           return WriteChannelFuture(channel.get(), std::move(value));
120         })
121         .Concat(OnDestroyStream<FVoid>([c = channel]() { c->Close(); }))
122         .template Collect<base::FVoid>(std::make_unique<AllVoidCollector>());
123   });
124   Stream<T> stream = ReadChannelStream(channel.get())
125                          .Concat(OnDestroyStream<T>([channel, control]() {
126                            // Close the control stream and drain an element from
127                            // the channel to unblock it in case it was blocked.
128                            // NOTE: the ordering here is important as we could
129                            // deadlock if it was the other way around!
130                            control->Close();
131                            base::ignore_result(channel->ReadNonBlocking());
132                          }));
133   return std::make_pair(std::move(handle), std::move(stream));
134 }
135 
136 // Variant of |SpawnResultStream| but for Future<T>.
137 template <typename T>
138 PERFETTO_WARN_UNUSED_RESULT inline std::pair<SpawnHandle, Future<T>>
SpawnResultFuture(TaskRunner * task_runner,std::function<Future<T> ()> fn)139 SpawnResultFuture(TaskRunner* task_runner, std::function<Future<T>()> fn) {
140   auto [handle, stream] = SpawnResultStream<T>(
141       task_runner, [fn = std::move(fn)]() { return StreamFromFuture(fn()); });
142   return std::make_pair(std::move(handle), std::move(stream).Collect(
143                                                ToFutureCheckedCollector<T>()));
144 }
145 
146 }  // namespace base
147 }  // namespace perfetto
148 
149 #endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
150