• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/surface/call_utils.h"
16 
17 #include <grpc/byte_buffer.h>
18 #include <grpc/compression.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/call.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/port_platform.h>
29 #include <grpc/support/string_util.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <stdlib.h>
33 #include <string.h>
34 
35 #include <algorithm>
36 #include <atomic>
37 #include <cstdint>
38 #include <memory>
39 #include <string>
40 #include <type_traits>
41 #include <utility>
42 
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/status/status.h"
46 #include "absl/strings/str_cat.h"
47 #include "absl/strings/str_format.h"
48 #include "absl/strings/string_view.h"
49 #include "src/core/lib/channel/status_util.h"
50 #include "src/core/lib/iomgr/exec_ctx.h"
51 #include "src/core/lib/promise/activity.h"
52 #include "src/core/lib/promise/context.h"
53 #include "src/core/lib/promise/poll.h"
54 #include "src/core/lib/promise/status_flag.h"
55 #include "src/core/lib/slice/slice_buffer.h"
56 #include "src/core/lib/slice/slice_internal.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/validate_metadata.h"
59 #include "src/core/lib/transport/metadata.h"
60 #include "src/core/lib/transport/metadata_batch.h"
61 #include "src/core/util/crash.h"
62 #include "src/core/util/debug_location.h"
63 #include "src/core/util/match.h"
64 
65 namespace grpc_core {
66 
PublishMetadataArray(grpc_metadata_batch * md,grpc_metadata_array * array,bool is_client)67 void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array,
68                           bool is_client) {
69   const auto md_count = md->count();
70   if (md_count > array->capacity) {
71     array->capacity =
72         std::max(array->capacity + md->count(), array->capacity * 3 / 2);
73     array->metadata = static_cast<grpc_metadata*>(
74         gpr_realloc(array->metadata, sizeof(grpc_metadata) * array->capacity));
75   }
76   PublishToAppEncoder encoder(array, md, is_client);
77   md->Encode(&encoder);
78 }
79 
CToMetadata(grpc_metadata * metadata,size_t count,grpc_metadata_batch * b)80 void CToMetadata(grpc_metadata* metadata, size_t count,
81                  grpc_metadata_batch* b) {
82   for (size_t i = 0; i < count; i++) {
83     grpc_metadata* md = &metadata[i];
84     auto key = StringViewFromSlice(md->key);
85     // Filter "content-length metadata"
86     if (key == "content-length") continue;
87     b->Append(key, Slice(CSliceRef(md->value)),
88               [md](absl::string_view error, const Slice& value) {
89                 VLOG(2) << "Append error: key=" << StringViewFromSlice(md->key)
90                         << " error=" << error
91                         << " value=" << value.as_string_view();
92               });
93   }
94 }
95 
GrpcOpTypeName(grpc_op_type op)96 const char* GrpcOpTypeName(grpc_op_type op) {
97   switch (op) {
98     case GRPC_OP_SEND_INITIAL_METADATA:
99       return "SendInitialMetadata";
100     case GRPC_OP_SEND_MESSAGE:
101       return "SendMessage";
102     case GRPC_OP_SEND_STATUS_FROM_SERVER:
103       return "SendStatusFromServer";
104     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
105       return "SendCloseFromClient";
106     case GRPC_OP_RECV_MESSAGE:
107       return "RecvMessage";
108     case GRPC_OP_RECV_CLOSE_ON_SERVER:
109       return "RecvCloseOnServer";
110     case GRPC_OP_RECV_INITIAL_METADATA:
111       return "RecvInitialMetadata";
112     case GRPC_OP_RECV_STATUS_ON_CLIENT:
113       return "RecvStatusOnClient";
114   }
115   Crash("Unreachable");
116 }
117 
118 ////////////////////////////////////////////////////////////////////////
119 // WaitForCqEndOp
120 
operator ()()121 Poll<Empty> WaitForCqEndOp::operator()() {
122   GRPC_TRACE_LOG(promise_primitives, INFO)
123       << Activity::current()->DebugTag() << "WaitForCqEndOp[" << this << "] "
124       << StateString(state_);
125   if (auto* n = absl::get_if<NotStarted>(&state_)) {
126     if (n->is_closure) {
127       ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(n->tag),
128                    std::move(n->error));
129       return Empty{};
130     } else {
131       auto not_started = std::move(*n);
132       auto& started =
133           state_.emplace<Started>(GetContext<Activity>()->MakeOwningWaker());
134       grpc_cq_end_op(
135           not_started.cq, not_started.tag, std::move(not_started.error),
136           [](void* p, grpc_cq_completion*) {
137             auto started = static_cast<Started*>(p);
138             auto wakeup = std::move(started->waker);
139             started->done.store(true, std::memory_order_release);
140             wakeup.Wakeup();
141           },
142           &started, &started.completion);
143     }
144   }
145   auto& started = absl::get<Started>(state_);
146   if (started.done.load(std::memory_order_acquire)) {
147     return Empty{};
148   } else {
149     return Pending{};
150   }
151 }
152 
StateString(const State & state)153 std::string WaitForCqEndOp::StateString(const State& state) {
154   return Match(
155       state,
156       [](const NotStarted& x) {
157         return absl::StrFormat(
158             "NotStarted{is_closure=%s, tag=%p, error=%s, cq=%p}",
159             x.is_closure ? "true" : "false", x.tag, x.error.ToString(), x.cq);
160       },
161       [](const Started& x) {
162         return absl::StrFormat(
163             "Started{completion=%p, done=%s}", &x.completion,
164             x.done.load(std::memory_order_relaxed) ? "true" : "false");
165       },
166       [](const Invalid&) -> std::string { return "Invalid{}"; });
167 }
168 
169 ////////////////////////////////////////////////////////////////////////
170 // MakeErrorString
171 
MakeErrorString(const ServerMetadata * trailing_metadata)172 std::string MakeErrorString(const ServerMetadata* trailing_metadata) {
173   std::string out = absl::StrCat(
174       trailing_metadata->get(GrpcStatusFromWire()).value_or(false)
175           ? "Error received from peer"
176           : "Error generated by client",
177       " grpc_status: ",
178       grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata())
179                                      .value_or(GRPC_STATUS_UNKNOWN)));
180   if (const Slice* message =
181           trailing_metadata->get_pointer(GrpcMessageMetadata())) {
182     absl::StrAppend(&out, "\ngrpc_message: ", message->as_string_view());
183   }
184   if (auto annotations = trailing_metadata->get_pointer(GrpcStatusContext())) {
185     absl::StrAppend(&out, "\nStatus Context:");
186     for (const std::string& annotation : *annotations) {
187       absl::StrAppend(&out, "\n  ", annotation);
188     }
189   }
190   return out;
191 }
192 
ValidateMetadata(size_t count,grpc_metadata * metadata)193 bool ValidateMetadata(size_t count, grpc_metadata* metadata) {
194   if (count > INT_MAX) {
195     return false;
196   }
197   for (size_t i = 0; i < count; i++) {
198     grpc_metadata* md = &metadata[i];
199     if (!GRPC_LOG_IF_ERROR("validate_metadata",
200                            grpc_validate_header_key_is_legal(md->key))) {
201       return false;
202     } else if (!grpc_is_binary_header_internal(md->key) &&
203                !GRPC_LOG_IF_ERROR(
204                    "validate_metadata",
205                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
206       return false;
207     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
208       // HTTP2 hpack encoding has a maximum limit.
209       return false;
210     }
211   }
212   return true;
213 }
214 
EndOpImmediately(grpc_completion_queue * cq,void * notify_tag,bool is_notify_tag_closure)215 void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag,
216                       bool is_notify_tag_closure) {
217   if (!is_notify_tag_closure) {
218     CHECK(grpc_cq_begin_op(cq, notify_tag));
219     grpc_cq_end_op(
220         cq, notify_tag, absl::OkStatus(),
221         [](void*, grpc_cq_completion* completion) { gpr_free(completion); },
222         nullptr,
223         static_cast<grpc_cq_completion*>(
224             gpr_malloc(sizeof(grpc_cq_completion))));
225   } else {
226     Closure::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(notify_tag),
227                  absl::OkStatus());
228   }
229 }
230 
231 }  // namespace grpc_core
232