1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/surface/server_call.h"
16
17 #include <grpc/byte_buffer.h>
18 #include <grpc/compression.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/call.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/port_platform.h>
29 #include <grpc/support/string_util.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <stdlib.h>
33 #include <string.h>
34
35 #include <memory>
36 #include <string>
37 #include <utility>
38
39 #include "absl/log/check.h"
40 #include "absl/strings/string_view.h"
41 #include "src/core/lib/promise/all_ok.h"
42 #include "src/core/lib/promise/map.h"
43 #include "src/core/lib/promise/poll.h"
44 #include "src/core/lib/promise/status_flag.h"
45 #include "src/core/lib/promise/try_seq.h"
46 #include "src/core/lib/resource_quota/arena.h"
47 #include "src/core/lib/slice/slice_buffer.h"
48 #include "src/core/lib/surface/completion_queue.h"
49 #include "src/core/lib/transport/metadata.h"
50 #include "src/core/lib/transport/metadata_batch.h"
51 #include "src/core/server/server_interface.h"
52 #include "src/core/util/bitset.h"
53 #include "src/core/util/latent_see.h"
54
55 namespace grpc_core {
56
57 namespace {
58
ValidateServerBatch(const grpc_op * ops,size_t nops)59 grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) {
60 BitSet<8> got_ops;
61 for (size_t op_idx = 0; op_idx < nops; op_idx++) {
62 const grpc_op& op = ops[op_idx];
63 switch (op.op) {
64 case GRPC_OP_SEND_INITIAL_METADATA:
65 if (!AreInitialMetadataFlagsValid(op.flags)) {
66 return GRPC_CALL_ERROR_INVALID_FLAGS;
67 }
68 if (!ValidateMetadata(op.data.send_initial_metadata.count,
69 op.data.send_initial_metadata.metadata)) {
70 return GRPC_CALL_ERROR_INVALID_METADATA;
71 }
72 break;
73 case GRPC_OP_SEND_MESSAGE:
74 if (!AreWriteFlagsValid(op.flags)) {
75 return GRPC_CALL_ERROR_INVALID_FLAGS;
76 }
77 break;
78 case GRPC_OP_SEND_STATUS_FROM_SERVER:
79 if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
80 if (!ValidateMetadata(
81 op.data.send_status_from_server.trailing_metadata_count,
82 op.data.send_status_from_server.trailing_metadata)) {
83 return GRPC_CALL_ERROR_INVALID_METADATA;
84 }
85 break;
86 case GRPC_OP_RECV_MESSAGE:
87 case GRPC_OP_RECV_CLOSE_ON_SERVER:
88 if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
89 break;
90 case GRPC_OP_RECV_INITIAL_METADATA:
91 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
92 case GRPC_OP_RECV_STATUS_ON_CLIENT:
93 return GRPC_CALL_ERROR_NOT_ON_SERVER;
94 }
95 if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
96 got_ops.set(op.op);
97 }
98 return GRPC_CALL_OK;
99 }
100
101 } // namespace
102
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)103 grpc_call_error ServerCall::StartBatch(const grpc_op* ops, size_t nops,
104 void* notify_tag,
105 bool is_notify_tag_closure) {
106 if (nops == 0) {
107 EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
108 return GRPC_CALL_OK;
109 }
110 const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
111 if (validation_result != GRPC_CALL_OK) {
112 return validation_result;
113 }
114 CommitBatch(ops, nops, notify_tag, is_notify_tag_closure);
115 return GRPC_CALL_OK;
116 }
117
CommitBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)118 void ServerCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
119 bool is_notify_tag_closure) {
120 BatchOpIndex op_index(ops, nops);
121 if (!is_notify_tag_closure) grpc_cq_begin_op(cq_, notify_tag);
122
123 auto commit_with_send_ops = [&](auto send_ops) {
124 auto recv_message =
125 op_index.OpHandler<GRPC_OP_RECV_MESSAGE>([this](const grpc_op& op) {
126 return message_receiver_.MakeBatchOp(op, &call_handler_);
127 });
128 auto primary_ops =
129 AllOk<StatusFlag>(std::move(send_ops), std::move(recv_message));
130 if (auto* op = op_index.op(GRPC_OP_RECV_CLOSE_ON_SERVER)) {
131 auto recv_trailing_metadata = OpHandler<GRPC_OP_RECV_CLOSE_ON_SERVER>(
132 [this, cancelled = op->data.recv_close_on_server.cancelled]() {
133 return Map(call_handler_.WasCancelled(),
134 [cancelled, this](bool result) -> Success {
135 saw_was_cancelled_.store(true,
136 std::memory_order_relaxed);
137 ResetDeadline();
138 *cancelled = result ? 1 : 0;
139 return Success{};
140 });
141 });
142 call_handler_.SpawnInfallible(
143 "final-batch",
144 GRPC_LATENT_SEE_PROMISE(
145 "ServerCallBatch",
146 InfallibleBatch(std::move(primary_ops),
147 std::move(recv_trailing_metadata),
148 is_notify_tag_closure, notify_tag, cq_)));
149 } else {
150 call_handler_.SpawnInfallible(
151 "batch", GRPC_LATENT_SEE_PROMISE(
152 "ServerCallBatch",
153 FallibleBatch(std::move(primary_ops),
154 is_notify_tag_closure, notify_tag, cq_)));
155 }
156 };
157
158 auto make_send_trailing_metadata = [this](const grpc_op& op) {
159 auto metadata = arena()->MakePooled<ServerMetadata>();
160 CToMetadata(op.data.send_status_from_server.trailing_metadata,
161 op.data.send_status_from_server.trailing_metadata_count,
162 metadata.get());
163 metadata->Set(GrpcStatusMetadata(), op.data.send_status_from_server.status);
164 if (auto* details = op.data.send_status_from_server.status_details) {
165 // TODO(ctiller): this should not be a copy, but we have
166 // callers that allocate and pass in a slice created with
167 // grpc_slice_from_static_string and then delete the string
168 // after passing it in, which shouldn't be a supported API.
169 metadata->Set(GrpcMessageMetadata(), Slice(grpc_slice_copy(*details)));
170 }
171 CHECK(metadata != nullptr);
172 return [this, metadata = std::move(metadata)]() mutable {
173 CHECK(metadata != nullptr);
174 return [this, metadata = std::move(metadata)]() mutable -> Poll<Success> {
175 CHECK(metadata != nullptr);
176 call_handler_.PushServerTrailingMetadata(std::move(metadata));
177 return Success{};
178 };
179 };
180 };
181
182 if (op_index.has_op(GRPC_OP_SEND_INITIAL_METADATA) &&
183 op_index.has_op(GRPC_OP_SEND_STATUS_FROM_SERVER) &&
184 !op_index.has_op(GRPC_OP_SEND_MESSAGE) &&
185 op_index.op(GRPC_OP_SEND_INITIAL_METADATA)
186 ->data.send_initial_metadata.count == 0) {
187 const auto& trailing_metadata =
188 *op_index.op(GRPC_OP_SEND_STATUS_FROM_SERVER);
189 commit_with_send_ops(OpHandler<GRPC_OP_SEND_STATUS_FROM_SERVER>(
190 make_send_trailing_metadata(trailing_metadata)));
191 } else {
192 auto send_initial_metadata =
193 op_index.OpHandler<GRPC_OP_SEND_INITIAL_METADATA>(
194 [this](const grpc_op& op) {
195 auto metadata = arena()->MakePooled<ServerMetadata>();
196 PrepareOutgoingInitialMetadata(op, *metadata);
197 CToMetadata(op.data.send_initial_metadata.metadata,
198 op.data.send_initial_metadata.count, metadata.get());
199 GRPC_TRACE_LOG(call, INFO)
200 << DebugTag() << "[call] Send initial metadata";
201 return [this, metadata = std::move(metadata)]() mutable {
202 return call_handler_.PushServerInitialMetadata(
203 std::move(metadata));
204 };
205 });
206 auto send_message =
207 op_index.OpHandler<GRPC_OP_SEND_MESSAGE>([this](const grpc_op& op) {
208 SliceBuffer send;
209 grpc_slice_buffer_swap(
210 &op.data.send_message.send_message->data.raw.slice_buffer,
211 send.c_slice_buffer());
212 auto msg = arena()->MakePooled<Message>(std::move(send), op.flags);
213 return [this, msg = std::move(msg)]() mutable {
214 return call_handler_.PushMessage(std::move(msg));
215 };
216 });
217 auto send_trailing_metadata =
218 op_index.OpHandler<GRPC_OP_SEND_STATUS_FROM_SERVER>(
219 make_send_trailing_metadata);
220 commit_with_send_ops(
221 TrySeq(AllOk<StatusFlag>(std::move(send_initial_metadata),
222 std::move(send_message)),
223 std::move(send_trailing_metadata)));
224 }
225 }
226
MakeServerCall(CallHandler call_handler,ClientMetadataHandle client_initial_metadata,ServerInterface * server,grpc_completion_queue * cq,grpc_metadata_array * publish_initial_metadata)227 grpc_call* MakeServerCall(CallHandler call_handler,
228 ClientMetadataHandle client_initial_metadata,
229 ServerInterface* server, grpc_completion_queue* cq,
230 grpc_metadata_array* publish_initial_metadata) {
231 PublishMetadataArray(client_initial_metadata.get(), publish_initial_metadata,
232 false);
233 // TODO(ctiller): ideally we'd put this in the arena with the CallHandler,
234 // but there's an ownership problem: CallHandler owns the arena, and so would
235 // get destroyed before the base class Call destructor runs, leading to
236 // UB/crash. Investigate another path.
237 return (new ServerCall(std::move(client_initial_metadata),
238 std::move(call_handler), server, cq))
239 ->c_ptr();
240 }
241
242 } // namespace grpc_core
243