• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/transport/call_spine.h"
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "absl/functional/any_invocable.h"
20 #include "src/core/lib/event_engine/event_engine_context.h"
21 #include "src/core/lib/promise/for_each.h"
22 #include "src/core/lib/promise/try_seq.h"
23 
24 namespace grpc_core {
25 
ForwardCall(CallHandler call_handler,CallInitiator call_initiator,absl::AnyInvocable<void (ServerMetadata &)> on_server_trailing_metadata_from_initiator)26 void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
27                  absl::AnyInvocable<void(ServerMetadata&)>
28                      on_server_trailing_metadata_from_initiator) {
29   call_handler.AddChildCall(call_initiator);
30   // Read messages from handler into initiator.
31   call_handler.SpawnInfallible("read_messages", [call_handler,
32                                                  call_initiator]() mutable {
33     return Seq(ForEach(MessagesFrom(call_handler),
34                        [call_initiator](MessageHandle msg) mutable {
35                          // Need to spawn a job into the initiator's activity to
36                          // push the message in.
37                          return call_initiator.SpawnPushMessage(std::move(msg));
38                        }),
39                [call_initiator](StatusFlag result) mutable {
40                  if (result.ok()) {
41                    call_initiator.SpawnFinishSends();
42                  }
43                });
44   });
45   call_initiator.SpawnInfallible(
46       "read_the_things",
47       [call_initiator, call_handler,
48        on_server_trailing_metadata_from_initiator =
49            std::move(on_server_trailing_metadata_from_initiator)]() mutable {
50         return Seq(
51             call_initiator.CancelIfFails(
52                 TrySeq(call_initiator.PullServerInitialMetadata(),
53                        [call_handler, call_initiator](
54                            absl::optional<ServerMetadataHandle> md) mutable {
55                          const bool has_md = md.has_value();
56                          return If(
57                              has_md,
58                              [&call_handler, &call_initiator,
59                               md = std::move(md)]() mutable {
60                                call_handler.SpawnPushServerInitialMetadata(
61                                    std::move(*md));
62                                return ForEach(
63                                    MessagesFrom(call_initiator),
64                                    [call_handler](MessageHandle msg) mutable {
65                                      return call_handler.SpawnPushMessage(
66                                          std::move(msg));
67                                    });
68                              },
69                              []() -> StatusFlag { return Success{}; });
70                        })),
71             call_initiator.PullServerTrailingMetadata(),
72             [call_handler,
73              on_server_trailing_metadata_from_initiator =
74                  std::move(on_server_trailing_metadata_from_initiator)](
75                 ServerMetadataHandle md) mutable {
76               on_server_trailing_metadata_from_initiator(*md);
77               call_handler.SpawnPushServerTrailingMetadata(std::move(md));
78             });
79       });
80 }
81 
MakeCallPair(ClientMetadataHandle client_initial_metadata,RefCountedPtr<Arena> arena)82 CallInitiatorAndHandler MakeCallPair(
83     ClientMetadataHandle client_initial_metadata, RefCountedPtr<Arena> arena) {
84   DCHECK_NE(arena.get(), nullptr);
85   DCHECK_NE(arena->GetContext<grpc_event_engine::experimental::EventEngine>(),
86             nullptr);
87   auto spine =
88       CallSpine::Create(std::move(client_initial_metadata), std::move(arena));
89   return {CallInitiator(spine), UnstartedCallHandler(spine)};
90 }
91 
92 }  // namespace grpc_core
93