// // // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // #include "src/core/util/http_client/httpcli.h" #include #include #include #include #include #include #include #include "absl/functional/bind_front.h" #include "absl/log/check.h" #include "absl/status/status.h" #include "absl/strings/str_format.h" #include "src/core/config/core_configuration.h" #include "src/core/handshaker/handshaker.h" #include "src/core/handshaker/handshaker_registry.h" #include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/event_engine/resolved_address_internal.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/resource_quota/api.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/security_connector/security_connector.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/util/http_client/format_request.h" #include "src/core/util/http_client/parser.h" #include "src/core/util/status_helper.h" namespace grpc_core { namespace { using grpc_event_engine::experimental::EventEngine; using grpc_event_engine::experimental::ResolvedAddressToURI; grpc_httpcli_get_override g_get_override; grpc_httpcli_post_override g_post_override; grpc_httpcli_put_override g_put_override; void (*g_test_only_on_handshake_done_intercept)(HttpRequest* req); } // namespace OrphanablePtr HttpRequest::Get( URI uri, const grpc_channel_args* channel_args, grpc_polling_entity* pollent, const grpc_http_request* request, Timestamp deadline, grpc_closure* on_done, grpc_http_response* response, RefCountedPtr channel_creds) { absl::optional> test_only_generate_response; if (g_get_override != nullptr) { test_only_generate_response = [request, uri, deadline, on_done, response]() { // Note that capturing request here assumes it will remain alive // until after Start is called. This avoids making a copy as this // code path is only used for test mocks. return g_get_override(request, uri, deadline, on_done, response); }; } std::string name = absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path()); const grpc_slice request_text = grpc_httpcli_format_get_request(request, uri.authority().c_str(), uri.EncodedPathAndQueryParams().c_str()); return MakeOrphanable( std::move(uri), request_text, response, deadline, channel_args, on_done, pollent, name.c_str(), std::move(test_only_generate_response), std::move(channel_creds)); } OrphanablePtr HttpRequest::Post( URI uri, const grpc_channel_args* channel_args, grpc_polling_entity* pollent, const grpc_http_request* request, Timestamp deadline, grpc_closure* on_done, grpc_http_response* response, RefCountedPtr channel_creds) { absl::optional> test_only_generate_response; if (g_post_override != nullptr) { test_only_generate_response = [request, uri, deadline, on_done, response]() { return g_post_override( request, uri, absl::string_view(request->body, request->body_length), deadline, on_done, response); }; } std::string name = absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path()); const grpc_slice request_text = grpc_httpcli_format_post_request(request, uri.authority().c_str(), uri.EncodedPathAndQueryParams().c_str()); return MakeOrphanable( std::move(uri), request_text, response, deadline, channel_args, on_done, pollent, name.c_str(), std::move(test_only_generate_response), std::move(channel_creds)); } OrphanablePtr HttpRequest::Put( URI uri, const grpc_channel_args* channel_args, grpc_polling_entity* pollent, const grpc_http_request* request, Timestamp deadline, grpc_closure* on_done, grpc_http_response* response, RefCountedPtr channel_creds) { absl::optional> test_only_generate_response; if (g_put_override != nullptr) { test_only_generate_response = [request, uri, deadline, on_done, response]() { return g_put_override( request, uri, absl::string_view(request->body, request->body_length), deadline, on_done, response); }; } std::string name = absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path()); const grpc_slice request_text = grpc_httpcli_format_put_request(request, uri.authority().c_str(), uri.EncodedPathAndQueryParams().c_str()); return MakeOrphanable( std::move(uri), request_text, response, deadline, channel_args, on_done, pollent, name.c_str(), std::move(test_only_generate_response), std::move(channel_creds)); } void HttpRequest::SetOverride(grpc_httpcli_get_override get, grpc_httpcli_post_override post, grpc_httpcli_put_override put) { g_get_override = get; g_post_override = post; g_put_override = put; } void HttpRequest::TestOnlySetOnHandshakeDoneIntercept( void (*intercept)(HttpRequest* req)) { g_test_only_on_handshake_done_intercept = intercept; } HttpRequest::HttpRequest( URI uri, const grpc_slice& request_text, grpc_http_response* response, Timestamp deadline, const grpc_channel_args* channel_args, grpc_closure* on_done, grpc_polling_entity* pollent, const char* name, absl::optional> test_only_generate_response, RefCountedPtr channel_creds) : uri_(std::move(uri)), request_text_(request_text), deadline_(deadline), channel_args_(CoreConfiguration::Get() .channel_args_preconditioning() .PreconditionChannelArgs(channel_args) .ToC() .release()), channel_creds_(std::move(channel_creds)), on_done_(on_done), resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)), pollent_(pollent), pollset_set_(grpc_pollset_set_create()), test_only_generate_response_(std::move(test_only_generate_response)), use_event_engine_dns_resolver_(IsEventEngineDnsNonClientChannelEnabled()), resolver_(!use_event_engine_dns_resolver_ ? GetDNSResolver() : nullptr), ee_resolver_( use_event_engine_dns_resolver_ ? ChannelArgs::FromC(channel_args_) .GetObjectRef() ->GetDNSResolver( EventEngine::DNSResolver::ResolverOptions()) : absl::InternalError("EventEngine DNS is not enabled")) { grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response); grpc_slice_buffer_init(&incoming_); grpc_slice_buffer_init(&outgoing_); grpc_iomgr_register_object(&iomgr_obj_, name); GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&continue_on_read_after_schedule_on_exec_ctx_, ContinueOnReadAfterScheduleOnExecCtx, this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&continue_done_write_after_schedule_on_exec_ctx_, ContinueDoneWriteAfterScheduleOnExecCtx, this, grpc_schedule_on_exec_ctx); CHECK(pollent); grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_); } HttpRequest::~HttpRequest() { grpc_channel_args_destroy(channel_args_); grpc_http_parser_destroy(&parser_); ep_.reset(); CSliceUnref(request_text_); grpc_iomgr_unregister_object(&iomgr_obj_); grpc_slice_buffer_destroy(&incoming_); grpc_slice_buffer_destroy(&outgoing_); grpc_pollset_set_destroy(pollset_set_); } void HttpRequest::Start() { MutexLock lock(&mu_); if (test_only_generate_response_.has_value()) { if (test_only_generate_response_.value()()) return; } if (use_event_engine_dns_resolver_ && !ee_resolver_.ok()) { Finish(ee_resolver_.status()); return; } Ref().release(); // ref held by pending DNS resolution if (use_event_engine_dns_resolver_) { (*ee_resolver_) ->LookupHostname( [this](absl::StatusOr> addresses_or) { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; OnResolved(addresses_or); }, uri_.authority(), uri_.scheme()); } else { dns_request_handle_ = resolver_->LookupHostname( [this](absl::StatusOr> addresses) { if (addresses.ok()) { std::vector ee_addresses; for (const auto& addr : *addresses) { ee_addresses.push_back( grpc_event_engine::experimental::CreateResolvedAddress(addr)); } OnResolved(ee_addresses); } else { OnResolved(addresses.status()); } }, uri_.authority(), uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_, /*name_server=*/""); } } void HttpRequest::Orphan() { { MutexLock lock(&mu_); CHECK(!cancelled_); cancelled_ = true; // cancel potentially pending DNS resolution. if (use_event_engine_dns_resolver_) { if (*ee_resolver_ != nullptr) { ee_resolver_->reset(); } } else { if (dns_request_handle_.has_value() && resolver_->Cancel(dns_request_handle_.value())) { Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution")); Unref(); } } if (handshake_mgr_ != nullptr) { // Shutdown will cancel any ongoing tcp connect. handshake_mgr_->Shutdown( GRPC_ERROR_CREATE("HTTP request cancelled during handshake")); } ep_.reset(); } Unref(); } void HttpRequest::AppendError(grpc_error_handle error) { if (overall_error_.ok()) { overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request"); } auto addr_text = ResolvedAddressToURI(addresses_[next_address_ - 1]); if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error)); overall_error_ = grpc_error_add_child(overall_error_, std::move(error)); } void HttpRequest::OnReadInternal(grpc_error_handle error) { for (size_t i = 0; i < incoming_.count; i++) { GRPC_TRACE_LOG(http1, INFO) << "HTTP response data: " << StringViewFromSlice(incoming_.slices[i]); if (GRPC_SLICE_LENGTH(incoming_.slices[i])) { have_read_byte_ = 1; grpc_error_handle err = grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr); if (!err.ok()) { Finish(err); return; } } } if (cancelled_) { Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP1 request cancelled during read", &overall_error_, 1)); } else if (error.ok()) { DoRead(); } else if (!have_read_byte_) { NextAddress(error); } else { Finish(grpc_http_parser_eof(&parser_)); } } void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx( void* arg, grpc_error_handle error) { RefCountedPtr req(static_cast(arg)); MutexLock lock(&req->mu_); if (error.ok() && !req->cancelled_) { req->OnWritten(); } else { req->NextAddress(error); } } void HttpRequest::StartWrite() { GRPC_TRACE_LOG(http1, INFO) << "Sending HTTP1 request: " << StringViewFromSlice(request_text_); CSliceRef(request_text_); grpc_slice_buffer_add(&outgoing_, request_text_); Ref().release(); // ref held by pending write grpc_endpoint_write(ep_.get(), &outgoing_, &done_write_, nullptr, /*max_frame_size=*/INT_MAX); } void HttpRequest::OnHandshakeDone(absl::StatusOr result) { if (g_test_only_on_handshake_done_intercept != nullptr) { // Run this testing intercept before the lock so that it has a chance to // do things like calling Orphan on the request g_test_only_on_handshake_done_intercept(this); } MutexLock lock(&mu_); if (!result.ok()) { handshake_mgr_.reset(); NextAddress(result.status()); return; } // Handshake completed, so get the endpoint. ep_ = std::move((*result)->endpoint); handshake_mgr_.reset(); if (cancelled_) { NextAddress(GRPC_ERROR_CREATE("HTTP request cancelled during handshake")); return; } StartWrite(); } void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) { // Create the security connector using the credentials and target name. ChannelArgs args = ChannelArgs::FromC(channel_args_); RefCountedPtr sc = channel_creds_->create_security_connector( nullptr /*call_creds*/, uri_.authority().c_str(), &args); if (sc == nullptr) { Finish(GRPC_ERROR_CREATE_REFERENCING("failed to create security connector", &overall_error_, 1)); return; } absl::StatusOr address = ResolvedAddressToURI(addr); if (!address.ok()) { Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address", &overall_error_, 1)); return; } args = args.SetObject(std::move(sc)) .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value()); // Start the handshake handshake_mgr_ = MakeRefCounted(); CoreConfiguration::Get().handshaker_registry().AddHandshakers( HANDSHAKER_CLIENT, args, pollset_set_, handshake_mgr_.get()); handshake_mgr_->DoHandshake( nullptr, args, deadline_, /*acceptor=*/nullptr, [self = Ref()](absl::StatusOr result) { self->OnHandshakeDone(std::move(result)); }); } void HttpRequest::NextAddress(grpc_error_handle error) { if (!error.ok()) { AppendError(error); } if (cancelled_) { Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP request was cancelled", &overall_error_, 1)); return; } if (next_address_ == addresses_.size()) { Finish(GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets", &overall_error_, 1)); return; } DoHandshake(addresses_[next_address_++]); } void HttpRequest::OnResolved( absl::StatusOr> addresses_or) { RefCountedPtr unreffer(this); MutexLock lock(&mu_); if (use_event_engine_dns_resolver_) { ee_resolver_->reset(); } else { dns_request_handle_.reset(); } if (cancelled_) { Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution")); return; } if (!addresses_or.ok()) { Finish(absl_status_to_grpc_error(addresses_or.status())); return; } addresses_ = std::move(*addresses_or); next_address_ = 0; NextAddress(absl::OkStatus()); } } // namespace grpc_core