1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/util/http_client/httpcli.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/slice_buffer.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/port_platform.h>
25 #include <limits.h>
26
27 #include <string>
28 #include <utility>
29
30 #include "absl/functional/bind_front.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/str_format.h"
34 #include "src/core/config/core_configuration.h"
35 #include "src/core/handshaker/handshaker.h"
36 #include "src/core/handshaker/handshaker_registry.h"
37 #include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/channel/channel_args_preconditioning.h"
40 #include "src/core/lib/event_engine/resolved_address_internal.h"
41 #include "src/core/lib/event_engine/tcp_socket_utils.h"
42 #include "src/core/lib/iomgr/endpoint.h"
43 #include "src/core/lib/iomgr/iomgr_internal.h"
44 #include "src/core/lib/iomgr/pollset_set.h"
45 #include "src/core/lib/iomgr/resolve_address.h"
46 #include "src/core/lib/resource_quota/api.h"
47 #include "src/core/lib/security/credentials/credentials.h"
48 #include "src/core/lib/security/security_connector/security_connector.h"
49 #include "src/core/lib/slice/slice.h"
50 #include "src/core/lib/transport/error_utils.h"
51 #include "src/core/util/http_client/format_request.h"
52 #include "src/core/util/http_client/parser.h"
53 #include "src/core/util/status_helper.h"
54
55 namespace grpc_core {
56
57 namespace {
58
59 using grpc_event_engine::experimental::EventEngine;
60 using grpc_event_engine::experimental::ResolvedAddressToURI;
61
62 grpc_httpcli_get_override g_get_override;
63 grpc_httpcli_post_override g_post_override;
64 grpc_httpcli_put_override g_put_override;
65 void (*g_test_only_on_handshake_done_intercept)(HttpRequest* req);
66
67 } // namespace
68
Get(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)69 OrphanablePtr<HttpRequest> HttpRequest::Get(
70 URI uri, const grpc_channel_args* channel_args,
71 grpc_polling_entity* pollent, const grpc_http_request* request,
72 Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
73 RefCountedPtr<grpc_channel_credentials> channel_creds) {
74 absl::optional<std::function<bool()>> test_only_generate_response;
75 if (g_get_override != nullptr) {
76 test_only_generate_response = [request, uri, deadline, on_done,
77 response]() {
78 // Note that capturing request here assumes it will remain alive
79 // until after Start is called. This avoids making a copy as this
80 // code path is only used for test mocks.
81 return g_get_override(request, uri, deadline, on_done, response);
82 };
83 }
84 std::string name =
85 absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path());
86 const grpc_slice request_text =
87 grpc_httpcli_format_get_request(request, uri.authority().c_str(),
88 uri.EncodedPathAndQueryParams().c_str());
89 return MakeOrphanable<HttpRequest>(
90 std::move(uri), request_text, response, deadline, channel_args, on_done,
91 pollent, name.c_str(), std::move(test_only_generate_response),
92 std::move(channel_creds));
93 }
94
Post(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)95 OrphanablePtr<HttpRequest> HttpRequest::Post(
96 URI uri, const grpc_channel_args* channel_args,
97 grpc_polling_entity* pollent, const grpc_http_request* request,
98 Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
99 RefCountedPtr<grpc_channel_credentials> channel_creds) {
100 absl::optional<std::function<bool()>> test_only_generate_response;
101 if (g_post_override != nullptr) {
102 test_only_generate_response = [request, uri, deadline, on_done,
103 response]() {
104 return g_post_override(
105 request, uri, absl::string_view(request->body, request->body_length),
106 deadline, on_done, response);
107 };
108 }
109 std::string name =
110 absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path());
111 const grpc_slice request_text =
112 grpc_httpcli_format_post_request(request, uri.authority().c_str(),
113 uri.EncodedPathAndQueryParams().c_str());
114 return MakeOrphanable<HttpRequest>(
115 std::move(uri), request_text, response, deadline, channel_args, on_done,
116 pollent, name.c_str(), std::move(test_only_generate_response),
117 std::move(channel_creds));
118 }
119
Put(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)120 OrphanablePtr<HttpRequest> HttpRequest::Put(
121 URI uri, const grpc_channel_args* channel_args,
122 grpc_polling_entity* pollent, const grpc_http_request* request,
123 Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
124 RefCountedPtr<grpc_channel_credentials> channel_creds) {
125 absl::optional<std::function<bool()>> test_only_generate_response;
126 if (g_put_override != nullptr) {
127 test_only_generate_response = [request, uri, deadline, on_done,
128 response]() {
129 return g_put_override(
130 request, uri, absl::string_view(request->body, request->body_length),
131 deadline, on_done, response);
132 };
133 }
134 std::string name =
135 absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path());
136 const grpc_slice request_text =
137 grpc_httpcli_format_put_request(request, uri.authority().c_str(),
138 uri.EncodedPathAndQueryParams().c_str());
139 return MakeOrphanable<HttpRequest>(
140 std::move(uri), request_text, response, deadline, channel_args, on_done,
141 pollent, name.c_str(), std::move(test_only_generate_response),
142 std::move(channel_creds));
143 }
144
SetOverride(grpc_httpcli_get_override get,grpc_httpcli_post_override post,grpc_httpcli_put_override put)145 void HttpRequest::SetOverride(grpc_httpcli_get_override get,
146 grpc_httpcli_post_override post,
147 grpc_httpcli_put_override put) {
148 g_get_override = get;
149 g_post_override = post;
150 g_put_override = put;
151 }
152
TestOnlySetOnHandshakeDoneIntercept(void (* intercept)(HttpRequest * req))153 void HttpRequest::TestOnlySetOnHandshakeDoneIntercept(
154 void (*intercept)(HttpRequest* req)) {
155 g_test_only_on_handshake_done_intercept = intercept;
156 }
157
HttpRequest(URI uri,const grpc_slice & request_text,grpc_http_response * response,Timestamp deadline,const grpc_channel_args * channel_args,grpc_closure * on_done,grpc_polling_entity * pollent,const char * name,absl::optional<std::function<bool ()>> test_only_generate_response,RefCountedPtr<grpc_channel_credentials> channel_creds)158 HttpRequest::HttpRequest(
159 URI uri, const grpc_slice& request_text, grpc_http_response* response,
160 Timestamp deadline, const grpc_channel_args* channel_args,
161 grpc_closure* on_done, grpc_polling_entity* pollent, const char* name,
162 absl::optional<std::function<bool()>> test_only_generate_response,
163 RefCountedPtr<grpc_channel_credentials> channel_creds)
164 : uri_(std::move(uri)),
165 request_text_(request_text),
166 deadline_(deadline),
167 channel_args_(CoreConfiguration::Get()
168 .channel_args_preconditioning()
169 .PreconditionChannelArgs(channel_args)
170 .ToC()
171 .release()),
172 channel_creds_(std::move(channel_creds)),
173 on_done_(on_done),
174 resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)),
175 pollent_(pollent),
176 pollset_set_(grpc_pollset_set_create()),
177 test_only_generate_response_(std::move(test_only_generate_response)),
178 use_event_engine_dns_resolver_(IsEventEngineDnsNonClientChannelEnabled()),
179 resolver_(!use_event_engine_dns_resolver_ ? GetDNSResolver() : nullptr),
180 ee_resolver_(
181 use_event_engine_dns_resolver_
182 ? ChannelArgs::FromC(channel_args_)
183 .GetObjectRef<EventEngine>()
184 ->GetDNSResolver(
185 EventEngine::DNSResolver::ResolverOptions())
186 : absl::InternalError("EventEngine DNS is not enabled")) {
187 grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
188 grpc_slice_buffer_init(&incoming_);
189 grpc_slice_buffer_init(&outgoing_);
190 grpc_iomgr_register_object(&iomgr_obj_, name);
191 GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
192 GRPC_CLOSURE_INIT(&continue_on_read_after_schedule_on_exec_ctx_,
193 ContinueOnReadAfterScheduleOnExecCtx, this,
194 grpc_schedule_on_exec_ctx);
195 GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
196 GRPC_CLOSURE_INIT(&continue_done_write_after_schedule_on_exec_ctx_,
197 ContinueDoneWriteAfterScheduleOnExecCtx, this,
198 grpc_schedule_on_exec_ctx);
199 CHECK(pollent);
200 grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_);
201 }
202
~HttpRequest()203 HttpRequest::~HttpRequest() {
204 grpc_channel_args_destroy(channel_args_);
205 grpc_http_parser_destroy(&parser_);
206 ep_.reset();
207 CSliceUnref(request_text_);
208 grpc_iomgr_unregister_object(&iomgr_obj_);
209 grpc_slice_buffer_destroy(&incoming_);
210 grpc_slice_buffer_destroy(&outgoing_);
211 grpc_pollset_set_destroy(pollset_set_);
212 }
213
Start()214 void HttpRequest::Start() {
215 MutexLock lock(&mu_);
216 if (test_only_generate_response_.has_value()) {
217 if (test_only_generate_response_.value()()) return;
218 }
219 if (use_event_engine_dns_resolver_ && !ee_resolver_.ok()) {
220 Finish(ee_resolver_.status());
221 return;
222 }
223 Ref().release(); // ref held by pending DNS resolution
224 if (use_event_engine_dns_resolver_) {
225 (*ee_resolver_)
226 ->LookupHostname(
227 [this](absl::StatusOr<std::vector<EventEngine::ResolvedAddress>>
228 addresses_or) {
229 ApplicationCallbackExecCtx callback_exec_ctx;
230 ExecCtx exec_ctx;
231 OnResolved(addresses_or);
232 },
233 uri_.authority(), uri_.scheme());
234 } else {
235 dns_request_handle_ = resolver_->LookupHostname(
236 [this](absl::StatusOr<std::vector<grpc_resolved_address>> addresses) {
237 if (addresses.ok()) {
238 std::vector<EventEngine::ResolvedAddress> ee_addresses;
239 for (const auto& addr : *addresses) {
240 ee_addresses.push_back(
241 grpc_event_engine::experimental::CreateResolvedAddress(addr));
242 }
243 OnResolved(ee_addresses);
244 } else {
245 OnResolved(addresses.status());
246 }
247 },
248 uri_.authority(), uri_.scheme(), kDefaultDNSRequestTimeout,
249 pollset_set_,
250 /*name_server=*/"");
251 }
252 }
253
Orphan()254 void HttpRequest::Orphan() {
255 {
256 MutexLock lock(&mu_);
257 CHECK(!cancelled_);
258 cancelled_ = true;
259 // cancel potentially pending DNS resolution.
260 if (use_event_engine_dns_resolver_) {
261 if (*ee_resolver_ != nullptr) {
262 ee_resolver_->reset();
263 }
264 } else {
265 if (dns_request_handle_.has_value() &&
266 resolver_->Cancel(dns_request_handle_.value())) {
267 Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
268 Unref();
269 }
270 }
271 if (handshake_mgr_ != nullptr) {
272 // Shutdown will cancel any ongoing tcp connect.
273 handshake_mgr_->Shutdown(
274 GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
275 }
276 ep_.reset();
277 }
278 Unref();
279 }
280
AppendError(grpc_error_handle error)281 void HttpRequest::AppendError(grpc_error_handle error) {
282 if (overall_error_.ok()) {
283 overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
284 }
285 auto addr_text = ResolvedAddressToURI(addresses_[next_address_ - 1]);
286 if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error));
287 overall_error_ = grpc_error_add_child(overall_error_, std::move(error));
288 }
289
OnReadInternal(grpc_error_handle error)290 void HttpRequest::OnReadInternal(grpc_error_handle error) {
291 for (size_t i = 0; i < incoming_.count; i++) {
292 GRPC_TRACE_LOG(http1, INFO)
293 << "HTTP response data: " << StringViewFromSlice(incoming_.slices[i]);
294 if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
295 have_read_byte_ = 1;
296 grpc_error_handle err =
297 grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
298 if (!err.ok()) {
299 Finish(err);
300 return;
301 }
302 }
303 }
304 if (cancelled_) {
305 Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP1 request cancelled during read",
306 &overall_error_, 1));
307 } else if (error.ok()) {
308 DoRead();
309 } else if (!have_read_byte_) {
310 NextAddress(error);
311 } else {
312 Finish(grpc_http_parser_eof(&parser_));
313 }
314 }
315
ContinueDoneWriteAfterScheduleOnExecCtx(void * arg,grpc_error_handle error)316 void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx(
317 void* arg, grpc_error_handle error) {
318 RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(arg));
319 MutexLock lock(&req->mu_);
320 if (error.ok() && !req->cancelled_) {
321 req->OnWritten();
322 } else {
323 req->NextAddress(error);
324 }
325 }
326
StartWrite()327 void HttpRequest::StartWrite() {
328 GRPC_TRACE_LOG(http1, INFO)
329 << "Sending HTTP1 request: " << StringViewFromSlice(request_text_);
330 CSliceRef(request_text_);
331 grpc_slice_buffer_add(&outgoing_, request_text_);
332 Ref().release(); // ref held by pending write
333 grpc_endpoint_write(ep_.get(), &outgoing_, &done_write_, nullptr,
334 /*max_frame_size=*/INT_MAX);
335 }
336
OnHandshakeDone(absl::StatusOr<HandshakerArgs * > result)337 void HttpRequest::OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result) {
338 if (g_test_only_on_handshake_done_intercept != nullptr) {
339 // Run this testing intercept before the lock so that it has a chance to
340 // do things like calling Orphan on the request
341 g_test_only_on_handshake_done_intercept(this);
342 }
343 MutexLock lock(&mu_);
344 if (!result.ok()) {
345 handshake_mgr_.reset();
346 NextAddress(result.status());
347 return;
348 }
349 // Handshake completed, so get the endpoint.
350 ep_ = std::move((*result)->endpoint);
351 handshake_mgr_.reset();
352 if (cancelled_) {
353 NextAddress(GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
354 return;
355 }
356 StartWrite();
357 }
358
DoHandshake(const EventEngine::ResolvedAddress & addr)359 void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) {
360 // Create the security connector using the credentials and target name.
361 ChannelArgs args = ChannelArgs::FromC(channel_args_);
362 RefCountedPtr<grpc_channel_security_connector> sc =
363 channel_creds_->create_security_connector(
364 nullptr /*call_creds*/, uri_.authority().c_str(), &args);
365 if (sc == nullptr) {
366 Finish(GRPC_ERROR_CREATE_REFERENCING("failed to create security connector",
367 &overall_error_, 1));
368 return;
369 }
370 absl::StatusOr<std::string> address = ResolvedAddressToURI(addr);
371 if (!address.ok()) {
372 Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
373 &overall_error_, 1));
374 return;
375 }
376 args = args.SetObject(std::move(sc))
377 .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value());
378 // Start the handshake
379 handshake_mgr_ = MakeRefCounted<HandshakeManager>();
380 CoreConfiguration::Get().handshaker_registry().AddHandshakers(
381 HANDSHAKER_CLIENT, args, pollset_set_, handshake_mgr_.get());
382 handshake_mgr_->DoHandshake(
383 nullptr, args, deadline_, /*acceptor=*/nullptr,
384 [self = Ref()](absl::StatusOr<HandshakerArgs*> result) {
385 self->OnHandshakeDone(std::move(result));
386 });
387 }
388
NextAddress(grpc_error_handle error)389 void HttpRequest::NextAddress(grpc_error_handle error) {
390 if (!error.ok()) {
391 AppendError(error);
392 }
393 if (cancelled_) {
394 Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP request was cancelled",
395 &overall_error_, 1));
396 return;
397 }
398 if (next_address_ == addresses_.size()) {
399 Finish(GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets",
400 &overall_error_, 1));
401 return;
402 }
403 DoHandshake(addresses_[next_address_++]);
404 }
405
OnResolved(absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses_or)406 void HttpRequest::OnResolved(
407 absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses_or) {
408 RefCountedPtr<HttpRequest> unreffer(this);
409 MutexLock lock(&mu_);
410 if (use_event_engine_dns_resolver_) {
411 ee_resolver_->reset();
412 } else {
413 dns_request_handle_.reset();
414 }
415 if (cancelled_) {
416 Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
417 return;
418 }
419 if (!addresses_or.ok()) {
420 Finish(absl_status_to_grpc_error(addresses_or.status()));
421 return;
422 }
423 addresses_ = std::move(*addresses_or);
424 next_address_ = 0;
425 NextAddress(absl::OkStatus());
426 }
427
428 } // namespace grpc_core
429