• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
16 #define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include "absl/types/optional.h"
21 #include "src/core/lib/debug/trace.h"
22 #include "src/core/lib/promise/activity.h"
23 #include "src/core/lib/promise/poll.h"
24 #include "src/core/lib/promise/status_flag.h"
25 #include "src/core/util/crash.h"
26 
27 namespace grpc_core {
28 
29 class CallState {
30  public:
31   CallState();
32 
33   /////////////////////////////////////////////////////////////////////////////
34   // Misc events
35 
36   // Start the call: allows pulls to proceed
37   void Start();
38 
39   /////////////////////////////////////////////////////////////////////////////
40   // PUSH: client -> server
41 
42   // Poll for the next message pull to be started.
43   // This can be used for flow control by waiting for the reader to request
44   // data, then providing flow control tokens to read, and finally pushing the
45   // message.
46   Poll<StatusFlag> PollPullClientToServerMessageStarted();
47 
48   // Begin a message push.
49   void BeginPushClientToServerMessage();
50 
51   // Poll for the push to be completed (up to FinishPullClientToServerMessage).
52   Poll<StatusFlag> PollPushClientToServerMessage();
53 
54   // Note that the client has half-closed the stream.
55   void ClientToServerHalfClose();
56 
57   /////////////////////////////////////////////////////////////////////////////
58   // PULL: client -> server
59 
60   // Begin pulling client initial metadata.
61   void BeginPullClientInitialMetadata();
62   // Finish pulling client initial metadata.
63   void FinishPullClientInitialMetadata();
64   // Poll for the next message pull to be available.
65   // Resolves to true if a message is available, false if the call is
66   // half-closed, and Failure if the call is cancelled.
67   Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable();
68   // Finish pulling a message.
69   void FinishPullClientToServerMessage();
70 
71   /////////////////////////////////////////////////////////////////////////////
72   // PUSH: server -> client
73 
74   // Push server initial metadata (instantaneous).
75   StatusFlag PushServerInitialMetadata();
76   // Poll for the next message pull to be started.
77   // This can be used for flow control by waiting for the reader to request
78   // data, then providing flow control tokens to read, and finally pushing the
79   // message.
80   Poll<StatusFlag> PollPullServerToClientMessageStarted();
81   // Begin a message push.
82   void BeginPushServerToClientMessage();
83   // Poll for the push to be completed (up to FinishPullServerToClientMessage).
84   Poll<StatusFlag> PollPushServerToClientMessage();
85   // Push server trailing metadata.
86   // This is idempotent: only the first call will have any effect.
87   // Returns true if this is the first call.
88   bool PushServerTrailingMetadata(bool cancel);
89 
90   /////////////////////////////////////////////////////////////////////////////
91   // PULL: server -> client
92 
93   // Poll for initial metadata to be available.
94   Poll<bool> PollPullServerInitialMetadataAvailable();
95   // Finish pulling server initial metadata.
96   void FinishPullServerInitialMetadata();
97   // Poll for the next message pull to be available.
98   // Resolves to true if a message is available, false if trailing metadata is
99   // ready, and Failure if the call is cancelled.
100   Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
101   // Finish pulling a message.
102   void FinishPullServerToClientMessage();
103   // Poll for trailing metadata to be available.
104   Poll<Empty> PollServerTrailingMetadataAvailable();
105   // Instantaneously return true if server trailing metadata has been pulled.
106   bool WasServerTrailingMetadataPulled() const;
107   // Resolves after server trailing metadata has been pulled, to true if the
108   // call was cancelled, and false otherwise.
109   Poll<bool> PollWasCancelled();
110   // Return true if server trailing metadata has been pushed *and* that push was
111   // a cancellation.
112   bool WasCancelledPushed() const;
113   // Resolves after server trailing metadata has been pushed, regardless of
114   // whether the call was cancelled.
115   Poll<Empty> PollServerTrailingMetadataWasPushed();
116 
117   /////////////////////////////////////////////////////////////////////////////
118   // Debug
119   std::string DebugString() const;
120 
121   friend std::ostream& operator<<(std::ostream& out,
122                                   const CallState& call_state) {
123     return out << call_state.DebugString();
124   }
125 
126  private:
127   enum class ClientToServerPullState : uint16_t {
128     // Ready to read: client initial metadata is there, but not yet processed
129     kBegin,
130     // Processing client initial metadata
131     kProcessingClientInitialMetadata,
132     // Main call loop: not reading
133     kIdle,
134     // Main call loop: reading but no message available
135     kReading,
136     // Main call loop: processing one message
137     kProcessingClientToServerMessage,
138     // Processing complete
139     kTerminated,
140   };
ClientToServerPullStateString(ClientToServerPullState state)141   static const char* ClientToServerPullStateString(
142       ClientToServerPullState state) {
143     switch (state) {
144       case ClientToServerPullState::kBegin:
145         return "Begin";
146       case ClientToServerPullState::kProcessingClientInitialMetadata:
147         return "ProcessingClientInitialMetadata";
148       case ClientToServerPullState::kIdle:
149         return "Idle";
150       case ClientToServerPullState::kReading:
151         return "Reading";
152       case ClientToServerPullState::kProcessingClientToServerMessage:
153         return "ProcessingClientToServerMessage";
154       case ClientToServerPullState::kTerminated:
155         return "Terminated";
156     }
157   }
158   template <typename Sink>
AbslStringify(Sink & out,ClientToServerPullState state)159   friend void AbslStringify(Sink& out, ClientToServerPullState state) {
160     out.Append(ClientToServerPullStateString(state));
161   }
162   friend std::ostream& operator<<(std::ostream& out,
163                                   ClientToServerPullState state) {
164     return out << ClientToServerPullStateString(state);
165   }
166   enum class ClientToServerPushState : uint16_t {
167     kIdle,
168     kPushedMessage,
169     kPushedHalfClose,
170     kPushedMessageAndHalfClosed,
171     kFinished,
172   };
ClientToServerPushStateString(ClientToServerPushState state)173   static const char* ClientToServerPushStateString(
174       ClientToServerPushState state) {
175     switch (state) {
176       case ClientToServerPushState::kIdle:
177         return "Idle";
178       case ClientToServerPushState::kPushedMessage:
179         return "PushedMessage";
180       case ClientToServerPushState::kPushedHalfClose:
181         return "PushedHalfClose";
182       case ClientToServerPushState::kPushedMessageAndHalfClosed:
183         return "PushedMessageAndHalfClosed";
184       case ClientToServerPushState::kFinished:
185         return "Finished";
186     }
187   }
188   template <typename Sink>
AbslStringify(Sink & out,ClientToServerPushState state)189   friend void AbslStringify(Sink& out, ClientToServerPushState state) {
190     out.Append(ClientToServerPushStateString(state));
191   }
192   friend std::ostream& operator<<(std::ostream& out,
193                                   ClientToServerPushState state) {
194     return out << ClientToServerPushStateString(state);
195   }
196   enum class ServerToClientPullState : uint16_t {
197     // Not yet started: cannot read
198     kUnstarted,
199     kUnstartedReading,
200     kStarted,
201     kStartedReading,
202     // Processing server initial metadata
203     kProcessingServerInitialMetadata,
204     kProcessingServerInitialMetadataReading,
205     // Main call loop: not reading
206     kIdle,
207     // Main call loop: reading but no message available
208     kReading,
209     // Main call loop: processing one message
210     kProcessingServerToClientMessage,
211     kTerminated,
212   };
ServerToClientPullStateString(ServerToClientPullState state)213   static const char* ServerToClientPullStateString(
214       ServerToClientPullState state) {
215     switch (state) {
216       case ServerToClientPullState::kUnstarted:
217         return "Unstarted";
218       case ServerToClientPullState::kUnstartedReading:
219         return "UnstartedReading";
220       case ServerToClientPullState::kStarted:
221         return "Started";
222       case ServerToClientPullState::kStartedReading:
223         return "StartedReading";
224       case ServerToClientPullState::kProcessingServerInitialMetadata:
225         return "ProcessingServerInitialMetadata";
226       case ServerToClientPullState::kProcessingServerInitialMetadataReading:
227         return "ProcessingServerInitialMetadataReading";
228       case ServerToClientPullState::kIdle:
229         return "Idle";
230       case ServerToClientPullState::kReading:
231         return "Reading";
232       case ServerToClientPullState::kProcessingServerToClientMessage:
233         return "ProcessingServerToClientMessage";
234       case ServerToClientPullState::kTerminated:
235         return "Terminated";
236     }
237   }
238   template <typename Sink>
AbslStringify(Sink & out,ServerToClientPullState state)239   friend void AbslStringify(Sink& out, ServerToClientPullState state) {
240     out.Append(ServerToClientPullStateString(state));
241   }
242   friend std::ostream& operator<<(std::ostream& out,
243                                   ServerToClientPullState state) {
244     return out << ServerToClientPullStateString(state);
245   }
246   enum class ServerToClientPushState : uint16_t {
247     kStart,
248     kPushedMessageWithoutInitialMetadata,
249     kPushedServerInitialMetadata,
250     kPushedServerInitialMetadataAndPushedMessage,
251     kTrailersOnly,
252     kIdle,
253     kPushedMessage,
254     kFinished,
255   };
ServerToClientPushStateString(ServerToClientPushState state)256   static const char* ServerToClientPushStateString(
257       ServerToClientPushState state) {
258     switch (state) {
259       case ServerToClientPushState::kStart:
260         return "Start";
261       case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
262         return "PushedMessageWithoutInitialMetadata";
263       case ServerToClientPushState::kPushedServerInitialMetadata:
264         return "PushedServerInitialMetadata";
265       case ServerToClientPushState::
266           kPushedServerInitialMetadataAndPushedMessage:
267         return "PushedServerInitialMetadataAndPushedMessage";
268       case ServerToClientPushState::kTrailersOnly:
269         return "TrailersOnly";
270       case ServerToClientPushState::kIdle:
271         return "Idle";
272       case ServerToClientPushState::kPushedMessage:
273         return "PushedMessage";
274       case ServerToClientPushState::kFinished:
275         return "Finished";
276     }
277   }
278   template <typename Sink>
AbslStringify(Sink & out,ServerToClientPushState state)279   friend void AbslStringify(Sink& out, ServerToClientPushState state) {
280     out.Append(ServerToClientPushStateString(state));
281   }
282   friend std::ostream& operator<<(std::ostream& out,
283                                   ServerToClientPushState state) {
284     return out << ServerToClientPushStateString(state);
285   }
286   enum class ServerTrailingMetadataState : uint16_t {
287     kNotPushed,
288     kPushed,
289     kPushedCancel,
290     kPulled,
291     kPulledCancel,
292   };
ServerTrailingMetadataStateString(ServerTrailingMetadataState state)293   static const char* ServerTrailingMetadataStateString(
294       ServerTrailingMetadataState state) {
295     switch (state) {
296       case ServerTrailingMetadataState::kNotPushed:
297         return "NotPushed";
298       case ServerTrailingMetadataState::kPushed:
299         return "Pushed";
300       case ServerTrailingMetadataState::kPushedCancel:
301         return "PushedCancel";
302       case ServerTrailingMetadataState::kPulled:
303         return "Pulled";
304       case ServerTrailingMetadataState::kPulledCancel:
305         return "PulledCancel";
306     }
307   }
308   template <typename Sink>
AbslStringify(Sink & out,ServerTrailingMetadataState state)309   friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) {
310     out.Append(ServerTrailingMetadataStateString(state));
311   }
312   friend std::ostream& operator<<(std::ostream& out,
313                                   ServerTrailingMetadataState state) {
314     return out << ServerTrailingMetadataStateString(state);
315   }
316   ClientToServerPullState client_to_server_pull_state_ : 3;
317   ClientToServerPushState client_to_server_push_state_ : 3;
318   ServerToClientPullState server_to_client_pull_state_ : 4;
319   ServerToClientPushState server_to_client_push_state_ : 3;
320   ServerTrailingMetadataState server_trailing_metadata_state_ : 3;
321   IntraActivityWaiter client_to_server_pull_waiter_;
322   IntraActivityWaiter server_to_client_pull_waiter_;
323   IntraActivityWaiter client_to_server_push_waiter_;
324   IntraActivityWaiter server_to_client_push_waiter_;
325   IntraActivityWaiter server_trailing_metadata_waiter_;
326 };
327 
CallState()328 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline CallState::CallState()
329     : client_to_server_pull_state_(ClientToServerPullState::kBegin),
330       client_to_server_push_state_(ClientToServerPushState::kIdle),
331       server_to_client_pull_state_(ServerToClientPullState::kUnstarted),
332       server_to_client_push_state_(ServerToClientPushState::kStart),
333       server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) {
334 }
335 
Start()336 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void CallState::Start() {
337   GRPC_TRACE_LOG(call_state, INFO)
338       << "[call_state] Start: "
339       << GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
340   switch (server_to_client_pull_state_) {
341     case ServerToClientPullState::kUnstarted:
342       server_to_client_pull_state_ = ServerToClientPullState::kStarted;
343       server_to_client_pull_waiter_.Wake();
344       break;
345     case ServerToClientPullState::kUnstartedReading:
346       server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
347       server_to_client_pull_waiter_.Wake();
348       break;
349     case ServerToClientPullState::kStarted:
350     case ServerToClientPullState::kStartedReading:
351     case ServerToClientPullState::kProcessingServerInitialMetadata:
352     case ServerToClientPullState::kProcessingServerInitialMetadataReading:
353     case ServerToClientPullState::kIdle:
354     case ServerToClientPullState::kReading:
355     case ServerToClientPullState::kProcessingServerToClientMessage:
356       LOG(FATAL) << "Start called twice; "
357                  << GRPC_DUMP_ARGS(server_to_client_pull_state_);
358     case ServerToClientPullState::kTerminated:
359       break;
360   }
361 }
362 
363 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
BeginPushClientToServerMessage()364 CallState::BeginPushClientToServerMessage() {
365   GRPC_TRACE_LOG(call_state, INFO)
366       << "[call_state] BeginPushClientToServerMessage: "
367       << GRPC_DUMP_ARGS(this, client_to_server_push_state_,
368                         client_to_server_push_waiter_);
369   switch (client_to_server_push_state_) {
370     case ClientToServerPushState::kIdle:
371       client_to_server_push_state_ = ClientToServerPushState::kPushedMessage;
372       client_to_server_push_waiter_.Wake();
373       break;
374     case ClientToServerPushState::kPushedMessage:
375     case ClientToServerPushState::kPushedMessageAndHalfClosed:
376       LOG(FATAL) << "PushClientToServerMessage called twice concurrently;"
377                  << GRPC_DUMP_ARGS(client_to_server_push_state_);
378       break;
379     case ClientToServerPushState::kPushedHalfClose:
380       LOG(FATAL) << "PushClientToServerMessage called after half-close; "
381                  << GRPC_DUMP_ARGS(client_to_server_push_state_);
382       break;
383     case ClientToServerPushState::kFinished:
384       break;
385   }
386 }
387 
388 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
PollPushClientToServerMessage()389 CallState::PollPushClientToServerMessage() {
390   GRPC_TRACE_LOG(call_state, INFO)
391       << "[call_state] PollPushClientToServerMessage: "
392       << GRPC_DUMP_ARGS(this, client_to_server_push_state_);
393   switch (client_to_server_push_state_) {
394     case ClientToServerPushState::kIdle:
395     case ClientToServerPushState::kPushedHalfClose:
396       return Success{};
397     case ClientToServerPushState::kPushedMessage:
398     case ClientToServerPushState::kPushedMessageAndHalfClosed:
399       return client_to_server_push_waiter_.pending();
400     case ClientToServerPushState::kFinished:
401       return Failure{};
402   }
403   Crash("Unreachable");
404 }
405 
406 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
ClientToServerHalfClose()407 CallState::ClientToServerHalfClose() {
408   GRPC_TRACE_LOG(call_state, INFO)
409       << "[call_state] ClientToServerHalfClose: "
410       << GRPC_DUMP_ARGS(this, client_to_server_push_state_);
411   switch (client_to_server_push_state_) {
412     case ClientToServerPushState::kIdle:
413       client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
414       client_to_server_push_waiter_.Wake();
415       break;
416     case ClientToServerPushState::kPushedMessage:
417       client_to_server_push_state_ =
418           ClientToServerPushState::kPushedMessageAndHalfClosed;
419       break;
420     case ClientToServerPushState::kPushedHalfClose:
421     case ClientToServerPushState::kPushedMessageAndHalfClosed:
422       LOG(FATAL) << "ClientToServerHalfClose called twice;"
423                  << GRPC_DUMP_ARGS(client_to_server_push_state_);
424       break;
425     case ClientToServerPushState::kFinished:
426       break;
427   }
428 }
429 
430 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
BeginPullClientInitialMetadata()431 CallState::BeginPullClientInitialMetadata() {
432   GRPC_TRACE_LOG(call_state, INFO)
433       << "[call_state] BeginPullClientInitialMetadata: "
434       << GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
435   switch (client_to_server_pull_state_) {
436     case ClientToServerPullState::kBegin:
437       client_to_server_pull_state_ =
438           ClientToServerPullState::kProcessingClientInitialMetadata;
439       break;
440     case ClientToServerPullState::kProcessingClientInitialMetadata:
441     case ClientToServerPullState::kIdle:
442     case ClientToServerPullState::kReading:
443     case ClientToServerPullState::kProcessingClientToServerMessage:
444       LOG(FATAL) << "BeginPullClientInitialMetadata called twice; "
445                  << GRPC_DUMP_ARGS(client_to_server_pull_state_);
446       break;
447     case ClientToServerPullState::kTerminated:
448       break;
449   }
450 }
451 
452 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
FinishPullClientInitialMetadata()453 CallState::FinishPullClientInitialMetadata() {
454   GRPC_TRACE_LOG(call_state, INFO)
455       << "[call_state] FinishPullClientInitialMetadata: "
456       << GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
457   switch (client_to_server_pull_state_) {
458     case ClientToServerPullState::kBegin:
459       LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin; "
460                  << GRPC_DUMP_ARGS(client_to_server_pull_state_);
461       break;
462     case ClientToServerPullState::kProcessingClientInitialMetadata:
463       client_to_server_pull_state_ = ClientToServerPullState::kIdle;
464       client_to_server_pull_waiter_.Wake();
465       break;
466     case ClientToServerPullState::kIdle:
467     case ClientToServerPullState::kReading:
468     case ClientToServerPullState::kProcessingClientToServerMessage:
469       LOG(FATAL) << "Out of order FinishPullClientInitialMetadata"
470                  << GRPC_DUMP_ARGS(client_to_server_pull_state_);
471       break;
472     case ClientToServerPullState::kTerminated:
473       break;
474   }
475 }
476 
477 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>>
PollPullClientToServerMessageAvailable()478 CallState::PollPullClientToServerMessageAvailable() {
479   GRPC_TRACE_LOG(call_state, INFO)
480       << "[call_state] PollPullClientToServerMessageAvailable: "
481       << GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
482                         client_to_server_push_state_);
483   switch (client_to_server_pull_state_) {
484     case ClientToServerPullState::kBegin:
485     case ClientToServerPullState::kProcessingClientInitialMetadata:
486       return client_to_server_pull_waiter_.pending();
487     case ClientToServerPullState::kIdle:
488       client_to_server_pull_state_ = ClientToServerPullState::kReading;
489       client_to_server_pull_waiter_.Wake();
490       ABSL_FALLTHROUGH_INTENDED;
491     case ClientToServerPullState::kReading:
492       break;
493     case ClientToServerPullState::kProcessingClientToServerMessage:
494       LOG(FATAL) << "PollPullClientToServerMessageAvailable called while "
495                     "processing a message; "
496                  << GRPC_DUMP_ARGS(client_to_server_pull_state_);
497       break;
498     case ClientToServerPullState::kTerminated:
499       return Failure{};
500   }
501   DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading);
502   switch (client_to_server_push_state_) {
503     case ClientToServerPushState::kIdle:
504       return client_to_server_push_waiter_.pending();
505     case ClientToServerPushState::kPushedMessage:
506     case ClientToServerPushState::kPushedMessageAndHalfClosed:
507       client_to_server_pull_state_ =
508           ClientToServerPullState::kProcessingClientToServerMessage;
509       return true;
510     case ClientToServerPushState::kPushedHalfClose:
511       return false;
512     case ClientToServerPushState::kFinished:
513       client_to_server_pull_state_ = ClientToServerPullState::kTerminated;
514       return Failure{};
515   }
516   Crash("Unreachable");
517 }
518 
519 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
PollPullClientToServerMessageStarted()520 CallState::PollPullClientToServerMessageStarted() {
521   GRPC_TRACE_LOG(call_state, INFO)
522       << "[call_state] PollPullClientToServerMessageStarted: "
523       << GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
524   switch (client_to_server_pull_state_) {
525     case ClientToServerPullState::kBegin:
526     case ClientToServerPullState::kProcessingClientInitialMetadata:
527     case ClientToServerPullState::kIdle:
528       return client_to_server_pull_waiter_.pending();
529     case ClientToServerPullState::kReading:
530     case ClientToServerPullState::kProcessingClientToServerMessage:
531       return Success{};
532     case ClientToServerPullState::kTerminated:
533       return Failure{};
534   }
535   Crash("Unreachable");
536 }
537 
538 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
FinishPullClientToServerMessage()539 CallState::FinishPullClientToServerMessage() {
540   GRPC_TRACE_LOG(call_state, INFO)
541       << "[call_state] FinishPullClientToServerMessage: "
542       << GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
543                         client_to_server_push_state_);
544   switch (client_to_server_pull_state_) {
545     case ClientToServerPullState::kBegin:
546     case ClientToServerPullState::kProcessingClientInitialMetadata:
547       LOG(FATAL) << "FinishPullClientToServerMessage called before Begin; "
548                  << GRPC_DUMP_ARGS(client_to_server_pull_state_,
549                                    client_to_server_push_state_);
550       break;
551     case ClientToServerPullState::kIdle:
552       LOG(FATAL) << "FinishPullClientToServerMessage called twice; "
553                  << GRPC_DUMP_ARGS(client_to_server_pull_state_,
554                                    client_to_server_push_state_);
555       break;
556     case ClientToServerPullState::kReading:
557       LOG(FATAL) << "FinishPullClientToServerMessage called before "
558                     "PollPullClientToServerMessageAvailable; "
559                  << GRPC_DUMP_ARGS(client_to_server_pull_state_,
560                                    client_to_server_push_state_);
561       break;
562     case ClientToServerPullState::kProcessingClientToServerMessage:
563       client_to_server_pull_state_ = ClientToServerPullState::kIdle;
564       client_to_server_pull_waiter_.Wake();
565       break;
566     case ClientToServerPullState::kTerminated:
567       break;
568   }
569   switch (client_to_server_push_state_) {
570     case ClientToServerPushState::kPushedMessage:
571       client_to_server_push_state_ = ClientToServerPushState::kIdle;
572       client_to_server_push_waiter_.Wake();
573       break;
574     case ClientToServerPushState::kIdle:
575     case ClientToServerPushState::kPushedHalfClose:
576       LOG(FATAL) << "FinishPullClientToServerMessage called without a message; "
577                  << GRPC_DUMP_ARGS(client_to_server_pull_state_,
578                                    client_to_server_push_state_);
579       break;
580     case ClientToServerPushState::kPushedMessageAndHalfClosed:
581       client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
582       client_to_server_push_waiter_.Wake();
583       break;
584     case ClientToServerPushState::kFinished:
585       break;
586   }
587 }
588 
589 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline StatusFlag
PushServerInitialMetadata()590 CallState::PushServerInitialMetadata() {
591   GRPC_TRACE_LOG(call_state, INFO)
592       << "[call_state] PushServerInitialMetadata: "
593       << GRPC_DUMP_ARGS(this, server_to_client_push_state_,
594                         server_trailing_metadata_state_);
595   if (server_trailing_metadata_state_ !=
596       ServerTrailingMetadataState::kNotPushed) {
597     return Failure{};
598   }
599   switch (server_to_client_push_state_) {
600     case ServerToClientPushState::kStart:
601       server_to_client_push_state_ =
602           ServerToClientPushState::kPushedServerInitialMetadata;
603       break;
604     case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
605       server_to_client_push_state_ =
606           ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
607       break;
608     case ServerToClientPushState::kPushedServerInitialMetadata:
609     case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
610     case ServerToClientPushState::kTrailersOnly:
611     case ServerToClientPushState::kIdle:
612     case ServerToClientPushState::kPushedMessage:
613       LOG(FATAL) << "PushServerInitialMetadata called twice; "
614                  << GRPC_DUMP_ARGS(server_to_client_push_state_);
615       break;
616     case ServerToClientPushState::kFinished:
617       break;
618   }
619   server_to_client_push_waiter_.Wake();
620   return Success{};
621 }
622 
623 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
BeginPushServerToClientMessage()624 CallState::BeginPushServerToClientMessage() {
625   GRPC_TRACE_LOG(call_state, INFO)
626       << "[call_state] BeginPushServerToClientMessage: "
627       << GRPC_DUMP_ARGS(this, server_to_client_push_state_);
628   switch (server_to_client_push_state_) {
629     case ServerToClientPushState::kStart:
630       server_to_client_push_state_ =
631           ServerToClientPushState::kPushedMessageWithoutInitialMetadata;
632       break;
633     case ServerToClientPushState::kPushedServerInitialMetadata:
634       server_to_client_push_state_ =
635           ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
636       break;
637     case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
638     case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
639     case ServerToClientPushState::kPushedMessage:
640       LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently; "
641                  << GRPC_DUMP_ARGS(server_to_client_push_state_);
642       break;
643     case ServerToClientPushState::kTrailersOnly:
644       // Will fail in poll.
645       break;
646     case ServerToClientPushState::kIdle:
647       server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
648       server_to_client_push_waiter_.Wake();
649       break;
650     case ServerToClientPushState::kFinished:
651       break;
652   }
653 }
654 
655 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
PollPushServerToClientMessage()656 CallState::PollPushServerToClientMessage() {
657   GRPC_TRACE_LOG(call_state, INFO)
658       << "[call_state] PollPushServerToClientMessage: "
659       << GRPC_DUMP_ARGS(this, server_to_client_push_state_);
660   switch (server_to_client_push_state_) {
661     case ServerToClientPushState::kStart:
662     case ServerToClientPushState::kPushedServerInitialMetadata:
663       LOG(FATAL) << "PollPushServerToClientMessage called before "
664                  << "PushServerInitialMetadata; "
665                  << GRPC_DUMP_ARGS(server_to_client_push_state_);
666     case ServerToClientPushState::kTrailersOnly:
667       return false;
668     case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
669     case ServerToClientPushState::kPushedMessage:
670     case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
671       return server_to_client_push_waiter_.pending();
672     case ServerToClientPushState::kIdle:
673       return Success{};
674     case ServerToClientPushState::kFinished:
675       return Failure{};
676   }
677   Crash("Unreachable");
678 }
679 
680 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool
PushServerTrailingMetadata(bool cancel)681 CallState::PushServerTrailingMetadata(bool cancel) {
682   GRPC_TRACE_LOG(call_state, INFO)
683       << "[call_state] PushServerTrailingMetadata: "
684       << GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_,
685                         server_to_client_push_state_,
686                         client_to_server_push_state_,
687                         server_trailing_metadata_waiter_);
688   if (server_trailing_metadata_state_ !=
689       ServerTrailingMetadataState::kNotPushed) {
690     return false;
691   }
692   server_trailing_metadata_state_ =
693       cancel ? ServerTrailingMetadataState::kPushedCancel
694              : ServerTrailingMetadataState::kPushed;
695   server_trailing_metadata_waiter_.Wake();
696   switch (server_to_client_push_state_) {
697     case ServerToClientPushState::kStart:
698       server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly;
699       server_to_client_push_waiter_.Wake();
700       break;
701     case ServerToClientPushState::kPushedServerInitialMetadata:
702     case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
703     case ServerToClientPushState::kPushedMessage:
704     case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
705       if (cancel) {
706         server_to_client_push_state_ = ServerToClientPushState::kFinished;
707         server_to_client_push_waiter_.Wake();
708       }
709       break;
710     case ServerToClientPushState::kIdle:
711       if (cancel) {
712         server_to_client_push_state_ = ServerToClientPushState::kFinished;
713         server_to_client_push_waiter_.Wake();
714       }
715       break;
716     case ServerToClientPushState::kFinished:
717     case ServerToClientPushState::kTrailersOnly:
718       break;
719   }
720   switch (client_to_server_push_state_) {
721     case ClientToServerPushState::kIdle:
722       client_to_server_push_state_ = ClientToServerPushState::kFinished;
723       client_to_server_push_waiter_.Wake();
724       break;
725     case ClientToServerPushState::kPushedMessage:
726     case ClientToServerPushState::kPushedMessageAndHalfClosed:
727       client_to_server_push_state_ = ClientToServerPushState::kFinished;
728       client_to_server_push_waiter_.Wake();
729       break;
730     case ClientToServerPushState::kPushedHalfClose:
731     case ClientToServerPushState::kFinished:
732       break;
733   }
734   return true;
735 }
736 
737 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>
PollPullServerInitialMetadataAvailable()738 CallState::PollPullServerInitialMetadataAvailable() {
739   GRPC_TRACE_LOG(call_state, INFO)
740       << "[call_state] PollPullServerInitialMetadataAvailable: "
741       << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
742                         server_to_client_push_state_);
743   bool reading;
744   switch (server_to_client_pull_state_) {
745     case ServerToClientPullState::kUnstarted:
746     case ServerToClientPullState::kUnstartedReading:
747       if (server_to_client_push_state_ ==
748           ServerToClientPushState::kTrailersOnly) {
749         server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
750         return false;
751       }
752       server_to_client_push_waiter_.pending();
753       return server_to_client_pull_waiter_.pending();
754     case ServerToClientPullState::kStartedReading:
755       reading = true;
756       break;
757     case ServerToClientPullState::kStarted:
758       reading = false;
759       break;
760     case ServerToClientPullState::kProcessingServerInitialMetadata:
761     case ServerToClientPullState::kProcessingServerInitialMetadataReading:
762     case ServerToClientPullState::kIdle:
763     case ServerToClientPullState::kReading:
764     case ServerToClientPullState::kProcessingServerToClientMessage:
765       LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice; "
766                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
767                                    server_to_client_push_state_);
768     case ServerToClientPullState::kTerminated:
769       return false;
770   }
771   DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted ||
772          server_to_client_pull_state_ ==
773              ServerToClientPullState::kStartedReading)
774       << server_to_client_pull_state_;
775   switch (server_to_client_push_state_) {
776     case ServerToClientPushState::kStart:
777     case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
778       return server_to_client_push_waiter_.pending();
779     case ServerToClientPushState::kPushedServerInitialMetadata:
780     case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
781       server_to_client_pull_state_ =
782           reading
783               ? ServerToClientPullState::kProcessingServerInitialMetadataReading
784               : ServerToClientPullState::kProcessingServerInitialMetadata;
785       server_to_client_pull_waiter_.Wake();
786       return true;
787     case ServerToClientPushState::kIdle:
788     case ServerToClientPushState::kPushedMessage:
789       LOG(FATAL)
790           << "PollPullServerInitialMetadataAvailable after metadata processed; "
791           << GRPC_DUMP_ARGS(server_to_client_pull_state_,
792                             server_to_client_push_state_);
793     case ServerToClientPushState::kFinished:
794       server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
795       server_to_client_pull_waiter_.Wake();
796       return false;
797     case ServerToClientPushState::kTrailersOnly:
798       return false;
799   }
800   Crash("Unreachable");
801 }
802 
803 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
FinishPullServerInitialMetadata()804 CallState::FinishPullServerInitialMetadata() {
805   GRPC_TRACE_LOG(call_state, INFO)
806       << "[call_state] FinishPullServerInitialMetadata: "
807       << GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
808   switch (server_to_client_pull_state_) {
809     case ServerToClientPullState::kUnstarted:
810     case ServerToClientPullState::kUnstartedReading:
811       LOG(FATAL) << "FinishPullServerInitialMetadata called before Start";
812     case ServerToClientPullState::kStarted:
813     case ServerToClientPullState::kStartedReading:
814       CHECK_EQ(server_to_client_push_state_,
815                ServerToClientPushState::kTrailersOnly);
816       return;
817     case ServerToClientPullState::kProcessingServerInitialMetadata:
818       server_to_client_pull_state_ = ServerToClientPullState::kIdle;
819       server_to_client_pull_waiter_.Wake();
820       break;
821     case ServerToClientPullState::kProcessingServerInitialMetadataReading:
822       server_to_client_pull_state_ = ServerToClientPullState::kReading;
823       server_to_client_pull_waiter_.Wake();
824       break;
825     case ServerToClientPullState::kIdle:
826     case ServerToClientPullState::kReading:
827     case ServerToClientPullState::kProcessingServerToClientMessage:
828       LOG(FATAL) << "Out of order FinishPullServerInitialMetadata; "
829                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
830                                    server_to_client_push_state_);
831     case ServerToClientPullState::kTerminated:
832       return;
833   }
834   DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle ||
835          server_to_client_pull_state_ == ServerToClientPullState::kReading)
836       << server_to_client_pull_state_;
837   switch (server_to_client_push_state_) {
838     case ServerToClientPushState::kStart:
839     case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
840       LOG(FATAL) << "FinishPullServerInitialMetadata called before initial "
841                     "metadata consumed; "
842                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
843                                    server_to_client_push_state_);
844     case ServerToClientPushState::kPushedServerInitialMetadata:
845       server_to_client_push_state_ = ServerToClientPushState::kIdle;
846       server_to_client_push_waiter_.Wake();
847       break;
848     case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
849       server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
850       server_to_client_push_waiter_.Wake();
851       break;
852     case ServerToClientPushState::kIdle:
853     case ServerToClientPushState::kPushedMessage:
854     case ServerToClientPushState::kTrailersOnly:
855     case ServerToClientPushState::kFinished:
856       LOG(FATAL) << "FinishPullServerInitialMetadata called twice; "
857                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
858                                    server_to_client_push_state_);
859   }
860 }
861 
862 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>>
PollPullServerToClientMessageAvailable()863 CallState::PollPullServerToClientMessageAvailable() {
864   GRPC_TRACE_LOG(call_state, INFO)
865       << "[call_state] PollPullServerToClientMessageAvailable: "
866       << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
867                         server_to_client_push_state_,
868                         server_trailing_metadata_state_);
869   switch (server_to_client_pull_state_) {
870     case ServerToClientPullState::kUnstarted:
871       server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading;
872       return server_to_client_pull_waiter_.pending();
873     case ServerToClientPullState::kProcessingServerInitialMetadata:
874       server_to_client_pull_state_ =
875           ServerToClientPullState::kProcessingServerInitialMetadataReading;
876       return server_to_client_pull_waiter_.pending();
877     case ServerToClientPullState::kUnstartedReading:
878     case ServerToClientPullState::kProcessingServerInitialMetadataReading:
879       return server_to_client_pull_waiter_.pending();
880     case ServerToClientPullState::kStarted:
881       server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
882       ABSL_FALLTHROUGH_INTENDED;
883     case ServerToClientPullState::kStartedReading:
884       if (server_to_client_push_state_ ==
885           ServerToClientPushState::kTrailersOnly) {
886         return false;
887       }
888       return server_to_client_pull_waiter_.pending();
889     case ServerToClientPullState::kIdle:
890       server_to_client_pull_state_ = ServerToClientPullState::kReading;
891       server_to_client_pull_waiter_.Wake();
892       ABSL_FALLTHROUGH_INTENDED;
893     case ServerToClientPullState::kReading:
894       break;
895     case ServerToClientPullState::kProcessingServerToClientMessage:
896       LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
897                     "processing a message; "
898                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
899                                    server_to_client_push_state_);
900     case ServerToClientPullState::kTerminated:
901       return Failure{};
902   }
903   DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading);
904   switch (server_to_client_push_state_) {
905     case ServerToClientPushState::kStart:
906     case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
907     case ServerToClientPushState::kPushedServerInitialMetadata:
908     case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
909       return server_to_client_push_waiter_.pending();
910     case ServerToClientPushState::kIdle:
911       if (server_trailing_metadata_state_ !=
912           ServerTrailingMetadataState::kNotPushed) {
913         return false;
914       }
915       server_trailing_metadata_waiter_.pending();
916       return server_to_client_push_waiter_.pending();
917     case ServerToClientPushState::kTrailersOnly:
918       DCHECK_NE(server_trailing_metadata_state_,
919                 ServerTrailingMetadataState::kNotPushed);
920       return false;
921     case ServerToClientPushState::kPushedMessage:
922       server_to_client_pull_state_ =
923           ServerToClientPullState::kProcessingServerToClientMessage;
924       server_to_client_pull_waiter_.Wake();
925       return true;
926     case ServerToClientPushState::kFinished:
927       server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
928       server_to_client_pull_waiter_.Wake();
929       return Failure{};
930   }
931   Crash("Unreachable");
932 }
933 
934 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
PollPullServerToClientMessageStarted()935 CallState::PollPullServerToClientMessageStarted() {
936   GRPC_TRACE_LOG(call_state, INFO)
937       << "[call_state] PollPullClientToServerMessageStarted: "
938       << GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
939   switch (server_to_client_pull_state_) {
940     case ServerToClientPullState::kUnstarted:
941     case ServerToClientPullState::kUnstartedReading:
942     case ServerToClientPullState::kStarted:
943     case ServerToClientPullState::kProcessingServerInitialMetadata:
944     case ServerToClientPullState::kProcessingServerInitialMetadataReading:
945     case ServerToClientPullState::kIdle:
946       return server_to_client_pull_waiter_.pending();
947     case ServerToClientPullState::kStartedReading:
948     case ServerToClientPullState::kReading:
949     case ServerToClientPullState::kProcessingServerToClientMessage:
950       return Success{};
951     case ServerToClientPullState::kTerminated:
952       return Failure{};
953   }
954   Crash("Unreachable");
955 }
956 
957 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
FinishPullServerToClientMessage()958 CallState::FinishPullServerToClientMessage() {
959   GRPC_TRACE_LOG(call_state, INFO)
960       << "[call_state] FinishPullServerToClientMessage: "
961       << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
962                         server_to_client_push_state_);
963   switch (server_to_client_pull_state_) {
964     case ServerToClientPullState::kUnstarted:
965     case ServerToClientPullState::kUnstartedReading:
966     case ServerToClientPullState::kStarted:
967     case ServerToClientPullState::kStartedReading:
968     case ServerToClientPullState::kProcessingServerInitialMetadata:
969     case ServerToClientPullState::kProcessingServerInitialMetadataReading:
970       LOG(FATAL) << "FinishPullServerToClientMessage called before metadata "
971                     "available; "
972                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
973                                    server_to_client_push_state_);
974     case ServerToClientPullState::kIdle:
975       LOG(FATAL) << "FinishPullServerToClientMessage called twice; "
976                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
977                                    server_to_client_push_state_);
978     case ServerToClientPullState::kReading:
979       LOG(FATAL) << "FinishPullServerToClientMessage called before "
980                  << "PollPullServerToClientMessageAvailable; "
981                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
982                                    server_to_client_push_state_);
983     case ServerToClientPullState::kProcessingServerToClientMessage:
984       server_to_client_pull_state_ = ServerToClientPullState::kIdle;
985       server_to_client_pull_waiter_.Wake();
986       break;
987     case ServerToClientPullState::kTerminated:
988       break;
989   }
990   switch (server_to_client_push_state_) {
991     case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
992     case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
993     case ServerToClientPushState::kPushedServerInitialMetadata:
994     case ServerToClientPushState::kStart:
995       LOG(FATAL) << "FinishPullServerToClientMessage called before initial "
996                     "metadata consumed; "
997                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
998                                    server_to_client_push_state_);
999     case ServerToClientPushState::kTrailersOnly:
1000       LOG(FATAL) << "FinishPullServerToClientMessage called after "
1001                     "PushServerTrailingMetadata; "
1002                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
1003                                    server_to_client_push_state_);
1004     case ServerToClientPushState::kPushedMessage:
1005       server_to_client_push_state_ = ServerToClientPushState::kIdle;
1006       server_to_client_push_waiter_.Wake();
1007       break;
1008     case ServerToClientPushState::kIdle:
1009       LOG(FATAL) << "FinishPullServerToClientMessage called without a message; "
1010                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
1011                                    server_to_client_push_state_);
1012     case ServerToClientPushState::kFinished:
1013       break;
1014   }
1015 }
1016 
1017 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<Empty>
PollServerTrailingMetadataAvailable()1018 CallState::PollServerTrailingMetadataAvailable() {
1019   GRPC_TRACE_LOG(call_state, INFO)
1020       << "[call_state] PollServerTrailingMetadataAvailable: "
1021       << GRPC_DUMP_ARGS(
1022              this, server_to_client_pull_state_, server_to_client_push_state_,
1023              server_trailing_metadata_state_, server_trailing_metadata_waiter_);
1024   switch (server_to_client_pull_state_) {
1025     case ServerToClientPullState::kProcessingServerInitialMetadata:
1026     case ServerToClientPullState::kProcessingServerToClientMessage:
1027     case ServerToClientPullState::kProcessingServerInitialMetadataReading:
1028     case ServerToClientPullState::kUnstartedReading:
1029       return server_to_client_pull_waiter_.pending();
1030     case ServerToClientPullState::kStartedReading:
1031     case ServerToClientPullState::kReading:
1032       switch (server_to_client_push_state_) {
1033         case ServerToClientPushState::kTrailersOnly:
1034         case ServerToClientPushState::kIdle:
1035         case ServerToClientPushState::kStart:
1036         case ServerToClientPushState::kFinished:
1037           if (server_trailing_metadata_state_ !=
1038               ServerTrailingMetadataState::kNotPushed) {
1039             break;  // Ready for processing
1040           }
1041           ABSL_FALLTHROUGH_INTENDED;
1042         case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
1043         case ServerToClientPushState::kPushedServerInitialMetadata:
1044         case ServerToClientPushState::
1045             kPushedServerInitialMetadataAndPushedMessage:
1046         case ServerToClientPushState::kPushedMessage:
1047           server_to_client_push_waiter_.pending();
1048           return server_to_client_pull_waiter_.pending();
1049       }
1050       break;
1051     case ServerToClientPullState::kStarted:
1052     case ServerToClientPullState::kUnstarted:
1053     case ServerToClientPullState::kIdle:
1054       if (server_trailing_metadata_state_ !=
1055           ServerTrailingMetadataState::kNotPushed) {
1056         break;  // Ready for processing
1057       }
1058       return server_trailing_metadata_waiter_.pending();
1059     case ServerToClientPullState::kTerminated:
1060       break;
1061   }
1062   server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
1063   server_to_client_pull_waiter_.Wake();
1064   switch (server_trailing_metadata_state_) {
1065     case ServerTrailingMetadataState::kPushed:
1066       server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
1067       server_trailing_metadata_waiter_.Wake();
1068       break;
1069     case ServerTrailingMetadataState::kPushedCancel:
1070       server_trailing_metadata_state_ =
1071           ServerTrailingMetadataState::kPulledCancel;
1072       server_trailing_metadata_waiter_.Wake();
1073       break;
1074     case ServerTrailingMetadataState::kNotPushed:
1075     case ServerTrailingMetadataState::kPulled:
1076     case ServerTrailingMetadataState::kPulledCancel:
1077       LOG(FATAL) << "PollServerTrailingMetadataAvailable completed twice; "
1078                  << GRPC_DUMP_ARGS(server_to_client_pull_state_,
1079                                    server_trailing_metadata_state_);
1080   }
1081   return Empty{};
1082 }
1083 
1084 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool
WasServerTrailingMetadataPulled()1085 CallState::WasServerTrailingMetadataPulled() const {
1086   switch (server_trailing_metadata_state_) {
1087     case ServerTrailingMetadataState::kNotPushed:
1088     case ServerTrailingMetadataState::kPushed:
1089     case ServerTrailingMetadataState::kPushedCancel:
1090       return false;
1091     case ServerTrailingMetadataState::kPulled:
1092     case ServerTrailingMetadataState::kPulledCancel:
1093       return true;
1094   }
1095   GPR_UNREACHABLE_CODE(Crash("unreachable"));
1096 }
1097 
1098 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>
PollWasCancelled()1099 CallState::PollWasCancelled() {
1100   GRPC_TRACE_LOG(call_state, INFO)
1101       << "[call_state] PollWasCancelled: "
1102       << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
1103   switch (server_trailing_metadata_state_) {
1104     case ServerTrailingMetadataState::kNotPushed:
1105     case ServerTrailingMetadataState::kPushed:
1106     case ServerTrailingMetadataState::kPushedCancel: {
1107       return server_trailing_metadata_waiter_.pending();
1108     }
1109     case ServerTrailingMetadataState::kPulled:
1110       return false;
1111     case ServerTrailingMetadataState::kPulledCancel:
1112       return true;
1113   }
1114   Crash("Unreachable");
1115 }
1116 
WasCancelledPushed()1117 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool CallState::WasCancelledPushed()
1118     const {
1119   GRPC_TRACE_LOG(call_state, INFO)
1120       << "[call_state] PollWasCancelledPushed: "
1121       << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
1122   switch (server_trailing_metadata_state_) {
1123     case ServerTrailingMetadataState::kNotPushed:
1124     case ServerTrailingMetadataState::kPulled:
1125     case ServerTrailingMetadataState::kPushed:
1126       return false;
1127     case ServerTrailingMetadataState::kPushedCancel:
1128     case ServerTrailingMetadataState::kPulledCancel:
1129       return true;
1130   }
1131   Crash("Unreachable");
1132 }
1133 
1134 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<Empty>
PollServerTrailingMetadataWasPushed()1135 CallState::PollServerTrailingMetadataWasPushed() {
1136   GRPC_TRACE_LOG(call_state, INFO)
1137       << "[call_state] PollWasCancelled: "
1138       << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
1139   switch (server_trailing_metadata_state_) {
1140     case ServerTrailingMetadataState::kNotPushed: {
1141       return server_trailing_metadata_waiter_.pending();
1142     }
1143     case ServerTrailingMetadataState::kPushed:
1144     case ServerTrailingMetadataState::kPushedCancel:
1145     case ServerTrailingMetadataState::kPulled:
1146     case ServerTrailingMetadataState::kPulledCancel:
1147       return Empty{};
1148   }
1149   Crash("Unreachable");
1150 }
1151 
1152 }  // namespace grpc_core
1153 
1154 #endif  // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
1155