• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stddef.h>
20 
21 #include <algorithm>
22 #include <new>
23 #include <string>
24 #include <utility>
25 
26 #include "absl/log/check.h"
27 #include "absl/log/log.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_format.h"
30 #include "absl/types/optional.h"
31 #include "src/core/lib/promise/context.h"
32 #include "src/core/lib/promise/detail/promise_factory.h"
33 #include "src/core/lib/promise/poll.h"
34 #include "src/core/lib/resource_quota/arena.h"
35 #include "src/core/util/construct_destruct.h"
36 #include "src/core/util/debug_location.h"
37 
38 namespace grpc_core {
39 
40 // Tracks a list of maps of T -> optional<T> via promises.
41 // When Run, runs each transformation in order, and resolves to the ulimate
42 // result.
43 // If a map resolves to nullopt, the chain is terminated and the result is
44 // nullopt.
45 // Maps can also have synchronous cleanup functions, which are guaranteed to be
46 // called at the termination of each run through the chain.
47 template <typename T>
48 class InterceptorList {
49  private:
50   // A map of T -> T via promises.
51   class Map {
52    public:
Map(DebugLocation from)53     explicit Map(DebugLocation from) : from_(from) {}
54     // Construct a promise to transform x into some other value at memory.
55     virtual void MakePromise(T x, void* memory) = 0;
56     // Destroy a promise constructed at memory.
57     virtual void Destroy(void* memory) = 0;
58     // Poll a promise constructed at memory.
59     // Resolves to an optional<T> -- if nullopt it means terminate the chain and
60     // resolve.
61     virtual Poll<absl::optional<T>> PollOnce(void* memory) = 0;
62     virtual ~Map() = default;
63 
64     // Update the next pointer stored with this map.
65     // This is only valid to call once, and only before the map is used.
SetNext(Map * next)66     void SetNext(Map* next) {
67       DCHECK_EQ(next_, nullptr);
68       next_ = next;
69     }
70 
71     // Access the creation location for this map (for debug tracing).
from()72     DebugLocation from() const { return from_; }
73 
74     // Access the next map in the chain (or nullptr if this is the last map).
next()75     Map* next() const { return next_; }
76 
77    private:
78     GPR_NO_UNIQUE_ADDRESS const DebugLocation from_;
79     Map* next_ = nullptr;
80   };
81 
82  public:
83   // The result of Run: a promise that will execute the entire chain.
84   class RunPromise {
85    public:
RunPromise(size_t memory_required,Map ** factory,absl::optional<T> value)86     RunPromise(size_t memory_required, Map** factory, absl::optional<T> value) {
87       if (!value.has_value() || *factory == nullptr) {
88         GRPC_TRACE_VLOG(promise_primitives, 2)
89             << "InterceptorList::RunPromise[" << this << "]: create immediate";
90         is_immediately_resolved_ = true;
91         Construct(&result_, std::move(value));
92       } else {
93         is_immediately_resolved_ = false;
94         Construct(&async_resolution_, memory_required);
95         (*factory)->MakePromise(std::move(*value),
96                                 async_resolution_.space.get());
97         async_resolution_.current_factory = *factory;
98         async_resolution_.first_factory = factory;
99         GRPC_TRACE_VLOG(promise_primitives, 2)
100             << "InterceptorList::RunPromise[" << this
101             << "]: create async; mem=" << async_resolution_.space.get();
102       }
103     }
104 
~RunPromise()105     ~RunPromise() {
106       GRPC_TRACE_VLOG(promise_primitives, 2)
107           << "InterceptorList::RunPromise[" << this << "]: destroy";
108       if (is_immediately_resolved_) {
109         Destruct(&result_);
110       } else {
111         if (async_resolution_.current_factory != nullptr) {
112           async_resolution_.current_factory->Destroy(
113               async_resolution_.space.get());
114         }
115         Destruct(&async_resolution_);
116       }
117     }
118 
119     RunPromise(const RunPromise&) = delete;
120     RunPromise& operator=(const RunPromise&) = delete;
121 
RunPromise(RunPromise && other)122     RunPromise(RunPromise&& other) noexcept
123         : is_immediately_resolved_(other.is_immediately_resolved_) {
124       GRPC_TRACE_VLOG(promise_primitives, 2)
125           << "InterceptorList::RunPromise[" << this << "]: move from "
126           << &other;
127       if (is_immediately_resolved_) {
128         Construct(&result_, std::move(other.result_));
129       } else {
130         Construct(&async_resolution_, std::move(other.async_resolution_));
131       }
132     }
133 
134     RunPromise& operator=(RunPromise&& other) noexcept = delete;
135 
operator()136     Poll<absl::optional<T>> operator()() {
137       GRPC_TRACE_VLOG(promise_primitives, 2)
138           << "InterceptorList::RunPromise[" << this << "]: " << DebugString();
139       if (is_immediately_resolved_) return std::move(result_);
140       while (true) {
141         if (*async_resolution_.first_factory == nullptr) {
142           // Cancelled whilst polling
143           return absl::nullopt;
144         }
145         auto r = async_resolution_.current_factory->PollOnce(
146             async_resolution_.space.get());
147         if (auto* p = r.value_if_ready()) {
148           async_resolution_.current_factory->Destroy(
149               async_resolution_.space.get());
150           async_resolution_.current_factory =
151               async_resolution_.current_factory->next();
152           if (!p->has_value()) async_resolution_.current_factory = nullptr;
153           GRPC_TRACE_VLOG(promise_primitives, 2)
154               << "InterceptorList::RunPromise[" << this
155               << "]: " << DebugString();
156           if (async_resolution_.current_factory == nullptr) {
157             return std::move(*p);
158           }
159           async_resolution_.current_factory->MakePromise(
160               std::move(**p), async_resolution_.space.get());
161           continue;
162         }
163         return Pending{};
164       }
165     }
166 
167    private:
DebugString()168     std::string DebugString() const {
169       if (is_immediately_resolved_) {
170         return absl::StrFormat("Result:has_value:%d", result_.has_value());
171       } else {
172         return absl::StrCat(
173             "Running:",
174             async_resolution_.current_factory == nullptr
175                 ? "END"
176                 : ([p = async_resolution_.current_factory->from()]() {
177                     return absl::StrCat(p.file(), ":", p.line());
178                   })()
179                       .c_str());
180       }
181     }
182     struct AsyncResolution {
AsyncResolutionAsyncResolution183       explicit AsyncResolution(size_t max_size)
184           : space(GetContext<Arena>()->MakePooledArray<char>(max_size)) {}
185       AsyncResolution(const AsyncResolution&) = delete;
186       AsyncResolution& operator=(const AsyncResolution&) = delete;
AsyncResolutionAsyncResolution187       AsyncResolution(AsyncResolution&& other) noexcept
188           : current_factory(std::exchange(other.current_factory, nullptr)),
189             first_factory(std::exchange(other.first_factory, nullptr)),
190             space(std::move(other.space)) {}
191       Map* current_factory;
192       Map** first_factory;
193       Arena::PoolPtr<char[]> space;
194     };
195     union {
196       AsyncResolution async_resolution_;
197       absl::optional<T> result_;
198     };
199     // If true, the result_ union is valid, otherwise async_resolution_ is.
200     // Indicates whether the promise resolved immediately at construction or if
201     // additional steps were needed.
202     bool is_immediately_resolved_;
203   };
204 
205   InterceptorList() = default;
206   InterceptorList(const InterceptorList&) = delete;
207   InterceptorList& operator=(const InterceptorList&) = delete;
~InterceptorList()208   ~InterceptorList() { DeleteFactories(); }
209 
Run(absl::optional<T> initial_value)210   RunPromise Run(absl::optional<T> initial_value) {
211     return RunPromise(promise_memory_required_, &first_map_,
212                       std::move(initial_value));
213   }
214 
215   // Append a new map to the end of the chain.
216   template <typename Fn>
AppendMap(Fn fn,DebugLocation from)217   void AppendMap(Fn fn, DebugLocation from) {
218     Append(MakeMapToAdd(std::move(fn), [] {}, from));
219   }
220 
221   // Prepend a new map to the beginning of the chain.
222   template <typename Fn>
PrependMap(Fn fn,DebugLocation from)223   void PrependMap(Fn fn, DebugLocation from) {
224     Prepend(MakeMapToAdd(std::move(fn), [] {}, from));
225   }
226 
227   // Append a new map to the end of the chain, with a cleanup function to be
228   // called at the end of run promise execution.
229   template <typename Fn, typename CleanupFn>
AppendMapWithCleanup(Fn fn,CleanupFn cleanup_fn,DebugLocation from)230   void AppendMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
231     Append(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from));
232   }
233 
234   // Prepend a new map to the beginning of the chain, with a cleanup function to
235   // be called at the end of run promise execution.
236   template <typename Fn, typename CleanupFn>
PrependMapWithCleanup(Fn fn,CleanupFn cleanup_fn,DebugLocation from)237   void PrependMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
238     Prepend(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from));
239   }
240 
241  protected:
242   // Clear the interceptor list
ResetInterceptorList()243   void ResetInterceptorList() {
244     DeleteFactories();
245     first_map_ = nullptr;
246     last_map_ = nullptr;
247     promise_memory_required_ = 0;
248   }
249 
250  private:
251   template <typename Fn, typename CleanupFn>
252   class MapImpl final : public Map {
253    public:
254     using PromiseFactory = promise_detail::RepeatedPromiseFactory<T, Fn>;
255     using Promise = typename PromiseFactory::Promise;
256 
MapImpl(Fn fn,CleanupFn cleanup_fn,DebugLocation from)257     explicit MapImpl(Fn fn, CleanupFn cleanup_fn, DebugLocation from)
258         : Map(from), fn_(std::move(fn)), cleanup_fn_(std::move(cleanup_fn)) {}
~MapImpl()259     ~MapImpl() override { cleanup_fn_(); }
MakePromise(T x,void * memory)260     void MakePromise(T x, void* memory) override {
261       new (memory) Promise(fn_.Make(std::move(x)));
262     }
Destroy(void * memory)263     void Destroy(void* memory) override {
264       static_cast<Promise*>(memory)->~Promise();
265     }
PollOnce(void * memory)266     Poll<absl::optional<T>> PollOnce(void* memory) override {
267       return poll_cast<absl::optional<T>>((*static_cast<Promise*>(memory))());
268     }
269 
270    private:
271     GPR_NO_UNIQUE_ADDRESS PromiseFactory fn_;
272     GPR_NO_UNIQUE_ADDRESS CleanupFn cleanup_fn_;
273   };
274 
275   template <typename Fn, typename CleanupFn>
MakeMapToAdd(Fn fn,CleanupFn cleanup_fn,DebugLocation from)276   Map* MakeMapToAdd(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
277     using FactoryType = MapImpl<Fn, CleanupFn>;
278     promise_memory_required_ = std::max(promise_memory_required_,
279                                         sizeof(typename FactoryType::Promise));
280     return GetContext<Arena>()->New<FactoryType>(std::move(fn),
281                                                  std::move(cleanup_fn), from);
282   }
283 
Append(Map * f)284   void Append(Map* f) {
285     if (first_map_ == nullptr) {
286       first_map_ = f;
287       last_map_ = f;
288     } else {
289       last_map_->SetNext(f);
290       last_map_ = f;
291     }
292   }
293 
Prepend(Map * f)294   void Prepend(Map* f) {
295     if (first_map_ == nullptr) {
296       first_map_ = f;
297       last_map_ = f;
298     } else {
299       f->SetNext(first_map_);
300       first_map_ = f;
301     }
302   }
303 
DeleteFactories()304   void DeleteFactories() {
305     for (auto* f = first_map_; f != nullptr;) {
306       auto* next = f->next();
307       f->~Map();
308       f = next;
309     }
310   }
311 
312   // The first map in the chain.
313   Map* first_map_ = nullptr;
314   // The last map in the chain.
315   Map* last_map_ = nullptr;
316   // The amount of memory required to store the largest promise in the chain.
317   size_t promise_memory_required_ = 0;
318 };
319 
320 }  // namespace grpc_core
321 
322 #endif  // GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
323