• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H
16 #define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <cstdint>
21 #include <limits>
22 #include <memory>
23 #include <ostream>
24 #include <type_traits>
25 
26 #include "absl/log/check.h"
27 #include "src/core/lib/promise/for_each.h"
28 #include "src/core/lib/promise/if.h"
29 #include "src/core/lib/promise/latch.h"
30 #include "src/core/lib/promise/map.h"
31 #include "src/core/lib/promise/promise.h"
32 #include "src/core/lib/promise/seq.h"
33 #include "src/core/lib/promise/status_flag.h"
34 #include "src/core/lib/promise/try_seq.h"
35 #include "src/core/lib/transport/call_final_info.h"
36 #include "src/core/lib/transport/call_state.h"
37 #include "src/core/lib/transport/message.h"
38 #include "src/core/lib/transport/metadata.h"
39 #include "src/core/util/dump_args.h"
40 #include "src/core/util/ref_counted.h"
41 #include "src/core/util/ref_counted_ptr.h"
42 
43 // CallFilters tracks a list of filters that are attached to a call.
44 // At a high level, a filter (for the purposes of this module) is a class
45 // that has a Call member class, and a set of methods that are called
46 // for each major event in the lifetime of a call.
47 //
48 // The Call member class must have the following members:
49 // - OnClientInitialMetadata  - $VALUE_TYPE = ClientMetadata
50 // - OnServerInitialMetadata  - $VALUE_TYPE = ServerMetadata
51 // - OnServerToClientMessage  - $VALUE_TYPE = Message
52 // - OnClientToServerMessage  - $VALUE_TYPE = Message
53 // - OnClientToServerHalfClose - no value
54 // - OnServerTrailingMetadata - $VALUE_TYPE = ServerMetadata
55 // - OnFinalize               - special, see below
56 // These members define an interception point for a particular event in
57 // the call lifecycle.
58 //
59 // The type of these members matters, and is selectable by the class
60 // author. For $INTERCEPTOR_NAME in the above list:
61 // - static const NoInterceptor $INTERCEPTOR_NAME:
62 //   defines that this filter does not intercept this event.
63 //   there is zero runtime cost added to handling that event by this filter.
64 // - void $INTERCEPTOR_NAME($VALUE_TYPE&):
65 //   the filter intercepts this event, and can modify the value.
66 //   it never fails.
67 // - absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&):
68 //   the filter intercepts this event, and can modify the value.
69 //   it can fail, in which case the call will be aborted.
70 // - ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&)
71 //   the filter intercepts this event, and can modify the value.
72 //   the filter can return nullptr for success, or a metadata handle for
73 //   failure (in which case the call will be aborted).
74 //   useful for cases where the exact metadata returned needs to be customized.
75 // - void $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*):
76 //   the filter intercepts this event, and can modify the value.
77 //   it can access the channel via the second argument.
78 //   it never fails.
79 // - absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*):
80 //   the filter intercepts this event, and can modify the value.
81 //   it can access the channel via the second argument.
82 //   it can fail, in which case the call will be aborted.
83 // - ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*)
84 //   the filter intercepts this event, and can modify the value.
85 //   it can access the channel via the second argument.
86 //   the filter can return nullptr for success, or a metadata handle for
87 //   failure (in which case the call will be aborted).
88 //   useful for cases where the exact metadata returned needs to be customized.
89 // It's also acceptable to return a promise that resolves to the
90 // relevant return type listed above.
91 //
92 // Finally, OnFinalize is added to intecept call finalization.
93 // It must have one of the signatures:
94 // - static const NoInterceptor OnFinalize:
95 //   the filter does not intercept call finalization.
96 // - void OnFinalize(const grpc_call_final_info*):
97 //   the filter intercepts call finalization.
98 // - void OnFinalize(const grpc_call_final_info*, FilterType*):
99 //   the filter intercepts call finalization.
100 //
101 // The constructor of the Call object can either take a pointer to the channel
102 // object, or not take any arguments.
103 //
104 // *THIS MODULE* holds no opinion on what members the channel part of the
105 // filter should or should not have, but does require that it have a stable
106 // pointer for the lifetime of a call (ownership is expected to happen
107 // elsewhere).
108 
109 namespace grpc_core {
110 
111 // Tag type to indicate no interception.
112 // This is used to indicate that a filter does not intercept a particular
113 // event.
114 // In C++14 we declare these as (for example):
115 //   static const NoInterceptor OnClientInitialMetadata;
116 // and out-of-line provide the definition:
117 //   const MyFilter::Call::NoInterceptor
118 //   MyFilter::Call::OnClientInitialMetadata;
119 // In C++17 and later we can use inline variables instead:
120 //   inline static const NoInterceptor OnClientInitialMetadata;
121 struct NoInterceptor {};
122 
123 namespace filters_detail {
124 
125 // Flow control across pipe stages.
126 // This ends up being exceedingly subtle - essentially we need to ensure that
127 // across a series of pipes we have no more than one outstanding message at a
128 // time - but those pipes are for the most part independent.
129 // How we achieve this is that this NextMessage object holds both the message
130 // and a completion token - the last owning NextMessage instance will call
131 // the on_progress method on the referenced CallState - and at that point that
132 // CallState will allow the next message to be sent through it.
133 // Next, the ForEach promise combiner explicitly holds onto the wrapper object
134 // owning the result (this object) and extracts the message from it, but doesn't
135 // dispose that instance until the action promise for the ForEach iteration
136 // completes, ensuring most callers need do nothing special to have the
137 // flow control work correctly.
138 template <void (CallState::*on_progress)()>
139 class NextMessage {
140  public:
~NextMessage()141   ~NextMessage() {
142     if (message_ != end_of_stream() && message_ != error() &&
143         message_ != taken()) {
144       delete message_;
145     }
146     if (call_state_ != nullptr) {
147       (call_state_->*on_progress)();
148     }
149   }
150 
151   NextMessage() = default;
NextMessage(Failure)152   explicit NextMessage(Failure) : message_(error()), call_state_(nullptr) {}
NextMessage(MessageHandle message,CallState * call_state)153   NextMessage(MessageHandle message, CallState* call_state) {
154     DCHECK_NE(call_state, nullptr);
155     DCHECK_NE(message.get(), nullptr);
156     DCHECK(message.get_deleter().has_freelist());
157     message_ = message.release();
158     call_state_ = call_state;
159   }
160   NextMessage(const NextMessage& other) = delete;
161   NextMessage& operator=(const NextMessage& other) = delete;
NextMessage(NextMessage && other)162   NextMessage(NextMessage&& other) noexcept
163       : message_(std::exchange(other.message_, taken())),
164         call_state_(std::exchange(other.call_state_, nullptr)) {}
165   NextMessage& operator=(NextMessage&& other) noexcept {
166     if (message_ != end_of_stream() && message_ != error() &&
167         message_ != taken()) {
168       delete message_;
169     }
170     if (call_state_ != nullptr) {
171       (call_state_->*on_progress)();
172     }
173     message_ = std::exchange(other.message_, taken());
174     call_state_ = std::exchange(other.call_state_, nullptr);
175     return *this;
176   }
177 
ok()178   bool ok() const {
179     DCHECK_NE(message_, taken());
180     return message_ != error();
181   }
has_value()182   bool has_value() const {
183     DCHECK_NE(message_, taken());
184     DCHECK(ok());
185     return message_ != end_of_stream();
186   }
status()187   StatusFlag status() const { return StatusFlag(ok()); }
value()188   Message& value() {
189     DCHECK_NE(message_, taken());
190     DCHECK(ok());
191     DCHECK(has_value());
192     return *message_;
193   }
TakeValue()194   MessageHandle TakeValue() {
195     DCHECK_NE(message_, taken());
196     DCHECK(ok());
197     DCHECK(has_value());
198     return MessageHandle(std::exchange(message_, taken()),
199                          Arena::PooledDeleter());
200   }
progressed()201   bool progressed() const { return call_state_ == nullptr; }
Progress()202   void Progress() {
203     DCHECK(!progressed());
204     (call_state_->*on_progress)();
205     call_state_ = nullptr;
206   }
207 
208  private:
end_of_stream()209   static Message* end_of_stream() { return nullptr; }
error()210   static Message* error() { return reinterpret_cast<Message*>(1); }
taken()211   static Message* taken() { return reinterpret_cast<Message*>(2); }
212   Message* message_ = end_of_stream();
213   CallState* call_state_ = nullptr;
214 };
215 
216 template <typename T>
217 struct ArgumentMustBeNextMessage;
218 template <void (CallState::*on_progress)()>
219 struct ArgumentMustBeNextMessage<NextMessage<on_progress>> {
220   static constexpr bool value() { return true; }
221 };
222 
223 inline void* Offset(void* base, size_t amt) {
224   return static_cast<char*>(base) + amt;
225 }
226 
227 // One call filter constructor
228 // Contains enough information to allocate and initialize the
229 // call data for one filter.
230 struct FilterConstructor {
231   // Pointer to corresponding channel data for this filter
232   void* channel_data;
233   // Offset of the call data for this filter within the call data memory
234   // allocation
235   size_t call_offset;
236   // Initialize the call data for this filter
237   void (*call_init)(void* call_data, void* channel_data);
238 };
239 
240 // One call filter destructor
241 struct FilterDestructor {
242   // Offset of the call data for this filter within the call data memory
243   // allocation
244   size_t call_offset;
245   // Destroy the call data for this filter
246   void (*call_destroy)(void* call_data);
247 };
248 
249 template <typename FilterType, typename = void>
250 struct CallConstructor {
251   static void Construct(void* call_data, FilterType*) {
252     new (call_data) typename FilterType::Call();
253   }
254 };
255 
256 template <typename FilterType>
257 struct CallConstructor<FilterType,
258                        absl::void_t<decltype(typename FilterType::Call(
259                            static_cast<FilterType*>(nullptr)))>> {
260   static void Construct(void* call_data, FilterType* channel) {
261     new (call_data) typename FilterType::Call(channel);
262   }
263 };
264 
265 // Result of a filter operation
266 // Can be either ok (if ok is non-null) or an error.
267 // Only one pointer can be set.
268 template <typename T>
269 struct ResultOr {
270   ResultOr(T ok, ServerMetadataHandle error)
271       : ok(std::move(ok)), error(std::move(error)) {
272     CHECK((this->ok == nullptr) ^ (this->error == nullptr));
273   }
274   T ok;
275   ServerMetadataHandle error;
276 };
277 
278 // One filter operation metadata
279 // Given a value of type T, produces a promise of type ResultOr<T>.
280 template <typename T>
281 struct Operator {
282   using Arg = T;
283   // Pointer to corresponding channel data for this filter
284   void* channel_data;
285   // Offset of the call data for this filter within the call data memory
286   size_t call_offset;
287   // Initialize the promise data for this filter, and poll once.
288   // Return the result of the poll.
289   // If the promise finishes, also destroy the promise data!
290   Poll<ResultOr<T>> (*promise_init)(void* promise_data, void* call_data,
291                                     void* channel_data, T value);
292   // Poll the promise data for this filter.
293   // If the promise finishes, also destroy the promise data!
294   // Note that if the promise always finishes on the first poll, then supplying
295   // this method is unnecessary (as it will never be called).
296   Poll<ResultOr<T>> (*poll)(void* promise_data);
297   // Destroy the promise data for this filter for an in-progress operation
298   // before the promise finishes.
299   // Note that if the promise always finishes on the first poll, then supplying
300   // this method is unnecessary (as it will never be called).
301   void (*early_destroy)(void* promise_data);
302 };
303 
304 struct HalfCloseOperator {
305   // Pointer to corresponding channel data for this filter
306   void* channel_data;
307   // Offset of the call data for this filter within the call data memory
308   size_t call_offset;
309   void (*half_close)(void* call_data, void* channel_data);
310 };
311 
312 struct ServerTrailingMetadataOperator {
313   // Pointer to corresponding channel data for this filter
314   void* channel_data;
315   // Offset of the call data for this filter within the call data memory
316   size_t call_offset;
317   ServerMetadataHandle (*server_trailing_metadata)(
318       void* call_data, void* channel_data, ServerMetadataHandle metadata);
319 };
320 
321 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void RunHalfClose(
322     absl::Span<const HalfCloseOperator> ops, void* call_data) {
323   for (const auto& op : ops) {
324     op.half_close(Offset(call_data, op.call_offset), op.channel_data);
325   }
326 }
327 
328 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline ServerMetadataHandle
329 RunServerTrailingMetadata(absl::Span<const ServerTrailingMetadataOperator> ops,
330                           void* call_data, ServerMetadataHandle md) {
331   for (auto& op : ops) {
332     md = op.server_trailing_metadata(Offset(call_data, op.call_offset),
333                                      op.channel_data, std::move(md));
334   }
335   return md;
336 }
337 
338 // One call finalizer
339 struct Finalizer {
340   void* channel_data;
341   size_t call_offset;
342   void (*final)(void* call_data, void* channel_data,
343                 const grpc_call_final_info* final_info);
344 };
345 
346 // A layout of operations for a given filter stack
347 // This includes which operations, how much memory is required, what alignment.
348 template <typename T>
349 struct Layout {
350   size_t promise_size = 0;
351   size_t promise_alignment = 0;
352   std::vector<Operator<T>> ops;
353 
354   void Add(size_t filter_promise_size, size_t filter_promise_alignment,
355            Operator<T> op) {
356     promise_size = std::max(promise_size, filter_promise_size);
357     promise_alignment = std::max(promise_alignment, filter_promise_alignment);
358     ops.push_back(op);
359   }
360 
361   void Reverse() { absl::c_reverse(ops); }
362 };
363 
364 // AddOp and friends
365 // These are helpers to wrap a member function on a class into an operation
366 // and attach it to a layout.
367 // There's a generic wrapper function `AddOp` for each of fallible and
368 // infallible operations.
369 // There are then specializations of AddOpImpl for each kind of member function
370 // an operation could have.
371 // Each specialization has an `Add` member function for the kinds of operations
372 // it supports: some only support fallible, some only support infallible, some
373 // support both.
374 
375 template <typename FilterType, typename T, typename FunctionImpl,
376           FunctionImpl impl, typename SfinaeVoid = void>
377 struct AddOpImpl;
378 
379 template <typename FunctionImpl, FunctionImpl impl, typename FilterType,
380           typename T>
381 void AddOp(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
382   AddOpImpl<FilterType, T, FunctionImpl, impl>::Add(channel_data, call_offset,
383                                                     to);
384 }
385 
386 template <typename FilterType>
387 void AddHalfClose(FilterType* channel_data, size_t call_offset,
388                   void (FilterType::Call::*)(),
389                   std::vector<HalfCloseOperator>& to) {
390   to.push_back(
391       HalfCloseOperator{channel_data, call_offset, [](void* call_data, void*) {
392                           static_cast<typename FilterType::Call*>(call_data)
393                               ->OnClientToServerHalfClose();
394                         }});
395 }
396 
397 template <typename FilterType>
398 void AddHalfClose(FilterType* channel_data, size_t call_offset,
399                   void (FilterType::Call::*)(FilterType*),
400                   std::vector<HalfCloseOperator>& to) {
401   to.push_back(HalfCloseOperator{
402       channel_data, call_offset, [](void* call_data, void* channel_data) {
403         static_cast<typename FilterType::Call*>(call_data)
404             ->OnClientToServerHalfClose(static_cast<FilterType*>(channel_data));
405       }});
406 }
407 
408 template <typename FilterType>
409 void AddHalfClose(FilterType*, size_t, const NoInterceptor*,
410                   std::vector<HalfCloseOperator>&) {}
411 
412 template <typename FilterType>
413 void AddServerTrailingMetadata(
414     FilterType* channel_data, size_t call_offset,
415     void (FilterType::Call::*)(ServerMetadata&),
416     std::vector<ServerTrailingMetadataOperator>& to) {
417   to.push_back(ServerTrailingMetadataOperator{
418       channel_data, call_offset,
419       [](void* call_data, void*, ServerMetadataHandle metadata) {
420         static_cast<typename FilterType::Call*>(call_data)
421             ->OnServerTrailingMetadata(*metadata);
422         return metadata;
423       }});
424 }
425 
426 template <typename FilterType>
427 void AddServerTrailingMetadata(
428     FilterType* channel_data, size_t call_offset,
429     void (FilterType::Call::*)(ServerMetadata&, FilterType*),
430     std::vector<ServerTrailingMetadataOperator>& to) {
431   to.push_back(ServerTrailingMetadataOperator{
432       channel_data, call_offset,
433       [](void* call_data, void* channel_data, ServerMetadataHandle metadata) {
434         static_cast<typename FilterType::Call*>(call_data)
435             ->OnServerTrailingMetadata(*metadata,
436                                        static_cast<FilterType*>(channel_data));
437         return metadata;
438       }});
439 }
440 
441 template <typename FilterType>
442 void AddServerTrailingMetadata(
443     FilterType* channel_data, size_t call_offset,
444     absl::Status (FilterType::Call::*)(ServerMetadata&),
445     std::vector<ServerTrailingMetadataOperator>& to) {
446   to.push_back(ServerTrailingMetadataOperator{
447       channel_data, call_offset,
448       [](void* call_data, void*, ServerMetadataHandle metadata) {
449         auto r = static_cast<typename FilterType::Call*>(call_data)
450                      ->OnServerTrailingMetadata(*metadata);
451         if (r.ok()) return metadata;
452         return CancelledServerMetadataFromStatus(r);
453       }});
454 }
455 
456 template <typename FilterType>
457 void AddServerTrailingMetadata(
458     FilterType* channel_data, size_t call_offset,
459     ServerMetadataHandle (FilterType::Call::*)(ServerMetadataHandle),
460     std::vector<ServerTrailingMetadataOperator>& to) {
461   to.push_back(ServerTrailingMetadataOperator{
462       channel_data, call_offset,
463       [](void* call_data, void*, ServerMetadataHandle metadata) {
464         return static_cast<typename FilterType::Call*>(call_data)
465             ->OnServerTrailingMetadata(std::move(metadata));
466       }});
467 }
468 
469 template <typename FilterType>
470 void AddServerTrailingMetadata(FilterType*, size_t, const NoInterceptor*,
471                                std::vector<ServerTrailingMetadataOperator>&) {}
472 
473 // const NoInterceptor $EVENT
474 // These do nothing, and specifically DO NOT add an operation to the layout.
475 // Supported for fallible & infallible operations.
476 template <typename FilterType, typename T, const NoInterceptor* which>
477 struct AddOpImpl<FilterType, T, const NoInterceptor*, which> {
478   static void Add(FilterType*, size_t, Layout<T>&) {}
479 };
480 
481 // void $INTERCEPTOR_NAME($VALUE_TYPE&)
482 template <typename FilterType, typename T,
483           void (FilterType::Call::*impl)(typename T::element_type&)>
484 struct AddOpImpl<FilterType, T,
485                  void (FilterType::Call::*)(typename T::element_type&), impl> {
486   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
487     to.Add(0, 0,
488            Operator<T>{
489                channel_data,
490                call_offset,
491                [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
492                  (static_cast<typename FilterType::Call*>(call_data)->*impl)(
493                      *value);
494                  return ResultOr<T>{std::move(value), nullptr};
495                },
496                nullptr,
497                nullptr,
498            });
499   }
500 };
501 
502 // void $INTERCEPTOR_NAME(const $VALUE_TYPE&)
503 template <typename FilterType, typename T,
504           void (FilterType::Call::*impl)(const typename T::element_type&)>
505 struct AddOpImpl<FilterType, T,
506                  void (FilterType::Call::*)(const typename T::element_type&),
507                  impl> {
508   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
509     to.Add(0, 0,
510            Operator<T>{
511                channel_data,
512                call_offset,
513                [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
514                  (static_cast<typename FilterType::Call*>(call_data)->*impl)(
515                      *value);
516                  return ResultOr<T>{std::move(value), nullptr};
517                },
518                nullptr,
519                nullptr,
520            });
521   }
522 };
523 
524 // void $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*)
525 template <typename FilterType, typename T,
526           void (FilterType::Call::*impl)(typename T::element_type&,
527                                          FilterType*)>
528 struct AddOpImpl<
529     FilterType, T,
530     void (FilterType::Call::*)(typename T::element_type&, FilterType*), impl> {
531   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
532     to.Add(0, 0,
533            Operator<T>{
534                channel_data,
535                call_offset,
536                [](void*, void* call_data, void* channel_data,
537                   T value) -> Poll<ResultOr<T>> {
538                  (static_cast<typename FilterType::Call*>(call_data)->*impl)(
539                      *value, static_cast<FilterType*>(channel_data));
540                  return ResultOr<T>{std::move(value), nullptr};
541                },
542                nullptr,
543                nullptr,
544            });
545   }
546 };
547 
548 // $VALUE_HANDLE $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*)
549 template <typename FilterType, typename T,
550           T (FilterType::Call::*impl)(T, FilterType*)>
551 struct AddOpImpl<FilterType, T, T (FilterType::Call::*)(T, FilterType*), impl> {
552   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
553     to.Add(
554         0, 0,
555         Operator<T>{
556             channel_data,
557             call_offset,
558             [](void*, void* call_data, void* channel_data,
559                T value) -> Poll<ResultOr<T>> {
560               return ResultOr<T>{
561                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
562                       std::move(value), static_cast<FilterType*>(channel_data)),
563                   nullptr};
564             },
565             nullptr,
566             nullptr,
567         });
568   }
569 };
570 
571 // absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&)
572 template <typename FilterType, typename T,
573           absl::Status (FilterType::Call::*impl)(typename T::element_type&)>
574 struct AddOpImpl<FilterType, T,
575                  absl::Status (FilterType::Call::*)(typename T::element_type&),
576                  impl> {
577   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
578     to.Add(
579         0, 0,
580         Operator<T>{
581             channel_data,
582             call_offset,
583             [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
584               auto r =
585                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
586                       *value);
587               if (r.ok()) return ResultOr<T>{std::move(value), nullptr};
588               return ResultOr<T>{
589                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
590             },
591             nullptr,
592             nullptr,
593         });
594   }
595 };
596 
597 // absl::Status $INTERCEPTOR_NAME(const $VALUE_TYPE&)
598 template <typename FilterType, typename T,
599           absl::Status (FilterType::Call::*impl)(
600               const typename T::element_type&)>
601 struct AddOpImpl<
602     FilterType, T,
603     absl::Status (FilterType::Call::*)(const typename T::element_type&), impl> {
604   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
605     to.Add(
606         0, 0,
607         Operator<T>{
608             channel_data,
609             call_offset,
610             [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
611               auto r =
612                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
613                       *value);
614               if (r.ok()) return ResultOr<T>{std::move(value), nullptr};
615               return ResultOr<T>{
616                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
617             },
618             nullptr,
619             nullptr,
620         });
621   }
622 };
623 
624 // absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*)
625 template <typename FilterType, typename T,
626           absl::Status (FilterType::Call::*impl)(typename T::element_type&,
627                                                  FilterType*)>
628 struct AddOpImpl<FilterType, T,
629                  absl::Status (FilterType::Call::*)(typename T::element_type&,
630                                                     FilterType*),
631                  impl> {
632   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
633     to.Add(
634         0, 0,
635         Operator<T>{
636             channel_data,
637             call_offset,
638             [](void*, void* call_data, void* channel_data,
639                T value) -> Poll<ResultOr<T>> {
640               auto r =
641                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
642                       *value, static_cast<FilterType*>(channel_data));
643               if (IsStatusOk(r)) return ResultOr<T>{std::move(value), nullptr};
644               return ResultOr<T>{
645                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
646             },
647             nullptr,
648             nullptr,
649         });
650   }
651 };
652 
653 // absl::Status $INTERCEPTOR_NAME(const $VALUE_TYPE&, FilterType*)
654 template <typename FilterType, typename T,
655           absl::Status (FilterType::Call::*impl)(
656               const typename T::element_type&, FilterType*)>
657 struct AddOpImpl<FilterType, T,
658                  absl::Status (FilterType::Call::*)(
659                      const typename T::element_type&, FilterType*),
660                  impl> {
661   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
662     to.Add(
663         0, 0,
664         Operator<T>{
665             channel_data,
666             call_offset,
667             [](void*, void* call_data, void* channel_data,
668                T value) -> Poll<ResultOr<T>> {
669               auto r =
670                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
671                       *value, static_cast<FilterType*>(channel_data));
672               if (IsStatusOk(r)) return ResultOr<T>{std::move(value), nullptr};
673               return ResultOr<T>{
674                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
675             },
676             nullptr,
677             nullptr,
678         });
679   }
680 };
681 
682 // absl::StatusOr<$VALUE_HANDLE> $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*)
683 template <typename FilterType, typename T,
684           absl::StatusOr<T> (FilterType::Call::*impl)(T, FilterType*)>
685 struct AddOpImpl<FilterType, T,
686                  absl::StatusOr<T> (FilterType::Call::*)(T, FilterType*),
687                  impl> {
688   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
689     to.Add(
690         0, 0,
691         Operator<T>{
692             channel_data,
693             call_offset,
694             [](void*, void* call_data, void* channel_data,
695                T value) -> Poll<ResultOr<T>> {
696               auto r =
697                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
698                       std::move(value), static_cast<FilterType*>(channel_data));
699               if (IsStatusOk(r)) return ResultOr<T>{std::move(*r), nullptr};
700               return ResultOr<T>{
701                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
702             },
703             nullptr,
704             nullptr,
705         });
706   }
707 };
708 
709 // ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&)
710 template <typename FilterType, typename T,
711           ServerMetadataHandle (FilterType::Call::*impl)(
712               typename T::element_type&)>
713 struct AddOpImpl<FilterType, T,
714                  ServerMetadataHandle (FilterType::Call::*)(
715                      typename T::element_type&),
716                  impl> {
717   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
718     to.Add(
719         0, 0,
720         Operator<T>{
721             channel_data,
722             call_offset,
723             [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
724               auto r =
725                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
726                       *value);
727               if (r == nullptr) return ResultOr<T>{std::move(value), nullptr};
728               return ResultOr<T>{
729                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
730             },
731             nullptr,
732             nullptr,
733         });
734   }
735 };
736 
737 // ServerMetadataHandle $INTERCEPTOR_NAME(const $VALUE_TYPE&)
738 template <typename FilterType, typename T,
739           ServerMetadataHandle (FilterType::Call::*impl)(
740               const typename T::element_type&)>
741 struct AddOpImpl<FilterType, T,
742                  ServerMetadataHandle (FilterType::Call::*)(
743                      const typename T::element_type&),
744                  impl> {
745   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
746     to.Add(
747         0, 0,
748         Operator<T>{
749             channel_data,
750             call_offset,
751             [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
752               auto r =
753                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
754                       *value);
755               if (r == nullptr) return ResultOr<T>{std::move(value), nullptr};
756               return ResultOr<T>{
757                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
758             },
759             nullptr,
760             nullptr,
761         });
762   }
763 };
764 
765 // ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*)
766 template <typename FilterType, typename T,
767           ServerMetadataHandle (FilterType::Call::*impl)(
768               typename T::element_type&, FilterType*)>
769 struct AddOpImpl<FilterType, T,
770                  ServerMetadataHandle (FilterType::Call::*)(
771                      typename T::element_type&, FilterType*),
772                  impl> {
773   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
774     to.Add(
775         0, 0,
776         Operator<T>{
777             channel_data,
778             call_offset,
779             [](void*, void* call_data, void* channel_data,
780                T value) -> Poll<ResultOr<T>> {
781               auto r =
782                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
783                       *value, static_cast<FilterType*>(channel_data));
784               if (r == nullptr) return ResultOr<T>{std::move(value), nullptr};
785               return ResultOr<T>{
786                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
787             },
788             nullptr,
789             nullptr,
790         });
791   }
792 };
793 
794 // ServerMetadataHandle $INTERCEPTOR_NAME(const $VALUE_TYPE&, FilterType*)
795 template <typename FilterType, typename T,
796           ServerMetadataHandle (FilterType::Call::*impl)(
797               const typename T::element_type&, FilterType*)>
798 struct AddOpImpl<FilterType, T,
799                  ServerMetadataHandle (FilterType::Call::*)(
800                      const typename T::element_type&, FilterType*),
801                  impl> {
802   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
803     to.Add(
804         0, 0,
805         Operator<T>{
806             channel_data,
807             call_offset,
808             [](void*, void* call_data, void* channel_data,
809                T value) -> Poll<ResultOr<T>> {
810               auto r =
811                   (static_cast<typename FilterType::Call*>(call_data)->*impl)(
812                       *value, static_cast<FilterType*>(channel_data));
813               if (r == nullptr) return ResultOr<T>{std::move(value), nullptr};
814               return ResultOr<T>{
815                   nullptr, StatusCast<ServerMetadataHandle>(std::move(r))};
816             },
817             nullptr,
818             nullptr,
819         });
820   }
821 };
822 
823 // PROMISE_RETURNING(absl::Status) $INTERCEPTOR_NAME($VALUE_TYPE&)
824 template <typename FilterType, typename T, typename R,
825           R (FilterType::Call::*impl)(typename T::element_type&)>
826 struct AddOpImpl<
827     FilterType, T, R (FilterType::Call::*)(typename T::element_type&), impl,
828     absl::enable_if_t<std::is_same<absl::Status, PromiseResult<R>>::value>> {
829   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
830     class Promise {
831      public:
832       Promise(T value, typename FilterType::Call* call_data, FilterType*)
833           : value_(std::move(value)), impl_((call_data->*impl)(*value_)) {}
834 
835       Poll<ResultOr<T>> PollOnce() {
836         auto p = impl_();
837         auto* r = p.value_if_ready();
838         if (r == nullptr) return Pending{};
839         T value = std::move(value_);
840         this->~Promise();
841         if (r->ok()) {
842           return ResultOr<T>{std::move(value), nullptr};
843         }
844         return ResultOr<T>{nullptr, CancelledServerMetadataFromStatus(*r)};
845       }
846 
847      private:
848       GPR_NO_UNIQUE_ADDRESS T value_;
849       GPR_NO_UNIQUE_ADDRESS R impl_;
850     };
851     to.Add(sizeof(Promise), alignof(Promise),
852            Operator<T>{
853                channel_data,
854                call_offset,
855                [](void* promise_data, void* call_data, void* channel_data,
856                   T value) -> Poll<ResultOr<T>> {
857                  auto* promise = new (promise_data)
858                      Promise(std::move(value),
859                              static_cast<typename FilterType::Call*>(call_data),
860                              static_cast<FilterType*>(channel_data));
861                  return promise->PollOnce();
862                },
863                [](void* promise_data) {
864                  return static_cast<Promise*>(promise_data)->PollOnce();
865                },
866                [](void* promise_data) {
867                  static_cast<Promise*>(promise_data)->~Promise();
868                },
869            });
870   }
871 };
872 
873 // PROMISE_RETURNING(absl::Status) $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*)
874 template <typename FilterType, typename T, typename R,
875           R (FilterType::Call::*impl)(typename T::element_type&, FilterType*)>
876 struct AddOpImpl<
877     FilterType, T,
878     R (FilterType::Call::*)(typename T::element_type&, FilterType*), impl,
879     absl::enable_if_t<!std::is_same<R, absl::Status>::value &&
880                       std::is_same<absl::Status, PromiseResult<R>>::value>> {
881   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
882     class Promise {
883      public:
884       Promise(T value, typename FilterType::Call* call_data,
885               FilterType* channel_data)
886           : value_(std::move(value)),
887             impl_((call_data->*impl)(*value_, channel_data)) {}
888 
889       Poll<ResultOr<T>> PollOnce() {
890         auto p = impl_();
891         auto* r = p.value_if_ready();
892         if (r == nullptr) return Pending{};
893         T value = std::move(value_);
894         this->~Promise();
895         if (r->ok()) {
896           return ResultOr<T>{std::move(value), nullptr};
897         }
898         return ResultOr<T>{nullptr, CancelledServerMetadataFromStatus(*r)};
899       }
900 
901      private:
902       GPR_NO_UNIQUE_ADDRESS T value_;
903       GPR_NO_UNIQUE_ADDRESS R impl_;
904     };
905     to.Add(sizeof(Promise), alignof(Promise),
906            Operator<T>{
907                channel_data,
908                call_offset,
909                [](void* promise_data, void* call_data, void* channel_data,
910                   T value) -> Poll<ResultOr<T>> {
911                  auto* promise = new (promise_data)
912                      Promise(std::move(value),
913                              static_cast<typename FilterType::Call*>(call_data),
914                              static_cast<FilterType*>(channel_data));
915                  return promise->PollOnce();
916                },
917                [](void* promise_data) {
918                  return static_cast<Promise*>(promise_data)->PollOnce();
919                },
920                [](void* promise_data) {
921                  static_cast<Promise*>(promise_data)->~Promise();
922                },
923            });
924   }
925 };
926 
927 // PROMISE_RETURNING(absl::StatusOr<$VALUE_HANDLE>)
928 // $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*)
929 template <typename FilterType, typename T, typename R,
930           R (FilterType::Call::*impl)(T, FilterType*)>
931 struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl,
932                  absl::enable_if_t<std::is_same<absl::StatusOr<T>,
933                                                 PromiseResult<R>>::value>> {
934   static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
935     class Promise {
936      public:
937       Promise(T value, typename FilterType::Call* call_data,
938               FilterType* channel_data)
939           : impl_((call_data->*impl)(std::move(value), channel_data)) {}
940 
941       Poll<ResultOr<T>> PollOnce() {
942         auto p = impl_();
943         auto* r = p.value_if_ready();
944         if (r == nullptr) return Pending{};
945         this->~Promise();
946         if (r->ok()) return ResultOr<T>{std::move(**r), nullptr};
947         return ResultOr<T>{nullptr,
948                            CancelledServerMetadataFromStatus(r->status())};
949       }
950 
951      private:
952       GPR_NO_UNIQUE_ADDRESS R impl_;
953     };
954     to.Add(sizeof(Promise), alignof(Promise),
955            Operator<T>{
956                channel_data,
957                call_offset,
958                [](void* promise_data, void* call_data, void* channel_data,
959                   T value) -> Poll<ResultOr<T>> {
960                  auto* promise = new (promise_data)
961                      Promise(std::move(value),
962                              static_cast<typename FilterType::Call*>(call_data),
963                              static_cast<FilterType*>(channel_data));
964                  return promise->PollOnce();
965                },
966                [](void* promise_data) {
967                  return static_cast<Promise*>(promise_data)->PollOnce();
968                },
969                [](void* promise_data) {
970                  static_cast<Promise*>(promise_data)->~Promise();
971                },
972            });
973   }
974 };
975 
976 struct ChannelDataDestructor {
977   void (*destroy)(void* channel_data);
978   void* channel_data;
979 };
980 
981 // StackData contains the main datastructures built up by this module.
982 // It's a complete representation of all the code that needs to be invoked
983 // to execute a call for a given set of filters.
984 // This structure is held at the channel layer and is shared between many
985 // in-flight calls.
986 struct StackData {
987   // Overall size and alignment of the call data for this stack.
988   size_t call_data_alignment = 1;
989   size_t call_data_size = 0;
990   // A complete list of filters for this call, so that we can construct the
991   // call data for each filter.
992   std::vector<FilterConstructor> filter_constructor;
993   std::vector<FilterDestructor> filter_destructor;
994   // For each kind of operation, a layout of the operations for this call.
995   // (there's some duplicate data here, and that's ok: we want to avoid
996   // pointer chasing as much as possible when executing a call)
997   Layout<ClientMetadataHandle> client_initial_metadata;
998   Layout<ServerMetadataHandle> server_initial_metadata;
999   Layout<MessageHandle> client_to_server_messages;
1000   std::vector<HalfCloseOperator> client_to_server_half_close;
1001   Layout<MessageHandle> server_to_client_messages;
1002   std::vector<ServerTrailingMetadataOperator> server_trailing_metadata;
1003   // A list of finalizers for this call.
1004   // We use a bespoke data structure here because finalizers can never be
1005   // asynchronous.
1006   std::vector<Finalizer> finalizers;
1007   // A list of functions to call when this stack data is destroyed
1008   // (to capture ownership of channel data)
1009   std::vector<ChannelDataDestructor> channel_data_destructors;
1010 
1011   bool empty() const {
1012     return filter_constructor.empty() && filter_destructor.empty() &&
1013            client_initial_metadata.ops.empty() &&
1014            server_initial_metadata.ops.empty() &&
1015            client_to_server_messages.ops.empty() &&
1016            client_to_server_half_close.empty() &&
1017            server_to_client_messages.ops.empty() &&
1018            server_trailing_metadata.empty() && finalizers.empty() &&
1019            channel_data_destructors.empty();
1020   }
1021 
1022   // Add one filter to the list of filters, and update alignment.
1023   // Returns the offset of the call data for this filter.
1024   // Specifically does not update any of the layouts or finalizers.
1025   // Callers are expected to do that themselves.
1026   // This separation enables separation of *testing* of filters, and since
1027   // this is a detail type it's felt that a slightly harder to hold API that
1028   // we have exactly one caller for is warranted for a more thorough testing
1029   // story.
1030   template <typename FilterType>
1031   absl::enable_if_t<!std::is_empty<typename FilterType::Call>::value, size_t>
1032   AddFilterConstructor(FilterType* channel_data) {
1033     const size_t alignment = alignof(typename FilterType::Call);
1034     call_data_alignment = std::max(call_data_alignment, alignment);
1035     if (call_data_size % alignment != 0) {
1036       call_data_size += alignment - call_data_size % alignment;
1037     }
1038     const size_t call_offset = call_data_size;
1039     call_data_size += sizeof(typename FilterType::Call);
1040     filter_constructor.push_back(FilterConstructor{
1041         channel_data,
1042         call_offset,
1043         [](void* call_data, void* channel_data) {
1044           CallConstructor<FilterType>::Construct(
1045               call_data, static_cast<FilterType*>(channel_data));
1046         },
1047     });
1048     return call_offset;
1049   }
1050 
1051   template <typename FilterType>
1052   absl::enable_if_t<
1053       std::is_empty<typename FilterType::Call>::value &&
1054           !std::is_trivially_constructible<typename FilterType::Call>::value,
1055       size_t>
1056   AddFilterConstructor(FilterType* channel_data) {
1057     const size_t alignment = alignof(typename FilterType::Call);
1058     call_data_alignment = std::max(call_data_alignment, alignment);
1059     filter_constructor.push_back(FilterConstructor{
1060         channel_data,
1061         0,
1062         [](void* call_data, void* channel_data) {
1063           CallConstructor<FilterType>::Construct(
1064               call_data, static_cast<FilterType*>(channel_data));
1065         },
1066     });
1067     return 0;
1068   }
1069 
1070   template <typename FilterType>
1071   absl::enable_if_t<
1072       std::is_empty<typename FilterType::Call>::value &&
1073           std::is_trivially_constructible<typename FilterType::Call>::value,
1074       size_t>
1075   AddFilterConstructor(FilterType*) {
1076     const size_t alignment = alignof(typename FilterType::Call);
1077     call_data_alignment = std::max(call_data_alignment, alignment);
1078     return 0;
1079   }
1080 
1081   template <typename FilterType>
1082   absl::enable_if_t<
1083       !std::is_trivially_destructible<typename FilterType::Call>::value>
1084   AddFilterDestructor(size_t call_offset) {
1085     filter_destructor.push_back(FilterDestructor{
1086         call_offset,
1087         [](void* call_data) {
1088           Destruct(static_cast<typename FilterType::Call*>(call_data));
1089         },
1090     });
1091   }
1092 
1093   template <typename FilterType>
1094   absl::enable_if_t<
1095       std::is_trivially_destructible<typename FilterType::Call>::value>
1096   AddFilterDestructor(size_t) {}
1097 
1098   template <typename FilterType>
1099   size_t AddFilter(FilterType* filter) {
1100     const size_t call_offset = AddFilterConstructor(filter);
1101     AddFilterDestructor<FilterType>(call_offset);
1102     return call_offset;
1103   }
1104 
1105   // Per operation adders - one for each interception point.
1106   // Delegate to AddOp() above.
1107 
1108   template <typename FilterType>
1109   void AddClientInitialMetadataOp(FilterType* channel_data,
1110                                   size_t call_offset) {
1111     AddOp<decltype(&FilterType::Call::OnClientInitialMetadata),
1112           &FilterType::Call::OnClientInitialMetadata>(channel_data, call_offset,
1113                                                       client_initial_metadata);
1114   }
1115 
1116   template <typename FilterType>
1117   void AddServerInitialMetadataOp(FilterType* channel_data,
1118                                   size_t call_offset) {
1119     AddOp<decltype(&FilterType::Call::OnServerInitialMetadata),
1120           &FilterType::Call::OnServerInitialMetadata>(channel_data, call_offset,
1121                                                       server_initial_metadata);
1122   }
1123 
1124   template <typename FilterType>
1125   void AddClientToServerMessageOp(FilterType* channel_data,
1126                                   size_t call_offset) {
1127     AddOp<decltype(&FilterType::Call::OnClientToServerMessage),
1128           &FilterType::Call::OnClientToServerMessage>(
1129         channel_data, call_offset, client_to_server_messages);
1130   }
1131 
1132   template <typename FilterType>
1133   void AddClientToServerHalfClose(FilterType* channel_data,
1134                                   size_t call_offset) {
1135     AddHalfClose(channel_data, call_offset,
1136                  &FilterType::Call::OnClientToServerHalfClose,
1137                  client_to_server_half_close);
1138   }
1139 
1140   template <typename FilterType>
1141   void AddServerToClientMessageOp(FilterType* channel_data,
1142                                   size_t call_offset) {
1143     AddOp<decltype(&FilterType::Call::OnServerToClientMessage),
1144           &FilterType::Call::OnServerToClientMessage>(
1145         channel_data, call_offset, server_to_client_messages);
1146   }
1147 
1148   template <typename FilterType>
1149   void AddServerTrailingMetadataOp(FilterType* channel_data,
1150                                    size_t call_offset) {
1151     AddServerTrailingMetadata(channel_data, call_offset,
1152                               &FilterType::Call::OnServerTrailingMetadata,
1153                               server_trailing_metadata);
1154   }
1155 
1156   // Finalizer interception adders
1157 
1158   template <typename FilterType>
1159   void AddFinalizer(FilterType*, size_t, const NoInterceptor* p) {
1160     DCHECK(p == &FilterType::Call::OnFinalize);
1161   }
1162 
1163   template <typename FilterType>
1164   void AddFinalizer(FilterType* channel_data, size_t call_offset,
1165                     void (FilterType::Call::*p)(const grpc_call_final_info*)) {
1166     DCHECK(p == &FilterType::Call::OnFinalize);
1167     finalizers.push_back(Finalizer{
1168         channel_data,
1169         call_offset,
1170         [](void* call_data, void*, const grpc_call_final_info* final_info) {
1171           static_cast<typename FilterType::Call*>(call_data)->OnFinalize(
1172               final_info);
1173         },
1174     });
1175   }
1176 
1177   template <typename FilterType>
1178   void AddFinalizer(FilterType* channel_data, size_t call_offset,
1179                     void (FilterType::Call::*p)(const grpc_call_final_info*,
1180                                                 FilterType*)) {
1181     DCHECK(p == &FilterType::Call::OnFinalize);
1182     finalizers.push_back(Finalizer{
1183         channel_data,
1184         call_offset,
1185         [](void* call_data, void* channel_data,
1186            const grpc_call_final_info* final_info) {
1187           static_cast<typename FilterType::Call*>(call_data)->OnFinalize(
1188               final_info, static_cast<FilterType*>(channel_data));
1189         },
1190     });
1191   }
1192 };
1193 
1194 // OperationExecutor is a helper class to execute a sequence of operations
1195 // from a layout on one value.
1196 // We instantiate one of these during the *Pull* promise for each operation
1197 // and wait for it to resolve.
1198 // At this layer the filters look like a list of transformations on the
1199 // value pushed.
1200 // An early-failing filter will cause subsequent filters to not execute.
1201 template <typename T>
1202 class OperationExecutor {
1203  public:
1204   OperationExecutor() = default;
1205   ~OperationExecutor();
1206   OperationExecutor(const OperationExecutor&) = delete;
1207   OperationExecutor& operator=(const OperationExecutor&) = delete;
1208   OperationExecutor(OperationExecutor&& other) noexcept
1209       : ops_(other.ops_), end_ops_(other.end_ops_) {
1210     // Movable iff we're not running.
1211     DCHECK_EQ(other.promise_data_, nullptr);
1212   }
1213   OperationExecutor& operator=(OperationExecutor&& other) noexcept {
1214     DCHECK_EQ(other.promise_data_, nullptr);
1215     DCHECK_EQ(promise_data_, nullptr);
1216     ops_ = other.ops_;
1217     end_ops_ = other.end_ops_;
1218     return *this;
1219   }
1220   // IsRunning() is true if we're currently executing a sequence of operations.
1221   bool IsRunning() const { return promise_data_ != nullptr; }
1222   // Start executing a layout. May allocate space to store the relevant promise.
1223   // Returns the result of the first poll.
1224   // If the promise finishes, also destroy the promise data.
1225   Poll<ResultOr<T>> Start(const Layout<T>* layout, T input, void* call_data);
1226   // Continue executing a layout. Returns the result of the next poll.
1227   // If the promise finishes, also destroy the promise data.
1228   Poll<ResultOr<T>> Step(void* call_data);
1229 
1230  private:
1231   // Start polling on the current step of the layout.
1232   // `input` is the current value (either the input to the first step, or the
1233   // so far transformed value)
1234   // `call_data` is the call data for the filter stack.
1235   // If this op finishes immediately then we iterative move to the next step.
1236   // If we reach the end up the ops, we return the overall poll result,
1237   // otherwise we return Pending.
1238   Poll<ResultOr<T>> InitStep(T input, void* call_data);
1239   // Continue polling on the current step of the layout.
1240   // Called on the next poll after InitStep returns pending.
1241   // If the promise is still pending, returns this.
1242   // If the promise completes we call into InitStep to continue execution
1243   // through the filters.
1244   Poll<ResultOr<T>> ContinueStep(void* call_data);
1245 
1246   void* promise_data_ = nullptr;
1247   const Operator<T>* ops_;
1248   const Operator<T>* end_ops_;
1249 };
1250 
1251 template <typename T>
1252 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline OperationExecutor<
1253     T>::~OperationExecutor() {
1254   if (promise_data_ != nullptr) {
1255     ops_->early_destroy(promise_data_);
1256     gpr_free_aligned(promise_data_);
1257   }
1258 }
1259 
1260 template <typename T>
1261 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ResultOr<T>>
1262 OperationExecutor<T>::Start(const Layout<T>* layout, T input, void* call_data) {
1263   ops_ = layout->ops.data();
1264   end_ops_ = ops_ + layout->ops.size();
1265   if (layout->promise_size == 0) {
1266     // No call state ==> instantaneously ready
1267     auto r = InitStep(std::move(input), call_data);
1268     CHECK(r.ready());
1269     return r;
1270   }
1271   promise_data_ =
1272       gpr_malloc_aligned(layout->promise_size, layout->promise_alignment);
1273   return InitStep(std::move(input), call_data);
1274 }
1275 
1276 template <typename T>
1277 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ResultOr<T>>
1278 OperationExecutor<T>::InitStep(T input, void* call_data) {
1279   CHECK(input != nullptr);
1280   while (true) {
1281     if (ops_ == end_ops_) {
1282       return ResultOr<T>{std::move(input), nullptr};
1283     }
1284     auto p =
1285         ops_->promise_init(promise_data_, Offset(call_data, ops_->call_offset),
1286                            ops_->channel_data, std::move(input));
1287     if (auto* r = p.value_if_ready()) {
1288       if (r->ok == nullptr) return std::move(*r);
1289       input = std::move(r->ok);
1290       ++ops_;
1291       continue;
1292     }
1293     return Pending{};
1294   }
1295 }
1296 
1297 template <typename T>
1298 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ResultOr<T>>
1299 OperationExecutor<T>::Step(void* call_data) {
1300   DCHECK_NE(promise_data_, nullptr);
1301   auto p = ContinueStep(call_data);
1302   if (p.ready()) {
1303     gpr_free_aligned(promise_data_);
1304     promise_data_ = nullptr;
1305   }
1306   return p;
1307 }
1308 
1309 template <typename T>
1310 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ResultOr<T>>
1311 OperationExecutor<T>::ContinueStep(void* call_data) {
1312   auto p = ops_->poll(promise_data_);
1313   if (auto* r = p.value_if_ready()) {
1314     if (r->ok == nullptr) return std::move(*r);
1315     ++ops_;
1316     return InitStep(std::move(r->ok), call_data);
1317   }
1318   return Pending{};
1319 }
1320 
1321 template <typename Fn>
1322 class ServerTrailingMetadataInterceptor {
1323  public:
1324   class Call {
1325    public:
1326     static const NoInterceptor OnClientInitialMetadata;
1327     static const NoInterceptor OnServerInitialMetadata;
1328     static const NoInterceptor OnClientToServerMessage;
1329     static const NoInterceptor OnClientToServerHalfClose;
1330     static const NoInterceptor OnServerToClientMessage;
1331     static const NoInterceptor OnFinalize;
1332     void OnServerTrailingMetadata(ServerMetadata& md,
1333                                   ServerTrailingMetadataInterceptor* filter) {
1334       filter->fn_(md);
1335     }
1336   };
1337 
1338   explicit ServerTrailingMetadataInterceptor(Fn fn) : fn_(std::move(fn)) {}
1339 
1340  private:
1341   GPR_NO_UNIQUE_ADDRESS Fn fn_;
1342 };
1343 template <typename Fn>
1344 const NoInterceptor
1345     ServerTrailingMetadataInterceptor<Fn>::Call::OnClientInitialMetadata;
1346 template <typename Fn>
1347 const NoInterceptor
1348     ServerTrailingMetadataInterceptor<Fn>::Call::OnServerInitialMetadata;
1349 template <typename Fn>
1350 const NoInterceptor
1351     ServerTrailingMetadataInterceptor<Fn>::Call::OnClientToServerMessage;
1352 template <typename Fn>
1353 const NoInterceptor
1354     ServerTrailingMetadataInterceptor<Fn>::Call::OnClientToServerHalfClose;
1355 template <typename Fn>
1356 const NoInterceptor
1357     ServerTrailingMetadataInterceptor<Fn>::Call::OnServerToClientMessage;
1358 template <typename Fn>
1359 const NoInterceptor ServerTrailingMetadataInterceptor<Fn>::Call::OnFinalize;
1360 
1361 template <typename Fn>
1362 class ClientInitialMetadataInterceptor {
1363  public:
1364   class Call {
1365    public:
1366     auto OnClientInitialMetadata(ClientMetadata& md,
1367                                  ClientInitialMetadataInterceptor* filter) {
1368       return filter->fn_(md);
1369     }
1370     static const NoInterceptor OnServerInitialMetadata;
1371     static const NoInterceptor OnClientToServerMessage;
1372     static const NoInterceptor OnClientToServerHalfClose;
1373     static const NoInterceptor OnServerToClientMessage;
1374     static const NoInterceptor OnServerTrailingMetadata;
1375     static const NoInterceptor OnFinalize;
1376   };
1377 
1378   explicit ClientInitialMetadataInterceptor(Fn fn) : fn_(std::move(fn)) {}
1379 
1380  private:
1381   GPR_NO_UNIQUE_ADDRESS Fn fn_;
1382 };
1383 template <typename Fn>
1384 const NoInterceptor
1385     ClientInitialMetadataInterceptor<Fn>::Call::OnServerInitialMetadata;
1386 template <typename Fn>
1387 const NoInterceptor
1388     ClientInitialMetadataInterceptor<Fn>::Call::OnClientToServerMessage;
1389 template <typename Fn>
1390 const NoInterceptor
1391     ClientInitialMetadataInterceptor<Fn>::Call::OnClientToServerHalfClose;
1392 template <typename Fn>
1393 const NoInterceptor
1394     ClientInitialMetadataInterceptor<Fn>::Call::OnServerToClientMessage;
1395 template <typename Fn>
1396 const NoInterceptor
1397     ClientInitialMetadataInterceptor<Fn>::Call::OnServerTrailingMetadata;
1398 template <typename Fn>
1399 const NoInterceptor ClientInitialMetadataInterceptor<Fn>::Call::OnFinalize;
1400 
1401 }  // namespace filters_detail
1402 
1403 namespace for_each_detail {
1404 template <void (CallState::*on_progress)()>
1405 struct NextValueTraits<filters_detail::NextMessage<on_progress>> {
1406   using NextMsg = filters_detail::NextMessage<on_progress>;
1407   using Value = MessageHandle;
1408 
1409   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static NextValueType Type(
1410       const NextMsg& t) {
1411     if (!t.ok()) return NextValueType::kError;
1412     if (t.has_value()) return NextValueType::kValue;
1413     return NextValueType::kEndOfStream;
1414   }
1415 
1416   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static MessageHandle TakeValue(
1417       NextMsg& t) {
1418     return t.TakeValue();
1419   }
1420 };
1421 }  // namespace for_each_detail
1422 
1423 template <void (CallState::*on_progress)()>
1424 struct FailureStatusCastImpl<filters_detail::NextMessage<on_progress>,
1425                              StatusFlag> {
1426   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static filters_detail::NextMessage<
1427       on_progress>
1428   Cast(StatusFlag flag) {
1429     DCHECK_EQ(flag, Failure{});
1430     return filters_detail::NextMessage<on_progress>(Failure{});
1431   }
1432 };
1433 
1434 namespace promise_detail {
1435 template <void (CallState::*on_progress)()>
1436 struct TrySeqTraitsWithSfinae<filters_detail::NextMessage<on_progress>> {
1437   using UnwrappedType = MessageHandle;
1438   using WrappedType = filters_detail::NextMessage<on_progress>;
1439   template <typename Next>
1440   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static auto CallFactory(
1441       Next* next, WrappedType&& value) {
1442     return next->Make(value.TakeValue());
1443   }
1444   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static bool IsOk(
1445       const WrappedType& value) {
1446     return value.ok();
1447   }
1448   static const char* ErrorString(const WrappedType& status) {
1449     DCHECK(!status.ok());
1450     return "failed";
1451   }
1452   template <typename R>
1453   static R ReturnValue(WrappedType&& status) {
1454     DCHECK(!status.ok());
1455     return WrappedType(Failure{});
1456   }
1457   template <typename F, typename Elem>
1458   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static auto CallSeqFactory(
1459       F& f, Elem&& elem, WrappedType value)
1460       -> decltype(f(std::forward<Elem>(elem), std::declval<MessageHandle>())) {
1461     return f(std::forward<Elem>(elem), value.TakeValue());
1462   }
1463   template <typename Result, typename RunNext>
1464   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Poll<Result>
1465   CheckResultAndRunNext(WrappedType prior, RunNext run_next) {
1466     if (!prior.ok()) return WrappedType(prior.status());
1467     return run_next(std::move(prior));
1468   }
1469 };
1470 }  // namespace promise_detail
1471 
1472 using ServerToClientNextMessage =
1473     filters_detail::NextMessage<&CallState::FinishPullServerToClientMessage>;
1474 using ClientToServerNextMessage =
1475     filters_detail::NextMessage<&CallState::FinishPullClientToServerMessage>;
1476 
1477 // Execution environment for a stack of filters.
1478 // This is a per-call object.
1479 class CallFilters {
1480  public:
1481   class StackBuilder;
1482   class StackTestSpouse;
1483 
1484   // A stack is an opaque, immutable type that contains the data necessary to
1485   // execute a call through a given set of filters.
1486   // It's reference counted so that it can be shared between many calls.
1487   // It contains pointers to the individual filters, yet it does not own those
1488   // pointers: it's expected that some other object will track that ownership.
1489   class Stack : public RefCounted<Stack> {
1490    public:
1491     ~Stack() override;
1492 
1493    private:
1494     friend class CallFilters;
1495     friend class StackBuilder;
1496     friend class StackTestSpouse;
1497     explicit Stack(filters_detail::StackData data) : data_(std::move(data)) {}
1498     const filters_detail::StackData data_;
1499   };
1500 
1501   // Build stacks... repeatedly call Add with each filter that contributes to
1502   // the stack, then call Build() to generate a ref counted Stack object.
1503   class StackBuilder {
1504    public:
1505     ~StackBuilder();
1506 
1507     template <typename FilterType>
1508     void Add(FilterType* filter) {
1509       const size_t call_offset = data_.AddFilter<FilterType>(filter);
1510       data_.AddClientInitialMetadataOp(filter, call_offset);
1511       data_.AddServerInitialMetadataOp(filter, call_offset);
1512       data_.AddClientToServerMessageOp(filter, call_offset);
1513       data_.AddClientToServerHalfClose(filter, call_offset);
1514       data_.AddServerToClientMessageOp(filter, call_offset);
1515       data_.AddServerTrailingMetadataOp(filter, call_offset);
1516       data_.AddFinalizer(filter, call_offset, &FilterType::Call::OnFinalize);
1517     }
1518 
1519     void AddOwnedObject(void (*destroy)(void* p), void* p) {
1520       data_.channel_data_destructors.push_back({destroy, p});
1521     }
1522 
1523     template <typename T>
1524     void AddOwnedObject(RefCountedPtr<T> p) {
1525       AddOwnedObject([](void* p) { static_cast<T*>(p)->Unref(); }, p.release());
1526     }
1527 
1528     template <typename T>
1529     void AddOwnedObject(std::unique_ptr<T> p) {
1530       AddOwnedObject([](void* p) { delete static_cast<T*>(p); }, p.release());
1531     }
1532 
1533     template <typename Fn>
1534     void AddOnClientInitialMetadata(Fn fn) {
1535       auto filter = std::make_unique<
1536           filters_detail::ClientInitialMetadataInterceptor<Fn>>(std::move(fn));
1537       Add(filter.get());
1538       AddOwnedObject(std::move(filter));
1539     }
1540 
1541     template <typename Fn>
1542     void AddOnServerTrailingMetadata(Fn fn) {
1543       auto filter = std::make_unique<
1544           filters_detail::ServerTrailingMetadataInterceptor<Fn>>(std::move(fn));
1545       Add(filter.get());
1546       AddOwnedObject(std::move(filter));
1547     }
1548 
1549     RefCountedPtr<Stack> Build();
1550 
1551    private:
1552     filters_detail::StackData data_;
1553   };
1554 
1555   explicit CallFilters(ClientMetadataHandle client_initial_metadata)
1556       : call_data_(nullptr),
1557         push_client_initial_metadata_(std::move(client_initial_metadata)) {}
1558   ~CallFilters() {
1559     if (call_data_ != nullptr && call_data_ != &g_empty_call_data_) {
1560       for (const auto& stack : stacks_) {
1561         for (const auto& destructor : stack.stack->data_.filter_destructor) {
1562           destructor.call_destroy(filters_detail::Offset(
1563               call_data_, stack.call_data_offset + destructor.call_offset));
1564         }
1565       }
1566       gpr_free_aligned(call_data_);
1567     }
1568   };
1569 
1570   CallFilters(const CallFilters&) = delete;
1571   CallFilters& operator=(const CallFilters&) = delete;
1572   CallFilters(CallFilters&&) = delete;
1573   CallFilters& operator=(CallFilters&&) = delete;
1574 
1575   void AddStack(RefCountedPtr<Stack> stack) {
1576     if (stack->data_.empty()) return;
1577     stacks_.emplace_back(std::move(stack));
1578   }
1579   void Start();
1580 
1581   // Access client initial metadata before it's processed
1582   ClientMetadata* unprocessed_client_initial_metadata() {
1583     return push_client_initial_metadata_.get();
1584   }
1585 
1586  private:
1587   template <typename Output, typename Input,
1588             Input(CallFilters::* input_location),
1589             filters_detail::Layout<Input>(filters_detail::StackData::* layout),
1590             void (CallState::*on_done)(), typename StackIterator>
1591   class MetadataExecutor {
1592    public:
1593     MetadataExecutor(CallFilters* filters, StackIterator stack_begin,
1594                      StackIterator stack_end)
1595         : stack_current_(stack_begin),
1596           stack_end_(stack_end),
1597           filters_(filters) {
1598       DCHECK_NE((filters_->*input_location).get(), nullptr);
1599     }
1600 
1601     Poll<ValueOrFailure<Output>> operator()() {
1602       if ((filters_->*input_location) != nullptr) {
1603         if (stack_current_ == stack_end_) {
1604           DCHECK_NE((filters_->*input_location).get(), nullptr);
1605           (filters_->call_state_.*on_done)();
1606           return Output(std::move(filters_->*input_location));
1607         }
1608         return FinishStep(executor_.Start(
1609             &(stack_current_->stack->data_.*layout),
1610             std::move(filters_->*input_location), filters_->call_data_));
1611       } else {
1612         return FinishStep(executor_.Step(filters_->call_data_));
1613       }
1614     }
1615 
1616    private:
1617     Poll<ValueOrFailure<Output>> FinishStep(
1618         Poll<filters_detail::ResultOr<Input>> p) {
1619       auto* r = p.value_if_ready();
1620       if (r == nullptr) return Pending{};
1621       if (r->ok != nullptr) {
1622         ++stack_current_;
1623         if (stack_current_ == stack_end_) {
1624           (filters_->call_state_.*on_done)();
1625           return ValueOrFailure<Output>{std::move(r->ok)};
1626         }
1627         return FinishStep(
1628             executor_.Start(&(stack_current_->stack->data_.*layout),
1629                             std::move(r->ok), filters_->call_data_));
1630       }
1631       (filters_->call_state_.*on_done)();
1632       filters_->PushServerTrailingMetadata(std::move(r->error));
1633       return Failure{};
1634     }
1635 
1636     StackIterator stack_current_;
1637     StackIterator stack_end_;
1638     CallFilters* filters_;
1639     filters_detail::OperationExecutor<Input> executor_;
1640   };
1641 
1642   template <MessageHandle(CallFilters::* input_location),
1643             filters_detail::Layout<MessageHandle>(
1644                 filters_detail::StackData::* layout),
1645             void (CallState::*on_done)(), typename StackIterator>
1646   class MessageExecutor {
1647    public:
1648     using NextMsg = filters_detail::NextMessage<on_done>;
1649 
1650     MessageExecutor(CallFilters* filters, StackIterator stack_begin,
1651                     StackIterator stack_end)
1652         : stack_current_(stack_begin),
1653           stack_end_(stack_end),
1654           filters_(filters) {
1655       DCHECK_NE((filters_->*input_location).get(), nullptr);
1656     }
1657 
1658     Poll<NextMsg> operator()() {
1659       if ((filters_->*input_location) != nullptr) {
1660         if (stack_current_ == stack_end_) {
1661           DCHECK_NE((filters_->*input_location).get(), nullptr);
1662           return NextMsg(std::move(filters_->*input_location),
1663                          &filters_->call_state_);
1664         }
1665         return FinishStep(executor_.Start(
1666             &(stack_current_->stack->data_.*layout),
1667             std::move(filters_->*input_location), filters_->call_data_));
1668       } else {
1669         return FinishStep(executor_.Step(filters_->call_data_));
1670       }
1671     }
1672 
1673    private:
1674     Poll<NextMsg> FinishStep(Poll<filters_detail::ResultOr<MessageHandle>> p) {
1675       auto* r = p.value_if_ready();
1676       if (r == nullptr) return Pending{};
1677       if (r->ok != nullptr) {
1678         ++stack_current_;
1679         if (stack_current_ == stack_end_) {
1680           return NextMsg{std::move(r->ok), &filters_->call_state_};
1681         }
1682         return FinishStep(
1683             executor_.Start(&(stack_current_->stack->data_.*layout),
1684                             std::move(r->ok), filters_->call_data_));
1685       }
1686       (filters_->call_state_.*on_done)();
1687       filters_->PushServerTrailingMetadata(std::move(r->error));
1688       return Failure{};
1689     }
1690 
1691     StackIterator stack_current_;
1692     StackIterator stack_end_;
1693     CallFilters* filters_;
1694     filters_detail::OperationExecutor<MessageHandle> executor_;
1695   };
1696 
1697  public:
1698   // Client: Fetch client initial metadata
1699   // Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle>
1700   GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() {
1701     call_state_.BeginPullClientInitialMetadata();
1702     return MetadataExecutor<ClientMetadataHandle, ClientMetadataHandle,
1703                             &CallFilters::push_client_initial_metadata_,
1704                             &filters_detail::StackData::client_initial_metadata,
1705                             &CallState::FinishPullClientInitialMetadata,
1706                             StacksVector::const_iterator>(
1707         this, stacks_.cbegin(), stacks_.cend());
1708   }
1709   // Server: Push server initial metadata
1710   // Returns a promise that resolves to a StatusFlag indicating success
1711   StatusFlag PushServerInitialMetadata(ServerMetadataHandle md) {
1712     push_server_initial_metadata_ = std::move(md);
1713     return call_state_.PushServerInitialMetadata();
1714   }
1715   // Client: Fetch server initial metadata
1716   // Returns a promise that resolves to ValueOrFailure<ServerMetadataHandle>
1717   GRPC_MUST_USE_RESULT auto PullServerInitialMetadata() {
1718     return Seq(
1719         [this]() {
1720           return call_state_.PollPullServerInitialMetadataAvailable();
1721         },
1722         [this](bool has_server_initial_metadata) {
1723           return If(
1724               has_server_initial_metadata,
1725               [this]() {
1726                 return Map(
1727                     MetadataExecutor<
1728                         absl::optional<ServerMetadataHandle>,
1729                         ServerMetadataHandle,
1730                         &CallFilters::push_server_initial_metadata_,
1731                         &filters_detail::StackData::server_initial_metadata,
1732                         &CallState::FinishPullServerInitialMetadata,
1733                         StacksVector::const_reverse_iterator>(
1734                         this, stacks_.crbegin(), stacks_.crend()),
1735                     [](ValueOrFailure<absl::optional<ServerMetadataHandle>> r) {
1736                       if (r.ok()) return std::move(*r);
1737                       return absl::optional<ServerMetadataHandle>{};
1738                     });
1739               },
1740               []() {
1741                 return Immediate(absl::optional<ServerMetadataHandle>{});
1742               });
1743         });
1744   }
1745   // Client: Push client to server message
1746   // Returns a promise that resolves to a StatusFlag indicating success
1747   GRPC_MUST_USE_RESULT auto PushClientToServerMessage(MessageHandle message) {
1748     call_state_.BeginPushClientToServerMessage();
1749     DCHECK_NE(message.get(), nullptr);
1750     DCHECK_EQ(push_client_to_server_message_.get(), nullptr);
1751     push_client_to_server_message_ = std::move(message);
1752     return [this]() { return call_state_.PollPushClientToServerMessage(); };
1753   }
1754   // Client: Indicate that no more messages will be sent
1755   void FinishClientToServerSends() { call_state_.ClientToServerHalfClose(); }
1756   // Server: Fetch client to server message
1757   // Returns a promise that resolves to ClientToServerNextMessage
1758   GRPC_MUST_USE_RESULT auto PullClientToServerMessage() {
1759     return TrySeq(
1760         [this]() {
1761           return call_state_.PollPullClientToServerMessageAvailable();
1762         },
1763         [this](bool message_available) {
1764           return If(
1765               message_available,
1766               [this]() {
1767                 return MessageExecutor<
1768                     &CallFilters::push_client_to_server_message_,
1769                     &filters_detail::StackData::client_to_server_messages,
1770                     &CallState::FinishPullClientToServerMessage,
1771                     StacksVector::const_iterator>(this, stacks_.cbegin(),
1772                                                   stacks_.cend());
1773               },
1774               []() -> ClientToServerNextMessage {
1775                 return ClientToServerNextMessage();
1776               });
1777         });
1778   }
1779   // Server: Push server to client message
1780   // Returns a promise that resolves to a StatusFlag indicating success
1781   GRPC_MUST_USE_RESULT auto PushServerToClientMessage(MessageHandle message) {
1782     call_state_.BeginPushServerToClientMessage();
1783     push_server_to_client_message_ = std::move(message);
1784     return [this]() { return call_state_.PollPushServerToClientMessage(); };
1785   }
1786   // Server: Fetch server to client message
1787   // Returns a promise that resolves to ServerToClientNextMessage
1788   GRPC_MUST_USE_RESULT auto PullServerToClientMessage() {
1789     return TrySeq(
1790         [this]() {
1791           return call_state_.PollPullServerToClientMessageAvailable();
1792         },
1793         [this](bool message_available) {
1794           return If(
1795               message_available,
1796               [this]() {
1797                 return MessageExecutor<
1798                     &CallFilters::push_server_to_client_message_,
1799                     &filters_detail::StackData::server_to_client_messages,
1800                     &CallState::FinishPullServerToClientMessage,
1801                     StacksVector::const_reverse_iterator>(
1802                     this, stacks_.crbegin(), stacks_.crend());
1803               },
1804               []() -> ServerToClientNextMessage {
1805                 return ServerToClientNextMessage();
1806               });
1807         });
1808   }
1809   // Server: Indicate end of response
1810   // Closes the request entirely - no messages can be sent/received
1811   // If no server initial metadata has been sent, implies
1812   // NoServerInitialMetadata() called.
1813   void PushServerTrailingMetadata(ServerMetadataHandle md);
1814   // Optimized path to push cancellation onto the call
1815   void Cancel();
1816   // Client: Fetch server trailing metadata
1817   // Returns a promise that resolves to ServerMetadataHandle
1818   GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata() {
1819     return Map(
1820         [this]() { return call_state_.PollServerTrailingMetadataAvailable(); },
1821         [this](Empty) {
1822           auto value = std::move(push_server_trailing_metadata_);
1823           if (call_data_ != nullptr) {
1824             for (auto it = stacks_.crbegin(); it != stacks_.crend(); ++it) {
1825               value = filters_detail::RunServerTrailingMetadata(
1826                   it->stack->data_.server_trailing_metadata,
1827                   filters_detail::Offset(call_data_, it->call_data_offset),
1828                   std::move(value));
1829             }
1830           }
1831           return value;
1832         });
1833   }
1834   // Server: Wait for server trailing metadata to have been sent
1835   // Returns a promise that resolves to a StatusFlag indicating whether the
1836   // request was cancelled or not -- failure to send trailing metadata is
1837   // considered a cancellation, as is actual cancellation -- but not application
1838   // errors.
1839   GRPC_MUST_USE_RESULT auto WasCancelled() {
1840     return [this]() { return call_state_.PollWasCancelled(); };
1841   }
1842   // Client & server: wait for server trailing metadata to be available, and
1843   // resolve to empty when it is.
1844   GRPC_MUST_USE_RESULT auto ServerTrailingMetadataWasPushed() {
1845     return
1846         [this]() { return call_state_.PollServerTrailingMetadataWasPushed(); };
1847   }
1848   // Client & server: returns true if server trailing metadata has been pushed
1849   // *and* contained a cancellation, false otherwise.
1850   GRPC_MUST_USE_RESULT bool WasCancelledPushed() const {
1851     return call_state_.WasCancelledPushed();
1852   }
1853 
1854   // Returns true if server trailing metadata has been pulled
1855   bool WasServerTrailingMetadataPulled() const {
1856     return call_state_.WasServerTrailingMetadataPulled();
1857   }
1858 
1859   // Client & server: fill in final_info with the final status of the call.
1860   void Finalize(const grpc_call_final_info* final_info);
1861 
1862   std::string DebugString() const;
1863 
1864  private:
1865   void CancelDueToFailedPipeOperation(SourceLocation but_where = {});
1866 
1867   struct AddedStack {
1868     explicit AddedStack(RefCountedPtr<Stack> stack)
1869         : call_data_offset(std::numeric_limits<size_t>::max()),
1870           stack(std::move(stack)) {}
1871     size_t call_data_offset;
1872     RefCountedPtr<Stack> stack;
1873   };
1874 
1875   using StacksVector = absl::InlinedVector<AddedStack, 2>;
1876 
1877   StacksVector stacks_;
1878 
1879   CallState call_state_;
1880 
1881   void* call_data_;
1882   ClientMetadataHandle push_client_initial_metadata_;
1883   ServerMetadataHandle push_server_initial_metadata_;
1884   MessageHandle push_client_to_server_message_;
1885   MessageHandle push_server_to_client_message_;
1886   ServerMetadataHandle push_server_trailing_metadata_;
1887 
1888   static char g_empty_call_data_;
1889 };
1890 
1891 static_assert(
1892     filters_detail::ArgumentMustBeNextMessage<
1893         absl::remove_cvref_t<decltype(std::declval<CallFilters*>()
1894                                           ->PullServerToClientMessage()()
1895                                           .value())>>::value(),
1896     "PullServerToClientMessage must return a NextMessage");
1897 
1898 static_assert(
1899     filters_detail::ArgumentMustBeNextMessage<
1900         absl::remove_cvref_t<decltype(std::declval<CallFilters*>()
1901                                           ->PullClientToServerMessage()()
1902                                           .value())>>::value(),
1903     "PullServerToClientMessage must return a NextMessage");
1904 
1905 }  // namespace grpc_core
1906 
1907 #endif  // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H
1908