1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/surface/call_utils.h"
16
17 #include <grpc/byte_buffer.h>
18 #include <grpc/compression.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/call.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/port_platform.h>
29 #include <grpc/support/string_util.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <stdlib.h>
33 #include <string.h>
34
35 #include <algorithm>
36 #include <atomic>
37 #include <cstdint>
38 #include <memory>
39 #include <string>
40 #include <type_traits>
41 #include <utility>
42
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/status/status.h"
46 #include "absl/strings/str_cat.h"
47 #include "absl/strings/str_format.h"
48 #include "absl/strings/string_view.h"
49 #include "src/core/lib/channel/status_util.h"
50 #include "src/core/lib/iomgr/exec_ctx.h"
51 #include "src/core/lib/promise/activity.h"
52 #include "src/core/lib/promise/context.h"
53 #include "src/core/lib/promise/poll.h"
54 #include "src/core/lib/promise/status_flag.h"
55 #include "src/core/lib/slice/slice_buffer.h"
56 #include "src/core/lib/slice/slice_internal.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/validate_metadata.h"
59 #include "src/core/lib/transport/metadata.h"
60 #include "src/core/lib/transport/metadata_batch.h"
61 #include "src/core/util/crash.h"
62 #include "src/core/util/debug_location.h"
63 #include "src/core/util/match.h"
64
65 namespace grpc_core {
66
PublishMetadataArray(grpc_metadata_batch * md,grpc_metadata_array * array,bool is_client)67 void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array,
68 bool is_client) {
69 const auto md_count = md->count();
70 if (md_count > array->capacity) {
71 array->capacity =
72 std::max(array->capacity + md->count(), array->capacity * 3 / 2);
73 array->metadata = static_cast<grpc_metadata*>(
74 gpr_realloc(array->metadata, sizeof(grpc_metadata) * array->capacity));
75 }
76 PublishToAppEncoder encoder(array, md, is_client);
77 md->Encode(&encoder);
78 }
79
CToMetadata(grpc_metadata * metadata,size_t count,grpc_metadata_batch * b)80 void CToMetadata(grpc_metadata* metadata, size_t count,
81 grpc_metadata_batch* b) {
82 for (size_t i = 0; i < count; i++) {
83 grpc_metadata* md = &metadata[i];
84 auto key = StringViewFromSlice(md->key);
85 // Filter "content-length metadata"
86 if (key == "content-length") continue;
87 b->Append(key, Slice(CSliceRef(md->value)),
88 [md](absl::string_view error, const Slice& value) {
89 VLOG(2) << "Append error: key=" << StringViewFromSlice(md->key)
90 << " error=" << error
91 << " value=" << value.as_string_view();
92 });
93 }
94 }
95
GrpcOpTypeName(grpc_op_type op)96 const char* GrpcOpTypeName(grpc_op_type op) {
97 switch (op) {
98 case GRPC_OP_SEND_INITIAL_METADATA:
99 return "SendInitialMetadata";
100 case GRPC_OP_SEND_MESSAGE:
101 return "SendMessage";
102 case GRPC_OP_SEND_STATUS_FROM_SERVER:
103 return "SendStatusFromServer";
104 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
105 return "SendCloseFromClient";
106 case GRPC_OP_RECV_MESSAGE:
107 return "RecvMessage";
108 case GRPC_OP_RECV_CLOSE_ON_SERVER:
109 return "RecvCloseOnServer";
110 case GRPC_OP_RECV_INITIAL_METADATA:
111 return "RecvInitialMetadata";
112 case GRPC_OP_RECV_STATUS_ON_CLIENT:
113 return "RecvStatusOnClient";
114 }
115 Crash("Unreachable");
116 }
117
118 ////////////////////////////////////////////////////////////////////////
119 // WaitForCqEndOp
120
operator ()()121 Poll<Empty> WaitForCqEndOp::operator()() {
122 GRPC_TRACE_LOG(promise_primitives, INFO)
123 << Activity::current()->DebugTag() << "WaitForCqEndOp[" << this << "] "
124 << StateString(state_);
125 if (auto* n = absl::get_if<NotStarted>(&state_)) {
126 if (n->is_closure) {
127 ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(n->tag),
128 std::move(n->error));
129 return Empty{};
130 } else {
131 auto not_started = std::move(*n);
132 auto& started =
133 state_.emplace<Started>(GetContext<Activity>()->MakeOwningWaker());
134 grpc_cq_end_op(
135 not_started.cq, not_started.tag, std::move(not_started.error),
136 [](void* p, grpc_cq_completion*) {
137 auto started = static_cast<Started*>(p);
138 auto wakeup = std::move(started->waker);
139 started->done.store(true, std::memory_order_release);
140 wakeup.Wakeup();
141 },
142 &started, &started.completion);
143 }
144 }
145 auto& started = absl::get<Started>(state_);
146 if (started.done.load(std::memory_order_acquire)) {
147 return Empty{};
148 } else {
149 return Pending{};
150 }
151 }
152
StateString(const State & state)153 std::string WaitForCqEndOp::StateString(const State& state) {
154 return Match(
155 state,
156 [](const NotStarted& x) {
157 return absl::StrFormat(
158 "NotStarted{is_closure=%s, tag=%p, error=%s, cq=%p}",
159 x.is_closure ? "true" : "false", x.tag, x.error.ToString(), x.cq);
160 },
161 [](const Started& x) {
162 return absl::StrFormat(
163 "Started{completion=%p, done=%s}", &x.completion,
164 x.done.load(std::memory_order_relaxed) ? "true" : "false");
165 },
166 [](const Invalid&) -> std::string { return "Invalid{}"; });
167 }
168
169 ////////////////////////////////////////////////////////////////////////
170 // MakeErrorString
171
MakeErrorString(const ServerMetadata * trailing_metadata)172 std::string MakeErrorString(const ServerMetadata* trailing_metadata) {
173 std::string out = absl::StrCat(
174 trailing_metadata->get(GrpcStatusFromWire()).value_or(false)
175 ? "Error received from peer"
176 : "Error generated by client",
177 " grpc_status: ",
178 grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata())
179 .value_or(GRPC_STATUS_UNKNOWN)));
180 if (const Slice* message =
181 trailing_metadata->get_pointer(GrpcMessageMetadata())) {
182 absl::StrAppend(&out, "\ngrpc_message: ", message->as_string_view());
183 }
184 if (auto annotations = trailing_metadata->get_pointer(GrpcStatusContext())) {
185 absl::StrAppend(&out, "\nStatus Context:");
186 for (const std::string& annotation : *annotations) {
187 absl::StrAppend(&out, "\n ", annotation);
188 }
189 }
190 return out;
191 }
192
ValidateMetadata(size_t count,grpc_metadata * metadata)193 bool ValidateMetadata(size_t count, grpc_metadata* metadata) {
194 if (count > INT_MAX) {
195 return false;
196 }
197 for (size_t i = 0; i < count; i++) {
198 grpc_metadata* md = &metadata[i];
199 if (!GRPC_LOG_IF_ERROR("validate_metadata",
200 grpc_validate_header_key_is_legal(md->key))) {
201 return false;
202 } else if (!grpc_is_binary_header_internal(md->key) &&
203 !GRPC_LOG_IF_ERROR(
204 "validate_metadata",
205 grpc_validate_header_nonbin_value_is_legal(md->value))) {
206 return false;
207 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
208 // HTTP2 hpack encoding has a maximum limit.
209 return false;
210 }
211 }
212 return true;
213 }
214
EndOpImmediately(grpc_completion_queue * cq,void * notify_tag,bool is_notify_tag_closure)215 void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag,
216 bool is_notify_tag_closure) {
217 if (!is_notify_tag_closure) {
218 CHECK(grpc_cq_begin_op(cq, notify_tag));
219 grpc_cq_end_op(
220 cq, notify_tag, absl::OkStatus(),
221 [](void*, grpc_cq_completion* completion) { gpr_free(completion); },
222 nullptr,
223 static_cast<grpc_cq_completion*>(
224 gpr_malloc(sizeof(grpc_cq_completion))));
225 } else {
226 Closure::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(notify_tag),
227 absl::OkStatus());
228 }
229 }
230
231 } // namespace grpc_core
232