1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H 16 #define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <cstdint> 21 #include <limits> 22 #include <memory> 23 #include <ostream> 24 #include <type_traits> 25 26 #include "absl/log/check.h" 27 #include "src/core/lib/promise/for_each.h" 28 #include "src/core/lib/promise/if.h" 29 #include "src/core/lib/promise/latch.h" 30 #include "src/core/lib/promise/map.h" 31 #include "src/core/lib/promise/promise.h" 32 #include "src/core/lib/promise/seq.h" 33 #include "src/core/lib/promise/status_flag.h" 34 #include "src/core/lib/promise/try_seq.h" 35 #include "src/core/lib/transport/call_final_info.h" 36 #include "src/core/lib/transport/call_state.h" 37 #include "src/core/lib/transport/message.h" 38 #include "src/core/lib/transport/metadata.h" 39 #include "src/core/util/dump_args.h" 40 #include "src/core/util/ref_counted.h" 41 #include "src/core/util/ref_counted_ptr.h" 42 43 // CallFilters tracks a list of filters that are attached to a call. 44 // At a high level, a filter (for the purposes of this module) is a class 45 // that has a Call member class, and a set of methods that are called 46 // for each major event in the lifetime of a call. 47 // 48 // The Call member class must have the following members: 49 // - OnClientInitialMetadata - $VALUE_TYPE = ClientMetadata 50 // - OnServerInitialMetadata - $VALUE_TYPE = ServerMetadata 51 // - OnServerToClientMessage - $VALUE_TYPE = Message 52 // - OnClientToServerMessage - $VALUE_TYPE = Message 53 // - OnClientToServerHalfClose - no value 54 // - OnServerTrailingMetadata - $VALUE_TYPE = ServerMetadata 55 // - OnFinalize - special, see below 56 // These members define an interception point for a particular event in 57 // the call lifecycle. 58 // 59 // The type of these members matters, and is selectable by the class 60 // author. For $INTERCEPTOR_NAME in the above list: 61 // - static const NoInterceptor $INTERCEPTOR_NAME: 62 // defines that this filter does not intercept this event. 63 // there is zero runtime cost added to handling that event by this filter. 64 // - void $INTERCEPTOR_NAME($VALUE_TYPE&): 65 // the filter intercepts this event, and can modify the value. 66 // it never fails. 67 // - absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&): 68 // the filter intercepts this event, and can modify the value. 69 // it can fail, in which case the call will be aborted. 70 // - ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&) 71 // the filter intercepts this event, and can modify the value. 72 // the filter can return nullptr for success, or a metadata handle for 73 // failure (in which case the call will be aborted). 74 // useful for cases where the exact metadata returned needs to be customized. 75 // - void $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*): 76 // the filter intercepts this event, and can modify the value. 77 // it can access the channel via the second argument. 78 // it never fails. 79 // - absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*): 80 // the filter intercepts this event, and can modify the value. 81 // it can access the channel via the second argument. 82 // it can fail, in which case the call will be aborted. 83 // - ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 84 // the filter intercepts this event, and can modify the value. 85 // it can access the channel via the second argument. 86 // the filter can return nullptr for success, or a metadata handle for 87 // failure (in which case the call will be aborted). 88 // useful for cases where the exact metadata returned needs to be customized. 89 // It's also acceptable to return a promise that resolves to the 90 // relevant return type listed above. 91 // 92 // Finally, OnFinalize is added to intecept call finalization. 93 // It must have one of the signatures: 94 // - static const NoInterceptor OnFinalize: 95 // the filter does not intercept call finalization. 96 // - void OnFinalize(const grpc_call_final_info*): 97 // the filter intercepts call finalization. 98 // - void OnFinalize(const grpc_call_final_info*, FilterType*): 99 // the filter intercepts call finalization. 100 // 101 // The constructor of the Call object can either take a pointer to the channel 102 // object, or not take any arguments. 103 // 104 // *THIS MODULE* holds no opinion on what members the channel part of the 105 // filter should or should not have, but does require that it have a stable 106 // pointer for the lifetime of a call (ownership is expected to happen 107 // elsewhere). 108 109 namespace grpc_core { 110 111 // Tag type to indicate no interception. 112 // This is used to indicate that a filter does not intercept a particular 113 // event. 114 // In C++14 we declare these as (for example): 115 // static const NoInterceptor OnClientInitialMetadata; 116 // and out-of-line provide the definition: 117 // const MyFilter::Call::NoInterceptor 118 // MyFilter::Call::OnClientInitialMetadata; 119 // In C++17 and later we can use inline variables instead: 120 // inline static const NoInterceptor OnClientInitialMetadata; 121 struct NoInterceptor {}; 122 123 namespace filters_detail { 124 125 // Flow control across pipe stages. 126 // This ends up being exceedingly subtle - essentially we need to ensure that 127 // across a series of pipes we have no more than one outstanding message at a 128 // time - but those pipes are for the most part independent. 129 // How we achieve this is that this NextMessage object holds both the message 130 // and a completion token - the last owning NextMessage instance will call 131 // the on_progress method on the referenced CallState - and at that point that 132 // CallState will allow the next message to be sent through it. 133 // Next, the ForEach promise combiner explicitly holds onto the wrapper object 134 // owning the result (this object) and extracts the message from it, but doesn't 135 // dispose that instance until the action promise for the ForEach iteration 136 // completes, ensuring most callers need do nothing special to have the 137 // flow control work correctly. 138 template <void (CallState::*on_progress)()> 139 class NextMessage { 140 public: ~NextMessage()141 ~NextMessage() { 142 if (message_ != end_of_stream() && message_ != error() && 143 message_ != taken()) { 144 delete message_; 145 } 146 if (call_state_ != nullptr) { 147 (call_state_->*on_progress)(); 148 } 149 } 150 151 NextMessage() = default; NextMessage(Failure)152 explicit NextMessage(Failure) : message_(error()), call_state_(nullptr) {} NextMessage(MessageHandle message,CallState * call_state)153 NextMessage(MessageHandle message, CallState* call_state) { 154 DCHECK_NE(call_state, nullptr); 155 DCHECK_NE(message.get(), nullptr); 156 DCHECK(message.get_deleter().has_freelist()); 157 message_ = message.release(); 158 call_state_ = call_state; 159 } 160 NextMessage(const NextMessage& other) = delete; 161 NextMessage& operator=(const NextMessage& other) = delete; NextMessage(NextMessage && other)162 NextMessage(NextMessage&& other) noexcept 163 : message_(std::exchange(other.message_, taken())), 164 call_state_(std::exchange(other.call_state_, nullptr)) {} 165 NextMessage& operator=(NextMessage&& other) noexcept { 166 if (message_ != end_of_stream() && message_ != error() && 167 message_ != taken()) { 168 delete message_; 169 } 170 if (call_state_ != nullptr) { 171 (call_state_->*on_progress)(); 172 } 173 message_ = std::exchange(other.message_, taken()); 174 call_state_ = std::exchange(other.call_state_, nullptr); 175 return *this; 176 } 177 ok()178 bool ok() const { 179 DCHECK_NE(message_, taken()); 180 return message_ != error(); 181 } has_value()182 bool has_value() const { 183 DCHECK_NE(message_, taken()); 184 DCHECK(ok()); 185 return message_ != end_of_stream(); 186 } status()187 StatusFlag status() const { return StatusFlag(ok()); } value()188 Message& value() { 189 DCHECK_NE(message_, taken()); 190 DCHECK(ok()); 191 DCHECK(has_value()); 192 return *message_; 193 } TakeValue()194 MessageHandle TakeValue() { 195 DCHECK_NE(message_, taken()); 196 DCHECK(ok()); 197 DCHECK(has_value()); 198 return MessageHandle(std::exchange(message_, taken()), 199 Arena::PooledDeleter()); 200 } progressed()201 bool progressed() const { return call_state_ == nullptr; } Progress()202 void Progress() { 203 DCHECK(!progressed()); 204 (call_state_->*on_progress)(); 205 call_state_ = nullptr; 206 } 207 208 private: end_of_stream()209 static Message* end_of_stream() { return nullptr; } error()210 static Message* error() { return reinterpret_cast<Message*>(1); } taken()211 static Message* taken() { return reinterpret_cast<Message*>(2); } 212 Message* message_ = end_of_stream(); 213 CallState* call_state_ = nullptr; 214 }; 215 216 template <typename T> 217 struct ArgumentMustBeNextMessage; 218 template <void (CallState::*on_progress)()> 219 struct ArgumentMustBeNextMessage<NextMessage<on_progress>> { 220 static constexpr bool value() { return true; } 221 }; 222 223 inline void* Offset(void* base, size_t amt) { 224 return static_cast<char*>(base) + amt; 225 } 226 227 // One call filter constructor 228 // Contains enough information to allocate and initialize the 229 // call data for one filter. 230 struct FilterConstructor { 231 // Pointer to corresponding channel data for this filter 232 void* channel_data; 233 // Offset of the call data for this filter within the call data memory 234 // allocation 235 size_t call_offset; 236 // Initialize the call data for this filter 237 void (*call_init)(void* call_data, void* channel_data); 238 }; 239 240 // One call filter destructor 241 struct FilterDestructor { 242 // Offset of the call data for this filter within the call data memory 243 // allocation 244 size_t call_offset; 245 // Destroy the call data for this filter 246 void (*call_destroy)(void* call_data); 247 }; 248 249 template <typename FilterType, typename = void> 250 struct CallConstructor { 251 static void Construct(void* call_data, FilterType*) { 252 new (call_data) typename FilterType::Call(); 253 } 254 }; 255 256 template <typename FilterType> 257 struct CallConstructor<FilterType, 258 absl::void_t<decltype(typename FilterType::Call( 259 static_cast<FilterType*>(nullptr)))>> { 260 static void Construct(void* call_data, FilterType* channel) { 261 new (call_data) typename FilterType::Call(channel); 262 } 263 }; 264 265 // Result of a filter operation 266 // Can be either ok (if ok is non-null) or an error. 267 // Only one pointer can be set. 268 template <typename T> 269 struct ResultOr { 270 ResultOr(T ok, ServerMetadataHandle error) 271 : ok(std::move(ok)), error(std::move(error)) { 272 CHECK((this->ok == nullptr) ^ (this->error == nullptr)); 273 } 274 T ok; 275 ServerMetadataHandle error; 276 }; 277 278 // One filter operation metadata 279 // Given a value of type T, produces a promise of type ResultOr<T>. 280 template <typename T> 281 struct Operator { 282 using Arg = T; 283 // Pointer to corresponding channel data for this filter 284 void* channel_data; 285 // Offset of the call data for this filter within the call data memory 286 size_t call_offset; 287 // Initialize the promise data for this filter, and poll once. 288 // Return the result of the poll. 289 // If the promise finishes, also destroy the promise data! 290 Poll<ResultOr<T>> (*promise_init)(void* promise_data, void* call_data, 291 void* channel_data, T value); 292 // Poll the promise data for this filter. 293 // If the promise finishes, also destroy the promise data! 294 // Note that if the promise always finishes on the first poll, then supplying 295 // this method is unnecessary (as it will never be called). 296 Poll<ResultOr<T>> (*poll)(void* promise_data); 297 // Destroy the promise data for this filter for an in-progress operation 298 // before the promise finishes. 299 // Note that if the promise always finishes on the first poll, then supplying 300 // this method is unnecessary (as it will never be called). 301 void (*early_destroy)(void* promise_data); 302 }; 303 304 struct HalfCloseOperator { 305 // Pointer to corresponding channel data for this filter 306 void* channel_data; 307 // Offset of the call data for this filter within the call data memory 308 size_t call_offset; 309 void (*half_close)(void* call_data, void* channel_data); 310 }; 311 312 struct ServerTrailingMetadataOperator { 313 // Pointer to corresponding channel data for this filter 314 void* channel_data; 315 // Offset of the call data for this filter within the call data memory 316 size_t call_offset; 317 ServerMetadataHandle (*server_trailing_metadata)( 318 void* call_data, void* channel_data, ServerMetadataHandle metadata); 319 }; 320 321 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void RunHalfClose( 322 absl::Span<const HalfCloseOperator> ops, void* call_data) { 323 for (const auto& op : ops) { 324 op.half_close(Offset(call_data, op.call_offset), op.channel_data); 325 } 326 } 327 328 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline ServerMetadataHandle 329 RunServerTrailingMetadata(absl::Span<const ServerTrailingMetadataOperator> ops, 330 void* call_data, ServerMetadataHandle md) { 331 for (auto& op : ops) { 332 md = op.server_trailing_metadata(Offset(call_data, op.call_offset), 333 op.channel_data, std::move(md)); 334 } 335 return md; 336 } 337 338 // One call finalizer 339 struct Finalizer { 340 void* channel_data; 341 size_t call_offset; 342 void (*final)(void* call_data, void* channel_data, 343 const grpc_call_final_info* final_info); 344 }; 345 346 // A layout of operations for a given filter stack 347 // This includes which operations, how much memory is required, what alignment. 348 template <typename T> 349 struct Layout { 350 size_t promise_size = 0; 351 size_t promise_alignment = 0; 352 std::vector<Operator<T>> ops; 353 354 void Add(size_t filter_promise_size, size_t filter_promise_alignment, 355 Operator<T> op) { 356 promise_size = std::max(promise_size, filter_promise_size); 357 promise_alignment = std::max(promise_alignment, filter_promise_alignment); 358 ops.push_back(op); 359 } 360 361 void Reverse() { absl::c_reverse(ops); } 362 }; 363 364 // AddOp and friends 365 // These are helpers to wrap a member function on a class into an operation 366 // and attach it to a layout. 367 // There's a generic wrapper function `AddOp` for each of fallible and 368 // infallible operations. 369 // There are then specializations of AddOpImpl for each kind of member function 370 // an operation could have. 371 // Each specialization has an `Add` member function for the kinds of operations 372 // it supports: some only support fallible, some only support infallible, some 373 // support both. 374 375 template <typename FilterType, typename T, typename FunctionImpl, 376 FunctionImpl impl, typename SfinaeVoid = void> 377 struct AddOpImpl; 378 379 template <typename FunctionImpl, FunctionImpl impl, typename FilterType, 380 typename T> 381 void AddOp(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 382 AddOpImpl<FilterType, T, FunctionImpl, impl>::Add(channel_data, call_offset, 383 to); 384 } 385 386 template <typename FilterType> 387 void AddHalfClose(FilterType* channel_data, size_t call_offset, 388 void (FilterType::Call::*)(), 389 std::vector<HalfCloseOperator>& to) { 390 to.push_back( 391 HalfCloseOperator{channel_data, call_offset, [](void* call_data, void*) { 392 static_cast<typename FilterType::Call*>(call_data) 393 ->OnClientToServerHalfClose(); 394 }}); 395 } 396 397 template <typename FilterType> 398 void AddHalfClose(FilterType* channel_data, size_t call_offset, 399 void (FilterType::Call::*)(FilterType*), 400 std::vector<HalfCloseOperator>& to) { 401 to.push_back(HalfCloseOperator{ 402 channel_data, call_offset, [](void* call_data, void* channel_data) { 403 static_cast<typename FilterType::Call*>(call_data) 404 ->OnClientToServerHalfClose(static_cast<FilterType*>(channel_data)); 405 }}); 406 } 407 408 template <typename FilterType> 409 void AddHalfClose(FilterType*, size_t, const NoInterceptor*, 410 std::vector<HalfCloseOperator>&) {} 411 412 template <typename FilterType> 413 void AddServerTrailingMetadata( 414 FilterType* channel_data, size_t call_offset, 415 void (FilterType::Call::*)(ServerMetadata&), 416 std::vector<ServerTrailingMetadataOperator>& to) { 417 to.push_back(ServerTrailingMetadataOperator{ 418 channel_data, call_offset, 419 [](void* call_data, void*, ServerMetadataHandle metadata) { 420 static_cast<typename FilterType::Call*>(call_data) 421 ->OnServerTrailingMetadata(*metadata); 422 return metadata; 423 }}); 424 } 425 426 template <typename FilterType> 427 void AddServerTrailingMetadata( 428 FilterType* channel_data, size_t call_offset, 429 void (FilterType::Call::*)(ServerMetadata&, FilterType*), 430 std::vector<ServerTrailingMetadataOperator>& to) { 431 to.push_back(ServerTrailingMetadataOperator{ 432 channel_data, call_offset, 433 [](void* call_data, void* channel_data, ServerMetadataHandle metadata) { 434 static_cast<typename FilterType::Call*>(call_data) 435 ->OnServerTrailingMetadata(*metadata, 436 static_cast<FilterType*>(channel_data)); 437 return metadata; 438 }}); 439 } 440 441 template <typename FilterType> 442 void AddServerTrailingMetadata( 443 FilterType* channel_data, size_t call_offset, 444 absl::Status (FilterType::Call::*)(ServerMetadata&), 445 std::vector<ServerTrailingMetadataOperator>& to) { 446 to.push_back(ServerTrailingMetadataOperator{ 447 channel_data, call_offset, 448 [](void* call_data, void*, ServerMetadataHandle metadata) { 449 auto r = static_cast<typename FilterType::Call*>(call_data) 450 ->OnServerTrailingMetadata(*metadata); 451 if (r.ok()) return metadata; 452 return CancelledServerMetadataFromStatus(r); 453 }}); 454 } 455 456 template <typename FilterType> 457 void AddServerTrailingMetadata( 458 FilterType* channel_data, size_t call_offset, 459 ServerMetadataHandle (FilterType::Call::*)(ServerMetadataHandle), 460 std::vector<ServerTrailingMetadataOperator>& to) { 461 to.push_back(ServerTrailingMetadataOperator{ 462 channel_data, call_offset, 463 [](void* call_data, void*, ServerMetadataHandle metadata) { 464 return static_cast<typename FilterType::Call*>(call_data) 465 ->OnServerTrailingMetadata(std::move(metadata)); 466 }}); 467 } 468 469 template <typename FilterType> 470 void AddServerTrailingMetadata(FilterType*, size_t, const NoInterceptor*, 471 std::vector<ServerTrailingMetadataOperator>&) {} 472 473 // const NoInterceptor $EVENT 474 // These do nothing, and specifically DO NOT add an operation to the layout. 475 // Supported for fallible & infallible operations. 476 template <typename FilterType, typename T, const NoInterceptor* which> 477 struct AddOpImpl<FilterType, T, const NoInterceptor*, which> { 478 static void Add(FilterType*, size_t, Layout<T>&) {} 479 }; 480 481 // void $INTERCEPTOR_NAME($VALUE_TYPE&) 482 template <typename FilterType, typename T, 483 void (FilterType::Call::*impl)(typename T::element_type&)> 484 struct AddOpImpl<FilterType, T, 485 void (FilterType::Call::*)(typename T::element_type&), impl> { 486 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 487 to.Add(0, 0, 488 Operator<T>{ 489 channel_data, 490 call_offset, 491 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 492 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 493 *value); 494 return ResultOr<T>{std::move(value), nullptr}; 495 }, 496 nullptr, 497 nullptr, 498 }); 499 } 500 }; 501 502 // void $INTERCEPTOR_NAME(const $VALUE_TYPE&) 503 template <typename FilterType, typename T, 504 void (FilterType::Call::*impl)(const typename T::element_type&)> 505 struct AddOpImpl<FilterType, T, 506 void (FilterType::Call::*)(const typename T::element_type&), 507 impl> { 508 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 509 to.Add(0, 0, 510 Operator<T>{ 511 channel_data, 512 call_offset, 513 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 514 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 515 *value); 516 return ResultOr<T>{std::move(value), nullptr}; 517 }, 518 nullptr, 519 nullptr, 520 }); 521 } 522 }; 523 524 // void $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 525 template <typename FilterType, typename T, 526 void (FilterType::Call::*impl)(typename T::element_type&, 527 FilterType*)> 528 struct AddOpImpl< 529 FilterType, T, 530 void (FilterType::Call::*)(typename T::element_type&, FilterType*), impl> { 531 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 532 to.Add(0, 0, 533 Operator<T>{ 534 channel_data, 535 call_offset, 536 [](void*, void* call_data, void* channel_data, 537 T value) -> Poll<ResultOr<T>> { 538 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 539 *value, static_cast<FilterType*>(channel_data)); 540 return ResultOr<T>{std::move(value), nullptr}; 541 }, 542 nullptr, 543 nullptr, 544 }); 545 } 546 }; 547 548 // $VALUE_HANDLE $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*) 549 template <typename FilterType, typename T, 550 T (FilterType::Call::*impl)(T, FilterType*)> 551 struct AddOpImpl<FilterType, T, T (FilterType::Call::*)(T, FilterType*), impl> { 552 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 553 to.Add( 554 0, 0, 555 Operator<T>{ 556 channel_data, 557 call_offset, 558 [](void*, void* call_data, void* channel_data, 559 T value) -> Poll<ResultOr<T>> { 560 return ResultOr<T>{ 561 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 562 std::move(value), static_cast<FilterType*>(channel_data)), 563 nullptr}; 564 }, 565 nullptr, 566 nullptr, 567 }); 568 } 569 }; 570 571 // absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&) 572 template <typename FilterType, typename T, 573 absl::Status (FilterType::Call::*impl)(typename T::element_type&)> 574 struct AddOpImpl<FilterType, T, 575 absl::Status (FilterType::Call::*)(typename T::element_type&), 576 impl> { 577 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 578 to.Add( 579 0, 0, 580 Operator<T>{ 581 channel_data, 582 call_offset, 583 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 584 auto r = 585 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 586 *value); 587 if (r.ok()) return ResultOr<T>{std::move(value), nullptr}; 588 return ResultOr<T>{ 589 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 590 }, 591 nullptr, 592 nullptr, 593 }); 594 } 595 }; 596 597 // absl::Status $INTERCEPTOR_NAME(const $VALUE_TYPE&) 598 template <typename FilterType, typename T, 599 absl::Status (FilterType::Call::*impl)( 600 const typename T::element_type&)> 601 struct AddOpImpl< 602 FilterType, T, 603 absl::Status (FilterType::Call::*)(const typename T::element_type&), impl> { 604 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 605 to.Add( 606 0, 0, 607 Operator<T>{ 608 channel_data, 609 call_offset, 610 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 611 auto r = 612 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 613 *value); 614 if (r.ok()) return ResultOr<T>{std::move(value), nullptr}; 615 return ResultOr<T>{ 616 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 617 }, 618 nullptr, 619 nullptr, 620 }); 621 } 622 }; 623 624 // absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 625 template <typename FilterType, typename T, 626 absl::Status (FilterType::Call::*impl)(typename T::element_type&, 627 FilterType*)> 628 struct AddOpImpl<FilterType, T, 629 absl::Status (FilterType::Call::*)(typename T::element_type&, 630 FilterType*), 631 impl> { 632 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 633 to.Add( 634 0, 0, 635 Operator<T>{ 636 channel_data, 637 call_offset, 638 [](void*, void* call_data, void* channel_data, 639 T value) -> Poll<ResultOr<T>> { 640 auto r = 641 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 642 *value, static_cast<FilterType*>(channel_data)); 643 if (IsStatusOk(r)) return ResultOr<T>{std::move(value), nullptr}; 644 return ResultOr<T>{ 645 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 646 }, 647 nullptr, 648 nullptr, 649 }); 650 } 651 }; 652 653 // absl::Status $INTERCEPTOR_NAME(const $VALUE_TYPE&, FilterType*) 654 template <typename FilterType, typename T, 655 absl::Status (FilterType::Call::*impl)( 656 const typename T::element_type&, FilterType*)> 657 struct AddOpImpl<FilterType, T, 658 absl::Status (FilterType::Call::*)( 659 const typename T::element_type&, FilterType*), 660 impl> { 661 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 662 to.Add( 663 0, 0, 664 Operator<T>{ 665 channel_data, 666 call_offset, 667 [](void*, void* call_data, void* channel_data, 668 T value) -> Poll<ResultOr<T>> { 669 auto r = 670 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 671 *value, static_cast<FilterType*>(channel_data)); 672 if (IsStatusOk(r)) return ResultOr<T>{std::move(value), nullptr}; 673 return ResultOr<T>{ 674 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 675 }, 676 nullptr, 677 nullptr, 678 }); 679 } 680 }; 681 682 // absl::StatusOr<$VALUE_HANDLE> $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*) 683 template <typename FilterType, typename T, 684 absl::StatusOr<T> (FilterType::Call::*impl)(T, FilterType*)> 685 struct AddOpImpl<FilterType, T, 686 absl::StatusOr<T> (FilterType::Call::*)(T, FilterType*), 687 impl> { 688 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 689 to.Add( 690 0, 0, 691 Operator<T>{ 692 channel_data, 693 call_offset, 694 [](void*, void* call_data, void* channel_data, 695 T value) -> Poll<ResultOr<T>> { 696 auto r = 697 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 698 std::move(value), static_cast<FilterType*>(channel_data)); 699 if (IsStatusOk(r)) return ResultOr<T>{std::move(*r), nullptr}; 700 return ResultOr<T>{ 701 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 702 }, 703 nullptr, 704 nullptr, 705 }); 706 } 707 }; 708 709 // ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&) 710 template <typename FilterType, typename T, 711 ServerMetadataHandle (FilterType::Call::*impl)( 712 typename T::element_type&)> 713 struct AddOpImpl<FilterType, T, 714 ServerMetadataHandle (FilterType::Call::*)( 715 typename T::element_type&), 716 impl> { 717 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 718 to.Add( 719 0, 0, 720 Operator<T>{ 721 channel_data, 722 call_offset, 723 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 724 auto r = 725 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 726 *value); 727 if (r == nullptr) return ResultOr<T>{std::move(value), nullptr}; 728 return ResultOr<T>{ 729 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 730 }, 731 nullptr, 732 nullptr, 733 }); 734 } 735 }; 736 737 // ServerMetadataHandle $INTERCEPTOR_NAME(const $VALUE_TYPE&) 738 template <typename FilterType, typename T, 739 ServerMetadataHandle (FilterType::Call::*impl)( 740 const typename T::element_type&)> 741 struct AddOpImpl<FilterType, T, 742 ServerMetadataHandle (FilterType::Call::*)( 743 const typename T::element_type&), 744 impl> { 745 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 746 to.Add( 747 0, 0, 748 Operator<T>{ 749 channel_data, 750 call_offset, 751 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 752 auto r = 753 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 754 *value); 755 if (r == nullptr) return ResultOr<T>{std::move(value), nullptr}; 756 return ResultOr<T>{ 757 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 758 }, 759 nullptr, 760 nullptr, 761 }); 762 } 763 }; 764 765 // ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 766 template <typename FilterType, typename T, 767 ServerMetadataHandle (FilterType::Call::*impl)( 768 typename T::element_type&, FilterType*)> 769 struct AddOpImpl<FilterType, T, 770 ServerMetadataHandle (FilterType::Call::*)( 771 typename T::element_type&, FilterType*), 772 impl> { 773 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 774 to.Add( 775 0, 0, 776 Operator<T>{ 777 channel_data, 778 call_offset, 779 [](void*, void* call_data, void* channel_data, 780 T value) -> Poll<ResultOr<T>> { 781 auto r = 782 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 783 *value, static_cast<FilterType*>(channel_data)); 784 if (r == nullptr) return ResultOr<T>{std::move(value), nullptr}; 785 return ResultOr<T>{ 786 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 787 }, 788 nullptr, 789 nullptr, 790 }); 791 } 792 }; 793 794 // ServerMetadataHandle $INTERCEPTOR_NAME(const $VALUE_TYPE&, FilterType*) 795 template <typename FilterType, typename T, 796 ServerMetadataHandle (FilterType::Call::*impl)( 797 const typename T::element_type&, FilterType*)> 798 struct AddOpImpl<FilterType, T, 799 ServerMetadataHandle (FilterType::Call::*)( 800 const typename T::element_type&, FilterType*), 801 impl> { 802 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 803 to.Add( 804 0, 0, 805 Operator<T>{ 806 channel_data, 807 call_offset, 808 [](void*, void* call_data, void* channel_data, 809 T value) -> Poll<ResultOr<T>> { 810 auto r = 811 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 812 *value, static_cast<FilterType*>(channel_data)); 813 if (r == nullptr) return ResultOr<T>{std::move(value), nullptr}; 814 return ResultOr<T>{ 815 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 816 }, 817 nullptr, 818 nullptr, 819 }); 820 } 821 }; 822 823 // PROMISE_RETURNING(absl::Status) $INTERCEPTOR_NAME($VALUE_TYPE&) 824 template <typename FilterType, typename T, typename R, 825 R (FilterType::Call::*impl)(typename T::element_type&)> 826 struct AddOpImpl< 827 FilterType, T, R (FilterType::Call::*)(typename T::element_type&), impl, 828 absl::enable_if_t<std::is_same<absl::Status, PromiseResult<R>>::value>> { 829 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 830 class Promise { 831 public: 832 Promise(T value, typename FilterType::Call* call_data, FilterType*) 833 : value_(std::move(value)), impl_((call_data->*impl)(*value_)) {} 834 835 Poll<ResultOr<T>> PollOnce() { 836 auto p = impl_(); 837 auto* r = p.value_if_ready(); 838 if (r == nullptr) return Pending{}; 839 T value = std::move(value_); 840 this->~Promise(); 841 if (r->ok()) { 842 return ResultOr<T>{std::move(value), nullptr}; 843 } 844 return ResultOr<T>{nullptr, CancelledServerMetadataFromStatus(*r)}; 845 } 846 847 private: 848 GPR_NO_UNIQUE_ADDRESS T value_; 849 GPR_NO_UNIQUE_ADDRESS R impl_; 850 }; 851 to.Add(sizeof(Promise), alignof(Promise), 852 Operator<T>{ 853 channel_data, 854 call_offset, 855 [](void* promise_data, void* call_data, void* channel_data, 856 T value) -> Poll<ResultOr<T>> { 857 auto* promise = new (promise_data) 858 Promise(std::move(value), 859 static_cast<typename FilterType::Call*>(call_data), 860 static_cast<FilterType*>(channel_data)); 861 return promise->PollOnce(); 862 }, 863 [](void* promise_data) { 864 return static_cast<Promise*>(promise_data)->PollOnce(); 865 }, 866 [](void* promise_data) { 867 static_cast<Promise*>(promise_data)->~Promise(); 868 }, 869 }); 870 } 871 }; 872 873 // PROMISE_RETURNING(absl::Status) $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 874 template <typename FilterType, typename T, typename R, 875 R (FilterType::Call::*impl)(typename T::element_type&, FilterType*)> 876 struct AddOpImpl< 877 FilterType, T, 878 R (FilterType::Call::*)(typename T::element_type&, FilterType*), impl, 879 absl::enable_if_t<!std::is_same<R, absl::Status>::value && 880 std::is_same<absl::Status, PromiseResult<R>>::value>> { 881 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 882 class Promise { 883 public: 884 Promise(T value, typename FilterType::Call* call_data, 885 FilterType* channel_data) 886 : value_(std::move(value)), 887 impl_((call_data->*impl)(*value_, channel_data)) {} 888 889 Poll<ResultOr<T>> PollOnce() { 890 auto p = impl_(); 891 auto* r = p.value_if_ready(); 892 if (r == nullptr) return Pending{}; 893 T value = std::move(value_); 894 this->~Promise(); 895 if (r->ok()) { 896 return ResultOr<T>{std::move(value), nullptr}; 897 } 898 return ResultOr<T>{nullptr, CancelledServerMetadataFromStatus(*r)}; 899 } 900 901 private: 902 GPR_NO_UNIQUE_ADDRESS T value_; 903 GPR_NO_UNIQUE_ADDRESS R impl_; 904 }; 905 to.Add(sizeof(Promise), alignof(Promise), 906 Operator<T>{ 907 channel_data, 908 call_offset, 909 [](void* promise_data, void* call_data, void* channel_data, 910 T value) -> Poll<ResultOr<T>> { 911 auto* promise = new (promise_data) 912 Promise(std::move(value), 913 static_cast<typename FilterType::Call*>(call_data), 914 static_cast<FilterType*>(channel_data)); 915 return promise->PollOnce(); 916 }, 917 [](void* promise_data) { 918 return static_cast<Promise*>(promise_data)->PollOnce(); 919 }, 920 [](void* promise_data) { 921 static_cast<Promise*>(promise_data)->~Promise(); 922 }, 923 }); 924 } 925 }; 926 927 // PROMISE_RETURNING(absl::StatusOr<$VALUE_HANDLE>) 928 // $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*) 929 template <typename FilterType, typename T, typename R, 930 R (FilterType::Call::*impl)(T, FilterType*)> 931 struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl, 932 absl::enable_if_t<std::is_same<absl::StatusOr<T>, 933 PromiseResult<R>>::value>> { 934 static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) { 935 class Promise { 936 public: 937 Promise(T value, typename FilterType::Call* call_data, 938 FilterType* channel_data) 939 : impl_((call_data->*impl)(std::move(value), channel_data)) {} 940 941 Poll<ResultOr<T>> PollOnce() { 942 auto p = impl_(); 943 auto* r = p.value_if_ready(); 944 if (r == nullptr) return Pending{}; 945 this->~Promise(); 946 if (r->ok()) return ResultOr<T>{std::move(**r), nullptr}; 947 return ResultOr<T>{nullptr, 948 CancelledServerMetadataFromStatus(r->status())}; 949 } 950 951 private: 952 GPR_NO_UNIQUE_ADDRESS R impl_; 953 }; 954 to.Add(sizeof(Promise), alignof(Promise), 955 Operator<T>{ 956 channel_data, 957 call_offset, 958 [](void* promise_data, void* call_data, void* channel_data, 959 T value) -> Poll<ResultOr<T>> { 960 auto* promise = new (promise_data) 961 Promise(std::move(value), 962 static_cast<typename FilterType::Call*>(call_data), 963 static_cast<FilterType*>(channel_data)); 964 return promise->PollOnce(); 965 }, 966 [](void* promise_data) { 967 return static_cast<Promise*>(promise_data)->PollOnce(); 968 }, 969 [](void* promise_data) { 970 static_cast<Promise*>(promise_data)->~Promise(); 971 }, 972 }); 973 } 974 }; 975 976 struct ChannelDataDestructor { 977 void (*destroy)(void* channel_data); 978 void* channel_data; 979 }; 980 981 // StackData contains the main datastructures built up by this module. 982 // It's a complete representation of all the code that needs to be invoked 983 // to execute a call for a given set of filters. 984 // This structure is held at the channel layer and is shared between many 985 // in-flight calls. 986 struct StackData { 987 // Overall size and alignment of the call data for this stack. 988 size_t call_data_alignment = 1; 989 size_t call_data_size = 0; 990 // A complete list of filters for this call, so that we can construct the 991 // call data for each filter. 992 std::vector<FilterConstructor> filter_constructor; 993 std::vector<FilterDestructor> filter_destructor; 994 // For each kind of operation, a layout of the operations for this call. 995 // (there's some duplicate data here, and that's ok: we want to avoid 996 // pointer chasing as much as possible when executing a call) 997 Layout<ClientMetadataHandle> client_initial_metadata; 998 Layout<ServerMetadataHandle> server_initial_metadata; 999 Layout<MessageHandle> client_to_server_messages; 1000 std::vector<HalfCloseOperator> client_to_server_half_close; 1001 Layout<MessageHandle> server_to_client_messages; 1002 std::vector<ServerTrailingMetadataOperator> server_trailing_metadata; 1003 // A list of finalizers for this call. 1004 // We use a bespoke data structure here because finalizers can never be 1005 // asynchronous. 1006 std::vector<Finalizer> finalizers; 1007 // A list of functions to call when this stack data is destroyed 1008 // (to capture ownership of channel data) 1009 std::vector<ChannelDataDestructor> channel_data_destructors; 1010 1011 bool empty() const { 1012 return filter_constructor.empty() && filter_destructor.empty() && 1013 client_initial_metadata.ops.empty() && 1014 server_initial_metadata.ops.empty() && 1015 client_to_server_messages.ops.empty() && 1016 client_to_server_half_close.empty() && 1017 server_to_client_messages.ops.empty() && 1018 server_trailing_metadata.empty() && finalizers.empty() && 1019 channel_data_destructors.empty(); 1020 } 1021 1022 // Add one filter to the list of filters, and update alignment. 1023 // Returns the offset of the call data for this filter. 1024 // Specifically does not update any of the layouts or finalizers. 1025 // Callers are expected to do that themselves. 1026 // This separation enables separation of *testing* of filters, and since 1027 // this is a detail type it's felt that a slightly harder to hold API that 1028 // we have exactly one caller for is warranted for a more thorough testing 1029 // story. 1030 template <typename FilterType> 1031 absl::enable_if_t<!std::is_empty<typename FilterType::Call>::value, size_t> 1032 AddFilterConstructor(FilterType* channel_data) { 1033 const size_t alignment = alignof(typename FilterType::Call); 1034 call_data_alignment = std::max(call_data_alignment, alignment); 1035 if (call_data_size % alignment != 0) { 1036 call_data_size += alignment - call_data_size % alignment; 1037 } 1038 const size_t call_offset = call_data_size; 1039 call_data_size += sizeof(typename FilterType::Call); 1040 filter_constructor.push_back(FilterConstructor{ 1041 channel_data, 1042 call_offset, 1043 [](void* call_data, void* channel_data) { 1044 CallConstructor<FilterType>::Construct( 1045 call_data, static_cast<FilterType*>(channel_data)); 1046 }, 1047 }); 1048 return call_offset; 1049 } 1050 1051 template <typename FilterType> 1052 absl::enable_if_t< 1053 std::is_empty<typename FilterType::Call>::value && 1054 !std::is_trivially_constructible<typename FilterType::Call>::value, 1055 size_t> 1056 AddFilterConstructor(FilterType* channel_data) { 1057 const size_t alignment = alignof(typename FilterType::Call); 1058 call_data_alignment = std::max(call_data_alignment, alignment); 1059 filter_constructor.push_back(FilterConstructor{ 1060 channel_data, 1061 0, 1062 [](void* call_data, void* channel_data) { 1063 CallConstructor<FilterType>::Construct( 1064 call_data, static_cast<FilterType*>(channel_data)); 1065 }, 1066 }); 1067 return 0; 1068 } 1069 1070 template <typename FilterType> 1071 absl::enable_if_t< 1072 std::is_empty<typename FilterType::Call>::value && 1073 std::is_trivially_constructible<typename FilterType::Call>::value, 1074 size_t> 1075 AddFilterConstructor(FilterType*) { 1076 const size_t alignment = alignof(typename FilterType::Call); 1077 call_data_alignment = std::max(call_data_alignment, alignment); 1078 return 0; 1079 } 1080 1081 template <typename FilterType> 1082 absl::enable_if_t< 1083 !std::is_trivially_destructible<typename FilterType::Call>::value> 1084 AddFilterDestructor(size_t call_offset) { 1085 filter_destructor.push_back(FilterDestructor{ 1086 call_offset, 1087 [](void* call_data) { 1088 Destruct(static_cast<typename FilterType::Call*>(call_data)); 1089 }, 1090 }); 1091 } 1092 1093 template <typename FilterType> 1094 absl::enable_if_t< 1095 std::is_trivially_destructible<typename FilterType::Call>::value> 1096 AddFilterDestructor(size_t) {} 1097 1098 template <typename FilterType> 1099 size_t AddFilter(FilterType* filter) { 1100 const size_t call_offset = AddFilterConstructor(filter); 1101 AddFilterDestructor<FilterType>(call_offset); 1102 return call_offset; 1103 } 1104 1105 // Per operation adders - one for each interception point. 1106 // Delegate to AddOp() above. 1107 1108 template <typename FilterType> 1109 void AddClientInitialMetadataOp(FilterType* channel_data, 1110 size_t call_offset) { 1111 AddOp<decltype(&FilterType::Call::OnClientInitialMetadata), 1112 &FilterType::Call::OnClientInitialMetadata>(channel_data, call_offset, 1113 client_initial_metadata); 1114 } 1115 1116 template <typename FilterType> 1117 void AddServerInitialMetadataOp(FilterType* channel_data, 1118 size_t call_offset) { 1119 AddOp<decltype(&FilterType::Call::OnServerInitialMetadata), 1120 &FilterType::Call::OnServerInitialMetadata>(channel_data, call_offset, 1121 server_initial_metadata); 1122 } 1123 1124 template <typename FilterType> 1125 void AddClientToServerMessageOp(FilterType* channel_data, 1126 size_t call_offset) { 1127 AddOp<decltype(&FilterType::Call::OnClientToServerMessage), 1128 &FilterType::Call::OnClientToServerMessage>( 1129 channel_data, call_offset, client_to_server_messages); 1130 } 1131 1132 template <typename FilterType> 1133 void AddClientToServerHalfClose(FilterType* channel_data, 1134 size_t call_offset) { 1135 AddHalfClose(channel_data, call_offset, 1136 &FilterType::Call::OnClientToServerHalfClose, 1137 client_to_server_half_close); 1138 } 1139 1140 template <typename FilterType> 1141 void AddServerToClientMessageOp(FilterType* channel_data, 1142 size_t call_offset) { 1143 AddOp<decltype(&FilterType::Call::OnServerToClientMessage), 1144 &FilterType::Call::OnServerToClientMessage>( 1145 channel_data, call_offset, server_to_client_messages); 1146 } 1147 1148 template <typename FilterType> 1149 void AddServerTrailingMetadataOp(FilterType* channel_data, 1150 size_t call_offset) { 1151 AddServerTrailingMetadata(channel_data, call_offset, 1152 &FilterType::Call::OnServerTrailingMetadata, 1153 server_trailing_metadata); 1154 } 1155 1156 // Finalizer interception adders 1157 1158 template <typename FilterType> 1159 void AddFinalizer(FilterType*, size_t, const NoInterceptor* p) { 1160 DCHECK(p == &FilterType::Call::OnFinalize); 1161 } 1162 1163 template <typename FilterType> 1164 void AddFinalizer(FilterType* channel_data, size_t call_offset, 1165 void (FilterType::Call::*p)(const grpc_call_final_info*)) { 1166 DCHECK(p == &FilterType::Call::OnFinalize); 1167 finalizers.push_back(Finalizer{ 1168 channel_data, 1169 call_offset, 1170 [](void* call_data, void*, const grpc_call_final_info* final_info) { 1171 static_cast<typename FilterType::Call*>(call_data)->OnFinalize( 1172 final_info); 1173 }, 1174 }); 1175 } 1176 1177 template <typename FilterType> 1178 void AddFinalizer(FilterType* channel_data, size_t call_offset, 1179 void (FilterType::Call::*p)(const grpc_call_final_info*, 1180 FilterType*)) { 1181 DCHECK(p == &FilterType::Call::OnFinalize); 1182 finalizers.push_back(Finalizer{ 1183 channel_data, 1184 call_offset, 1185 [](void* call_data, void* channel_data, 1186 const grpc_call_final_info* final_info) { 1187 static_cast<typename FilterType::Call*>(call_data)->OnFinalize( 1188 final_info, static_cast<FilterType*>(channel_data)); 1189 }, 1190 }); 1191 } 1192 }; 1193 1194 // OperationExecutor is a helper class to execute a sequence of operations 1195 // from a layout on one value. 1196 // We instantiate one of these during the *Pull* promise for each operation 1197 // and wait for it to resolve. 1198 // At this layer the filters look like a list of transformations on the 1199 // value pushed. 1200 // An early-failing filter will cause subsequent filters to not execute. 1201 template <typename T> 1202 class OperationExecutor { 1203 public: 1204 OperationExecutor() = default; 1205 ~OperationExecutor(); 1206 OperationExecutor(const OperationExecutor&) = delete; 1207 OperationExecutor& operator=(const OperationExecutor&) = delete; 1208 OperationExecutor(OperationExecutor&& other) noexcept 1209 : ops_(other.ops_), end_ops_(other.end_ops_) { 1210 // Movable iff we're not running. 1211 DCHECK_EQ(other.promise_data_, nullptr); 1212 } 1213 OperationExecutor& operator=(OperationExecutor&& other) noexcept { 1214 DCHECK_EQ(other.promise_data_, nullptr); 1215 DCHECK_EQ(promise_data_, nullptr); 1216 ops_ = other.ops_; 1217 end_ops_ = other.end_ops_; 1218 return *this; 1219 } 1220 // IsRunning() is true if we're currently executing a sequence of operations. 1221 bool IsRunning() const { return promise_data_ != nullptr; } 1222 // Start executing a layout. May allocate space to store the relevant promise. 1223 // Returns the result of the first poll. 1224 // If the promise finishes, also destroy the promise data. 1225 Poll<ResultOr<T>> Start(const Layout<T>* layout, T input, void* call_data); 1226 // Continue executing a layout. Returns the result of the next poll. 1227 // If the promise finishes, also destroy the promise data. 1228 Poll<ResultOr<T>> Step(void* call_data); 1229 1230 private: 1231 // Start polling on the current step of the layout. 1232 // `input` is the current value (either the input to the first step, or the 1233 // so far transformed value) 1234 // `call_data` is the call data for the filter stack. 1235 // If this op finishes immediately then we iterative move to the next step. 1236 // If we reach the end up the ops, we return the overall poll result, 1237 // otherwise we return Pending. 1238 Poll<ResultOr<T>> InitStep(T input, void* call_data); 1239 // Continue polling on the current step of the layout. 1240 // Called on the next poll after InitStep returns pending. 1241 // If the promise is still pending, returns this. 1242 // If the promise completes we call into InitStep to continue execution 1243 // through the filters. 1244 Poll<ResultOr<T>> ContinueStep(void* call_data); 1245 1246 void* promise_data_ = nullptr; 1247 const Operator<T>* ops_; 1248 const Operator<T>* end_ops_; 1249 }; 1250 1251 template <typename T> 1252 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline OperationExecutor< 1253 T>::~OperationExecutor() { 1254 if (promise_data_ != nullptr) { 1255 ops_->early_destroy(promise_data_); 1256 gpr_free_aligned(promise_data_); 1257 } 1258 } 1259 1260 template <typename T> 1261 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ResultOr<T>> 1262 OperationExecutor<T>::Start(const Layout<T>* layout, T input, void* call_data) { 1263 ops_ = layout->ops.data(); 1264 end_ops_ = ops_ + layout->ops.size(); 1265 if (layout->promise_size == 0) { 1266 // No call state ==> instantaneously ready 1267 auto r = InitStep(std::move(input), call_data); 1268 CHECK(r.ready()); 1269 return r; 1270 } 1271 promise_data_ = 1272 gpr_malloc_aligned(layout->promise_size, layout->promise_alignment); 1273 return InitStep(std::move(input), call_data); 1274 } 1275 1276 template <typename T> 1277 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ResultOr<T>> 1278 OperationExecutor<T>::InitStep(T input, void* call_data) { 1279 CHECK(input != nullptr); 1280 while (true) { 1281 if (ops_ == end_ops_) { 1282 return ResultOr<T>{std::move(input), nullptr}; 1283 } 1284 auto p = 1285 ops_->promise_init(promise_data_, Offset(call_data, ops_->call_offset), 1286 ops_->channel_data, std::move(input)); 1287 if (auto* r = p.value_if_ready()) { 1288 if (r->ok == nullptr) return std::move(*r); 1289 input = std::move(r->ok); 1290 ++ops_; 1291 continue; 1292 } 1293 return Pending{}; 1294 } 1295 } 1296 1297 template <typename T> 1298 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ResultOr<T>> 1299 OperationExecutor<T>::Step(void* call_data) { 1300 DCHECK_NE(promise_data_, nullptr); 1301 auto p = ContinueStep(call_data); 1302 if (p.ready()) { 1303 gpr_free_aligned(promise_data_); 1304 promise_data_ = nullptr; 1305 } 1306 return p; 1307 } 1308 1309 template <typename T> 1310 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ResultOr<T>> 1311 OperationExecutor<T>::ContinueStep(void* call_data) { 1312 auto p = ops_->poll(promise_data_); 1313 if (auto* r = p.value_if_ready()) { 1314 if (r->ok == nullptr) return std::move(*r); 1315 ++ops_; 1316 return InitStep(std::move(r->ok), call_data); 1317 } 1318 return Pending{}; 1319 } 1320 1321 template <typename Fn> 1322 class ServerTrailingMetadataInterceptor { 1323 public: 1324 class Call { 1325 public: 1326 static const NoInterceptor OnClientInitialMetadata; 1327 static const NoInterceptor OnServerInitialMetadata; 1328 static const NoInterceptor OnClientToServerMessage; 1329 static const NoInterceptor OnClientToServerHalfClose; 1330 static const NoInterceptor OnServerToClientMessage; 1331 static const NoInterceptor OnFinalize; 1332 void OnServerTrailingMetadata(ServerMetadata& md, 1333 ServerTrailingMetadataInterceptor* filter) { 1334 filter->fn_(md); 1335 } 1336 }; 1337 1338 explicit ServerTrailingMetadataInterceptor(Fn fn) : fn_(std::move(fn)) {} 1339 1340 private: 1341 GPR_NO_UNIQUE_ADDRESS Fn fn_; 1342 }; 1343 template <typename Fn> 1344 const NoInterceptor 1345 ServerTrailingMetadataInterceptor<Fn>::Call::OnClientInitialMetadata; 1346 template <typename Fn> 1347 const NoInterceptor 1348 ServerTrailingMetadataInterceptor<Fn>::Call::OnServerInitialMetadata; 1349 template <typename Fn> 1350 const NoInterceptor 1351 ServerTrailingMetadataInterceptor<Fn>::Call::OnClientToServerMessage; 1352 template <typename Fn> 1353 const NoInterceptor 1354 ServerTrailingMetadataInterceptor<Fn>::Call::OnClientToServerHalfClose; 1355 template <typename Fn> 1356 const NoInterceptor 1357 ServerTrailingMetadataInterceptor<Fn>::Call::OnServerToClientMessage; 1358 template <typename Fn> 1359 const NoInterceptor ServerTrailingMetadataInterceptor<Fn>::Call::OnFinalize; 1360 1361 template <typename Fn> 1362 class ClientInitialMetadataInterceptor { 1363 public: 1364 class Call { 1365 public: 1366 auto OnClientInitialMetadata(ClientMetadata& md, 1367 ClientInitialMetadataInterceptor* filter) { 1368 return filter->fn_(md); 1369 } 1370 static const NoInterceptor OnServerInitialMetadata; 1371 static const NoInterceptor OnClientToServerMessage; 1372 static const NoInterceptor OnClientToServerHalfClose; 1373 static const NoInterceptor OnServerToClientMessage; 1374 static const NoInterceptor OnServerTrailingMetadata; 1375 static const NoInterceptor OnFinalize; 1376 }; 1377 1378 explicit ClientInitialMetadataInterceptor(Fn fn) : fn_(std::move(fn)) {} 1379 1380 private: 1381 GPR_NO_UNIQUE_ADDRESS Fn fn_; 1382 }; 1383 template <typename Fn> 1384 const NoInterceptor 1385 ClientInitialMetadataInterceptor<Fn>::Call::OnServerInitialMetadata; 1386 template <typename Fn> 1387 const NoInterceptor 1388 ClientInitialMetadataInterceptor<Fn>::Call::OnClientToServerMessage; 1389 template <typename Fn> 1390 const NoInterceptor 1391 ClientInitialMetadataInterceptor<Fn>::Call::OnClientToServerHalfClose; 1392 template <typename Fn> 1393 const NoInterceptor 1394 ClientInitialMetadataInterceptor<Fn>::Call::OnServerToClientMessage; 1395 template <typename Fn> 1396 const NoInterceptor 1397 ClientInitialMetadataInterceptor<Fn>::Call::OnServerTrailingMetadata; 1398 template <typename Fn> 1399 const NoInterceptor ClientInitialMetadataInterceptor<Fn>::Call::OnFinalize; 1400 1401 } // namespace filters_detail 1402 1403 namespace for_each_detail { 1404 template <void (CallState::*on_progress)()> 1405 struct NextValueTraits<filters_detail::NextMessage<on_progress>> { 1406 using NextMsg = filters_detail::NextMessage<on_progress>; 1407 using Value = MessageHandle; 1408 1409 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static NextValueType Type( 1410 const NextMsg& t) { 1411 if (!t.ok()) return NextValueType::kError; 1412 if (t.has_value()) return NextValueType::kValue; 1413 return NextValueType::kEndOfStream; 1414 } 1415 1416 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static MessageHandle TakeValue( 1417 NextMsg& t) { 1418 return t.TakeValue(); 1419 } 1420 }; 1421 } // namespace for_each_detail 1422 1423 template <void (CallState::*on_progress)()> 1424 struct FailureStatusCastImpl<filters_detail::NextMessage<on_progress>, 1425 StatusFlag> { 1426 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static filters_detail::NextMessage< 1427 on_progress> 1428 Cast(StatusFlag flag) { 1429 DCHECK_EQ(flag, Failure{}); 1430 return filters_detail::NextMessage<on_progress>(Failure{}); 1431 } 1432 }; 1433 1434 namespace promise_detail { 1435 template <void (CallState::*on_progress)()> 1436 struct TrySeqTraitsWithSfinae<filters_detail::NextMessage<on_progress>> { 1437 using UnwrappedType = MessageHandle; 1438 using WrappedType = filters_detail::NextMessage<on_progress>; 1439 template <typename Next> 1440 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static auto CallFactory( 1441 Next* next, WrappedType&& value) { 1442 return next->Make(value.TakeValue()); 1443 } 1444 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static bool IsOk( 1445 const WrappedType& value) { 1446 return value.ok(); 1447 } 1448 static const char* ErrorString(const WrappedType& status) { 1449 DCHECK(!status.ok()); 1450 return "failed"; 1451 } 1452 template <typename R> 1453 static R ReturnValue(WrappedType&& status) { 1454 DCHECK(!status.ok()); 1455 return WrappedType(Failure{}); 1456 } 1457 template <typename F, typename Elem> 1458 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static auto CallSeqFactory( 1459 F& f, Elem&& elem, WrappedType value) 1460 -> decltype(f(std::forward<Elem>(elem), std::declval<MessageHandle>())) { 1461 return f(std::forward<Elem>(elem), value.TakeValue()); 1462 } 1463 template <typename Result, typename RunNext> 1464 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Poll<Result> 1465 CheckResultAndRunNext(WrappedType prior, RunNext run_next) { 1466 if (!prior.ok()) return WrappedType(prior.status()); 1467 return run_next(std::move(prior)); 1468 } 1469 }; 1470 } // namespace promise_detail 1471 1472 using ServerToClientNextMessage = 1473 filters_detail::NextMessage<&CallState::FinishPullServerToClientMessage>; 1474 using ClientToServerNextMessage = 1475 filters_detail::NextMessage<&CallState::FinishPullClientToServerMessage>; 1476 1477 // Execution environment for a stack of filters. 1478 // This is a per-call object. 1479 class CallFilters { 1480 public: 1481 class StackBuilder; 1482 class StackTestSpouse; 1483 1484 // A stack is an opaque, immutable type that contains the data necessary to 1485 // execute a call through a given set of filters. 1486 // It's reference counted so that it can be shared between many calls. 1487 // It contains pointers to the individual filters, yet it does not own those 1488 // pointers: it's expected that some other object will track that ownership. 1489 class Stack : public RefCounted<Stack> { 1490 public: 1491 ~Stack() override; 1492 1493 private: 1494 friend class CallFilters; 1495 friend class StackBuilder; 1496 friend class StackTestSpouse; 1497 explicit Stack(filters_detail::StackData data) : data_(std::move(data)) {} 1498 const filters_detail::StackData data_; 1499 }; 1500 1501 // Build stacks... repeatedly call Add with each filter that contributes to 1502 // the stack, then call Build() to generate a ref counted Stack object. 1503 class StackBuilder { 1504 public: 1505 ~StackBuilder(); 1506 1507 template <typename FilterType> 1508 void Add(FilterType* filter) { 1509 const size_t call_offset = data_.AddFilter<FilterType>(filter); 1510 data_.AddClientInitialMetadataOp(filter, call_offset); 1511 data_.AddServerInitialMetadataOp(filter, call_offset); 1512 data_.AddClientToServerMessageOp(filter, call_offset); 1513 data_.AddClientToServerHalfClose(filter, call_offset); 1514 data_.AddServerToClientMessageOp(filter, call_offset); 1515 data_.AddServerTrailingMetadataOp(filter, call_offset); 1516 data_.AddFinalizer(filter, call_offset, &FilterType::Call::OnFinalize); 1517 } 1518 1519 void AddOwnedObject(void (*destroy)(void* p), void* p) { 1520 data_.channel_data_destructors.push_back({destroy, p}); 1521 } 1522 1523 template <typename T> 1524 void AddOwnedObject(RefCountedPtr<T> p) { 1525 AddOwnedObject([](void* p) { static_cast<T*>(p)->Unref(); }, p.release()); 1526 } 1527 1528 template <typename T> 1529 void AddOwnedObject(std::unique_ptr<T> p) { 1530 AddOwnedObject([](void* p) { delete static_cast<T*>(p); }, p.release()); 1531 } 1532 1533 template <typename Fn> 1534 void AddOnClientInitialMetadata(Fn fn) { 1535 auto filter = std::make_unique< 1536 filters_detail::ClientInitialMetadataInterceptor<Fn>>(std::move(fn)); 1537 Add(filter.get()); 1538 AddOwnedObject(std::move(filter)); 1539 } 1540 1541 template <typename Fn> 1542 void AddOnServerTrailingMetadata(Fn fn) { 1543 auto filter = std::make_unique< 1544 filters_detail::ServerTrailingMetadataInterceptor<Fn>>(std::move(fn)); 1545 Add(filter.get()); 1546 AddOwnedObject(std::move(filter)); 1547 } 1548 1549 RefCountedPtr<Stack> Build(); 1550 1551 private: 1552 filters_detail::StackData data_; 1553 }; 1554 1555 explicit CallFilters(ClientMetadataHandle client_initial_metadata) 1556 : call_data_(nullptr), 1557 push_client_initial_metadata_(std::move(client_initial_metadata)) {} 1558 ~CallFilters() { 1559 if (call_data_ != nullptr && call_data_ != &g_empty_call_data_) { 1560 for (const auto& stack : stacks_) { 1561 for (const auto& destructor : stack.stack->data_.filter_destructor) { 1562 destructor.call_destroy(filters_detail::Offset( 1563 call_data_, stack.call_data_offset + destructor.call_offset)); 1564 } 1565 } 1566 gpr_free_aligned(call_data_); 1567 } 1568 }; 1569 1570 CallFilters(const CallFilters&) = delete; 1571 CallFilters& operator=(const CallFilters&) = delete; 1572 CallFilters(CallFilters&&) = delete; 1573 CallFilters& operator=(CallFilters&&) = delete; 1574 1575 void AddStack(RefCountedPtr<Stack> stack) { 1576 if (stack->data_.empty()) return; 1577 stacks_.emplace_back(std::move(stack)); 1578 } 1579 void Start(); 1580 1581 // Access client initial metadata before it's processed 1582 ClientMetadata* unprocessed_client_initial_metadata() { 1583 return push_client_initial_metadata_.get(); 1584 } 1585 1586 private: 1587 template <typename Output, typename Input, 1588 Input(CallFilters::* input_location), 1589 filters_detail::Layout<Input>(filters_detail::StackData::* layout), 1590 void (CallState::*on_done)(), typename StackIterator> 1591 class MetadataExecutor { 1592 public: 1593 MetadataExecutor(CallFilters* filters, StackIterator stack_begin, 1594 StackIterator stack_end) 1595 : stack_current_(stack_begin), 1596 stack_end_(stack_end), 1597 filters_(filters) { 1598 DCHECK_NE((filters_->*input_location).get(), nullptr); 1599 } 1600 1601 Poll<ValueOrFailure<Output>> operator()() { 1602 if ((filters_->*input_location) != nullptr) { 1603 if (stack_current_ == stack_end_) { 1604 DCHECK_NE((filters_->*input_location).get(), nullptr); 1605 (filters_->call_state_.*on_done)(); 1606 return Output(std::move(filters_->*input_location)); 1607 } 1608 return FinishStep(executor_.Start( 1609 &(stack_current_->stack->data_.*layout), 1610 std::move(filters_->*input_location), filters_->call_data_)); 1611 } else { 1612 return FinishStep(executor_.Step(filters_->call_data_)); 1613 } 1614 } 1615 1616 private: 1617 Poll<ValueOrFailure<Output>> FinishStep( 1618 Poll<filters_detail::ResultOr<Input>> p) { 1619 auto* r = p.value_if_ready(); 1620 if (r == nullptr) return Pending{}; 1621 if (r->ok != nullptr) { 1622 ++stack_current_; 1623 if (stack_current_ == stack_end_) { 1624 (filters_->call_state_.*on_done)(); 1625 return ValueOrFailure<Output>{std::move(r->ok)}; 1626 } 1627 return FinishStep( 1628 executor_.Start(&(stack_current_->stack->data_.*layout), 1629 std::move(r->ok), filters_->call_data_)); 1630 } 1631 (filters_->call_state_.*on_done)(); 1632 filters_->PushServerTrailingMetadata(std::move(r->error)); 1633 return Failure{}; 1634 } 1635 1636 StackIterator stack_current_; 1637 StackIterator stack_end_; 1638 CallFilters* filters_; 1639 filters_detail::OperationExecutor<Input> executor_; 1640 }; 1641 1642 template <MessageHandle(CallFilters::* input_location), 1643 filters_detail::Layout<MessageHandle>( 1644 filters_detail::StackData::* layout), 1645 void (CallState::*on_done)(), typename StackIterator> 1646 class MessageExecutor { 1647 public: 1648 using NextMsg = filters_detail::NextMessage<on_done>; 1649 1650 MessageExecutor(CallFilters* filters, StackIterator stack_begin, 1651 StackIterator stack_end) 1652 : stack_current_(stack_begin), 1653 stack_end_(stack_end), 1654 filters_(filters) { 1655 DCHECK_NE((filters_->*input_location).get(), nullptr); 1656 } 1657 1658 Poll<NextMsg> operator()() { 1659 if ((filters_->*input_location) != nullptr) { 1660 if (stack_current_ == stack_end_) { 1661 DCHECK_NE((filters_->*input_location).get(), nullptr); 1662 return NextMsg(std::move(filters_->*input_location), 1663 &filters_->call_state_); 1664 } 1665 return FinishStep(executor_.Start( 1666 &(stack_current_->stack->data_.*layout), 1667 std::move(filters_->*input_location), filters_->call_data_)); 1668 } else { 1669 return FinishStep(executor_.Step(filters_->call_data_)); 1670 } 1671 } 1672 1673 private: 1674 Poll<NextMsg> FinishStep(Poll<filters_detail::ResultOr<MessageHandle>> p) { 1675 auto* r = p.value_if_ready(); 1676 if (r == nullptr) return Pending{}; 1677 if (r->ok != nullptr) { 1678 ++stack_current_; 1679 if (stack_current_ == stack_end_) { 1680 return NextMsg{std::move(r->ok), &filters_->call_state_}; 1681 } 1682 return FinishStep( 1683 executor_.Start(&(stack_current_->stack->data_.*layout), 1684 std::move(r->ok), filters_->call_data_)); 1685 } 1686 (filters_->call_state_.*on_done)(); 1687 filters_->PushServerTrailingMetadata(std::move(r->error)); 1688 return Failure{}; 1689 } 1690 1691 StackIterator stack_current_; 1692 StackIterator stack_end_; 1693 CallFilters* filters_; 1694 filters_detail::OperationExecutor<MessageHandle> executor_; 1695 }; 1696 1697 public: 1698 // Client: Fetch client initial metadata 1699 // Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle> 1700 GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() { 1701 call_state_.BeginPullClientInitialMetadata(); 1702 return MetadataExecutor<ClientMetadataHandle, ClientMetadataHandle, 1703 &CallFilters::push_client_initial_metadata_, 1704 &filters_detail::StackData::client_initial_metadata, 1705 &CallState::FinishPullClientInitialMetadata, 1706 StacksVector::const_iterator>( 1707 this, stacks_.cbegin(), stacks_.cend()); 1708 } 1709 // Server: Push server initial metadata 1710 // Returns a promise that resolves to a StatusFlag indicating success 1711 StatusFlag PushServerInitialMetadata(ServerMetadataHandle md) { 1712 push_server_initial_metadata_ = std::move(md); 1713 return call_state_.PushServerInitialMetadata(); 1714 } 1715 // Client: Fetch server initial metadata 1716 // Returns a promise that resolves to ValueOrFailure<ServerMetadataHandle> 1717 GRPC_MUST_USE_RESULT auto PullServerInitialMetadata() { 1718 return Seq( 1719 [this]() { 1720 return call_state_.PollPullServerInitialMetadataAvailable(); 1721 }, 1722 [this](bool has_server_initial_metadata) { 1723 return If( 1724 has_server_initial_metadata, 1725 [this]() { 1726 return Map( 1727 MetadataExecutor< 1728 absl::optional<ServerMetadataHandle>, 1729 ServerMetadataHandle, 1730 &CallFilters::push_server_initial_metadata_, 1731 &filters_detail::StackData::server_initial_metadata, 1732 &CallState::FinishPullServerInitialMetadata, 1733 StacksVector::const_reverse_iterator>( 1734 this, stacks_.crbegin(), stacks_.crend()), 1735 [](ValueOrFailure<absl::optional<ServerMetadataHandle>> r) { 1736 if (r.ok()) return std::move(*r); 1737 return absl::optional<ServerMetadataHandle>{}; 1738 }); 1739 }, 1740 []() { 1741 return Immediate(absl::optional<ServerMetadataHandle>{}); 1742 }); 1743 }); 1744 } 1745 // Client: Push client to server message 1746 // Returns a promise that resolves to a StatusFlag indicating success 1747 GRPC_MUST_USE_RESULT auto PushClientToServerMessage(MessageHandle message) { 1748 call_state_.BeginPushClientToServerMessage(); 1749 DCHECK_NE(message.get(), nullptr); 1750 DCHECK_EQ(push_client_to_server_message_.get(), nullptr); 1751 push_client_to_server_message_ = std::move(message); 1752 return [this]() { return call_state_.PollPushClientToServerMessage(); }; 1753 } 1754 // Client: Indicate that no more messages will be sent 1755 void FinishClientToServerSends() { call_state_.ClientToServerHalfClose(); } 1756 // Server: Fetch client to server message 1757 // Returns a promise that resolves to ClientToServerNextMessage 1758 GRPC_MUST_USE_RESULT auto PullClientToServerMessage() { 1759 return TrySeq( 1760 [this]() { 1761 return call_state_.PollPullClientToServerMessageAvailable(); 1762 }, 1763 [this](bool message_available) { 1764 return If( 1765 message_available, 1766 [this]() { 1767 return MessageExecutor< 1768 &CallFilters::push_client_to_server_message_, 1769 &filters_detail::StackData::client_to_server_messages, 1770 &CallState::FinishPullClientToServerMessage, 1771 StacksVector::const_iterator>(this, stacks_.cbegin(), 1772 stacks_.cend()); 1773 }, 1774 []() -> ClientToServerNextMessage { 1775 return ClientToServerNextMessage(); 1776 }); 1777 }); 1778 } 1779 // Server: Push server to client message 1780 // Returns a promise that resolves to a StatusFlag indicating success 1781 GRPC_MUST_USE_RESULT auto PushServerToClientMessage(MessageHandle message) { 1782 call_state_.BeginPushServerToClientMessage(); 1783 push_server_to_client_message_ = std::move(message); 1784 return [this]() { return call_state_.PollPushServerToClientMessage(); }; 1785 } 1786 // Server: Fetch server to client message 1787 // Returns a promise that resolves to ServerToClientNextMessage 1788 GRPC_MUST_USE_RESULT auto PullServerToClientMessage() { 1789 return TrySeq( 1790 [this]() { 1791 return call_state_.PollPullServerToClientMessageAvailable(); 1792 }, 1793 [this](bool message_available) { 1794 return If( 1795 message_available, 1796 [this]() { 1797 return MessageExecutor< 1798 &CallFilters::push_server_to_client_message_, 1799 &filters_detail::StackData::server_to_client_messages, 1800 &CallState::FinishPullServerToClientMessage, 1801 StacksVector::const_reverse_iterator>( 1802 this, stacks_.crbegin(), stacks_.crend()); 1803 }, 1804 []() -> ServerToClientNextMessage { 1805 return ServerToClientNextMessage(); 1806 }); 1807 }); 1808 } 1809 // Server: Indicate end of response 1810 // Closes the request entirely - no messages can be sent/received 1811 // If no server initial metadata has been sent, implies 1812 // NoServerInitialMetadata() called. 1813 void PushServerTrailingMetadata(ServerMetadataHandle md); 1814 // Optimized path to push cancellation onto the call 1815 void Cancel(); 1816 // Client: Fetch server trailing metadata 1817 // Returns a promise that resolves to ServerMetadataHandle 1818 GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata() { 1819 return Map( 1820 [this]() { return call_state_.PollServerTrailingMetadataAvailable(); }, 1821 [this](Empty) { 1822 auto value = std::move(push_server_trailing_metadata_); 1823 if (call_data_ != nullptr) { 1824 for (auto it = stacks_.crbegin(); it != stacks_.crend(); ++it) { 1825 value = filters_detail::RunServerTrailingMetadata( 1826 it->stack->data_.server_trailing_metadata, 1827 filters_detail::Offset(call_data_, it->call_data_offset), 1828 std::move(value)); 1829 } 1830 } 1831 return value; 1832 }); 1833 } 1834 // Server: Wait for server trailing metadata to have been sent 1835 // Returns a promise that resolves to a StatusFlag indicating whether the 1836 // request was cancelled or not -- failure to send trailing metadata is 1837 // considered a cancellation, as is actual cancellation -- but not application 1838 // errors. 1839 GRPC_MUST_USE_RESULT auto WasCancelled() { 1840 return [this]() { return call_state_.PollWasCancelled(); }; 1841 } 1842 // Client & server: wait for server trailing metadata to be available, and 1843 // resolve to empty when it is. 1844 GRPC_MUST_USE_RESULT auto ServerTrailingMetadataWasPushed() { 1845 return 1846 [this]() { return call_state_.PollServerTrailingMetadataWasPushed(); }; 1847 } 1848 // Client & server: returns true if server trailing metadata has been pushed 1849 // *and* contained a cancellation, false otherwise. 1850 GRPC_MUST_USE_RESULT bool WasCancelledPushed() const { 1851 return call_state_.WasCancelledPushed(); 1852 } 1853 1854 // Returns true if server trailing metadata has been pulled 1855 bool WasServerTrailingMetadataPulled() const { 1856 return call_state_.WasServerTrailingMetadataPulled(); 1857 } 1858 1859 // Client & server: fill in final_info with the final status of the call. 1860 void Finalize(const grpc_call_final_info* final_info); 1861 1862 std::string DebugString() const; 1863 1864 private: 1865 void CancelDueToFailedPipeOperation(SourceLocation but_where = {}); 1866 1867 struct AddedStack { 1868 explicit AddedStack(RefCountedPtr<Stack> stack) 1869 : call_data_offset(std::numeric_limits<size_t>::max()), 1870 stack(std::move(stack)) {} 1871 size_t call_data_offset; 1872 RefCountedPtr<Stack> stack; 1873 }; 1874 1875 using StacksVector = absl::InlinedVector<AddedStack, 2>; 1876 1877 StacksVector stacks_; 1878 1879 CallState call_state_; 1880 1881 void* call_data_; 1882 ClientMetadataHandle push_client_initial_metadata_; 1883 ServerMetadataHandle push_server_initial_metadata_; 1884 MessageHandle push_client_to_server_message_; 1885 MessageHandle push_server_to_client_message_; 1886 ServerMetadataHandle push_server_trailing_metadata_; 1887 1888 static char g_empty_call_data_; 1889 }; 1890 1891 static_assert( 1892 filters_detail::ArgumentMustBeNextMessage< 1893 absl::remove_cvref_t<decltype(std::declval<CallFilters*>() 1894 ->PullServerToClientMessage()() 1895 .value())>>::value(), 1896 "PullServerToClientMessage must return a NextMessage"); 1897 1898 static_assert( 1899 filters_detail::ArgumentMustBeNextMessage< 1900 absl::remove_cvref_t<decltype(std::declval<CallFilters*>() 1901 ->PullClientToServerMessage()() 1902 .value())>>::value(), 1903 "PullServerToClientMessage must return a NextMessage"); 1904 1905 } // namespace grpc_core 1906 1907 #endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H 1908