1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/http/httpcli.h"
22
23 #include <limits.h>
24
25 #include <string>
26 #include <utility>
27
28 #include "absl/functional/bind_front.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/grpc.h>
33 #include <grpc/slice_buffer.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36
37 #include "src/core/lib/address_utils/sockaddr_utils.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/channel/channel_args_preconditioning.h"
40 #include "src/core/lib/config/core_configuration.h"
41 #include "src/core/lib/gprpp/status_helper.h"
42 #include "src/core/lib/http/format_request.h"
43 #include "src/core/lib/http/parser.h"
44 #include "src/core/lib/iomgr/endpoint.h"
45 #include "src/core/lib/iomgr/iomgr_internal.h"
46 #include "src/core/lib/iomgr/pollset_set.h"
47 #include "src/core/lib/iomgr/resolve_address.h"
48 #include "src/core/lib/resource_quota/api.h"
49 #include "src/core/lib/security/credentials/credentials.h"
50 #include "src/core/lib/security/security_connector/security_connector.h"
51 #include "src/core/lib/slice/slice.h"
52 #include "src/core/lib/transport/error_utils.h"
53 #include "src/core/lib/transport/handshaker_registry.h"
54 #include "src/core/lib/transport/tcp_connect_handshaker.h"
55
56 namespace grpc_core {
57
58 namespace {
59
60 grpc_httpcli_get_override g_get_override;
61 grpc_httpcli_post_override g_post_override;
62 grpc_httpcli_put_override g_put_override;
63 void (*g_test_only_on_handshake_done_intercept)(HttpRequest* req);
64
65 } // namespace
66
Get(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)67 OrphanablePtr<HttpRequest> HttpRequest::Get(
68 URI uri, const grpc_channel_args* channel_args,
69 grpc_polling_entity* pollent, const grpc_http_request* request,
70 Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
71 RefCountedPtr<grpc_channel_credentials> channel_creds) {
72 absl::optional<std::function<void()>> test_only_generate_response;
73 if (g_get_override != nullptr) {
74 test_only_generate_response = [request, uri, deadline, on_done,
75 response]() {
76 // Note that capturing request here assumes it will remain alive
77 // until after Start is called. This avoids making a copy as this
78 // code path is only used for test mocks.
79 g_get_override(request, uri.authority().c_str(), uri.path().c_str(),
80 deadline, on_done, response);
81 };
82 }
83 std::string name =
84 absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path());
85 const grpc_slice request_text = grpc_httpcli_format_get_request(
86 request, uri.authority().c_str(), uri.path().c_str());
87 return MakeOrphanable<HttpRequest>(
88 std::move(uri), request_text, response, deadline, channel_args, on_done,
89 pollent, name.c_str(), std::move(test_only_generate_response),
90 std::move(channel_creds));
91 }
92
Post(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)93 OrphanablePtr<HttpRequest> HttpRequest::Post(
94 URI uri, const grpc_channel_args* channel_args,
95 grpc_polling_entity* pollent, const grpc_http_request* request,
96 Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
97 RefCountedPtr<grpc_channel_credentials> channel_creds) {
98 absl::optional<std::function<void()>> test_only_generate_response;
99 if (g_post_override != nullptr) {
100 test_only_generate_response = [request, uri, deadline, on_done,
101 response]() {
102 g_post_override(request, uri.authority().c_str(), uri.path().c_str(),
103 request->body, request->body_length, deadline, on_done,
104 response);
105 };
106 }
107 std::string name =
108 absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path());
109 const grpc_slice request_text = grpc_httpcli_format_post_request(
110 request, uri.authority().c_str(), uri.path().c_str());
111 return MakeOrphanable<HttpRequest>(
112 std::move(uri), request_text, response, deadline, channel_args, on_done,
113 pollent, name.c_str(), std::move(test_only_generate_response),
114 std::move(channel_creds));
115 }
116
Put(URI uri,const grpc_channel_args * channel_args,grpc_polling_entity * pollent,const grpc_http_request * request,Timestamp deadline,grpc_closure * on_done,grpc_http_response * response,RefCountedPtr<grpc_channel_credentials> channel_creds)117 OrphanablePtr<HttpRequest> HttpRequest::Put(
118 URI uri, const grpc_channel_args* channel_args,
119 grpc_polling_entity* pollent, const grpc_http_request* request,
120 Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
121 RefCountedPtr<grpc_channel_credentials> channel_creds) {
122 absl::optional<std::function<void()>> test_only_generate_response;
123 if (g_put_override != nullptr) {
124 test_only_generate_response = [request, uri, deadline, on_done,
125 response]() {
126 g_put_override(request, uri.authority().c_str(), uri.path().c_str(),
127 request->body, request->body_length, deadline, on_done,
128 response);
129 };
130 }
131 std::string name =
132 absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path());
133 const grpc_slice request_text = grpc_httpcli_format_put_request(
134 request, uri.authority().c_str(), uri.path().c_str());
135 return MakeOrphanable<HttpRequest>(
136 std::move(uri), request_text, response, deadline, channel_args, on_done,
137 pollent, name.c_str(), std::move(test_only_generate_response),
138 std::move(channel_creds));
139 }
140
SetOverride(grpc_httpcli_get_override get,grpc_httpcli_post_override post,grpc_httpcli_put_override put)141 void HttpRequest::SetOverride(grpc_httpcli_get_override get,
142 grpc_httpcli_post_override post,
143 grpc_httpcli_put_override put) {
144 g_get_override = get;
145 g_post_override = post;
146 g_put_override = put;
147 }
148
TestOnlySetOnHandshakeDoneIntercept(void (* intercept)(HttpRequest * req))149 void HttpRequest::TestOnlySetOnHandshakeDoneIntercept(
150 void (*intercept)(HttpRequest* req)) {
151 g_test_only_on_handshake_done_intercept = intercept;
152 }
153
HttpRequest(URI uri,const grpc_slice & request_text,grpc_http_response * response,Timestamp deadline,const grpc_channel_args * channel_args,grpc_closure * on_done,grpc_polling_entity * pollent,const char * name,absl::optional<std::function<void ()>> test_only_generate_response,RefCountedPtr<grpc_channel_credentials> channel_creds)154 HttpRequest::HttpRequest(
155 URI uri, const grpc_slice& request_text, grpc_http_response* response,
156 Timestamp deadline, const grpc_channel_args* channel_args,
157 grpc_closure* on_done, grpc_polling_entity* pollent, const char* name,
158 absl::optional<std::function<void()>> test_only_generate_response,
159 RefCountedPtr<grpc_channel_credentials> channel_creds)
160 : uri_(std::move(uri)),
161 request_text_(request_text),
162 deadline_(deadline),
163 channel_args_(CoreConfiguration::Get()
164 .channel_args_preconditioning()
165 .PreconditionChannelArgs(channel_args)
166 .ToC()
167 .release()),
168 channel_creds_(std::move(channel_creds)),
169 on_done_(on_done),
170 resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)),
171 pollent_(pollent),
172 pollset_set_(grpc_pollset_set_create()),
173 test_only_generate_response_(std::move(test_only_generate_response)),
174 resolver_(GetDNSResolver()) {
175 grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
176 grpc_slice_buffer_init(&incoming_);
177 grpc_slice_buffer_init(&outgoing_);
178 grpc_iomgr_register_object(&iomgr_obj_, name);
179 GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
180 GRPC_CLOSURE_INIT(&continue_on_read_after_schedule_on_exec_ctx_,
181 ContinueOnReadAfterScheduleOnExecCtx, this,
182 grpc_schedule_on_exec_ctx);
183 GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
184 GRPC_CLOSURE_INIT(&continue_done_write_after_schedule_on_exec_ctx_,
185 ContinueDoneWriteAfterScheduleOnExecCtx, this,
186 grpc_schedule_on_exec_ctx);
187 GPR_ASSERT(pollent);
188 grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_);
189 }
190
~HttpRequest()191 HttpRequest::~HttpRequest() {
192 grpc_channel_args_destroy(channel_args_);
193 grpc_http_parser_destroy(&parser_);
194 if (own_endpoint_ && ep_ != nullptr) {
195 grpc_endpoint_destroy(ep_);
196 }
197 CSliceUnref(request_text_);
198 grpc_iomgr_unregister_object(&iomgr_obj_);
199 grpc_slice_buffer_destroy(&incoming_);
200 grpc_slice_buffer_destroy(&outgoing_);
201 grpc_pollset_set_destroy(pollset_set_);
202 }
203
Start()204 void HttpRequest::Start() {
205 MutexLock lock(&mu_);
206 if (test_only_generate_response_.has_value()) {
207 test_only_generate_response_.value()();
208 return;
209 }
210 Ref().release(); // ref held by pending DNS resolution
211 dns_request_handle_ = resolver_->LookupHostname(
212 absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(),
213 uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_,
214 /*name_server=*/"");
215 }
216
Orphan()217 void HttpRequest::Orphan() {
218 {
219 MutexLock lock(&mu_);
220 GPR_ASSERT(!cancelled_);
221 cancelled_ = true;
222 // cancel potentially pending DNS resolution.
223 if (dns_request_handle_.has_value() &&
224 resolver_->Cancel(dns_request_handle_.value())) {
225 Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
226 Unref();
227 }
228 if (handshake_mgr_ != nullptr) {
229 // Shutdown will cancel any ongoing tcp connect.
230 handshake_mgr_->Shutdown(
231 GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
232 }
233 if (own_endpoint_ && ep_ != nullptr) {
234 grpc_endpoint_shutdown(ep_, GRPC_ERROR_CREATE("HTTP request cancelled"));
235 }
236 }
237 Unref();
238 }
239
AppendError(grpc_error_handle error)240 void HttpRequest::AppendError(grpc_error_handle error) {
241 if (overall_error_.ok()) {
242 overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
243 }
244 const grpc_resolved_address* addr = &addresses_[next_address_ - 1];
245 auto addr_text = grpc_sockaddr_to_uri(addr);
246 overall_error_ = grpc_error_add_child(
247 overall_error_,
248 grpc_error_set_str(
249 error, StatusStrProperty::kTargetAddress,
250 addr_text.ok() ? addr_text.value() : addr_text.status().ToString()));
251 }
252
OnReadInternal(grpc_error_handle error)253 void HttpRequest::OnReadInternal(grpc_error_handle error) {
254 for (size_t i = 0; i < incoming_.count; i++) {
255 if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
256 have_read_byte_ = 1;
257 grpc_error_handle err =
258 grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
259 if (!err.ok()) {
260 Finish(err);
261 return;
262 }
263 }
264 }
265 if (cancelled_) {
266 Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP1 request cancelled during read",
267 &overall_error_, 1));
268 } else if (error.ok()) {
269 DoRead();
270 } else if (!have_read_byte_) {
271 NextAddress(error);
272 } else {
273 Finish(grpc_http_parser_eof(&parser_));
274 }
275 }
276
ContinueDoneWriteAfterScheduleOnExecCtx(void * arg,grpc_error_handle error)277 void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx(
278 void* arg, grpc_error_handle error) {
279 RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(arg));
280 MutexLock lock(&req->mu_);
281 if (error.ok() && !req->cancelled_) {
282 req->OnWritten();
283 } else {
284 req->NextAddress(error);
285 }
286 }
287
StartWrite()288 void HttpRequest::StartWrite() {
289 CSliceRef(request_text_);
290 grpc_slice_buffer_add(&outgoing_, request_text_);
291 Ref().release(); // ref held by pending write
292 grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr,
293 /*max_frame_size=*/INT_MAX);
294 }
295
OnHandshakeDone(void * arg,grpc_error_handle error)296 void HttpRequest::OnHandshakeDone(void* arg, grpc_error_handle error) {
297 auto* args = static_cast<HandshakerArgs*>(arg);
298 RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(args->user_data));
299 if (g_test_only_on_handshake_done_intercept != nullptr) {
300 // Run this testing intercept before the lock so that it has a chance to
301 // do things like calling Orphan on the request
302 g_test_only_on_handshake_done_intercept(req.get());
303 }
304 MutexLock lock(&req->mu_);
305 req->own_endpoint_ = true;
306 if (!error.ok()) {
307 req->handshake_mgr_.reset();
308 req->NextAddress(error);
309 return;
310 }
311 // Handshake completed, so we own fields in args
312 grpc_slice_buffer_destroy(args->read_buffer);
313 gpr_free(args->read_buffer);
314 req->ep_ = args->endpoint;
315 req->handshake_mgr_.reset();
316 if (req->cancelled_) {
317 req->NextAddress(
318 GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
319 return;
320 }
321 req->StartWrite();
322 }
323
DoHandshake(const grpc_resolved_address * addr)324 void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
325 // Create the security connector using the credentials and target name.
326 ChannelArgs args = ChannelArgs::FromC(channel_args_);
327 RefCountedPtr<grpc_channel_security_connector> sc =
328 channel_creds_->create_security_connector(
329 nullptr /*call_creds*/, uri_.authority().c_str(), &args);
330 if (sc == nullptr) {
331 Finish(GRPC_ERROR_CREATE_REFERENCING("failed to create security connector",
332 &overall_error_, 1));
333 return;
334 }
335 absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(addr);
336 if (!address.ok()) {
337 Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
338 &overall_error_, 1));
339 return;
340 }
341 args = args.SetObject(std::move(sc))
342 .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value());
343 // Start the handshake
344 handshake_mgr_ = MakeRefCounted<HandshakeManager>();
345 CoreConfiguration::Get().handshaker_registry().AddHandshakers(
346 HANDSHAKER_CLIENT, args, pollset_set_, handshake_mgr_.get());
347 Ref().release(); // ref held by pending handshake
348 grpc_endpoint* ep = ep_;
349 ep_ = nullptr;
350 own_endpoint_ = false;
351 handshake_mgr_->DoHandshake(ep, args, deadline_,
352 /*acceptor=*/nullptr, OnHandshakeDone,
353 /*user_data=*/this);
354 }
355
NextAddress(grpc_error_handle error)356 void HttpRequest::NextAddress(grpc_error_handle error) {
357 if (!error.ok()) {
358 AppendError(error);
359 }
360 if (cancelled_) {
361 Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP request was cancelled",
362 &overall_error_, 1));
363 return;
364 }
365 if (next_address_ == addresses_.size()) {
366 Finish(GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets",
367 &overall_error_, 1));
368 return;
369 }
370 const grpc_resolved_address* addr = &addresses_[next_address_++];
371 DoHandshake(addr);
372 }
373
OnResolved(absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or)374 void HttpRequest::OnResolved(
375 absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
376 RefCountedPtr<HttpRequest> unreffer(this);
377 MutexLock lock(&mu_);
378 dns_request_handle_.reset();
379 if (cancelled_) {
380 Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
381 return;
382 }
383 if (!addresses_or.ok()) {
384 Finish(absl_status_to_grpc_error(addresses_or.status()));
385 return;
386 }
387 addresses_ = std::move(*addresses_or);
388 next_address_ = 0;
389 NextAddress(absl::OkStatus());
390 }
391
392 } // namespace grpc_core
393