1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/transport/call_spine.h"
16
17 #include <grpc/support/port_platform.h>
18
19 #include "absl/functional/any_invocable.h"
20 #include "src/core/lib/event_engine/event_engine_context.h"
21 #include "src/core/lib/promise/for_each.h"
22 #include "src/core/lib/promise/try_seq.h"
23
24 namespace grpc_core {
25
ForwardCall(CallHandler call_handler,CallInitiator call_initiator,absl::AnyInvocable<void (ServerMetadata &)> on_server_trailing_metadata_from_initiator)26 void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
27 absl::AnyInvocable<void(ServerMetadata&)>
28 on_server_trailing_metadata_from_initiator) {
29 call_handler.AddChildCall(call_initiator);
30 // Read messages from handler into initiator.
31 call_handler.SpawnInfallible("read_messages", [call_handler,
32 call_initiator]() mutable {
33 return Seq(ForEach(MessagesFrom(call_handler),
34 [call_initiator](MessageHandle msg) mutable {
35 // Need to spawn a job into the initiator's activity to
36 // push the message in.
37 return call_initiator.SpawnPushMessage(std::move(msg));
38 }),
39 [call_initiator](StatusFlag result) mutable {
40 if (result.ok()) {
41 call_initiator.SpawnFinishSends();
42 }
43 });
44 });
45 call_initiator.SpawnInfallible(
46 "read_the_things",
47 [call_initiator, call_handler,
48 on_server_trailing_metadata_from_initiator =
49 std::move(on_server_trailing_metadata_from_initiator)]() mutable {
50 return Seq(
51 call_initiator.CancelIfFails(
52 TrySeq(call_initiator.PullServerInitialMetadata(),
53 [call_handler, call_initiator](
54 absl::optional<ServerMetadataHandle> md) mutable {
55 const bool has_md = md.has_value();
56 return If(
57 has_md,
58 [&call_handler, &call_initiator,
59 md = std::move(md)]() mutable {
60 call_handler.SpawnPushServerInitialMetadata(
61 std::move(*md));
62 return ForEach(
63 MessagesFrom(call_initiator),
64 [call_handler](MessageHandle msg) mutable {
65 return call_handler.SpawnPushMessage(
66 std::move(msg));
67 });
68 },
69 []() -> StatusFlag { return Success{}; });
70 })),
71 call_initiator.PullServerTrailingMetadata(),
72 [call_handler,
73 on_server_trailing_metadata_from_initiator =
74 std::move(on_server_trailing_metadata_from_initiator)](
75 ServerMetadataHandle md) mutable {
76 on_server_trailing_metadata_from_initiator(*md);
77 call_handler.SpawnPushServerTrailingMetadata(std::move(md));
78 });
79 });
80 }
81
MakeCallPair(ClientMetadataHandle client_initial_metadata,RefCountedPtr<Arena> arena)82 CallInitiatorAndHandler MakeCallPair(
83 ClientMetadataHandle client_initial_metadata, RefCountedPtr<Arena> arena) {
84 DCHECK_NE(arena.get(), nullptr);
85 DCHECK_NE(arena->GetContext<grpc_event_engine::experimental::EventEngine>(),
86 nullptr);
87 auto spine =
88 CallSpine::Create(std::move(client_initial_metadata), std::move(arena));
89 return {CallInitiator(spine), UnstartedCallHandler(spine)};
90 }
91
92 } // namespace grpc_core
93