• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/surface/server_call.h"
16 
17 #include <grpc/byte_buffer.h>
18 #include <grpc/compression.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/call.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/port_platform.h>
29 #include <grpc/support/string_util.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <stdlib.h>
33 #include <string.h>
34 
35 #include <memory>
36 #include <string>
37 #include <utility>
38 
39 #include "absl/log/check.h"
40 #include "absl/strings/string_view.h"
41 #include "src/core/lib/promise/all_ok.h"
42 #include "src/core/lib/promise/map.h"
43 #include "src/core/lib/promise/poll.h"
44 #include "src/core/lib/promise/status_flag.h"
45 #include "src/core/lib/promise/try_seq.h"
46 #include "src/core/lib/resource_quota/arena.h"
47 #include "src/core/lib/slice/slice_buffer.h"
48 #include "src/core/lib/surface/completion_queue.h"
49 #include "src/core/lib/transport/metadata.h"
50 #include "src/core/lib/transport/metadata_batch.h"
51 #include "src/core/server/server_interface.h"
52 #include "src/core/util/bitset.h"
53 #include "src/core/util/latent_see.h"
54 
55 namespace grpc_core {
56 
57 namespace {
58 
ValidateServerBatch(const grpc_op * ops,size_t nops)59 grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) {
60   BitSet<8> got_ops;
61   for (size_t op_idx = 0; op_idx < nops; op_idx++) {
62     const grpc_op& op = ops[op_idx];
63     switch (op.op) {
64       case GRPC_OP_SEND_INITIAL_METADATA:
65         if (!AreInitialMetadataFlagsValid(op.flags)) {
66           return GRPC_CALL_ERROR_INVALID_FLAGS;
67         }
68         if (!ValidateMetadata(op.data.send_initial_metadata.count,
69                               op.data.send_initial_metadata.metadata)) {
70           return GRPC_CALL_ERROR_INVALID_METADATA;
71         }
72         break;
73       case GRPC_OP_SEND_MESSAGE:
74         if (!AreWriteFlagsValid(op.flags)) {
75           return GRPC_CALL_ERROR_INVALID_FLAGS;
76         }
77         break;
78       case GRPC_OP_SEND_STATUS_FROM_SERVER:
79         if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
80         if (!ValidateMetadata(
81                 op.data.send_status_from_server.trailing_metadata_count,
82                 op.data.send_status_from_server.trailing_metadata)) {
83           return GRPC_CALL_ERROR_INVALID_METADATA;
84         }
85         break;
86       case GRPC_OP_RECV_MESSAGE:
87       case GRPC_OP_RECV_CLOSE_ON_SERVER:
88         if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
89         break;
90       case GRPC_OP_RECV_INITIAL_METADATA:
91       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
92       case GRPC_OP_RECV_STATUS_ON_CLIENT:
93         return GRPC_CALL_ERROR_NOT_ON_SERVER;
94     }
95     if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
96     got_ops.set(op.op);
97   }
98   return GRPC_CALL_OK;
99 }
100 
101 }  // namespace
102 
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)103 grpc_call_error ServerCall::StartBatch(const grpc_op* ops, size_t nops,
104                                        void* notify_tag,
105                                        bool is_notify_tag_closure) {
106   if (nops == 0) {
107     EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
108     return GRPC_CALL_OK;
109   }
110   const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
111   if (validation_result != GRPC_CALL_OK) {
112     return validation_result;
113   }
114   CommitBatch(ops, nops, notify_tag, is_notify_tag_closure);
115   return GRPC_CALL_OK;
116 }
117 
CommitBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)118 void ServerCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
119                              bool is_notify_tag_closure) {
120   BatchOpIndex op_index(ops, nops);
121   if (!is_notify_tag_closure) grpc_cq_begin_op(cq_, notify_tag);
122 
123   auto commit_with_send_ops = [&](auto send_ops) {
124     auto recv_message =
125         op_index.OpHandler<GRPC_OP_RECV_MESSAGE>([this](const grpc_op& op) {
126           return message_receiver_.MakeBatchOp(op, &call_handler_);
127         });
128     auto primary_ops =
129         AllOk<StatusFlag>(std::move(send_ops), std::move(recv_message));
130     if (auto* op = op_index.op(GRPC_OP_RECV_CLOSE_ON_SERVER)) {
131       auto recv_trailing_metadata = OpHandler<GRPC_OP_RECV_CLOSE_ON_SERVER>(
132           [this, cancelled = op->data.recv_close_on_server.cancelled]() {
133             return Map(call_handler_.WasCancelled(),
134                        [cancelled, this](bool result) -> Success {
135                          saw_was_cancelled_.store(true,
136                                                   std::memory_order_relaxed);
137                          ResetDeadline();
138                          *cancelled = result ? 1 : 0;
139                          return Success{};
140                        });
141           });
142       call_handler_.SpawnInfallible(
143           "final-batch",
144           GRPC_LATENT_SEE_PROMISE(
145               "ServerCallBatch",
146               InfallibleBatch(std::move(primary_ops),
147                               std::move(recv_trailing_metadata),
148                               is_notify_tag_closure, notify_tag, cq_)));
149     } else {
150       call_handler_.SpawnInfallible(
151           "batch", GRPC_LATENT_SEE_PROMISE(
152                        "ServerCallBatch",
153                        FallibleBatch(std::move(primary_ops),
154                                      is_notify_tag_closure, notify_tag, cq_)));
155     }
156   };
157 
158   auto make_send_trailing_metadata = [this](const grpc_op& op) {
159     auto metadata = arena()->MakePooled<ServerMetadata>();
160     CToMetadata(op.data.send_status_from_server.trailing_metadata,
161                 op.data.send_status_from_server.trailing_metadata_count,
162                 metadata.get());
163     metadata->Set(GrpcStatusMetadata(), op.data.send_status_from_server.status);
164     if (auto* details = op.data.send_status_from_server.status_details) {
165       // TODO(ctiller): this should not be a copy, but we have
166       // callers that allocate and pass in a slice created with
167       // grpc_slice_from_static_string and then delete the string
168       // after passing it in, which shouldn't be a supported API.
169       metadata->Set(GrpcMessageMetadata(), Slice(grpc_slice_copy(*details)));
170     }
171     CHECK(metadata != nullptr);
172     return [this, metadata = std::move(metadata)]() mutable {
173       CHECK(metadata != nullptr);
174       return [this, metadata = std::move(metadata)]() mutable -> Poll<Success> {
175         CHECK(metadata != nullptr);
176         call_handler_.PushServerTrailingMetadata(std::move(metadata));
177         return Success{};
178       };
179     };
180   };
181 
182   if (op_index.has_op(GRPC_OP_SEND_INITIAL_METADATA) &&
183       op_index.has_op(GRPC_OP_SEND_STATUS_FROM_SERVER) &&
184       !op_index.has_op(GRPC_OP_SEND_MESSAGE) &&
185       op_index.op(GRPC_OP_SEND_INITIAL_METADATA)
186               ->data.send_initial_metadata.count == 0) {
187     const auto& trailing_metadata =
188         *op_index.op(GRPC_OP_SEND_STATUS_FROM_SERVER);
189     commit_with_send_ops(OpHandler<GRPC_OP_SEND_STATUS_FROM_SERVER>(
190         make_send_trailing_metadata(trailing_metadata)));
191   } else {
192     auto send_initial_metadata =
193         op_index.OpHandler<GRPC_OP_SEND_INITIAL_METADATA>(
194             [this](const grpc_op& op) {
195               auto metadata = arena()->MakePooled<ServerMetadata>();
196               PrepareOutgoingInitialMetadata(op, *metadata);
197               CToMetadata(op.data.send_initial_metadata.metadata,
198                           op.data.send_initial_metadata.count, metadata.get());
199               GRPC_TRACE_LOG(call, INFO)
200                   << DebugTag() << "[call] Send initial metadata";
201               return [this, metadata = std::move(metadata)]() mutable {
202                 return call_handler_.PushServerInitialMetadata(
203                     std::move(metadata));
204               };
205             });
206     auto send_message =
207         op_index.OpHandler<GRPC_OP_SEND_MESSAGE>([this](const grpc_op& op) {
208           SliceBuffer send;
209           grpc_slice_buffer_swap(
210               &op.data.send_message.send_message->data.raw.slice_buffer,
211               send.c_slice_buffer());
212           auto msg = arena()->MakePooled<Message>(std::move(send), op.flags);
213           return [this, msg = std::move(msg)]() mutable {
214             return call_handler_.PushMessage(std::move(msg));
215           };
216         });
217     auto send_trailing_metadata =
218         op_index.OpHandler<GRPC_OP_SEND_STATUS_FROM_SERVER>(
219             make_send_trailing_metadata);
220     commit_with_send_ops(
221         TrySeq(AllOk<StatusFlag>(std::move(send_initial_metadata),
222                                  std::move(send_message)),
223                std::move(send_trailing_metadata)));
224   }
225 }
226 
MakeServerCall(CallHandler call_handler,ClientMetadataHandle client_initial_metadata,ServerInterface * server,grpc_completion_queue * cq,grpc_metadata_array * publish_initial_metadata)227 grpc_call* MakeServerCall(CallHandler call_handler,
228                           ClientMetadataHandle client_initial_metadata,
229                           ServerInterface* server, grpc_completion_queue* cq,
230                           grpc_metadata_array* publish_initial_metadata) {
231   PublishMetadataArray(client_initial_metadata.get(), publish_initial_metadata,
232                        false);
233   // TODO(ctiller): ideally we'd put this in the arena with the CallHandler,
234   // but there's an ownership problem: CallHandler owns the arena, and so would
235   // get destroyed before the base class Call destructor runs, leading to
236   // UB/crash. Investigate another path.
237   return (new ServerCall(std::move(client_initial_metadata),
238                          std::move(call_handler), server, cq))
239       ->c_ptr();
240 }
241 
242 }  // namespace grpc_core
243