1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/surface/client_call.h"
16
17 #include <grpc/byte_buffer.h>
18 #include <grpc/compression.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/call.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/port_platform.h>
29 #include <grpc/support/string_util.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <stdlib.h>
33 #include <string.h>
34
35 #include <algorithm>
36 #include <atomic>
37 #include <cstdint>
38 #include <memory>
39 #include <string>
40 #include <utility>
41
42 #include "absl/log/check.h"
43 #include "absl/status/status.h"
44 #include "absl/strings/string_view.h"
45 #include "src/core/lib/event_engine/event_engine_context.h"
46 #include "src/core/lib/promise/all_ok.h"
47 #include "src/core/lib/promise/status_flag.h"
48 #include "src/core/lib/promise/try_seq.h"
49 #include "src/core/lib/resource_quota/arena.h"
50 #include "src/core/lib/slice/slice_buffer.h"
51 #include "src/core/lib/surface/completion_queue.h"
52 #include "src/core/lib/transport/metadata.h"
53 #include "src/core/telemetry/stats.h"
54 #include "src/core/telemetry/stats_data.h"
55 #include "src/core/util/bitset.h"
56 #include "src/core/util/crash.h"
57 #include "src/core/util/latent_see.h"
58 #include "src/core/util/ref_counted.h"
59 #include "src/core/util/ref_counted_ptr.h"
60
61 namespace grpc_core {
62
63 namespace {
64
ValidateClientBatch(const grpc_op * ops,size_t nops)65 grpc_call_error ValidateClientBatch(const grpc_op* ops, size_t nops) {
66 BitSet<8> got_ops;
67 for (size_t op_idx = 0; op_idx < nops; op_idx++) {
68 const grpc_op& op = ops[op_idx];
69 switch (op.op) {
70 case GRPC_OP_SEND_INITIAL_METADATA:
71 if (!AreInitialMetadataFlagsValid(op.flags)) {
72 return GRPC_CALL_ERROR_INVALID_FLAGS;
73 }
74 if (!ValidateMetadata(op.data.send_initial_metadata.count,
75 op.data.send_initial_metadata.metadata)) {
76 return GRPC_CALL_ERROR_INVALID_METADATA;
77 }
78 break;
79 case GRPC_OP_SEND_MESSAGE:
80 if (!AreWriteFlagsValid(op.flags)) {
81 return GRPC_CALL_ERROR_INVALID_FLAGS;
82 }
83 break;
84 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
85 case GRPC_OP_RECV_INITIAL_METADATA:
86 case GRPC_OP_RECV_MESSAGE:
87 case GRPC_OP_RECV_STATUS_ON_CLIENT:
88 if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
89 break;
90 case GRPC_OP_RECV_CLOSE_ON_SERVER:
91 case GRPC_OP_SEND_STATUS_FROM_SERVER:
92 return GRPC_CALL_ERROR_NOT_ON_CLIENT;
93 }
94 if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
95 got_ops.set(op.op);
96 }
97 return GRPC_CALL_OK;
98 }
99
100 } // namespace
101
ClientCall(grpc_call *,uint32_t,grpc_completion_queue * cq,Slice path,absl::optional<Slice> authority,bool registered_method,Timestamp deadline,grpc_compression_options compression_options,RefCountedPtr<Arena> arena,RefCountedPtr<UnstartedCallDestination> destination)102 ClientCall::ClientCall(grpc_call*, uint32_t, grpc_completion_queue* cq,
103 Slice path, absl::optional<Slice> authority,
104 bool registered_method, Timestamp deadline,
105 grpc_compression_options compression_options,
106 RefCountedPtr<Arena> arena,
107 RefCountedPtr<UnstartedCallDestination> destination)
108 : Call(false, deadline, std::move(arena)),
109 DualRefCounted("ClientCall"),
110 cq_(cq),
111 call_destination_(std::move(destination)),
112 compression_options_(compression_options) {
113 global_stats().IncrementClientCallsCreated();
114 send_initial_metadata_->Set(HttpPathMetadata(), std::move(path));
115 if (authority.has_value()) {
116 send_initial_metadata_->Set(HttpAuthorityMetadata(), std::move(*authority));
117 }
118 send_initial_metadata_->Set(
119 GrpcRegisteredMethod(),
120 reinterpret_cast<void*>(static_cast<uintptr_t>(registered_method)));
121 if (deadline != Timestamp::InfFuture()) {
122 send_initial_metadata_->Set(GrpcTimeoutMetadata(), deadline);
123 UpdateDeadline(deadline);
124 }
125 }
126
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)127 grpc_call_error ClientCall::StartBatch(const grpc_op* ops, size_t nops,
128 void* notify_tag,
129 bool is_notify_tag_closure) {
130 GRPC_LATENT_SEE_PARENT_SCOPE("ClientCall::StartBatch");
131 if (nops == 0) {
132 EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
133 return GRPC_CALL_OK;
134 }
135 const grpc_call_error validation_result = ValidateClientBatch(ops, nops);
136 if (validation_result != GRPC_CALL_OK) {
137 return validation_result;
138 }
139 CommitBatch(ops, nops, notify_tag, is_notify_tag_closure);
140 return GRPC_CALL_OK;
141 }
142
CancelWithError(grpc_error_handle error)143 void ClientCall::CancelWithError(grpc_error_handle error) {
144 cancel_status_.Set(new absl::Status(error));
145 auto cur_state = call_state_.load(std::memory_order_acquire);
146 while (true) {
147 GRPC_TRACE_LOG(call, INFO)
148 << DebugTag() << "CancelWithError " << GRPC_DUMP_ARGS(cur_state, error);
149 switch (cur_state) {
150 case kCancelled:
151 return;
152 case kUnstarted:
153 if (call_state_.compare_exchange_strong(cur_state, kCancelled,
154 std::memory_order_acq_rel,
155 std::memory_order_acquire)) {
156 ResetDeadline();
157 return;
158 }
159 break;
160 case kStarted:
161 started_call_initiator_.SpawnInfallible(
162 "CancelWithError", [self = WeakRefAsSubclass<ClientCall>(),
163 error = std::move(error)]() mutable {
164 self->started_call_initiator_.Cancel(std::move(error));
165 });
166 return;
167 default:
168 if (call_state_.compare_exchange_strong(cur_state, kCancelled,
169 std::memory_order_acq_rel,
170 std::memory_order_acquire)) {
171 ResetDeadline();
172 auto* unordered_start = reinterpret_cast<UnorderedStart*>(cur_state);
173 while (unordered_start != nullptr) {
174 auto next = unordered_start->next;
175 delete unordered_start;
176 unordered_start = next;
177 }
178 return;
179 }
180 }
181 }
182 }
183
184 template <typename Batch>
ScheduleCommittedBatch(Batch batch)185 void ClientCall::ScheduleCommittedBatch(Batch batch) {
186 GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::ScheduleCommittedBatch");
187 auto cur_state = call_state_.load(std::memory_order_acquire);
188 while (true) {
189 switch (cur_state) {
190 case kUnstarted:
191 default: { // UnorderedStart
192 auto pending = std::make_unique<UnorderedStart>();
193 pending->start_pending_batch = [this,
194 batch = std::move(batch)]() mutable {
195 started_call_initiator_.SpawnInfallible(
196 "batch",
197 GRPC_LATENT_SEE_PROMISE("ClientCallBatch", std::move(batch)));
198 };
199 while (true) {
200 pending->next = reinterpret_cast<UnorderedStart*>(cur_state);
201 if (call_state_.compare_exchange_strong(
202 cur_state, reinterpret_cast<uintptr_t>(pending.get()),
203 std::memory_order_acq_rel, std::memory_order_acquire)) {
204 std::ignore = pending.release();
205 return;
206 }
207 if (cur_state == kStarted) {
208 pending->start_pending_batch();
209 return;
210 }
211 if (cur_state == kCancelled) {
212 return;
213 }
214 }
215 }
216 case kStarted:
217 started_call_initiator_.SpawnInfallible(
218 "batch",
219 GRPC_LATENT_SEE_PROMISE("ClientCallBatch", std::move(batch)));
220 return;
221 case kCancelled:
222 return;
223 }
224 }
225 }
226
StartCall(const grpc_op & send_initial_metadata_op)227 Party::WakeupHold ClientCall::StartCall(
228 const grpc_op& send_initial_metadata_op) {
229 GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::StartCall");
230 auto cur_state = call_state_.load(std::memory_order_acquire);
231 CToMetadata(send_initial_metadata_op.data.send_initial_metadata.metadata,
232 send_initial_metadata_op.data.send_initial_metadata.count,
233 send_initial_metadata_.get());
234 PrepareOutgoingInitialMetadata(send_initial_metadata_op,
235 *send_initial_metadata_);
236 auto call = MakeCallPair(std::move(send_initial_metadata_), arena()->Ref());
237 started_call_initiator_ = std::move(call.initiator);
238 Party::WakeupHold wakeup_hold{started_call_initiator_.party()};
239 while (!StartCallMaybeUpdateState(cur_state, call.handler)) {
240 }
241 return wakeup_hold;
242 }
243
StartCallMaybeUpdateState(uintptr_t & cur_state,UnstartedCallHandler & handler)244 bool ClientCall::StartCallMaybeUpdateState(uintptr_t& cur_state,
245 UnstartedCallHandler& handler) {
246 GRPC_TRACE_LOG(call, INFO)
247 << DebugTag() << "StartCall " << GRPC_DUMP_ARGS(cur_state);
248 switch (cur_state) {
249 case kUnstarted:
250 if (call_state_.compare_exchange_strong(cur_state, kStarted,
251 std::memory_order_acq_rel,
252 std::memory_order_acquire)) {
253 call_destination_->StartCall(std::move(handler));
254 return true;
255 }
256 return false;
257 case kStarted:
258 Crash("StartCall called twice"); // probably we crash earlier...
259 case kCancelled:
260 return true;
261 default: { // UnorderedStart
262 if (call_state_.compare_exchange_strong(cur_state, kStarted,
263 std::memory_order_acq_rel,
264 std::memory_order_acquire)) {
265 call_destination_->StartCall(std::move(handler));
266 auto unordered_start = reinterpret_cast<UnorderedStart*>(cur_state);
267 while (unordered_start->next != nullptr) {
268 unordered_start->start_pending_batch();
269 auto next = unordered_start->next;
270 delete unordered_start;
271 unordered_start = next;
272 }
273 return true;
274 }
275 return false;
276 }
277 }
278 }
279
CommitBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)280 void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
281 bool is_notify_tag_closure) {
282 GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::CommitBatch");
283 if (nops == 1 && ops[0].op == GRPC_OP_SEND_INITIAL_METADATA) {
284 StartCall(ops[0]);
285 EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
286 return;
287 }
288 if (!is_notify_tag_closure) grpc_cq_begin_op(cq_, notify_tag);
289 BatchOpIndex op_index(ops, nops);
290 auto send_message =
291 op_index.OpHandler<GRPC_OP_SEND_MESSAGE>([this](const grpc_op& op) {
292 SliceBuffer send;
293 grpc_slice_buffer_swap(
294 &op.data.send_message.send_message->data.raw.slice_buffer,
295 send.c_slice_buffer());
296 auto msg = arena()->MakePooled<Message>(std::move(send), op.flags);
297 return [this, msg = std::move(msg)]() mutable {
298 return started_call_initiator_.PushMessage(std::move(msg));
299 };
300 });
301 auto send_close_from_client =
302 op_index.OpHandler<GRPC_OP_SEND_CLOSE_FROM_CLIENT>(
303 [this](const grpc_op&) {
304 return [this]() {
305 started_call_initiator_.FinishSends();
306 return Success{};
307 };
308 });
309 auto recv_message =
310 op_index.OpHandler<GRPC_OP_RECV_MESSAGE>([this](const grpc_op& op) {
311 return message_receiver_.MakeBatchOp(op, &started_call_initiator_);
312 });
313 auto recv_initial_metadata =
314 op_index.OpHandler<GRPC_OP_RECV_INITIAL_METADATA>([this](
315 const grpc_op& op) {
316 return [this,
317 array = op.data.recv_initial_metadata.recv_initial_metadata]() {
318 return Map(
319 started_call_initiator_.PullServerInitialMetadata(),
320 [this,
321 array](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
322 ServerMetadataHandle metadata;
323 if (!md.ok() || !md->has_value()) {
324 is_trailers_only_ = true;
325 metadata = Arena::MakePooledForOverwrite<ServerMetadata>();
326 } else {
327 metadata = std::move(md->value());
328 is_trailers_only_ =
329 metadata->get(GrpcTrailersOnly()).value_or(false);
330 }
331 ProcessIncomingInitialMetadata(*metadata);
332 PublishMetadataArray(metadata.get(), array, true);
333 received_initial_metadata_ = std::move(metadata);
334 return Success{};
335 });
336 };
337 });
338 auto primary_ops = AllOk<StatusFlag>(
339 TrySeq(std::move(send_message), std::move(send_close_from_client)),
340 TrySeq(std::move(recv_initial_metadata), std::move(recv_message)));
341 Party::WakeupHold wakeup_hold;
342 if (const grpc_op* op = op_index.op(GRPC_OP_SEND_INITIAL_METADATA)) {
343 wakeup_hold = StartCall(*op);
344 }
345 if (const grpc_op* op = op_index.op(GRPC_OP_RECV_STATUS_ON_CLIENT)) {
346 auto out_status = op->data.recv_status_on_client.status;
347 auto out_status_details = op->data.recv_status_on_client.status_details;
348 auto out_error_string = op->data.recv_status_on_client.error_string;
349 auto out_trailing_metadata =
350 op->data.recv_status_on_client.trailing_metadata;
351 auto make_read_trailing_metadata = [self = WeakRef(), out_status,
352 out_status_details, out_error_string,
353 out_trailing_metadata]() {
354 return Map(self->started_call_initiator_.PullServerTrailingMetadata(),
355 [self, out_status, out_status_details, out_error_string,
356 out_trailing_metadata](
357 ServerMetadataHandle server_trailing_metadata) {
358 self->OnReceivedStatus(std::move(server_trailing_metadata),
359 out_status, out_status_details,
360 out_error_string,
361 out_trailing_metadata);
362 return Success{};
363 });
364 };
365 ScheduleCommittedBatch(InfallibleBatch(
366 std::move(primary_ops),
367 OpHandler<GRPC_OP_RECV_STATUS_ON_CLIENT>(OnCancelFactory(
368 std::move(make_read_trailing_metadata),
369 [this, out_status, out_status_details, out_error_string,
370 out_trailing_metadata]() {
371 auto* status = cancel_status_.Get();
372 CHECK_NE(status, nullptr);
373 *out_status = static_cast<grpc_status_code>(status->code());
374 *out_status_details =
375 Slice::FromCopiedString(status->message()).TakeCSlice();
376 if (out_error_string != nullptr) {
377 *out_error_string = nullptr;
378 }
379 out_trailing_metadata->count = 0;
380 })),
381 is_notify_tag_closure, notify_tag, cq_));
382 } else {
383 ScheduleCommittedBatch(FallibleBatch(
384 std::move(primary_ops), is_notify_tag_closure, notify_tag, cq_));
385 }
386 }
387
OnReceivedStatus(ServerMetadataHandle server_trailing_metadata,grpc_status_code * out_status,grpc_slice * out_status_details,const char ** out_error_string,grpc_metadata_array * out_trailing_metadata)388 void ClientCall::OnReceivedStatus(ServerMetadataHandle server_trailing_metadata,
389 grpc_status_code* out_status,
390 grpc_slice* out_status_details,
391 const char** out_error_string,
392 grpc_metadata_array* out_trailing_metadata) {
393 saw_trailing_metadata_.store(true, std::memory_order_relaxed);
394 ResetDeadline();
395 GRPC_TRACE_LOG(call, INFO) << DebugTag() << "RecvStatusOnClient "
396 << server_trailing_metadata->DebugString();
397 const auto status = server_trailing_metadata->get(GrpcStatusMetadata())
398 .value_or(GRPC_STATUS_UNKNOWN);
399 *out_status = status;
400 Slice message_slice;
401 if (Slice* message =
402 server_trailing_metadata->get_pointer(GrpcMessageMetadata())) {
403 message_slice = message->Ref();
404 }
405 *out_status_details = message_slice.TakeCSlice();
406 if (out_error_string != nullptr) {
407 if (status != GRPC_STATUS_OK) {
408 *out_error_string =
409 gpr_strdup(MakeErrorString(server_trailing_metadata.get()).c_str());
410 } else {
411 *out_error_string = nullptr;
412 }
413 }
414 PublishMetadataArray(server_trailing_metadata.get(), out_trailing_metadata,
415 true);
416 received_trailing_metadata_ = std::move(server_trailing_metadata);
417 }
418
GetPeer()419 char* ClientCall::GetPeer() {
420 Slice peer_slice = GetPeerString();
421 if (!peer_slice.empty()) {
422 absl::string_view peer_string_view = peer_slice.as_string_view();
423 char* peer_string =
424 static_cast<char*>(gpr_malloc(peer_string_view.size() + 1));
425 memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
426 peer_string[peer_string_view.size()] = '\0';
427 return peer_string;
428 }
429 return gpr_strdup("unknown");
430 }
431
MakeClientCall(grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * cq,Slice path,absl::optional<Slice> authority,bool registered_method,Timestamp deadline,grpc_compression_options compression_options,RefCountedPtr<Arena> arena,RefCountedPtr<UnstartedCallDestination> destination)432 grpc_call* MakeClientCall(grpc_call* parent_call, uint32_t propagation_mask,
433 grpc_completion_queue* cq, Slice path,
434 absl::optional<Slice> authority,
435 bool registered_method, Timestamp deadline,
436 grpc_compression_options compression_options,
437 RefCountedPtr<Arena> arena,
438 RefCountedPtr<UnstartedCallDestination> destination) {
439 DCHECK_NE(arena.get(), nullptr);
440 DCHECK_NE(arena->GetContext<grpc_event_engine::experimental::EventEngine>(),
441 nullptr);
442 return arena
443 ->New<ClientCall>(parent_call, propagation_mask, cq, std::move(path),
444 std::move(authority), registered_method, deadline,
445 compression_options, arena, destination)
446 ->c_ptr();
447 }
448
449 } // namespace grpc_core
450