• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/http/httpcli.h"
22 
23 #include <limits.h>
24 
25 #include <string>
26 #include <utility>
27 
28 #include "absl/functional/bind_front.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_format.h"
31 
32 #include <grpc/grpc.h>
33 #include <grpc/slice_buffer.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 
37 #include "src/core/lib/address_utils/sockaddr_utils.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/channel/channel_args_preconditioning.h"
40 #include "src/core/lib/config/core_configuration.h"
41 #include "src/core/lib/gprpp/status_helper.h"
42 #include "src/core/lib/http/format_request.h"
43 #include "src/core/lib/http/parser.h"
44 #include "src/core/lib/iomgr/endpoint.h"
45 #include "src/core/lib/iomgr/iomgr_internal.h"
46 #include "src/core/lib/iomgr/pollset_set.h"
47 #include "src/core/lib/iomgr/resolve_address.h"
48 #include "src/core/lib/resource_quota/api.h"
49 #include "src/core/lib/security/credentials/credentials.h"
50 #include "src/core/lib/security/security_connector/security_connector.h"
51 #include "src/core/lib/slice/slice.h"
52 #include "src/core/lib/transport/error_utils.h"
53 #include "src/core/lib/transport/handshaker_registry.h"
54 #include "src/core/lib/transport/tcp_connect_handshaker.h"
55 
56 namespace grpc_core {
57 
58 namespace {
59 
60 grpc_httpcli_get_override g_get_override;
61 grpc_httpcli_post_override g_post_override;
62 grpc_httpcli_put_override g_put_override;
63 void (*g_test_only_on_handshake_done_intercept)(HttpRequest* req);
64 
65 }  // namespace
66 
Get(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)67 OrphanablePtr<HttpRequest> HttpRequest::Get(
68     URI uri, const grpc_channel_args* channel_args,
69     grpc_polling_entity* pollent, const grpc_http_request* request,
70     Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
71     RefCountedPtr<grpc_channel_credentials> channel_creds) {
72   absl::optional<std::function<void()>> test_only_generate_response;
73   if (g_get_override != nullptr) {
74     test_only_generate_response = [request, uri, deadline, on_done,
75                                    response]() {
76       // Note that capturing request here assumes it will remain alive
77       // until after Start is called. This avoids making a copy as this
78       // code path is only used for test mocks.
79       g_get_override(request, uri.authority().c_str(), uri.path().c_str(),
80                      deadline, on_done, response);
81     };
82   }
83   std::string name =
84       absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path());
85   const grpc_slice request_text = grpc_httpcli_format_get_request(
86       request, uri.authority().c_str(), uri.path().c_str());
87   return MakeOrphanable<HttpRequest>(
88       std::move(uri), request_text, response, deadline, channel_args, on_done,
89       pollent, name.c_str(), std::move(test_only_generate_response),
90       std::move(channel_creds));
91 }
92 
Post(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)93 OrphanablePtr<HttpRequest> HttpRequest::Post(
94     URI uri, const grpc_channel_args* channel_args,
95     grpc_polling_entity* pollent, const grpc_http_request* request,
96     Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
97     RefCountedPtr<grpc_channel_credentials> channel_creds) {
98   absl::optional<std::function<void()>> test_only_generate_response;
99   if (g_post_override != nullptr) {
100     test_only_generate_response = [request, uri, deadline, on_done,
101                                    response]() {
102       g_post_override(request, uri.authority().c_str(), uri.path().c_str(),
103                       request->body, request->body_length, deadline, on_done,
104                       response);
105     };
106   }
107   std::string name =
108       absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path());
109   const grpc_slice request_text = grpc_httpcli_format_post_request(
110       request, uri.authority().c_str(), uri.path().c_str());
111   return MakeOrphanable<HttpRequest>(
112       std::move(uri), request_text, response, deadline, channel_args, on_done,
113       pollent, name.c_str(), std::move(test_only_generate_response),
114       std::move(channel_creds));
115 }
116 
Put(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)117 OrphanablePtr<HttpRequest> HttpRequest::Put(
118     URI uri, const grpc_channel_args* channel_args,
119     grpc_polling_entity* pollent, const grpc_http_request* request,
120     Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
121     RefCountedPtr<grpc_channel_credentials> channel_creds) {
122   absl::optional<std::function<void()>> test_only_generate_response;
123   if (g_put_override != nullptr) {
124     test_only_generate_response = [request, uri, deadline, on_done,
125                                    response]() {
126       g_put_override(request, uri.authority().c_str(), uri.path().c_str(),
127                      request->body, request->body_length, deadline, on_done,
128                      response);
129     };
130   }
131   std::string name =
132       absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path());
133   const grpc_slice request_text = grpc_httpcli_format_put_request(
134       request, uri.authority().c_str(), uri.path().c_str());
135   return MakeOrphanable<HttpRequest>(
136       std::move(uri), request_text, response, deadline, channel_args, on_done,
137       pollent, name.c_str(), std::move(test_only_generate_response),
138       std::move(channel_creds));
139 }
140 
SetOverride(grpc_httpcli_get_override get,grpc_httpcli_post_override post,grpc_httpcli_put_override put)141 void HttpRequest::SetOverride(grpc_httpcli_get_override get,
142                               grpc_httpcli_post_override post,
143                               grpc_httpcli_put_override put) {
144   g_get_override = get;
145   g_post_override = post;
146   g_put_override = put;
147 }
148 
TestOnlySetOnHandshakeDoneIntercept(void (* intercept)(HttpRequest * req))149 void HttpRequest::TestOnlySetOnHandshakeDoneIntercept(
150     void (*intercept)(HttpRequest* req)) {
151   g_test_only_on_handshake_done_intercept = intercept;
152 }
153 
HttpRequest(URI uri,const grpc_slice & request_text,grpc_http_response * response,Timestamp deadline,const grpc_channel_args * channel_args,grpc_closure * on_done,grpc_polling_entity * pollent,const char * name,absl::optional<std::function<void ()>> test_only_generate_response,RefCountedPtr<grpc_channel_credentials> channel_creds)154 HttpRequest::HttpRequest(
155     URI uri, const grpc_slice& request_text, grpc_http_response* response,
156     Timestamp deadline, const grpc_channel_args* channel_args,
157     grpc_closure* on_done, grpc_polling_entity* pollent, const char* name,
158     absl::optional<std::function<void()>> test_only_generate_response,
159     RefCountedPtr<grpc_channel_credentials> channel_creds)
160     : uri_(std::move(uri)),
161       request_text_(request_text),
162       deadline_(deadline),
163       channel_args_(CoreConfiguration::Get()
164                         .channel_args_preconditioning()
165                         .PreconditionChannelArgs(channel_args)
166                         .ToC()
167                         .release()),
168       channel_creds_(std::move(channel_creds)),
169       on_done_(on_done),
170       resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)),
171       pollent_(pollent),
172       pollset_set_(grpc_pollset_set_create()),
173       test_only_generate_response_(std::move(test_only_generate_response)),
174       resolver_(GetDNSResolver()) {
175   grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
176   grpc_slice_buffer_init(&incoming_);
177   grpc_slice_buffer_init(&outgoing_);
178   grpc_iomgr_register_object(&iomgr_obj_, name);
179   GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
180   GRPC_CLOSURE_INIT(&continue_on_read_after_schedule_on_exec_ctx_,
181                     ContinueOnReadAfterScheduleOnExecCtx, this,
182                     grpc_schedule_on_exec_ctx);
183   GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
184   GRPC_CLOSURE_INIT(&continue_done_write_after_schedule_on_exec_ctx_,
185                     ContinueDoneWriteAfterScheduleOnExecCtx, this,
186                     grpc_schedule_on_exec_ctx);
187   GPR_ASSERT(pollent);
188   grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_);
189 }
190 
~HttpRequest()191 HttpRequest::~HttpRequest() {
192   grpc_channel_args_destroy(channel_args_);
193   grpc_http_parser_destroy(&parser_);
194   if (own_endpoint_ && ep_ != nullptr) {
195     grpc_endpoint_destroy(ep_);
196   }
197   CSliceUnref(request_text_);
198   grpc_iomgr_unregister_object(&iomgr_obj_);
199   grpc_slice_buffer_destroy(&incoming_);
200   grpc_slice_buffer_destroy(&outgoing_);
201   grpc_pollset_set_destroy(pollset_set_);
202 }
203 
Start()204 void HttpRequest::Start() {
205   MutexLock lock(&mu_);
206   if (test_only_generate_response_.has_value()) {
207     test_only_generate_response_.value()();
208     return;
209   }
210   Ref().release();  // ref held by pending DNS resolution
211   dns_request_handle_ = resolver_->LookupHostname(
212       absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(),
213       uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_,
214       /*name_server=*/"");
215 }
216 
Orphan()217 void HttpRequest::Orphan() {
218   {
219     MutexLock lock(&mu_);
220     GPR_ASSERT(!cancelled_);
221     cancelled_ = true;
222     // cancel potentially pending DNS resolution.
223     if (dns_request_handle_.has_value() &&
224         resolver_->Cancel(dns_request_handle_.value())) {
225       Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
226       Unref();
227     }
228     if (handshake_mgr_ != nullptr) {
229       // Shutdown will cancel any ongoing tcp connect.
230       handshake_mgr_->Shutdown(
231           GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
232     }
233     if (own_endpoint_ && ep_ != nullptr) {
234       grpc_endpoint_shutdown(ep_, GRPC_ERROR_CREATE("HTTP request cancelled"));
235     }
236   }
237   Unref();
238 }
239 
AppendError(grpc_error_handle error)240 void HttpRequest::AppendError(grpc_error_handle error) {
241   if (overall_error_.ok()) {
242     overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
243   }
244   const grpc_resolved_address* addr = &addresses_[next_address_ - 1];
245   auto addr_text = grpc_sockaddr_to_uri(addr);
246   overall_error_ = grpc_error_add_child(
247       overall_error_,
248       grpc_error_set_str(
249           error, StatusStrProperty::kTargetAddress,
250           addr_text.ok() ? addr_text.value() : addr_text.status().ToString()));
251 }
252 
OnReadInternal(grpc_error_handle error)253 void HttpRequest::OnReadInternal(grpc_error_handle error) {
254   for (size_t i = 0; i < incoming_.count; i++) {
255     if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
256       have_read_byte_ = 1;
257       grpc_error_handle err =
258           grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
259       if (!err.ok()) {
260         Finish(err);
261         return;
262       }
263     }
264   }
265   if (cancelled_) {
266     Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP1 request cancelled during read",
267                                          &overall_error_, 1));
268   } else if (error.ok()) {
269     DoRead();
270   } else if (!have_read_byte_) {
271     NextAddress(error);
272   } else {
273     Finish(grpc_http_parser_eof(&parser_));
274   }
275 }
276 
ContinueDoneWriteAfterScheduleOnExecCtx(void * arg,grpc_error_handle error)277 void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx(
278     void* arg, grpc_error_handle error) {
279   RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(arg));
280   MutexLock lock(&req->mu_);
281   if (error.ok() && !req->cancelled_) {
282     req->OnWritten();
283   } else {
284     req->NextAddress(error);
285   }
286 }
287 
StartWrite()288 void HttpRequest::StartWrite() {
289   CSliceRef(request_text_);
290   grpc_slice_buffer_add(&outgoing_, request_text_);
291   Ref().release();  // ref held by pending write
292   grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr,
293                       /*max_frame_size=*/INT_MAX);
294 }
295 
OnHandshakeDone(void * arg,grpc_error_handle error)296 void HttpRequest::OnHandshakeDone(void* arg, grpc_error_handle error) {
297   auto* args = static_cast<HandshakerArgs*>(arg);
298   RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(args->user_data));
299   if (g_test_only_on_handshake_done_intercept != nullptr) {
300     // Run this testing intercept before the lock so that it has a chance to
301     // do things like calling Orphan on the request
302     g_test_only_on_handshake_done_intercept(req.get());
303   }
304   MutexLock lock(&req->mu_);
305   req->own_endpoint_ = true;
306   if (!error.ok()) {
307     req->handshake_mgr_.reset();
308     req->NextAddress(error);
309     return;
310   }
311   // Handshake completed, so we own fields in args
312   grpc_slice_buffer_destroy(args->read_buffer);
313   gpr_free(args->read_buffer);
314   req->ep_ = args->endpoint;
315   req->handshake_mgr_.reset();
316   if (req->cancelled_) {
317     req->NextAddress(
318         GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
319     return;
320   }
321   req->StartWrite();
322 }
323 
DoHandshake(const grpc_resolved_address * addr)324 void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
325   // Create the security connector using the credentials and target name.
326   ChannelArgs args = ChannelArgs::FromC(channel_args_);
327   RefCountedPtr<grpc_channel_security_connector> sc =
328       channel_creds_->create_security_connector(
329           nullptr /*call_creds*/, uri_.authority().c_str(), &args);
330   if (sc == nullptr) {
331     Finish(GRPC_ERROR_CREATE_REFERENCING("failed to create security connector",
332                                          &overall_error_, 1));
333     return;
334   }
335   absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(addr);
336   if (!address.ok()) {
337     Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
338                                          &overall_error_, 1));
339     return;
340   }
341   args = args.SetObject(std::move(sc))
342              .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value());
343   // Start the handshake
344   handshake_mgr_ = MakeRefCounted<HandshakeManager>();
345   CoreConfiguration::Get().handshaker_registry().AddHandshakers(
346       HANDSHAKER_CLIENT, args, pollset_set_, handshake_mgr_.get());
347   Ref().release();  // ref held by pending handshake
348   grpc_endpoint* ep = ep_;
349   ep_ = nullptr;
350   own_endpoint_ = false;
351   handshake_mgr_->DoHandshake(ep, args, deadline_,
352                               /*acceptor=*/nullptr, OnHandshakeDone,
353                               /*user_data=*/this);
354 }
355 
NextAddress(grpc_error_handle error)356 void HttpRequest::NextAddress(grpc_error_handle error) {
357   if (!error.ok()) {
358     AppendError(error);
359   }
360   if (cancelled_) {
361     Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP request was cancelled",
362                                          &overall_error_, 1));
363     return;
364   }
365   if (next_address_ == addresses_.size()) {
366     Finish(GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets",
367                                          &overall_error_, 1));
368     return;
369   }
370   const grpc_resolved_address* addr = &addresses_[next_address_++];
371   DoHandshake(addr);
372 }
373 
OnResolved(absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or)374 void HttpRequest::OnResolved(
375     absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
376   RefCountedPtr<HttpRequest> unreffer(this);
377   MutexLock lock(&mu_);
378   dns_request_handle_.reset();
379   if (cancelled_) {
380     Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
381     return;
382   }
383   if (!addresses_or.ok()) {
384     Finish(absl_status_to_grpc_error(addresses_or.status()));
385     return;
386   }
387   addresses_ = std::move(*addresses_or);
388   next_address_ = 0;
389   NextAddress(absl::OkStatus());
390 }
391 
392 }  // namespace grpc_core
393