• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/surface/client_call.h"
16 
17 #include <grpc/byte_buffer.h>
18 #include <grpc/compression.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/call.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/port_platform.h>
29 #include <grpc/support/string_util.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <stdlib.h>
33 #include <string.h>
34 
35 #include <algorithm>
36 #include <atomic>
37 #include <cstdint>
38 #include <memory>
39 #include <string>
40 #include <utility>
41 
42 #include "absl/log/check.h"
43 #include "absl/status/status.h"
44 #include "absl/strings/string_view.h"
45 #include "src/core/lib/event_engine/event_engine_context.h"
46 #include "src/core/lib/promise/all_ok.h"
47 #include "src/core/lib/promise/status_flag.h"
48 #include "src/core/lib/promise/try_seq.h"
49 #include "src/core/lib/resource_quota/arena.h"
50 #include "src/core/lib/slice/slice_buffer.h"
51 #include "src/core/lib/surface/completion_queue.h"
52 #include "src/core/lib/transport/metadata.h"
53 #include "src/core/telemetry/stats.h"
54 #include "src/core/telemetry/stats_data.h"
55 #include "src/core/util/bitset.h"
56 #include "src/core/util/crash.h"
57 #include "src/core/util/latent_see.h"
58 #include "src/core/util/ref_counted.h"
59 #include "src/core/util/ref_counted_ptr.h"
60 
61 namespace grpc_core {
62 
63 namespace {
64 
ValidateClientBatch(const grpc_op * ops,size_t nops)65 grpc_call_error ValidateClientBatch(const grpc_op* ops, size_t nops) {
66   BitSet<8> got_ops;
67   for (size_t op_idx = 0; op_idx < nops; op_idx++) {
68     const grpc_op& op = ops[op_idx];
69     switch (op.op) {
70       case GRPC_OP_SEND_INITIAL_METADATA:
71         if (!AreInitialMetadataFlagsValid(op.flags)) {
72           return GRPC_CALL_ERROR_INVALID_FLAGS;
73         }
74         if (!ValidateMetadata(op.data.send_initial_metadata.count,
75                               op.data.send_initial_metadata.metadata)) {
76           return GRPC_CALL_ERROR_INVALID_METADATA;
77         }
78         break;
79       case GRPC_OP_SEND_MESSAGE:
80         if (!AreWriteFlagsValid(op.flags)) {
81           return GRPC_CALL_ERROR_INVALID_FLAGS;
82         }
83         break;
84       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
85       case GRPC_OP_RECV_INITIAL_METADATA:
86       case GRPC_OP_RECV_MESSAGE:
87       case GRPC_OP_RECV_STATUS_ON_CLIENT:
88         if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
89         break;
90       case GRPC_OP_RECV_CLOSE_ON_SERVER:
91       case GRPC_OP_SEND_STATUS_FROM_SERVER:
92         return GRPC_CALL_ERROR_NOT_ON_CLIENT;
93     }
94     if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
95     got_ops.set(op.op);
96   }
97   return GRPC_CALL_OK;
98 }
99 
100 }  // namespace
101 
ClientCall(grpc_call *,uint32_t,grpc_completion_queue * cq,Slice path,absl::optional<Slice> authority,bool registered_method,Timestamp deadline,grpc_compression_options compression_options,RefCountedPtr<Arena> arena,RefCountedPtr<UnstartedCallDestination> destination)102 ClientCall::ClientCall(grpc_call*, uint32_t, grpc_completion_queue* cq,
103                        Slice path, absl::optional<Slice> authority,
104                        bool registered_method, Timestamp deadline,
105                        grpc_compression_options compression_options,
106                        RefCountedPtr<Arena> arena,
107                        RefCountedPtr<UnstartedCallDestination> destination)
108     : Call(false, deadline, std::move(arena)),
109       DualRefCounted("ClientCall"),
110       cq_(cq),
111       call_destination_(std::move(destination)),
112       compression_options_(compression_options) {
113   global_stats().IncrementClientCallsCreated();
114   send_initial_metadata_->Set(HttpPathMetadata(), std::move(path));
115   if (authority.has_value()) {
116     send_initial_metadata_->Set(HttpAuthorityMetadata(), std::move(*authority));
117   }
118   send_initial_metadata_->Set(
119       GrpcRegisteredMethod(),
120       reinterpret_cast<void*>(static_cast<uintptr_t>(registered_method)));
121   if (deadline != Timestamp::InfFuture()) {
122     send_initial_metadata_->Set(GrpcTimeoutMetadata(), deadline);
123     UpdateDeadline(deadline);
124   }
125 }
126 
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)127 grpc_call_error ClientCall::StartBatch(const grpc_op* ops, size_t nops,
128                                        void* notify_tag,
129                                        bool is_notify_tag_closure) {
130   GRPC_LATENT_SEE_PARENT_SCOPE("ClientCall::StartBatch");
131   if (nops == 0) {
132     EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
133     return GRPC_CALL_OK;
134   }
135   const grpc_call_error validation_result = ValidateClientBatch(ops, nops);
136   if (validation_result != GRPC_CALL_OK) {
137     return validation_result;
138   }
139   CommitBatch(ops, nops, notify_tag, is_notify_tag_closure);
140   return GRPC_CALL_OK;
141 }
142 
CancelWithError(grpc_error_handle error)143 void ClientCall::CancelWithError(grpc_error_handle error) {
144   cancel_status_.Set(new absl::Status(error));
145   auto cur_state = call_state_.load(std::memory_order_acquire);
146   while (true) {
147     GRPC_TRACE_LOG(call, INFO)
148         << DebugTag() << "CancelWithError " << GRPC_DUMP_ARGS(cur_state, error);
149     switch (cur_state) {
150       case kCancelled:
151         return;
152       case kUnstarted:
153         if (call_state_.compare_exchange_strong(cur_state, kCancelled,
154                                                 std::memory_order_acq_rel,
155                                                 std::memory_order_acquire)) {
156           ResetDeadline();
157           return;
158         }
159         break;
160       case kStarted:
161         started_call_initiator_.SpawnInfallible(
162             "CancelWithError", [self = WeakRefAsSubclass<ClientCall>(),
163                                 error = std::move(error)]() mutable {
164               self->started_call_initiator_.Cancel(std::move(error));
165             });
166         return;
167       default:
168         if (call_state_.compare_exchange_strong(cur_state, kCancelled,
169                                                 std::memory_order_acq_rel,
170                                                 std::memory_order_acquire)) {
171           ResetDeadline();
172           auto* unordered_start = reinterpret_cast<UnorderedStart*>(cur_state);
173           while (unordered_start != nullptr) {
174             auto next = unordered_start->next;
175             delete unordered_start;
176             unordered_start = next;
177           }
178           return;
179         }
180     }
181   }
182 }
183 
184 template <typename Batch>
ScheduleCommittedBatch(Batch batch)185 void ClientCall::ScheduleCommittedBatch(Batch batch) {
186   GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::ScheduleCommittedBatch");
187   auto cur_state = call_state_.load(std::memory_order_acquire);
188   while (true) {
189     switch (cur_state) {
190       case kUnstarted:
191       default: {  // UnorderedStart
192         auto pending = std::make_unique<UnorderedStart>();
193         pending->start_pending_batch = [this,
194                                         batch = std::move(batch)]() mutable {
195           started_call_initiator_.SpawnInfallible(
196               "batch",
197               GRPC_LATENT_SEE_PROMISE("ClientCallBatch", std::move(batch)));
198         };
199         while (true) {
200           pending->next = reinterpret_cast<UnorderedStart*>(cur_state);
201           if (call_state_.compare_exchange_strong(
202                   cur_state, reinterpret_cast<uintptr_t>(pending.get()),
203                   std::memory_order_acq_rel, std::memory_order_acquire)) {
204             std::ignore = pending.release();
205             return;
206           }
207           if (cur_state == kStarted) {
208             pending->start_pending_batch();
209             return;
210           }
211           if (cur_state == kCancelled) {
212             return;
213           }
214         }
215       }
216       case kStarted:
217         started_call_initiator_.SpawnInfallible(
218             "batch",
219             GRPC_LATENT_SEE_PROMISE("ClientCallBatch", std::move(batch)));
220         return;
221       case kCancelled:
222         return;
223     }
224   }
225 }
226 
StartCall(const grpc_op & send_initial_metadata_op)227 Party::WakeupHold ClientCall::StartCall(
228     const grpc_op& send_initial_metadata_op) {
229   GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::StartCall");
230   auto cur_state = call_state_.load(std::memory_order_acquire);
231   CToMetadata(send_initial_metadata_op.data.send_initial_metadata.metadata,
232               send_initial_metadata_op.data.send_initial_metadata.count,
233               send_initial_metadata_.get());
234   PrepareOutgoingInitialMetadata(send_initial_metadata_op,
235                                  *send_initial_metadata_);
236   auto call = MakeCallPair(std::move(send_initial_metadata_), arena()->Ref());
237   started_call_initiator_ = std::move(call.initiator);
238   Party::WakeupHold wakeup_hold{started_call_initiator_.party()};
239   while (!StartCallMaybeUpdateState(cur_state, call.handler)) {
240   }
241   return wakeup_hold;
242 }
243 
StartCallMaybeUpdateState(uintptr_t & cur_state,UnstartedCallHandler & handler)244 bool ClientCall::StartCallMaybeUpdateState(uintptr_t& cur_state,
245                                            UnstartedCallHandler& handler) {
246   GRPC_TRACE_LOG(call, INFO)
247       << DebugTag() << "StartCall " << GRPC_DUMP_ARGS(cur_state);
248   switch (cur_state) {
249     case kUnstarted:
250       if (call_state_.compare_exchange_strong(cur_state, kStarted,
251                                               std::memory_order_acq_rel,
252                                               std::memory_order_acquire)) {
253         call_destination_->StartCall(std::move(handler));
254         return true;
255       }
256       return false;
257     case kStarted:
258       Crash("StartCall called twice");  // probably we crash earlier...
259     case kCancelled:
260       return true;
261     default: {  // UnorderedStart
262       if (call_state_.compare_exchange_strong(cur_state, kStarted,
263                                               std::memory_order_acq_rel,
264                                               std::memory_order_acquire)) {
265         call_destination_->StartCall(std::move(handler));
266         auto unordered_start = reinterpret_cast<UnorderedStart*>(cur_state);
267         while (unordered_start->next != nullptr) {
268           unordered_start->start_pending_batch();
269           auto next = unordered_start->next;
270           delete unordered_start;
271           unordered_start = next;
272         }
273         return true;
274       }
275       return false;
276     }
277   }
278 }
279 
CommitBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)280 void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
281                              bool is_notify_tag_closure) {
282   GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::CommitBatch");
283   if (nops == 1 && ops[0].op == GRPC_OP_SEND_INITIAL_METADATA) {
284     StartCall(ops[0]);
285     EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
286     return;
287   }
288   if (!is_notify_tag_closure) grpc_cq_begin_op(cq_, notify_tag);
289   BatchOpIndex op_index(ops, nops);
290   auto send_message =
291       op_index.OpHandler<GRPC_OP_SEND_MESSAGE>([this](const grpc_op& op) {
292         SliceBuffer send;
293         grpc_slice_buffer_swap(
294             &op.data.send_message.send_message->data.raw.slice_buffer,
295             send.c_slice_buffer());
296         auto msg = arena()->MakePooled<Message>(std::move(send), op.flags);
297         return [this, msg = std::move(msg)]() mutable {
298           return started_call_initiator_.PushMessage(std::move(msg));
299         };
300       });
301   auto send_close_from_client =
302       op_index.OpHandler<GRPC_OP_SEND_CLOSE_FROM_CLIENT>(
303           [this](const grpc_op&) {
304             return [this]() {
305               started_call_initiator_.FinishSends();
306               return Success{};
307             };
308           });
309   auto recv_message =
310       op_index.OpHandler<GRPC_OP_RECV_MESSAGE>([this](const grpc_op& op) {
311         return message_receiver_.MakeBatchOp(op, &started_call_initiator_);
312       });
313   auto recv_initial_metadata =
314       op_index.OpHandler<GRPC_OP_RECV_INITIAL_METADATA>([this](
315                                                             const grpc_op& op) {
316         return [this,
317                 array = op.data.recv_initial_metadata.recv_initial_metadata]() {
318           return Map(
319               started_call_initiator_.PullServerInitialMetadata(),
320               [this,
321                array](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
322                 ServerMetadataHandle metadata;
323                 if (!md.ok() || !md->has_value()) {
324                   is_trailers_only_ = true;
325                   metadata = Arena::MakePooledForOverwrite<ServerMetadata>();
326                 } else {
327                   metadata = std::move(md->value());
328                   is_trailers_only_ =
329                       metadata->get(GrpcTrailersOnly()).value_or(false);
330                 }
331                 ProcessIncomingInitialMetadata(*metadata);
332                 PublishMetadataArray(metadata.get(), array, true);
333                 received_initial_metadata_ = std::move(metadata);
334                 return Success{};
335               });
336         };
337       });
338   auto primary_ops = AllOk<StatusFlag>(
339       TrySeq(std::move(send_message), std::move(send_close_from_client)),
340       TrySeq(std::move(recv_initial_metadata), std::move(recv_message)));
341   Party::WakeupHold wakeup_hold;
342   if (const grpc_op* op = op_index.op(GRPC_OP_SEND_INITIAL_METADATA)) {
343     wakeup_hold = StartCall(*op);
344   }
345   if (const grpc_op* op = op_index.op(GRPC_OP_RECV_STATUS_ON_CLIENT)) {
346     auto out_status = op->data.recv_status_on_client.status;
347     auto out_status_details = op->data.recv_status_on_client.status_details;
348     auto out_error_string = op->data.recv_status_on_client.error_string;
349     auto out_trailing_metadata =
350         op->data.recv_status_on_client.trailing_metadata;
351     auto make_read_trailing_metadata = [self = WeakRef(), out_status,
352                                         out_status_details, out_error_string,
353                                         out_trailing_metadata]() {
354       return Map(self->started_call_initiator_.PullServerTrailingMetadata(),
355                  [self, out_status, out_status_details, out_error_string,
356                   out_trailing_metadata](
357                      ServerMetadataHandle server_trailing_metadata) {
358                    self->OnReceivedStatus(std::move(server_trailing_metadata),
359                                           out_status, out_status_details,
360                                           out_error_string,
361                                           out_trailing_metadata);
362                    return Success{};
363                  });
364     };
365     ScheduleCommittedBatch(InfallibleBatch(
366         std::move(primary_ops),
367         OpHandler<GRPC_OP_RECV_STATUS_ON_CLIENT>(OnCancelFactory(
368             std::move(make_read_trailing_metadata),
369             [this, out_status, out_status_details, out_error_string,
370              out_trailing_metadata]() {
371               auto* status = cancel_status_.Get();
372               CHECK_NE(status, nullptr);
373               *out_status = static_cast<grpc_status_code>(status->code());
374               *out_status_details =
375                   Slice::FromCopiedString(status->message()).TakeCSlice();
376               if (out_error_string != nullptr) {
377                 *out_error_string = nullptr;
378               }
379               out_trailing_metadata->count = 0;
380             })),
381         is_notify_tag_closure, notify_tag, cq_));
382   } else {
383     ScheduleCommittedBatch(FallibleBatch(
384         std::move(primary_ops), is_notify_tag_closure, notify_tag, cq_));
385   }
386 }
387 
OnReceivedStatus(ServerMetadataHandle server_trailing_metadata,grpc_status_code * out_status,grpc_slice * out_status_details,const char ** out_error_string,grpc_metadata_array * out_trailing_metadata)388 void ClientCall::OnReceivedStatus(ServerMetadataHandle server_trailing_metadata,
389                                   grpc_status_code* out_status,
390                                   grpc_slice* out_status_details,
391                                   const char** out_error_string,
392                                   grpc_metadata_array* out_trailing_metadata) {
393   saw_trailing_metadata_.store(true, std::memory_order_relaxed);
394   ResetDeadline();
395   GRPC_TRACE_LOG(call, INFO) << DebugTag() << "RecvStatusOnClient "
396                              << server_trailing_metadata->DebugString();
397   const auto status = server_trailing_metadata->get(GrpcStatusMetadata())
398                           .value_or(GRPC_STATUS_UNKNOWN);
399   *out_status = status;
400   Slice message_slice;
401   if (Slice* message =
402           server_trailing_metadata->get_pointer(GrpcMessageMetadata())) {
403     message_slice = message->Ref();
404   }
405   *out_status_details = message_slice.TakeCSlice();
406   if (out_error_string != nullptr) {
407     if (status != GRPC_STATUS_OK) {
408       *out_error_string =
409           gpr_strdup(MakeErrorString(server_trailing_metadata.get()).c_str());
410     } else {
411       *out_error_string = nullptr;
412     }
413   }
414   PublishMetadataArray(server_trailing_metadata.get(), out_trailing_metadata,
415                        true);
416   received_trailing_metadata_ = std::move(server_trailing_metadata);
417 }
418 
GetPeer()419 char* ClientCall::GetPeer() {
420   Slice peer_slice = GetPeerString();
421   if (!peer_slice.empty()) {
422     absl::string_view peer_string_view = peer_slice.as_string_view();
423     char* peer_string =
424         static_cast<char*>(gpr_malloc(peer_string_view.size() + 1));
425     memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
426     peer_string[peer_string_view.size()] = '\0';
427     return peer_string;
428   }
429   return gpr_strdup("unknown");
430 }
431 
MakeClientCall(grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * cq,Slice path,absl::optional<Slice> authority,bool registered_method,Timestamp deadline,grpc_compression_options compression_options,RefCountedPtr<Arena> arena,RefCountedPtr<UnstartedCallDestination> destination)432 grpc_call* MakeClientCall(grpc_call* parent_call, uint32_t propagation_mask,
433                           grpc_completion_queue* cq, Slice path,
434                           absl::optional<Slice> authority,
435                           bool registered_method, Timestamp deadline,
436                           grpc_compression_options compression_options,
437                           RefCountedPtr<Arena> arena,
438                           RefCountedPtr<UnstartedCallDestination> destination) {
439   DCHECK_NE(arena.get(), nullptr);
440   DCHECK_NE(arena->GetContext<grpc_event_engine::experimental::EventEngine>(),
441             nullptr);
442   return arena
443       ->New<ClientCall>(parent_call, propagation_mask, cq, std::move(path),
444                         std::move(authority), registered_method, deadline,
445                         compression_options, arena, destination)
446       ->c_ptr();
447 }
448 
449 }  // namespace grpc_core
450