1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
16 #define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
17
18 #include <grpc/support/port_platform.h>
19
20 #include "absl/types/optional.h"
21 #include "src/core/lib/debug/trace.h"
22 #include "src/core/lib/promise/activity.h"
23 #include "src/core/lib/promise/poll.h"
24 #include "src/core/lib/promise/status_flag.h"
25 #include "src/core/util/crash.h"
26
27 namespace grpc_core {
28
29 class CallState {
30 public:
31 CallState();
32
33 /////////////////////////////////////////////////////////////////////////////
34 // Misc events
35
36 // Start the call: allows pulls to proceed
37 void Start();
38
39 /////////////////////////////////////////////////////////////////////////////
40 // PUSH: client -> server
41
42 // Poll for the next message pull to be started.
43 // This can be used for flow control by waiting for the reader to request
44 // data, then providing flow control tokens to read, and finally pushing the
45 // message.
46 Poll<StatusFlag> PollPullClientToServerMessageStarted();
47
48 // Begin a message push.
49 void BeginPushClientToServerMessage();
50
51 // Poll for the push to be completed (up to FinishPullClientToServerMessage).
52 Poll<StatusFlag> PollPushClientToServerMessage();
53
54 // Note that the client has half-closed the stream.
55 void ClientToServerHalfClose();
56
57 /////////////////////////////////////////////////////////////////////////////
58 // PULL: client -> server
59
60 // Begin pulling client initial metadata.
61 void BeginPullClientInitialMetadata();
62 // Finish pulling client initial metadata.
63 void FinishPullClientInitialMetadata();
64 // Poll for the next message pull to be available.
65 // Resolves to true if a message is available, false if the call is
66 // half-closed, and Failure if the call is cancelled.
67 Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable();
68 // Finish pulling a message.
69 void FinishPullClientToServerMessage();
70
71 /////////////////////////////////////////////////////////////////////////////
72 // PUSH: server -> client
73
74 // Push server initial metadata (instantaneous).
75 StatusFlag PushServerInitialMetadata();
76 // Poll for the next message pull to be started.
77 // This can be used for flow control by waiting for the reader to request
78 // data, then providing flow control tokens to read, and finally pushing the
79 // message.
80 Poll<StatusFlag> PollPullServerToClientMessageStarted();
81 // Begin a message push.
82 void BeginPushServerToClientMessage();
83 // Poll for the push to be completed (up to FinishPullServerToClientMessage).
84 Poll<StatusFlag> PollPushServerToClientMessage();
85 // Push server trailing metadata.
86 // This is idempotent: only the first call will have any effect.
87 // Returns true if this is the first call.
88 bool PushServerTrailingMetadata(bool cancel);
89
90 /////////////////////////////////////////////////////////////////////////////
91 // PULL: server -> client
92
93 // Poll for initial metadata to be available.
94 Poll<bool> PollPullServerInitialMetadataAvailable();
95 // Finish pulling server initial metadata.
96 void FinishPullServerInitialMetadata();
97 // Poll for the next message pull to be available.
98 // Resolves to true if a message is available, false if trailing metadata is
99 // ready, and Failure if the call is cancelled.
100 Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
101 // Finish pulling a message.
102 void FinishPullServerToClientMessage();
103 // Poll for trailing metadata to be available.
104 Poll<Empty> PollServerTrailingMetadataAvailable();
105 // Instantaneously return true if server trailing metadata has been pulled.
106 bool WasServerTrailingMetadataPulled() const;
107 // Resolves after server trailing metadata has been pulled, to true if the
108 // call was cancelled, and false otherwise.
109 Poll<bool> PollWasCancelled();
110 // Return true if server trailing metadata has been pushed *and* that push was
111 // a cancellation.
112 bool WasCancelledPushed() const;
113 // Resolves after server trailing metadata has been pushed, regardless of
114 // whether the call was cancelled.
115 Poll<Empty> PollServerTrailingMetadataWasPushed();
116
117 /////////////////////////////////////////////////////////////////////////////
118 // Debug
119 std::string DebugString() const;
120
121 friend std::ostream& operator<<(std::ostream& out,
122 const CallState& call_state) {
123 return out << call_state.DebugString();
124 }
125
126 private:
127 enum class ClientToServerPullState : uint16_t {
128 // Ready to read: client initial metadata is there, but not yet processed
129 kBegin,
130 // Processing client initial metadata
131 kProcessingClientInitialMetadata,
132 // Main call loop: not reading
133 kIdle,
134 // Main call loop: reading but no message available
135 kReading,
136 // Main call loop: processing one message
137 kProcessingClientToServerMessage,
138 // Processing complete
139 kTerminated,
140 };
ClientToServerPullStateString(ClientToServerPullState state)141 static const char* ClientToServerPullStateString(
142 ClientToServerPullState state) {
143 switch (state) {
144 case ClientToServerPullState::kBegin:
145 return "Begin";
146 case ClientToServerPullState::kProcessingClientInitialMetadata:
147 return "ProcessingClientInitialMetadata";
148 case ClientToServerPullState::kIdle:
149 return "Idle";
150 case ClientToServerPullState::kReading:
151 return "Reading";
152 case ClientToServerPullState::kProcessingClientToServerMessage:
153 return "ProcessingClientToServerMessage";
154 case ClientToServerPullState::kTerminated:
155 return "Terminated";
156 }
157 }
158 template <typename Sink>
AbslStringify(Sink & out,ClientToServerPullState state)159 friend void AbslStringify(Sink& out, ClientToServerPullState state) {
160 out.Append(ClientToServerPullStateString(state));
161 }
162 friend std::ostream& operator<<(std::ostream& out,
163 ClientToServerPullState state) {
164 return out << ClientToServerPullStateString(state);
165 }
166 enum class ClientToServerPushState : uint16_t {
167 kIdle,
168 kPushedMessage,
169 kPushedHalfClose,
170 kPushedMessageAndHalfClosed,
171 kFinished,
172 };
ClientToServerPushStateString(ClientToServerPushState state)173 static const char* ClientToServerPushStateString(
174 ClientToServerPushState state) {
175 switch (state) {
176 case ClientToServerPushState::kIdle:
177 return "Idle";
178 case ClientToServerPushState::kPushedMessage:
179 return "PushedMessage";
180 case ClientToServerPushState::kPushedHalfClose:
181 return "PushedHalfClose";
182 case ClientToServerPushState::kPushedMessageAndHalfClosed:
183 return "PushedMessageAndHalfClosed";
184 case ClientToServerPushState::kFinished:
185 return "Finished";
186 }
187 }
188 template <typename Sink>
AbslStringify(Sink & out,ClientToServerPushState state)189 friend void AbslStringify(Sink& out, ClientToServerPushState state) {
190 out.Append(ClientToServerPushStateString(state));
191 }
192 friend std::ostream& operator<<(std::ostream& out,
193 ClientToServerPushState state) {
194 return out << ClientToServerPushStateString(state);
195 }
196 enum class ServerToClientPullState : uint16_t {
197 // Not yet started: cannot read
198 kUnstarted,
199 kUnstartedReading,
200 kStarted,
201 kStartedReading,
202 // Processing server initial metadata
203 kProcessingServerInitialMetadata,
204 kProcessingServerInitialMetadataReading,
205 // Main call loop: not reading
206 kIdle,
207 // Main call loop: reading but no message available
208 kReading,
209 // Main call loop: processing one message
210 kProcessingServerToClientMessage,
211 kTerminated,
212 };
ServerToClientPullStateString(ServerToClientPullState state)213 static const char* ServerToClientPullStateString(
214 ServerToClientPullState state) {
215 switch (state) {
216 case ServerToClientPullState::kUnstarted:
217 return "Unstarted";
218 case ServerToClientPullState::kUnstartedReading:
219 return "UnstartedReading";
220 case ServerToClientPullState::kStarted:
221 return "Started";
222 case ServerToClientPullState::kStartedReading:
223 return "StartedReading";
224 case ServerToClientPullState::kProcessingServerInitialMetadata:
225 return "ProcessingServerInitialMetadata";
226 case ServerToClientPullState::kProcessingServerInitialMetadataReading:
227 return "ProcessingServerInitialMetadataReading";
228 case ServerToClientPullState::kIdle:
229 return "Idle";
230 case ServerToClientPullState::kReading:
231 return "Reading";
232 case ServerToClientPullState::kProcessingServerToClientMessage:
233 return "ProcessingServerToClientMessage";
234 case ServerToClientPullState::kTerminated:
235 return "Terminated";
236 }
237 }
238 template <typename Sink>
AbslStringify(Sink & out,ServerToClientPullState state)239 friend void AbslStringify(Sink& out, ServerToClientPullState state) {
240 out.Append(ServerToClientPullStateString(state));
241 }
242 friend std::ostream& operator<<(std::ostream& out,
243 ServerToClientPullState state) {
244 return out << ServerToClientPullStateString(state);
245 }
246 enum class ServerToClientPushState : uint16_t {
247 kStart,
248 kPushedMessageWithoutInitialMetadata,
249 kPushedServerInitialMetadata,
250 kPushedServerInitialMetadataAndPushedMessage,
251 kTrailersOnly,
252 kIdle,
253 kPushedMessage,
254 kFinished,
255 };
ServerToClientPushStateString(ServerToClientPushState state)256 static const char* ServerToClientPushStateString(
257 ServerToClientPushState state) {
258 switch (state) {
259 case ServerToClientPushState::kStart:
260 return "Start";
261 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
262 return "PushedMessageWithoutInitialMetadata";
263 case ServerToClientPushState::kPushedServerInitialMetadata:
264 return "PushedServerInitialMetadata";
265 case ServerToClientPushState::
266 kPushedServerInitialMetadataAndPushedMessage:
267 return "PushedServerInitialMetadataAndPushedMessage";
268 case ServerToClientPushState::kTrailersOnly:
269 return "TrailersOnly";
270 case ServerToClientPushState::kIdle:
271 return "Idle";
272 case ServerToClientPushState::kPushedMessage:
273 return "PushedMessage";
274 case ServerToClientPushState::kFinished:
275 return "Finished";
276 }
277 }
278 template <typename Sink>
AbslStringify(Sink & out,ServerToClientPushState state)279 friend void AbslStringify(Sink& out, ServerToClientPushState state) {
280 out.Append(ServerToClientPushStateString(state));
281 }
282 friend std::ostream& operator<<(std::ostream& out,
283 ServerToClientPushState state) {
284 return out << ServerToClientPushStateString(state);
285 }
286 enum class ServerTrailingMetadataState : uint16_t {
287 kNotPushed,
288 kPushed,
289 kPushedCancel,
290 kPulled,
291 kPulledCancel,
292 };
ServerTrailingMetadataStateString(ServerTrailingMetadataState state)293 static const char* ServerTrailingMetadataStateString(
294 ServerTrailingMetadataState state) {
295 switch (state) {
296 case ServerTrailingMetadataState::kNotPushed:
297 return "NotPushed";
298 case ServerTrailingMetadataState::kPushed:
299 return "Pushed";
300 case ServerTrailingMetadataState::kPushedCancel:
301 return "PushedCancel";
302 case ServerTrailingMetadataState::kPulled:
303 return "Pulled";
304 case ServerTrailingMetadataState::kPulledCancel:
305 return "PulledCancel";
306 }
307 }
308 template <typename Sink>
AbslStringify(Sink & out,ServerTrailingMetadataState state)309 friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) {
310 out.Append(ServerTrailingMetadataStateString(state));
311 }
312 friend std::ostream& operator<<(std::ostream& out,
313 ServerTrailingMetadataState state) {
314 return out << ServerTrailingMetadataStateString(state);
315 }
316 ClientToServerPullState client_to_server_pull_state_ : 3;
317 ClientToServerPushState client_to_server_push_state_ : 3;
318 ServerToClientPullState server_to_client_pull_state_ : 4;
319 ServerToClientPushState server_to_client_push_state_ : 3;
320 ServerTrailingMetadataState server_trailing_metadata_state_ : 3;
321 IntraActivityWaiter client_to_server_pull_waiter_;
322 IntraActivityWaiter server_to_client_pull_waiter_;
323 IntraActivityWaiter client_to_server_push_waiter_;
324 IntraActivityWaiter server_to_client_push_waiter_;
325 IntraActivityWaiter server_trailing_metadata_waiter_;
326 };
327
CallState()328 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline CallState::CallState()
329 : client_to_server_pull_state_(ClientToServerPullState::kBegin),
330 client_to_server_push_state_(ClientToServerPushState::kIdle),
331 server_to_client_pull_state_(ServerToClientPullState::kUnstarted),
332 server_to_client_push_state_(ServerToClientPushState::kStart),
333 server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) {
334 }
335
Start()336 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void CallState::Start() {
337 GRPC_TRACE_LOG(call_state, INFO)
338 << "[call_state] Start: "
339 << GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
340 switch (server_to_client_pull_state_) {
341 case ServerToClientPullState::kUnstarted:
342 server_to_client_pull_state_ = ServerToClientPullState::kStarted;
343 server_to_client_pull_waiter_.Wake();
344 break;
345 case ServerToClientPullState::kUnstartedReading:
346 server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
347 server_to_client_pull_waiter_.Wake();
348 break;
349 case ServerToClientPullState::kStarted:
350 case ServerToClientPullState::kStartedReading:
351 case ServerToClientPullState::kProcessingServerInitialMetadata:
352 case ServerToClientPullState::kProcessingServerInitialMetadataReading:
353 case ServerToClientPullState::kIdle:
354 case ServerToClientPullState::kReading:
355 case ServerToClientPullState::kProcessingServerToClientMessage:
356 LOG(FATAL) << "Start called twice; "
357 << GRPC_DUMP_ARGS(server_to_client_pull_state_);
358 case ServerToClientPullState::kTerminated:
359 break;
360 }
361 }
362
363 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
BeginPushClientToServerMessage()364 CallState::BeginPushClientToServerMessage() {
365 GRPC_TRACE_LOG(call_state, INFO)
366 << "[call_state] BeginPushClientToServerMessage: "
367 << GRPC_DUMP_ARGS(this, client_to_server_push_state_,
368 client_to_server_push_waiter_);
369 switch (client_to_server_push_state_) {
370 case ClientToServerPushState::kIdle:
371 client_to_server_push_state_ = ClientToServerPushState::kPushedMessage;
372 client_to_server_push_waiter_.Wake();
373 break;
374 case ClientToServerPushState::kPushedMessage:
375 case ClientToServerPushState::kPushedMessageAndHalfClosed:
376 LOG(FATAL) << "PushClientToServerMessage called twice concurrently;"
377 << GRPC_DUMP_ARGS(client_to_server_push_state_);
378 break;
379 case ClientToServerPushState::kPushedHalfClose:
380 LOG(FATAL) << "PushClientToServerMessage called after half-close; "
381 << GRPC_DUMP_ARGS(client_to_server_push_state_);
382 break;
383 case ClientToServerPushState::kFinished:
384 break;
385 }
386 }
387
388 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
PollPushClientToServerMessage()389 CallState::PollPushClientToServerMessage() {
390 GRPC_TRACE_LOG(call_state, INFO)
391 << "[call_state] PollPushClientToServerMessage: "
392 << GRPC_DUMP_ARGS(this, client_to_server_push_state_);
393 switch (client_to_server_push_state_) {
394 case ClientToServerPushState::kIdle:
395 case ClientToServerPushState::kPushedHalfClose:
396 return Success{};
397 case ClientToServerPushState::kPushedMessage:
398 case ClientToServerPushState::kPushedMessageAndHalfClosed:
399 return client_to_server_push_waiter_.pending();
400 case ClientToServerPushState::kFinished:
401 return Failure{};
402 }
403 Crash("Unreachable");
404 }
405
406 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
ClientToServerHalfClose()407 CallState::ClientToServerHalfClose() {
408 GRPC_TRACE_LOG(call_state, INFO)
409 << "[call_state] ClientToServerHalfClose: "
410 << GRPC_DUMP_ARGS(this, client_to_server_push_state_);
411 switch (client_to_server_push_state_) {
412 case ClientToServerPushState::kIdle:
413 client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
414 client_to_server_push_waiter_.Wake();
415 break;
416 case ClientToServerPushState::kPushedMessage:
417 client_to_server_push_state_ =
418 ClientToServerPushState::kPushedMessageAndHalfClosed;
419 break;
420 case ClientToServerPushState::kPushedHalfClose:
421 case ClientToServerPushState::kPushedMessageAndHalfClosed:
422 LOG(FATAL) << "ClientToServerHalfClose called twice;"
423 << GRPC_DUMP_ARGS(client_to_server_push_state_);
424 break;
425 case ClientToServerPushState::kFinished:
426 break;
427 }
428 }
429
430 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
BeginPullClientInitialMetadata()431 CallState::BeginPullClientInitialMetadata() {
432 GRPC_TRACE_LOG(call_state, INFO)
433 << "[call_state] BeginPullClientInitialMetadata: "
434 << GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
435 switch (client_to_server_pull_state_) {
436 case ClientToServerPullState::kBegin:
437 client_to_server_pull_state_ =
438 ClientToServerPullState::kProcessingClientInitialMetadata;
439 break;
440 case ClientToServerPullState::kProcessingClientInitialMetadata:
441 case ClientToServerPullState::kIdle:
442 case ClientToServerPullState::kReading:
443 case ClientToServerPullState::kProcessingClientToServerMessage:
444 LOG(FATAL) << "BeginPullClientInitialMetadata called twice; "
445 << GRPC_DUMP_ARGS(client_to_server_pull_state_);
446 break;
447 case ClientToServerPullState::kTerminated:
448 break;
449 }
450 }
451
452 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
FinishPullClientInitialMetadata()453 CallState::FinishPullClientInitialMetadata() {
454 GRPC_TRACE_LOG(call_state, INFO)
455 << "[call_state] FinishPullClientInitialMetadata: "
456 << GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
457 switch (client_to_server_pull_state_) {
458 case ClientToServerPullState::kBegin:
459 LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin; "
460 << GRPC_DUMP_ARGS(client_to_server_pull_state_);
461 break;
462 case ClientToServerPullState::kProcessingClientInitialMetadata:
463 client_to_server_pull_state_ = ClientToServerPullState::kIdle;
464 client_to_server_pull_waiter_.Wake();
465 break;
466 case ClientToServerPullState::kIdle:
467 case ClientToServerPullState::kReading:
468 case ClientToServerPullState::kProcessingClientToServerMessage:
469 LOG(FATAL) << "Out of order FinishPullClientInitialMetadata"
470 << GRPC_DUMP_ARGS(client_to_server_pull_state_);
471 break;
472 case ClientToServerPullState::kTerminated:
473 break;
474 }
475 }
476
477 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>>
PollPullClientToServerMessageAvailable()478 CallState::PollPullClientToServerMessageAvailable() {
479 GRPC_TRACE_LOG(call_state, INFO)
480 << "[call_state] PollPullClientToServerMessageAvailable: "
481 << GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
482 client_to_server_push_state_);
483 switch (client_to_server_pull_state_) {
484 case ClientToServerPullState::kBegin:
485 case ClientToServerPullState::kProcessingClientInitialMetadata:
486 return client_to_server_pull_waiter_.pending();
487 case ClientToServerPullState::kIdle:
488 client_to_server_pull_state_ = ClientToServerPullState::kReading;
489 client_to_server_pull_waiter_.Wake();
490 ABSL_FALLTHROUGH_INTENDED;
491 case ClientToServerPullState::kReading:
492 break;
493 case ClientToServerPullState::kProcessingClientToServerMessage:
494 LOG(FATAL) << "PollPullClientToServerMessageAvailable called while "
495 "processing a message; "
496 << GRPC_DUMP_ARGS(client_to_server_pull_state_);
497 break;
498 case ClientToServerPullState::kTerminated:
499 return Failure{};
500 }
501 DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading);
502 switch (client_to_server_push_state_) {
503 case ClientToServerPushState::kIdle:
504 return client_to_server_push_waiter_.pending();
505 case ClientToServerPushState::kPushedMessage:
506 case ClientToServerPushState::kPushedMessageAndHalfClosed:
507 client_to_server_pull_state_ =
508 ClientToServerPullState::kProcessingClientToServerMessage;
509 return true;
510 case ClientToServerPushState::kPushedHalfClose:
511 return false;
512 case ClientToServerPushState::kFinished:
513 client_to_server_pull_state_ = ClientToServerPullState::kTerminated;
514 return Failure{};
515 }
516 Crash("Unreachable");
517 }
518
519 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
PollPullClientToServerMessageStarted()520 CallState::PollPullClientToServerMessageStarted() {
521 GRPC_TRACE_LOG(call_state, INFO)
522 << "[call_state] PollPullClientToServerMessageStarted: "
523 << GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
524 switch (client_to_server_pull_state_) {
525 case ClientToServerPullState::kBegin:
526 case ClientToServerPullState::kProcessingClientInitialMetadata:
527 case ClientToServerPullState::kIdle:
528 return client_to_server_pull_waiter_.pending();
529 case ClientToServerPullState::kReading:
530 case ClientToServerPullState::kProcessingClientToServerMessage:
531 return Success{};
532 case ClientToServerPullState::kTerminated:
533 return Failure{};
534 }
535 Crash("Unreachable");
536 }
537
538 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
FinishPullClientToServerMessage()539 CallState::FinishPullClientToServerMessage() {
540 GRPC_TRACE_LOG(call_state, INFO)
541 << "[call_state] FinishPullClientToServerMessage: "
542 << GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
543 client_to_server_push_state_);
544 switch (client_to_server_pull_state_) {
545 case ClientToServerPullState::kBegin:
546 case ClientToServerPullState::kProcessingClientInitialMetadata:
547 LOG(FATAL) << "FinishPullClientToServerMessage called before Begin; "
548 << GRPC_DUMP_ARGS(client_to_server_pull_state_,
549 client_to_server_push_state_);
550 break;
551 case ClientToServerPullState::kIdle:
552 LOG(FATAL) << "FinishPullClientToServerMessage called twice; "
553 << GRPC_DUMP_ARGS(client_to_server_pull_state_,
554 client_to_server_push_state_);
555 break;
556 case ClientToServerPullState::kReading:
557 LOG(FATAL) << "FinishPullClientToServerMessage called before "
558 "PollPullClientToServerMessageAvailable; "
559 << GRPC_DUMP_ARGS(client_to_server_pull_state_,
560 client_to_server_push_state_);
561 break;
562 case ClientToServerPullState::kProcessingClientToServerMessage:
563 client_to_server_pull_state_ = ClientToServerPullState::kIdle;
564 client_to_server_pull_waiter_.Wake();
565 break;
566 case ClientToServerPullState::kTerminated:
567 break;
568 }
569 switch (client_to_server_push_state_) {
570 case ClientToServerPushState::kPushedMessage:
571 client_to_server_push_state_ = ClientToServerPushState::kIdle;
572 client_to_server_push_waiter_.Wake();
573 break;
574 case ClientToServerPushState::kIdle:
575 case ClientToServerPushState::kPushedHalfClose:
576 LOG(FATAL) << "FinishPullClientToServerMessage called without a message; "
577 << GRPC_DUMP_ARGS(client_to_server_pull_state_,
578 client_to_server_push_state_);
579 break;
580 case ClientToServerPushState::kPushedMessageAndHalfClosed:
581 client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
582 client_to_server_push_waiter_.Wake();
583 break;
584 case ClientToServerPushState::kFinished:
585 break;
586 }
587 }
588
589 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline StatusFlag
PushServerInitialMetadata()590 CallState::PushServerInitialMetadata() {
591 GRPC_TRACE_LOG(call_state, INFO)
592 << "[call_state] PushServerInitialMetadata: "
593 << GRPC_DUMP_ARGS(this, server_to_client_push_state_,
594 server_trailing_metadata_state_);
595 if (server_trailing_metadata_state_ !=
596 ServerTrailingMetadataState::kNotPushed) {
597 return Failure{};
598 }
599 switch (server_to_client_push_state_) {
600 case ServerToClientPushState::kStart:
601 server_to_client_push_state_ =
602 ServerToClientPushState::kPushedServerInitialMetadata;
603 break;
604 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
605 server_to_client_push_state_ =
606 ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
607 break;
608 case ServerToClientPushState::kPushedServerInitialMetadata:
609 case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
610 case ServerToClientPushState::kTrailersOnly:
611 case ServerToClientPushState::kIdle:
612 case ServerToClientPushState::kPushedMessage:
613 LOG(FATAL) << "PushServerInitialMetadata called twice; "
614 << GRPC_DUMP_ARGS(server_to_client_push_state_);
615 break;
616 case ServerToClientPushState::kFinished:
617 break;
618 }
619 server_to_client_push_waiter_.Wake();
620 return Success{};
621 }
622
623 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
BeginPushServerToClientMessage()624 CallState::BeginPushServerToClientMessage() {
625 GRPC_TRACE_LOG(call_state, INFO)
626 << "[call_state] BeginPushServerToClientMessage: "
627 << GRPC_DUMP_ARGS(this, server_to_client_push_state_);
628 switch (server_to_client_push_state_) {
629 case ServerToClientPushState::kStart:
630 server_to_client_push_state_ =
631 ServerToClientPushState::kPushedMessageWithoutInitialMetadata;
632 break;
633 case ServerToClientPushState::kPushedServerInitialMetadata:
634 server_to_client_push_state_ =
635 ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
636 break;
637 case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
638 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
639 case ServerToClientPushState::kPushedMessage:
640 LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently; "
641 << GRPC_DUMP_ARGS(server_to_client_push_state_);
642 break;
643 case ServerToClientPushState::kTrailersOnly:
644 // Will fail in poll.
645 break;
646 case ServerToClientPushState::kIdle:
647 server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
648 server_to_client_push_waiter_.Wake();
649 break;
650 case ServerToClientPushState::kFinished:
651 break;
652 }
653 }
654
655 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
PollPushServerToClientMessage()656 CallState::PollPushServerToClientMessage() {
657 GRPC_TRACE_LOG(call_state, INFO)
658 << "[call_state] PollPushServerToClientMessage: "
659 << GRPC_DUMP_ARGS(this, server_to_client_push_state_);
660 switch (server_to_client_push_state_) {
661 case ServerToClientPushState::kStart:
662 case ServerToClientPushState::kPushedServerInitialMetadata:
663 LOG(FATAL) << "PollPushServerToClientMessage called before "
664 << "PushServerInitialMetadata; "
665 << GRPC_DUMP_ARGS(server_to_client_push_state_);
666 case ServerToClientPushState::kTrailersOnly:
667 return false;
668 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
669 case ServerToClientPushState::kPushedMessage:
670 case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
671 return server_to_client_push_waiter_.pending();
672 case ServerToClientPushState::kIdle:
673 return Success{};
674 case ServerToClientPushState::kFinished:
675 return Failure{};
676 }
677 Crash("Unreachable");
678 }
679
680 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool
PushServerTrailingMetadata(bool cancel)681 CallState::PushServerTrailingMetadata(bool cancel) {
682 GRPC_TRACE_LOG(call_state, INFO)
683 << "[call_state] PushServerTrailingMetadata: "
684 << GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_,
685 server_to_client_push_state_,
686 client_to_server_push_state_,
687 server_trailing_metadata_waiter_);
688 if (server_trailing_metadata_state_ !=
689 ServerTrailingMetadataState::kNotPushed) {
690 return false;
691 }
692 server_trailing_metadata_state_ =
693 cancel ? ServerTrailingMetadataState::kPushedCancel
694 : ServerTrailingMetadataState::kPushed;
695 server_trailing_metadata_waiter_.Wake();
696 switch (server_to_client_push_state_) {
697 case ServerToClientPushState::kStart:
698 server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly;
699 server_to_client_push_waiter_.Wake();
700 break;
701 case ServerToClientPushState::kPushedServerInitialMetadata:
702 case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
703 case ServerToClientPushState::kPushedMessage:
704 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
705 if (cancel) {
706 server_to_client_push_state_ = ServerToClientPushState::kFinished;
707 server_to_client_push_waiter_.Wake();
708 }
709 break;
710 case ServerToClientPushState::kIdle:
711 if (cancel) {
712 server_to_client_push_state_ = ServerToClientPushState::kFinished;
713 server_to_client_push_waiter_.Wake();
714 }
715 break;
716 case ServerToClientPushState::kFinished:
717 case ServerToClientPushState::kTrailersOnly:
718 break;
719 }
720 switch (client_to_server_push_state_) {
721 case ClientToServerPushState::kIdle:
722 client_to_server_push_state_ = ClientToServerPushState::kFinished;
723 client_to_server_push_waiter_.Wake();
724 break;
725 case ClientToServerPushState::kPushedMessage:
726 case ClientToServerPushState::kPushedMessageAndHalfClosed:
727 client_to_server_push_state_ = ClientToServerPushState::kFinished;
728 client_to_server_push_waiter_.Wake();
729 break;
730 case ClientToServerPushState::kPushedHalfClose:
731 case ClientToServerPushState::kFinished:
732 break;
733 }
734 return true;
735 }
736
737 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>
PollPullServerInitialMetadataAvailable()738 CallState::PollPullServerInitialMetadataAvailable() {
739 GRPC_TRACE_LOG(call_state, INFO)
740 << "[call_state] PollPullServerInitialMetadataAvailable: "
741 << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
742 server_to_client_push_state_);
743 bool reading;
744 switch (server_to_client_pull_state_) {
745 case ServerToClientPullState::kUnstarted:
746 case ServerToClientPullState::kUnstartedReading:
747 if (server_to_client_push_state_ ==
748 ServerToClientPushState::kTrailersOnly) {
749 server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
750 return false;
751 }
752 server_to_client_push_waiter_.pending();
753 return server_to_client_pull_waiter_.pending();
754 case ServerToClientPullState::kStartedReading:
755 reading = true;
756 break;
757 case ServerToClientPullState::kStarted:
758 reading = false;
759 break;
760 case ServerToClientPullState::kProcessingServerInitialMetadata:
761 case ServerToClientPullState::kProcessingServerInitialMetadataReading:
762 case ServerToClientPullState::kIdle:
763 case ServerToClientPullState::kReading:
764 case ServerToClientPullState::kProcessingServerToClientMessage:
765 LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice; "
766 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
767 server_to_client_push_state_);
768 case ServerToClientPullState::kTerminated:
769 return false;
770 }
771 DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted ||
772 server_to_client_pull_state_ ==
773 ServerToClientPullState::kStartedReading)
774 << server_to_client_pull_state_;
775 switch (server_to_client_push_state_) {
776 case ServerToClientPushState::kStart:
777 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
778 return server_to_client_push_waiter_.pending();
779 case ServerToClientPushState::kPushedServerInitialMetadata:
780 case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
781 server_to_client_pull_state_ =
782 reading
783 ? ServerToClientPullState::kProcessingServerInitialMetadataReading
784 : ServerToClientPullState::kProcessingServerInitialMetadata;
785 server_to_client_pull_waiter_.Wake();
786 return true;
787 case ServerToClientPushState::kIdle:
788 case ServerToClientPushState::kPushedMessage:
789 LOG(FATAL)
790 << "PollPullServerInitialMetadataAvailable after metadata processed; "
791 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
792 server_to_client_push_state_);
793 case ServerToClientPushState::kFinished:
794 server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
795 server_to_client_pull_waiter_.Wake();
796 return false;
797 case ServerToClientPushState::kTrailersOnly:
798 return false;
799 }
800 Crash("Unreachable");
801 }
802
803 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
FinishPullServerInitialMetadata()804 CallState::FinishPullServerInitialMetadata() {
805 GRPC_TRACE_LOG(call_state, INFO)
806 << "[call_state] FinishPullServerInitialMetadata: "
807 << GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
808 switch (server_to_client_pull_state_) {
809 case ServerToClientPullState::kUnstarted:
810 case ServerToClientPullState::kUnstartedReading:
811 LOG(FATAL) << "FinishPullServerInitialMetadata called before Start";
812 case ServerToClientPullState::kStarted:
813 case ServerToClientPullState::kStartedReading:
814 CHECK_EQ(server_to_client_push_state_,
815 ServerToClientPushState::kTrailersOnly);
816 return;
817 case ServerToClientPullState::kProcessingServerInitialMetadata:
818 server_to_client_pull_state_ = ServerToClientPullState::kIdle;
819 server_to_client_pull_waiter_.Wake();
820 break;
821 case ServerToClientPullState::kProcessingServerInitialMetadataReading:
822 server_to_client_pull_state_ = ServerToClientPullState::kReading;
823 server_to_client_pull_waiter_.Wake();
824 break;
825 case ServerToClientPullState::kIdle:
826 case ServerToClientPullState::kReading:
827 case ServerToClientPullState::kProcessingServerToClientMessage:
828 LOG(FATAL) << "Out of order FinishPullServerInitialMetadata; "
829 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
830 server_to_client_push_state_);
831 case ServerToClientPullState::kTerminated:
832 return;
833 }
834 DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle ||
835 server_to_client_pull_state_ == ServerToClientPullState::kReading)
836 << server_to_client_pull_state_;
837 switch (server_to_client_push_state_) {
838 case ServerToClientPushState::kStart:
839 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
840 LOG(FATAL) << "FinishPullServerInitialMetadata called before initial "
841 "metadata consumed; "
842 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
843 server_to_client_push_state_);
844 case ServerToClientPushState::kPushedServerInitialMetadata:
845 server_to_client_push_state_ = ServerToClientPushState::kIdle;
846 server_to_client_push_waiter_.Wake();
847 break;
848 case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
849 server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
850 server_to_client_push_waiter_.Wake();
851 break;
852 case ServerToClientPushState::kIdle:
853 case ServerToClientPushState::kPushedMessage:
854 case ServerToClientPushState::kTrailersOnly:
855 case ServerToClientPushState::kFinished:
856 LOG(FATAL) << "FinishPullServerInitialMetadata called twice; "
857 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
858 server_to_client_push_state_);
859 }
860 }
861
862 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>>
PollPullServerToClientMessageAvailable()863 CallState::PollPullServerToClientMessageAvailable() {
864 GRPC_TRACE_LOG(call_state, INFO)
865 << "[call_state] PollPullServerToClientMessageAvailable: "
866 << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
867 server_to_client_push_state_,
868 server_trailing_metadata_state_);
869 switch (server_to_client_pull_state_) {
870 case ServerToClientPullState::kUnstarted:
871 server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading;
872 return server_to_client_pull_waiter_.pending();
873 case ServerToClientPullState::kProcessingServerInitialMetadata:
874 server_to_client_pull_state_ =
875 ServerToClientPullState::kProcessingServerInitialMetadataReading;
876 return server_to_client_pull_waiter_.pending();
877 case ServerToClientPullState::kUnstartedReading:
878 case ServerToClientPullState::kProcessingServerInitialMetadataReading:
879 return server_to_client_pull_waiter_.pending();
880 case ServerToClientPullState::kStarted:
881 server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
882 ABSL_FALLTHROUGH_INTENDED;
883 case ServerToClientPullState::kStartedReading:
884 if (server_to_client_push_state_ ==
885 ServerToClientPushState::kTrailersOnly) {
886 return false;
887 }
888 return server_to_client_pull_waiter_.pending();
889 case ServerToClientPullState::kIdle:
890 server_to_client_pull_state_ = ServerToClientPullState::kReading;
891 server_to_client_pull_waiter_.Wake();
892 ABSL_FALLTHROUGH_INTENDED;
893 case ServerToClientPullState::kReading:
894 break;
895 case ServerToClientPullState::kProcessingServerToClientMessage:
896 LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
897 "processing a message; "
898 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
899 server_to_client_push_state_);
900 case ServerToClientPullState::kTerminated:
901 return Failure{};
902 }
903 DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading);
904 switch (server_to_client_push_state_) {
905 case ServerToClientPushState::kStart:
906 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
907 case ServerToClientPushState::kPushedServerInitialMetadata:
908 case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
909 return server_to_client_push_waiter_.pending();
910 case ServerToClientPushState::kIdle:
911 if (server_trailing_metadata_state_ !=
912 ServerTrailingMetadataState::kNotPushed) {
913 return false;
914 }
915 server_trailing_metadata_waiter_.pending();
916 return server_to_client_push_waiter_.pending();
917 case ServerToClientPushState::kTrailersOnly:
918 DCHECK_NE(server_trailing_metadata_state_,
919 ServerTrailingMetadataState::kNotPushed);
920 return false;
921 case ServerToClientPushState::kPushedMessage:
922 server_to_client_pull_state_ =
923 ServerToClientPullState::kProcessingServerToClientMessage;
924 server_to_client_pull_waiter_.Wake();
925 return true;
926 case ServerToClientPushState::kFinished:
927 server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
928 server_to_client_pull_waiter_.Wake();
929 return Failure{};
930 }
931 Crash("Unreachable");
932 }
933
934 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
PollPullServerToClientMessageStarted()935 CallState::PollPullServerToClientMessageStarted() {
936 GRPC_TRACE_LOG(call_state, INFO)
937 << "[call_state] PollPullClientToServerMessageStarted: "
938 << GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
939 switch (server_to_client_pull_state_) {
940 case ServerToClientPullState::kUnstarted:
941 case ServerToClientPullState::kUnstartedReading:
942 case ServerToClientPullState::kStarted:
943 case ServerToClientPullState::kProcessingServerInitialMetadata:
944 case ServerToClientPullState::kProcessingServerInitialMetadataReading:
945 case ServerToClientPullState::kIdle:
946 return server_to_client_pull_waiter_.pending();
947 case ServerToClientPullState::kStartedReading:
948 case ServerToClientPullState::kReading:
949 case ServerToClientPullState::kProcessingServerToClientMessage:
950 return Success{};
951 case ServerToClientPullState::kTerminated:
952 return Failure{};
953 }
954 Crash("Unreachable");
955 }
956
957 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
FinishPullServerToClientMessage()958 CallState::FinishPullServerToClientMessage() {
959 GRPC_TRACE_LOG(call_state, INFO)
960 << "[call_state] FinishPullServerToClientMessage: "
961 << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
962 server_to_client_push_state_);
963 switch (server_to_client_pull_state_) {
964 case ServerToClientPullState::kUnstarted:
965 case ServerToClientPullState::kUnstartedReading:
966 case ServerToClientPullState::kStarted:
967 case ServerToClientPullState::kStartedReading:
968 case ServerToClientPullState::kProcessingServerInitialMetadata:
969 case ServerToClientPullState::kProcessingServerInitialMetadataReading:
970 LOG(FATAL) << "FinishPullServerToClientMessage called before metadata "
971 "available; "
972 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
973 server_to_client_push_state_);
974 case ServerToClientPullState::kIdle:
975 LOG(FATAL) << "FinishPullServerToClientMessage called twice; "
976 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
977 server_to_client_push_state_);
978 case ServerToClientPullState::kReading:
979 LOG(FATAL) << "FinishPullServerToClientMessage called before "
980 << "PollPullServerToClientMessageAvailable; "
981 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
982 server_to_client_push_state_);
983 case ServerToClientPullState::kProcessingServerToClientMessage:
984 server_to_client_pull_state_ = ServerToClientPullState::kIdle;
985 server_to_client_pull_waiter_.Wake();
986 break;
987 case ServerToClientPullState::kTerminated:
988 break;
989 }
990 switch (server_to_client_push_state_) {
991 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
992 case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
993 case ServerToClientPushState::kPushedServerInitialMetadata:
994 case ServerToClientPushState::kStart:
995 LOG(FATAL) << "FinishPullServerToClientMessage called before initial "
996 "metadata consumed; "
997 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
998 server_to_client_push_state_);
999 case ServerToClientPushState::kTrailersOnly:
1000 LOG(FATAL) << "FinishPullServerToClientMessage called after "
1001 "PushServerTrailingMetadata; "
1002 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
1003 server_to_client_push_state_);
1004 case ServerToClientPushState::kPushedMessage:
1005 server_to_client_push_state_ = ServerToClientPushState::kIdle;
1006 server_to_client_push_waiter_.Wake();
1007 break;
1008 case ServerToClientPushState::kIdle:
1009 LOG(FATAL) << "FinishPullServerToClientMessage called without a message; "
1010 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
1011 server_to_client_push_state_);
1012 case ServerToClientPushState::kFinished:
1013 break;
1014 }
1015 }
1016
1017 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<Empty>
PollServerTrailingMetadataAvailable()1018 CallState::PollServerTrailingMetadataAvailable() {
1019 GRPC_TRACE_LOG(call_state, INFO)
1020 << "[call_state] PollServerTrailingMetadataAvailable: "
1021 << GRPC_DUMP_ARGS(
1022 this, server_to_client_pull_state_, server_to_client_push_state_,
1023 server_trailing_metadata_state_, server_trailing_metadata_waiter_);
1024 switch (server_to_client_pull_state_) {
1025 case ServerToClientPullState::kProcessingServerInitialMetadata:
1026 case ServerToClientPullState::kProcessingServerToClientMessage:
1027 case ServerToClientPullState::kProcessingServerInitialMetadataReading:
1028 case ServerToClientPullState::kUnstartedReading:
1029 return server_to_client_pull_waiter_.pending();
1030 case ServerToClientPullState::kStartedReading:
1031 case ServerToClientPullState::kReading:
1032 switch (server_to_client_push_state_) {
1033 case ServerToClientPushState::kTrailersOnly:
1034 case ServerToClientPushState::kIdle:
1035 case ServerToClientPushState::kStart:
1036 case ServerToClientPushState::kFinished:
1037 if (server_trailing_metadata_state_ !=
1038 ServerTrailingMetadataState::kNotPushed) {
1039 break; // Ready for processing
1040 }
1041 ABSL_FALLTHROUGH_INTENDED;
1042 case ServerToClientPushState::kPushedMessageWithoutInitialMetadata:
1043 case ServerToClientPushState::kPushedServerInitialMetadata:
1044 case ServerToClientPushState::
1045 kPushedServerInitialMetadataAndPushedMessage:
1046 case ServerToClientPushState::kPushedMessage:
1047 server_to_client_push_waiter_.pending();
1048 return server_to_client_pull_waiter_.pending();
1049 }
1050 break;
1051 case ServerToClientPullState::kStarted:
1052 case ServerToClientPullState::kUnstarted:
1053 case ServerToClientPullState::kIdle:
1054 if (server_trailing_metadata_state_ !=
1055 ServerTrailingMetadataState::kNotPushed) {
1056 break; // Ready for processing
1057 }
1058 return server_trailing_metadata_waiter_.pending();
1059 case ServerToClientPullState::kTerminated:
1060 break;
1061 }
1062 server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
1063 server_to_client_pull_waiter_.Wake();
1064 switch (server_trailing_metadata_state_) {
1065 case ServerTrailingMetadataState::kPushed:
1066 server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
1067 server_trailing_metadata_waiter_.Wake();
1068 break;
1069 case ServerTrailingMetadataState::kPushedCancel:
1070 server_trailing_metadata_state_ =
1071 ServerTrailingMetadataState::kPulledCancel;
1072 server_trailing_metadata_waiter_.Wake();
1073 break;
1074 case ServerTrailingMetadataState::kNotPushed:
1075 case ServerTrailingMetadataState::kPulled:
1076 case ServerTrailingMetadataState::kPulledCancel:
1077 LOG(FATAL) << "PollServerTrailingMetadataAvailable completed twice; "
1078 << GRPC_DUMP_ARGS(server_to_client_pull_state_,
1079 server_trailing_metadata_state_);
1080 }
1081 return Empty{};
1082 }
1083
1084 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool
WasServerTrailingMetadataPulled()1085 CallState::WasServerTrailingMetadataPulled() const {
1086 switch (server_trailing_metadata_state_) {
1087 case ServerTrailingMetadataState::kNotPushed:
1088 case ServerTrailingMetadataState::kPushed:
1089 case ServerTrailingMetadataState::kPushedCancel:
1090 return false;
1091 case ServerTrailingMetadataState::kPulled:
1092 case ServerTrailingMetadataState::kPulledCancel:
1093 return true;
1094 }
1095 GPR_UNREACHABLE_CODE(Crash("unreachable"));
1096 }
1097
1098 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>
PollWasCancelled()1099 CallState::PollWasCancelled() {
1100 GRPC_TRACE_LOG(call_state, INFO)
1101 << "[call_state] PollWasCancelled: "
1102 << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
1103 switch (server_trailing_metadata_state_) {
1104 case ServerTrailingMetadataState::kNotPushed:
1105 case ServerTrailingMetadataState::kPushed:
1106 case ServerTrailingMetadataState::kPushedCancel: {
1107 return server_trailing_metadata_waiter_.pending();
1108 }
1109 case ServerTrailingMetadataState::kPulled:
1110 return false;
1111 case ServerTrailingMetadataState::kPulledCancel:
1112 return true;
1113 }
1114 Crash("Unreachable");
1115 }
1116
WasCancelledPushed()1117 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool CallState::WasCancelledPushed()
1118 const {
1119 GRPC_TRACE_LOG(call_state, INFO)
1120 << "[call_state] PollWasCancelledPushed: "
1121 << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
1122 switch (server_trailing_metadata_state_) {
1123 case ServerTrailingMetadataState::kNotPushed:
1124 case ServerTrailingMetadataState::kPulled:
1125 case ServerTrailingMetadataState::kPushed:
1126 return false;
1127 case ServerTrailingMetadataState::kPushedCancel:
1128 case ServerTrailingMetadataState::kPulledCancel:
1129 return true;
1130 }
1131 Crash("Unreachable");
1132 }
1133
1134 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<Empty>
PollServerTrailingMetadataWasPushed()1135 CallState::PollServerTrailingMetadataWasPushed() {
1136 GRPC_TRACE_LOG(call_state, INFO)
1137 << "[call_state] PollWasCancelled: "
1138 << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
1139 switch (server_trailing_metadata_state_) {
1140 case ServerTrailingMetadataState::kNotPushed: {
1141 return server_trailing_metadata_waiter_.pending();
1142 }
1143 case ServerTrailingMetadataState::kPushed:
1144 case ServerTrailingMetadataState::kPushedCancel:
1145 case ServerTrailingMetadataState::kPulled:
1146 case ServerTrailingMetadataState::kPulledCancel:
1147 return Empty{};
1148 }
1149 Crash("Unreachable");
1150 }
1151
1152 } // namespace grpc_core
1153
1154 #endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
1155