• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/util/http_client/httpcli.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/slice_buffer.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/port_platform.h>
25 #include <limits.h>
26 
27 #include <string>
28 #include <utility>
29 
30 #include "absl/functional/bind_front.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/str_format.h"
34 #include "src/core/config/core_configuration.h"
35 #include "src/core/handshaker/handshaker.h"
36 #include "src/core/handshaker/handshaker_registry.h"
37 #include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/channel/channel_args_preconditioning.h"
40 #include "src/core/lib/event_engine/resolved_address_internal.h"
41 #include "src/core/lib/event_engine/tcp_socket_utils.h"
42 #include "src/core/lib/iomgr/endpoint.h"
43 #include "src/core/lib/iomgr/iomgr_internal.h"
44 #include "src/core/lib/iomgr/pollset_set.h"
45 #include "src/core/lib/iomgr/resolve_address.h"
46 #include "src/core/lib/resource_quota/api.h"
47 #include "src/core/lib/security/credentials/credentials.h"
48 #include "src/core/lib/security/security_connector/security_connector.h"
49 #include "src/core/lib/slice/slice.h"
50 #include "src/core/lib/transport/error_utils.h"
51 #include "src/core/util/http_client/format_request.h"
52 #include "src/core/util/http_client/parser.h"
53 #include "src/core/util/status_helper.h"
54 
55 namespace grpc_core {
56 
57 namespace {
58 
59 using grpc_event_engine::experimental::EventEngine;
60 using grpc_event_engine::experimental::ResolvedAddressToURI;
61 
62 grpc_httpcli_get_override g_get_override;
63 grpc_httpcli_post_override g_post_override;
64 grpc_httpcli_put_override g_put_override;
65 void (*g_test_only_on_handshake_done_intercept)(HttpRequest* req);
66 
67 }  // namespace
68 
Get(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)69 OrphanablePtr<HttpRequest> HttpRequest::Get(
70     URI uri, const grpc_channel_args* channel_args,
71     grpc_polling_entity* pollent, const grpc_http_request* request,
72     Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
73     RefCountedPtr<grpc_channel_credentials> channel_creds) {
74   absl::optional<std::function<bool()>> test_only_generate_response;
75   if (g_get_override != nullptr) {
76     test_only_generate_response = [request, uri, deadline, on_done,
77                                    response]() {
78       // Note that capturing request here assumes it will remain alive
79       // until after Start is called. This avoids making a copy as this
80       // code path is only used for test mocks.
81       return g_get_override(request, uri, deadline, on_done, response);
82     };
83   }
84   std::string name =
85       absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path());
86   const grpc_slice request_text =
87       grpc_httpcli_format_get_request(request, uri.authority().c_str(),
88                                       uri.EncodedPathAndQueryParams().c_str());
89   return MakeOrphanable<HttpRequest>(
90       std::move(uri), request_text, response, deadline, channel_args, on_done,
91       pollent, name.c_str(), std::move(test_only_generate_response),
92       std::move(channel_creds));
93 }
94 
Post(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)95 OrphanablePtr<HttpRequest> HttpRequest::Post(
96     URI uri, const grpc_channel_args* channel_args,
97     grpc_polling_entity* pollent, const grpc_http_request* request,
98     Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
99     RefCountedPtr<grpc_channel_credentials> channel_creds) {
100   absl::optional<std::function<bool()>> test_only_generate_response;
101   if (g_post_override != nullptr) {
102     test_only_generate_response = [request, uri, deadline, on_done,
103                                    response]() {
104       return g_post_override(
105           request, uri, absl::string_view(request->body, request->body_length),
106           deadline, on_done, response);
107     };
108   }
109   std::string name =
110       absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path());
111   const grpc_slice request_text =
112       grpc_httpcli_format_post_request(request, uri.authority().c_str(),
113                                        uri.EncodedPathAndQueryParams().c_str());
114   return MakeOrphanable<HttpRequest>(
115       std::move(uri), request_text, response, deadline, channel_args, on_done,
116       pollent, name.c_str(), std::move(test_only_generate_response),
117       std::move(channel_creds));
118 }
119 
Put(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)120 OrphanablePtr<HttpRequest> HttpRequest::Put(
121     URI uri, const grpc_channel_args* channel_args,
122     grpc_polling_entity* pollent, const grpc_http_request* request,
123     Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
124     RefCountedPtr<grpc_channel_credentials> channel_creds) {
125   absl::optional<std::function<bool()>> test_only_generate_response;
126   if (g_put_override != nullptr) {
127     test_only_generate_response = [request, uri, deadline, on_done,
128                                    response]() {
129       return g_put_override(
130           request, uri, absl::string_view(request->body, request->body_length),
131           deadline, on_done, response);
132     };
133   }
134   std::string name =
135       absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path());
136   const grpc_slice request_text =
137       grpc_httpcli_format_put_request(request, uri.authority().c_str(),
138                                       uri.EncodedPathAndQueryParams().c_str());
139   return MakeOrphanable<HttpRequest>(
140       std::move(uri), request_text, response, deadline, channel_args, on_done,
141       pollent, name.c_str(), std::move(test_only_generate_response),
142       std::move(channel_creds));
143 }
144 
SetOverride(grpc_httpcli_get_override get,grpc_httpcli_post_override post,grpc_httpcli_put_override put)145 void HttpRequest::SetOverride(grpc_httpcli_get_override get,
146                               grpc_httpcli_post_override post,
147                               grpc_httpcli_put_override put) {
148   g_get_override = get;
149   g_post_override = post;
150   g_put_override = put;
151 }
152 
TestOnlySetOnHandshakeDoneIntercept(void (* intercept)(HttpRequest * req))153 void HttpRequest::TestOnlySetOnHandshakeDoneIntercept(
154     void (*intercept)(HttpRequest* req)) {
155   g_test_only_on_handshake_done_intercept = intercept;
156 }
157 
HttpRequest(URI uri,const grpc_slice & request_text,grpc_http_response * response,Timestamp deadline,const grpc_channel_args * channel_args,grpc_closure * on_done,grpc_polling_entity * pollent,const char * name,absl::optional<std::function<bool ()>> test_only_generate_response,RefCountedPtr<grpc_channel_credentials> channel_creds)158 HttpRequest::HttpRequest(
159     URI uri, const grpc_slice& request_text, grpc_http_response* response,
160     Timestamp deadline, const grpc_channel_args* channel_args,
161     grpc_closure* on_done, grpc_polling_entity* pollent, const char* name,
162     absl::optional<std::function<bool()>> test_only_generate_response,
163     RefCountedPtr<grpc_channel_credentials> channel_creds)
164     : uri_(std::move(uri)),
165       request_text_(request_text),
166       deadline_(deadline),
167       channel_args_(CoreConfiguration::Get()
168                         .channel_args_preconditioning()
169                         .PreconditionChannelArgs(channel_args)
170                         .ToC()
171                         .release()),
172       channel_creds_(std::move(channel_creds)),
173       on_done_(on_done),
174       resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)),
175       pollent_(pollent),
176       pollset_set_(grpc_pollset_set_create()),
177       test_only_generate_response_(std::move(test_only_generate_response)),
178       use_event_engine_dns_resolver_(IsEventEngineDnsNonClientChannelEnabled()),
179       resolver_(!use_event_engine_dns_resolver_ ? GetDNSResolver() : nullptr),
180       ee_resolver_(
181           use_event_engine_dns_resolver_
182               ? ChannelArgs::FromC(channel_args_)
183                     .GetObjectRef<EventEngine>()
184                     ->GetDNSResolver(
185                         EventEngine::DNSResolver::ResolverOptions())
186               : absl::InternalError("EventEngine DNS is not enabled")) {
187   grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
188   grpc_slice_buffer_init(&incoming_);
189   grpc_slice_buffer_init(&outgoing_);
190   grpc_iomgr_register_object(&iomgr_obj_, name);
191   GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
192   GRPC_CLOSURE_INIT(&continue_on_read_after_schedule_on_exec_ctx_,
193                     ContinueOnReadAfterScheduleOnExecCtx, this,
194                     grpc_schedule_on_exec_ctx);
195   GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
196   GRPC_CLOSURE_INIT(&continue_done_write_after_schedule_on_exec_ctx_,
197                     ContinueDoneWriteAfterScheduleOnExecCtx, this,
198                     grpc_schedule_on_exec_ctx);
199   CHECK(pollent);
200   grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_);
201 }
202 
~HttpRequest()203 HttpRequest::~HttpRequest() {
204   grpc_channel_args_destroy(channel_args_);
205   grpc_http_parser_destroy(&parser_);
206   ep_.reset();
207   CSliceUnref(request_text_);
208   grpc_iomgr_unregister_object(&iomgr_obj_);
209   grpc_slice_buffer_destroy(&incoming_);
210   grpc_slice_buffer_destroy(&outgoing_);
211   grpc_pollset_set_destroy(pollset_set_);
212 }
213 
Start()214 void HttpRequest::Start() {
215   MutexLock lock(&mu_);
216   if (test_only_generate_response_.has_value()) {
217     if (test_only_generate_response_.value()()) return;
218   }
219   if (use_event_engine_dns_resolver_ && !ee_resolver_.ok()) {
220     Finish(ee_resolver_.status());
221     return;
222   }
223   Ref().release();  // ref held by pending DNS resolution
224   if (use_event_engine_dns_resolver_) {
225     (*ee_resolver_)
226         ->LookupHostname(
227             [this](absl::StatusOr<std::vector<EventEngine::ResolvedAddress>>
228                        addresses_or) {
229               ApplicationCallbackExecCtx callback_exec_ctx;
230               ExecCtx exec_ctx;
231               OnResolved(addresses_or);
232             },
233             uri_.authority(), uri_.scheme());
234   } else {
235     dns_request_handle_ = resolver_->LookupHostname(
236         [this](absl::StatusOr<std::vector<grpc_resolved_address>> addresses) {
237           if (addresses.ok()) {
238             std::vector<EventEngine::ResolvedAddress> ee_addresses;
239             for (const auto& addr : *addresses) {
240               ee_addresses.push_back(
241                   grpc_event_engine::experimental::CreateResolvedAddress(addr));
242             }
243             OnResolved(ee_addresses);
244           } else {
245             OnResolved(addresses.status());
246           }
247         },
248         uri_.authority(), uri_.scheme(), kDefaultDNSRequestTimeout,
249         pollset_set_,
250         /*name_server=*/"");
251   }
252 }
253 
Orphan()254 void HttpRequest::Orphan() {
255   {
256     MutexLock lock(&mu_);
257     CHECK(!cancelled_);
258     cancelled_ = true;
259     // cancel potentially pending DNS resolution.
260     if (use_event_engine_dns_resolver_) {
261       if (*ee_resolver_ != nullptr) {
262         ee_resolver_->reset();
263       }
264     } else {
265       if (dns_request_handle_.has_value() &&
266           resolver_->Cancel(dns_request_handle_.value())) {
267         Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
268         Unref();
269       }
270     }
271     if (handshake_mgr_ != nullptr) {
272       // Shutdown will cancel any ongoing tcp connect.
273       handshake_mgr_->Shutdown(
274           GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
275     }
276     ep_.reset();
277   }
278   Unref();
279 }
280 
AppendError(grpc_error_handle error)281 void HttpRequest::AppendError(grpc_error_handle error) {
282   if (overall_error_.ok()) {
283     overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
284   }
285   auto addr_text = ResolvedAddressToURI(addresses_[next_address_ - 1]);
286   if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error));
287   overall_error_ = grpc_error_add_child(overall_error_, std::move(error));
288 }
289 
OnReadInternal(grpc_error_handle error)290 void HttpRequest::OnReadInternal(grpc_error_handle error) {
291   for (size_t i = 0; i < incoming_.count; i++) {
292     GRPC_TRACE_LOG(http1, INFO)
293         << "HTTP response data: " << StringViewFromSlice(incoming_.slices[i]);
294     if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
295       have_read_byte_ = 1;
296       grpc_error_handle err =
297           grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
298       if (!err.ok()) {
299         Finish(err);
300         return;
301       }
302     }
303   }
304   if (cancelled_) {
305     Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP1 request cancelled during read",
306                                          &overall_error_, 1));
307   } else if (error.ok()) {
308     DoRead();
309   } else if (!have_read_byte_) {
310     NextAddress(error);
311   } else {
312     Finish(grpc_http_parser_eof(&parser_));
313   }
314 }
315 
ContinueDoneWriteAfterScheduleOnExecCtx(void * arg,grpc_error_handle error)316 void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx(
317     void* arg, grpc_error_handle error) {
318   RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(arg));
319   MutexLock lock(&req->mu_);
320   if (error.ok() && !req->cancelled_) {
321     req->OnWritten();
322   } else {
323     req->NextAddress(error);
324   }
325 }
326 
StartWrite()327 void HttpRequest::StartWrite() {
328   GRPC_TRACE_LOG(http1, INFO)
329       << "Sending HTTP1 request: " << StringViewFromSlice(request_text_);
330   CSliceRef(request_text_);
331   grpc_slice_buffer_add(&outgoing_, request_text_);
332   Ref().release();  // ref held by pending write
333   grpc_endpoint_write(ep_.get(), &outgoing_, &done_write_, nullptr,
334                       /*max_frame_size=*/INT_MAX);
335 }
336 
OnHandshakeDone(absl::StatusOr<HandshakerArgs * > result)337 void HttpRequest::OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result) {
338   if (g_test_only_on_handshake_done_intercept != nullptr) {
339     // Run this testing intercept before the lock so that it has a chance to
340     // do things like calling Orphan on the request
341     g_test_only_on_handshake_done_intercept(this);
342   }
343   MutexLock lock(&mu_);
344   if (!result.ok()) {
345     handshake_mgr_.reset();
346     NextAddress(result.status());
347     return;
348   }
349   // Handshake completed, so get the endpoint.
350   ep_ = std::move((*result)->endpoint);
351   handshake_mgr_.reset();
352   if (cancelled_) {
353     NextAddress(GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
354     return;
355   }
356   StartWrite();
357 }
358 
DoHandshake(const EventEngine::ResolvedAddress & addr)359 void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) {
360   // Create the security connector using the credentials and target name.
361   ChannelArgs args = ChannelArgs::FromC(channel_args_);
362   RefCountedPtr<grpc_channel_security_connector> sc =
363       channel_creds_->create_security_connector(
364           nullptr /*call_creds*/, uri_.authority().c_str(), &args);
365   if (sc == nullptr) {
366     Finish(GRPC_ERROR_CREATE_REFERENCING("failed to create security connector",
367                                          &overall_error_, 1));
368     return;
369   }
370   absl::StatusOr<std::string> address = ResolvedAddressToURI(addr);
371   if (!address.ok()) {
372     Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
373                                          &overall_error_, 1));
374     return;
375   }
376   args = args.SetObject(std::move(sc))
377              .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value());
378   // Start the handshake
379   handshake_mgr_ = MakeRefCounted<HandshakeManager>();
380   CoreConfiguration::Get().handshaker_registry().AddHandshakers(
381       HANDSHAKER_CLIENT, args, pollset_set_, handshake_mgr_.get());
382   handshake_mgr_->DoHandshake(
383       nullptr, args, deadline_, /*acceptor=*/nullptr,
384       [self = Ref()](absl::StatusOr<HandshakerArgs*> result) {
385         self->OnHandshakeDone(std::move(result));
386       });
387 }
388 
NextAddress(grpc_error_handle error)389 void HttpRequest::NextAddress(grpc_error_handle error) {
390   if (!error.ok()) {
391     AppendError(error);
392   }
393   if (cancelled_) {
394     Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP request was cancelled",
395                                          &overall_error_, 1));
396     return;
397   }
398   if (next_address_ == addresses_.size()) {
399     Finish(GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets",
400                                          &overall_error_, 1));
401     return;
402   }
403   DoHandshake(addresses_[next_address_++]);
404 }
405 
OnResolved(absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses_or)406 void HttpRequest::OnResolved(
407     absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses_or) {
408   RefCountedPtr<HttpRequest> unreffer(this);
409   MutexLock lock(&mu_);
410   if (use_event_engine_dns_resolver_) {
411     ee_resolver_->reset();
412   } else {
413     dns_request_handle_.reset();
414   }
415   if (cancelled_) {
416     Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
417     return;
418   }
419   if (!addresses_or.ok()) {
420     Finish(absl_status_to_grpc_error(addresses_or.status()));
421     return;
422   }
423   addresses_ = std::move(*addresses_or);
424   next_address_ = 0;
425   NextAddress(absl::OkStatus());
426 }
427 
428 }  // namespace grpc_core
429