1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H 17 18 #include <grpc/support/port_platform.h> 19 #include <stddef.h> 20 21 #include <algorithm> 22 #include <new> 23 #include <string> 24 #include <utility> 25 26 #include "absl/log/check.h" 27 #include "absl/log/log.h" 28 #include "absl/strings/str_cat.h" 29 #include "absl/strings/str_format.h" 30 #include "absl/types/optional.h" 31 #include "src/core/lib/promise/context.h" 32 #include "src/core/lib/promise/detail/promise_factory.h" 33 #include "src/core/lib/promise/poll.h" 34 #include "src/core/lib/resource_quota/arena.h" 35 #include "src/core/util/construct_destruct.h" 36 #include "src/core/util/debug_location.h" 37 38 namespace grpc_core { 39 40 // Tracks a list of maps of T -> optional<T> via promises. 41 // When Run, runs each transformation in order, and resolves to the ulimate 42 // result. 43 // If a map resolves to nullopt, the chain is terminated and the result is 44 // nullopt. 45 // Maps can also have synchronous cleanup functions, which are guaranteed to be 46 // called at the termination of each run through the chain. 47 template <typename T> 48 class InterceptorList { 49 private: 50 // A map of T -> T via promises. 51 class Map { 52 public: Map(DebugLocation from)53 explicit Map(DebugLocation from) : from_(from) {} 54 // Construct a promise to transform x into some other value at memory. 55 virtual void MakePromise(T x, void* memory) = 0; 56 // Destroy a promise constructed at memory. 57 virtual void Destroy(void* memory) = 0; 58 // Poll a promise constructed at memory. 59 // Resolves to an optional<T> -- if nullopt it means terminate the chain and 60 // resolve. 61 virtual Poll<absl::optional<T>> PollOnce(void* memory) = 0; 62 virtual ~Map() = default; 63 64 // Update the next pointer stored with this map. 65 // This is only valid to call once, and only before the map is used. SetNext(Map * next)66 void SetNext(Map* next) { 67 DCHECK_EQ(next_, nullptr); 68 next_ = next; 69 } 70 71 // Access the creation location for this map (for debug tracing). from()72 DebugLocation from() const { return from_; } 73 74 // Access the next map in the chain (or nullptr if this is the last map). next()75 Map* next() const { return next_; } 76 77 private: 78 GPR_NO_UNIQUE_ADDRESS const DebugLocation from_; 79 Map* next_ = nullptr; 80 }; 81 82 public: 83 // The result of Run: a promise that will execute the entire chain. 84 class RunPromise { 85 public: RunPromise(size_t memory_required,Map ** factory,absl::optional<T> value)86 RunPromise(size_t memory_required, Map** factory, absl::optional<T> value) { 87 if (!value.has_value() || *factory == nullptr) { 88 GRPC_TRACE_VLOG(promise_primitives, 2) 89 << "InterceptorList::RunPromise[" << this << "]: create immediate"; 90 is_immediately_resolved_ = true; 91 Construct(&result_, std::move(value)); 92 } else { 93 is_immediately_resolved_ = false; 94 Construct(&async_resolution_, memory_required); 95 (*factory)->MakePromise(std::move(*value), 96 async_resolution_.space.get()); 97 async_resolution_.current_factory = *factory; 98 async_resolution_.first_factory = factory; 99 GRPC_TRACE_VLOG(promise_primitives, 2) 100 << "InterceptorList::RunPromise[" << this 101 << "]: create async; mem=" << async_resolution_.space.get(); 102 } 103 } 104 ~RunPromise()105 ~RunPromise() { 106 GRPC_TRACE_VLOG(promise_primitives, 2) 107 << "InterceptorList::RunPromise[" << this << "]: destroy"; 108 if (is_immediately_resolved_) { 109 Destruct(&result_); 110 } else { 111 if (async_resolution_.current_factory != nullptr) { 112 async_resolution_.current_factory->Destroy( 113 async_resolution_.space.get()); 114 } 115 Destruct(&async_resolution_); 116 } 117 } 118 119 RunPromise(const RunPromise&) = delete; 120 RunPromise& operator=(const RunPromise&) = delete; 121 RunPromise(RunPromise && other)122 RunPromise(RunPromise&& other) noexcept 123 : is_immediately_resolved_(other.is_immediately_resolved_) { 124 GRPC_TRACE_VLOG(promise_primitives, 2) 125 << "InterceptorList::RunPromise[" << this << "]: move from " 126 << &other; 127 if (is_immediately_resolved_) { 128 Construct(&result_, std::move(other.result_)); 129 } else { 130 Construct(&async_resolution_, std::move(other.async_resolution_)); 131 } 132 } 133 134 RunPromise& operator=(RunPromise&& other) noexcept = delete; 135 operator()136 Poll<absl::optional<T>> operator()() { 137 GRPC_TRACE_VLOG(promise_primitives, 2) 138 << "InterceptorList::RunPromise[" << this << "]: " << DebugString(); 139 if (is_immediately_resolved_) return std::move(result_); 140 while (true) { 141 if (*async_resolution_.first_factory == nullptr) { 142 // Cancelled whilst polling 143 return absl::nullopt; 144 } 145 auto r = async_resolution_.current_factory->PollOnce( 146 async_resolution_.space.get()); 147 if (auto* p = r.value_if_ready()) { 148 async_resolution_.current_factory->Destroy( 149 async_resolution_.space.get()); 150 async_resolution_.current_factory = 151 async_resolution_.current_factory->next(); 152 if (!p->has_value()) async_resolution_.current_factory = nullptr; 153 GRPC_TRACE_VLOG(promise_primitives, 2) 154 << "InterceptorList::RunPromise[" << this 155 << "]: " << DebugString(); 156 if (async_resolution_.current_factory == nullptr) { 157 return std::move(*p); 158 } 159 async_resolution_.current_factory->MakePromise( 160 std::move(**p), async_resolution_.space.get()); 161 continue; 162 } 163 return Pending{}; 164 } 165 } 166 167 private: DebugString()168 std::string DebugString() const { 169 if (is_immediately_resolved_) { 170 return absl::StrFormat("Result:has_value:%d", result_.has_value()); 171 } else { 172 return absl::StrCat( 173 "Running:", 174 async_resolution_.current_factory == nullptr 175 ? "END" 176 : ([p = async_resolution_.current_factory->from()]() { 177 return absl::StrCat(p.file(), ":", p.line()); 178 })() 179 .c_str()); 180 } 181 } 182 struct AsyncResolution { AsyncResolutionAsyncResolution183 explicit AsyncResolution(size_t max_size) 184 : space(GetContext<Arena>()->MakePooledArray<char>(max_size)) {} 185 AsyncResolution(const AsyncResolution&) = delete; 186 AsyncResolution& operator=(const AsyncResolution&) = delete; AsyncResolutionAsyncResolution187 AsyncResolution(AsyncResolution&& other) noexcept 188 : current_factory(std::exchange(other.current_factory, nullptr)), 189 first_factory(std::exchange(other.first_factory, nullptr)), 190 space(std::move(other.space)) {} 191 Map* current_factory; 192 Map** first_factory; 193 Arena::PoolPtr<char[]> space; 194 }; 195 union { 196 AsyncResolution async_resolution_; 197 absl::optional<T> result_; 198 }; 199 // If true, the result_ union is valid, otherwise async_resolution_ is. 200 // Indicates whether the promise resolved immediately at construction or if 201 // additional steps were needed. 202 bool is_immediately_resolved_; 203 }; 204 205 InterceptorList() = default; 206 InterceptorList(const InterceptorList&) = delete; 207 InterceptorList& operator=(const InterceptorList&) = delete; ~InterceptorList()208 ~InterceptorList() { DeleteFactories(); } 209 Run(absl::optional<T> initial_value)210 RunPromise Run(absl::optional<T> initial_value) { 211 return RunPromise(promise_memory_required_, &first_map_, 212 std::move(initial_value)); 213 } 214 215 // Append a new map to the end of the chain. 216 template <typename Fn> AppendMap(Fn fn,DebugLocation from)217 void AppendMap(Fn fn, DebugLocation from) { 218 Append(MakeMapToAdd(std::move(fn), [] {}, from)); 219 } 220 221 // Prepend a new map to the beginning of the chain. 222 template <typename Fn> PrependMap(Fn fn,DebugLocation from)223 void PrependMap(Fn fn, DebugLocation from) { 224 Prepend(MakeMapToAdd(std::move(fn), [] {}, from)); 225 } 226 227 // Append a new map to the end of the chain, with a cleanup function to be 228 // called at the end of run promise execution. 229 template <typename Fn, typename CleanupFn> AppendMapWithCleanup(Fn fn,CleanupFn cleanup_fn,DebugLocation from)230 void AppendMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { 231 Append(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from)); 232 } 233 234 // Prepend a new map to the beginning of the chain, with a cleanup function to 235 // be called at the end of run promise execution. 236 template <typename Fn, typename CleanupFn> PrependMapWithCleanup(Fn fn,CleanupFn cleanup_fn,DebugLocation from)237 void PrependMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { 238 Prepend(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from)); 239 } 240 241 protected: 242 // Clear the interceptor list ResetInterceptorList()243 void ResetInterceptorList() { 244 DeleteFactories(); 245 first_map_ = nullptr; 246 last_map_ = nullptr; 247 promise_memory_required_ = 0; 248 } 249 250 private: 251 template <typename Fn, typename CleanupFn> 252 class MapImpl final : public Map { 253 public: 254 using PromiseFactory = promise_detail::RepeatedPromiseFactory<T, Fn>; 255 using Promise = typename PromiseFactory::Promise; 256 MapImpl(Fn fn,CleanupFn cleanup_fn,DebugLocation from)257 explicit MapImpl(Fn fn, CleanupFn cleanup_fn, DebugLocation from) 258 : Map(from), fn_(std::move(fn)), cleanup_fn_(std::move(cleanup_fn)) {} ~MapImpl()259 ~MapImpl() override { cleanup_fn_(); } MakePromise(T x,void * memory)260 void MakePromise(T x, void* memory) override { 261 new (memory) Promise(fn_.Make(std::move(x))); 262 } Destroy(void * memory)263 void Destroy(void* memory) override { 264 static_cast<Promise*>(memory)->~Promise(); 265 } PollOnce(void * memory)266 Poll<absl::optional<T>> PollOnce(void* memory) override { 267 return poll_cast<absl::optional<T>>((*static_cast<Promise*>(memory))()); 268 } 269 270 private: 271 GPR_NO_UNIQUE_ADDRESS PromiseFactory fn_; 272 GPR_NO_UNIQUE_ADDRESS CleanupFn cleanup_fn_; 273 }; 274 275 template <typename Fn, typename CleanupFn> MakeMapToAdd(Fn fn,CleanupFn cleanup_fn,DebugLocation from)276 Map* MakeMapToAdd(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { 277 using FactoryType = MapImpl<Fn, CleanupFn>; 278 promise_memory_required_ = std::max(promise_memory_required_, 279 sizeof(typename FactoryType::Promise)); 280 return GetContext<Arena>()->New<FactoryType>(std::move(fn), 281 std::move(cleanup_fn), from); 282 } 283 Append(Map * f)284 void Append(Map* f) { 285 if (first_map_ == nullptr) { 286 first_map_ = f; 287 last_map_ = f; 288 } else { 289 last_map_->SetNext(f); 290 last_map_ = f; 291 } 292 } 293 Prepend(Map * f)294 void Prepend(Map* f) { 295 if (first_map_ == nullptr) { 296 first_map_ = f; 297 last_map_ = f; 298 } else { 299 f->SetNext(first_map_); 300 first_map_ = f; 301 } 302 } 303 DeleteFactories()304 void DeleteFactories() { 305 for (auto* f = first_map_; f != nullptr;) { 306 auto* next = f->next(); 307 f->~Map(); 308 f = next; 309 } 310 } 311 312 // The first map in the chain. 313 Map* first_map_ = nullptr; 314 // The last map in the chain. 315 Map* last_map_ = nullptr; 316 // The amount of memory required to store the largest promise in the chain. 317 size_t promise_memory_required_ = 0; 318 }; 319 320 } // namespace grpc_core 321 322 #endif // GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H 323