• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http_downstream_connection.h"
26 
27 #include "ssl_compat.h"
28 
29 #ifdef NGHTTP2_OPENSSL_IS_WOLFSSL
30 #  include <wolfssl/options.h>
31 #  include <wolfssl/openssl/rand.h>
32 #else // !NGHTTP2_OPENSSL_IS_WOLFSSL
33 #  include <openssl/rand.h>
34 #endif // !NGHTTP2_OPENSSL_IS_WOLFSSL
35 
36 #include "shrpx_client_handler.h"
37 #include "shrpx_upstream.h"
38 #include "shrpx_downstream.h"
39 #include "shrpx_config.h"
40 #include "shrpx_error.h"
41 #include "shrpx_http.h"
42 #include "shrpx_log_config.h"
43 #include "shrpx_connect_blocker.h"
44 #include "shrpx_downstream_connection_pool.h"
45 #include "shrpx_worker.h"
46 #include "shrpx_http2_session.h"
47 #include "shrpx_tls.h"
48 #include "shrpx_log.h"
49 #include "http2.h"
50 #include "util.h"
51 
52 using namespace nghttp2;
53 
54 namespace shrpx {
55 
56 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)57 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
58   auto conn = static_cast<Connection *>(w->data);
59   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
60 
61   if (w == &conn->rt && !conn->expired_rt()) {
62     return;
63   }
64 
65   if (LOG_ENABLED(INFO)) {
66     DCLOG(INFO, dconn) << "Time out";
67   }
68 
69   auto downstream = dconn->get_downstream();
70   auto upstream = downstream->get_upstream();
71   auto handler = upstream->get_client_handler();
72   auto &resp = downstream->response();
73 
74   // Do this so that dconn is not pooled
75   resp.connection_close = true;
76 
77   if (upstream->downstream_error(dconn, Downstream::EVENT_TIMEOUT) != 0) {
78     delete handler;
79   }
80 }
81 } // namespace
82 
83 namespace {
retry_downstream_connection(Downstream * downstream,unsigned int status_code)84 void retry_downstream_connection(Downstream *downstream,
85                                  unsigned int status_code) {
86   auto upstream = downstream->get_upstream();
87   auto handler = upstream->get_client_handler();
88 
89   assert(!downstream->get_request_header_sent());
90 
91   downstream->add_retry();
92 
93   if (downstream->no_more_retry()) {
94     delete handler;
95     return;
96   }
97 
98   downstream->pop_downstream_connection();
99   auto buf = downstream->get_request_buf();
100   buf->reset();
101 
102   int rv;
103 
104   for (;;) {
105     auto ndconn = handler->get_downstream_connection(rv, downstream);
106     if (!ndconn) {
107       break;
108     }
109     if (downstream->attach_downstream_connection(std::move(ndconn)) != 0) {
110       continue;
111     }
112     if (downstream->push_request_headers() == 0) {
113       return;
114     }
115   }
116 
117   downstream->set_request_state(DownstreamState::CONNECT_FAIL);
118 
119   if (rv == SHRPX_ERR_TLS_REQUIRED) {
120     rv = upstream->on_downstream_abort_request_with_https_redirect(downstream);
121   } else {
122     rv = upstream->on_downstream_abort_request(downstream, status_code);
123   }
124 
125   if (rv != 0) {
126     delete handler;
127   }
128 }
129 } // namespace
130 
131 namespace {
connect_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)132 void connect_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
133   auto conn = static_cast<Connection *>(w->data);
134   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
135   auto addr = dconn->get_addr();
136   auto raddr = dconn->get_raddr();
137 
138   DCLOG(WARN, dconn) << "Connect time out; addr="
139                      << util::to_numeric_addr(raddr);
140 
141   downstream_failure(addr, raddr);
142 
143   auto downstream = dconn->get_downstream();
144 
145   retry_downstream_connection(downstream, 504);
146 }
147 } // namespace
148 
149 namespace {
backend_retry(Downstream * downstream)150 void backend_retry(Downstream *downstream) {
151   retry_downstream_connection(downstream, 502);
152 }
153 } // namespace
154 
155 namespace {
readcb(struct ev_loop * loop,ev_io * w,int revents)156 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
157   int rv;
158   auto conn = static_cast<Connection *>(w->data);
159   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
160   auto downstream = dconn->get_downstream();
161   auto upstream = downstream->get_upstream();
162   auto handler = upstream->get_client_handler();
163 
164   rv = upstream->downstream_read(dconn);
165   if (rv != 0) {
166     if (rv == SHRPX_ERR_RETRY) {
167       backend_retry(downstream);
168       return;
169     }
170 
171     delete handler;
172   }
173 }
174 } // namespace
175 
176 namespace {
writecb(struct ev_loop * loop,ev_io * w,int revents)177 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
178   int rv;
179   auto conn = static_cast<Connection *>(w->data);
180   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
181   auto downstream = dconn->get_downstream();
182   auto upstream = downstream->get_upstream();
183   auto handler = upstream->get_client_handler();
184 
185   rv = upstream->downstream_write(dconn);
186   if (rv == SHRPX_ERR_RETRY) {
187     backend_retry(downstream);
188     return;
189   }
190 
191   if (rv != 0) {
192     delete handler;
193   }
194 }
195 } // namespace
196 
197 namespace {
connectcb(struct ev_loop * loop,ev_io * w,int revents)198 void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
199   auto conn = static_cast<Connection *>(w->data);
200   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
201   auto downstream = dconn->get_downstream();
202   if (dconn->connected() != 0) {
203     backend_retry(downstream);
204     return;
205   }
206   writecb(loop, w, revents);
207 }
208 } // namespace
209 
HttpDownstreamConnection(const std::shared_ptr<DownstreamAddrGroup> & group,DownstreamAddr * addr,struct ev_loop * loop,Worker * worker)210 HttpDownstreamConnection::HttpDownstreamConnection(
211   const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr,
212   struct ev_loop *loop, Worker *worker)
213   : conn_(loop, -1, nullptr, worker->get_mcpool(),
214           group->shared_addr->timeout.write, group->shared_addr->timeout.read,
215           {}, {}, connectcb, readcb, connect_timeoutcb, this,
216           get_config()->tls.dyn_rec.warmup_threshold,
217           get_config()->tls.dyn_rec.idle_timeout, Proto::HTTP1),
218     on_read_(&HttpDownstreamConnection::noop),
219     on_write_(&HttpDownstreamConnection::noop),
220     signal_write_(&HttpDownstreamConnection::noop),
221     worker_(worker),
222     ssl_ctx_(worker->get_cl_ssl_ctx()),
223     group_(group),
224     addr_(addr),
225     raddr_(nullptr),
226     ioctrl_(&conn_.rlimit),
227     response_htp_{0},
228     first_write_done_(false),
229     reusable_(true),
230     request_header_written_(false) {}
231 
~HttpDownstreamConnection()232 HttpDownstreamConnection::~HttpDownstreamConnection() {
233   if (LOG_ENABLED(INFO)) {
234     DCLOG(INFO, this) << "Deleted";
235   }
236 
237   if (dns_query_) {
238     auto dns_tracker = worker_->get_dns_tracker();
239     dns_tracker->cancel(dns_query_.get());
240   }
241 }
242 
attach_downstream(Downstream * downstream)243 int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
244   int rv;
245 
246   if (LOG_ENABLED(INFO)) {
247     DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream;
248   }
249 
250   downstream_ = downstream;
251 
252   rv = initiate_connection();
253   if (rv != 0) {
254     downstream_ = nullptr;
255     return rv;
256   }
257 
258   return 0;
259 }
260 
261 namespace {
262 int htp_msg_begincb(llhttp_t *htp);
263 int htp_hdr_keycb(llhttp_t *htp, const char *data, size_t len);
264 int htp_hdr_valcb(llhttp_t *htp, const char *data, size_t len);
265 int htp_hdrs_completecb(llhttp_t *htp);
266 int htp_bodycb(llhttp_t *htp, const char *data, size_t len);
267 int htp_msg_completecb(llhttp_t *htp);
268 } // namespace
269 
270 namespace {
271 constexpr llhttp_settings_t htp_hooks = {
272   htp_msg_begincb,     // llhttp_cb      on_message_begin;
273   nullptr,             // llhttp_data_cb on_url;
274   nullptr,             // llhttp_data_cb on_status;
275   nullptr,             // llhttp_data_cb on_method;
276   nullptr,             // llhttp_data_cb on_version;
277   htp_hdr_keycb,       // llhttp_data_cb on_header_field;
278   htp_hdr_valcb,       // llhttp_data_cb on_header_value;
279   nullptr,             // llhttp_data_cb on_chunk_extension_name;
280   nullptr,             // llhttp_data_cb on_chunk_extension_value;
281   htp_hdrs_completecb, // llhttp_cb      on_headers_complete;
282   htp_bodycb,          // llhttp_data_cb on_body;
283   htp_msg_completecb,  // llhttp_cb      on_message_complete;
284   nullptr,             // llhttp_cb      on_url_complete;
285   nullptr,             // llhttp_cb      on_status_complete;
286   nullptr,             // llhttp_cb      on_method_complete;
287   nullptr,             // llhttp_cb      on_version_complete;
288   nullptr,             // llhttp_cb      on_header_field_complete;
289   nullptr,             // llhttp_cb      on_header_value_complete;
290   nullptr,             // llhttp_cb      on_chunk_extension_name_complete;
291   nullptr,             // llhttp_cb      on_chunk_extension_value_complete;
292   nullptr,             // llhttp_cb      on_chunk_header;
293   nullptr,             // llhttp_cb      on_chunk_complete;
294   nullptr,             // llhttp_cb      on_reset;
295 };
296 } // namespace
297 
initiate_connection()298 int HttpDownstreamConnection::initiate_connection() {
299   int rv;
300 
301   auto worker_blocker = worker_->get_connect_blocker();
302   if (worker_blocker->blocked()) {
303     if (LOG_ENABLED(INFO)) {
304       DCLOG(INFO, this)
305         << "Worker wide backend connection was blocked temporarily";
306     }
307     return SHRPX_ERR_NETWORK;
308   }
309 
310   auto &downstreamconf = *worker_->get_downstream_config();
311 
312   if (conn_.fd == -1) {
313     auto check_dns_result = dns_query_.get() != nullptr;
314 
315     if (check_dns_result) {
316       assert(addr_->dns);
317     }
318 
319     auto &connect_blocker = addr_->connect_blocker;
320 
321     if (connect_blocker->blocked()) {
322       if (LOG_ENABLED(INFO)) {
323         DCLOG(INFO, this) << "Backend server " << addr_->host << ":"
324                           << addr_->port << " was not available temporarily";
325       }
326 
327       return SHRPX_ERR_NETWORK;
328     }
329 
330     Address *raddr;
331 
332     if (addr_->dns) {
333       if (!check_dns_result) {
334         auto dns_query = std::make_unique<DNSQuery>(
335           addr_->host, [this](DNSResolverStatus status, const Address *result) {
336             int rv;
337 
338             if (status == DNSResolverStatus::OK) {
339               *this->resolved_addr_ = *result;
340             }
341 
342             rv = this->initiate_connection();
343             if (rv != 0) {
344               // This callback destroys |this|.
345               auto downstream = this->downstream_;
346               backend_retry(downstream);
347             }
348           });
349 
350         auto dns_tracker = worker_->get_dns_tracker();
351 
352         if (!resolved_addr_) {
353           resolved_addr_ = std::make_unique<Address>();
354         }
355         switch (dns_tracker->resolve(resolved_addr_.get(), dns_query.get())) {
356         case DNSResolverStatus::ERROR:
357           downstream_failure(addr_, nullptr);
358           return SHRPX_ERR_NETWORK;
359         case DNSResolverStatus::RUNNING:
360           dns_query_ = std::move(dns_query);
361           return 0;
362         case DNSResolverStatus::OK:
363           break;
364         default:
365           assert(0);
366         }
367       } else {
368         switch (dns_query_->status) {
369         case DNSResolverStatus::ERROR:
370           dns_query_.reset();
371           downstream_failure(addr_, nullptr);
372           return SHRPX_ERR_NETWORK;
373         case DNSResolverStatus::OK:
374           dns_query_.reset();
375           break;
376         default:
377           assert(0);
378         }
379       }
380 
381       raddr = resolved_addr_.get();
382       util::set_port(*resolved_addr_, addr_->port);
383     } else {
384       raddr = &addr_->addr;
385     }
386 
387     conn_.fd = util::create_nonblock_socket(raddr->su.storage.ss_family);
388 
389     if (conn_.fd == -1) {
390       auto error = errno;
391       DCLOG(WARN, this) << "socket() failed; addr="
392                         << util::to_numeric_addr(raddr) << ", errno=" << error;
393 
394       worker_blocker->on_failure();
395 
396       return SHRPX_ERR_NETWORK;
397     }
398 
399     worker_blocker->on_success();
400 
401     rv = connect(conn_.fd, &raddr->su.sa, raddr->len);
402     if (rv != 0 && errno != EINPROGRESS) {
403       auto error = errno;
404       DCLOG(WARN, this) << "connect() failed; addr="
405                         << util::to_numeric_addr(raddr) << ", errno=" << error;
406 
407       downstream_failure(addr_, raddr);
408 
409       return SHRPX_ERR_NETWORK;
410     }
411 
412     if (LOG_ENABLED(INFO)) {
413       DCLOG(INFO, this) << "Connecting to downstream server";
414     }
415 
416     raddr_ = raddr;
417 
418     if (addr_->tls) {
419       assert(ssl_ctx_);
420 
421       auto ssl = tls::create_ssl(ssl_ctx_);
422       if (!ssl) {
423         return -1;
424       }
425 
426       tls::setup_downstream_http1_alpn(ssl);
427 
428       conn_.set_ssl(ssl);
429       conn_.tls.client_session_cache = &addr_->tls_session_cache;
430 
431       auto sni_name =
432         addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni};
433       if (!util::numeric_host(sni_name.data())) {
434         SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.data());
435       }
436 
437       auto session = tls::reuse_tls_session(addr_->tls_session_cache);
438       if (session) {
439         SSL_set_session(conn_.tls.ssl, session);
440         SSL_SESSION_free(session);
441       }
442 
443       conn_.prepare_client_handshake();
444     }
445 
446     ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
447     ev_io_set(&conn_.rev, conn_.fd, EV_READ);
448 
449     conn_.wlimit.startw();
450 
451     conn_.wt.repeat = downstreamconf.timeout.connect;
452     ev_timer_again(conn_.loop, &conn_.wt);
453   } else {
454     // we may set read timer cb to idle_timeoutcb.  Reset again.
455     ev_set_cb(&conn_.rt, timeoutcb);
456     if (conn_.read_timeout < group_->shared_addr->timeout.read) {
457       conn_.read_timeout = group_->shared_addr->timeout.read;
458       conn_.last_read = std::chrono::steady_clock::now();
459     } else {
460       conn_.again_rt(group_->shared_addr->timeout.read);
461     }
462 
463     ev_set_cb(&conn_.rev, readcb);
464 
465     on_write_ = &HttpDownstreamConnection::write_first;
466     first_write_done_ = false;
467     request_header_written_ = false;
468   }
469 
470   llhttp_init(&response_htp_, HTTP_RESPONSE, &htp_hooks);
471   response_htp_.data = downstream_;
472 
473   return 0;
474 }
475 
push_request_headers()476 int HttpDownstreamConnection::push_request_headers() {
477   if (request_header_written_) {
478     signal_write();
479     return 0;
480   }
481 
482   const auto &downstream_hostport = addr_->hostport;
483   const auto &req = downstream_->request();
484 
485   auto &balloc = downstream_->get_block_allocator();
486 
487   auto connect_method = req.regular_connect_method();
488 
489   auto config = get_config();
490   auto &httpconf = config->http;
491 
492   request_header_written_ = true;
493 
494   // For HTTP/1.0 request, there is no authority in request.  In that
495   // case, we use backend server's host nonetheless.
496   auto authority = StringRef(downstream_hostport);
497   auto no_host_rewrite =
498     httpconf.no_host_rewrite || config->http2_proxy || connect_method;
499 
500   if (no_host_rewrite && !req.authority.empty()) {
501     authority = req.authority;
502   }
503 
504   downstream_->set_request_downstream_host(authority);
505 
506   auto buf = downstream_->get_request_buf();
507 
508   // Assume that method and request path do not contain \r\n.
509   auto meth = http2::to_method_string(
510     req.connect_proto == ConnectProto::WEBSOCKET ? HTTP_GET : req.method);
511   buf->append(meth);
512   buf->append(' ');
513 
514   if (connect_method) {
515     buf->append(authority);
516   } else if (config->http2_proxy) {
517     // Construct absolute-form request target because we are going to
518     // send a request to a HTTP/1 proxy.
519     assert(!req.scheme.empty());
520     buf->append(req.scheme);
521     buf->append("://");
522     buf->append(authority);
523     buf->append(req.path);
524   } else if (req.method == HTTP_OPTIONS && req.path.empty()) {
525     // Server-wide OPTIONS
526     buf->append("*");
527   } else {
528     buf->append(req.path);
529   }
530   buf->append(" HTTP/1.1\r\nHost: ");
531   buf->append(authority);
532   buf->append("\r\n");
533 
534   auto &fwdconf = httpconf.forwarded;
535   auto &xffconf = httpconf.xff;
536   auto &xfpconf = httpconf.xfp;
537   auto &earlydataconf = httpconf.early_data;
538 
539   uint32_t build_flags =
540     (fwdconf.strip_incoming ? http2::HDOP_STRIP_FORWARDED : 0) |
541     (xffconf.strip_incoming ? http2::HDOP_STRIP_X_FORWARDED_FOR : 0) |
542     (xfpconf.strip_incoming ? http2::HDOP_STRIP_X_FORWARDED_PROTO : 0) |
543     (earlydataconf.strip_incoming ? http2::HDOP_STRIP_EARLY_DATA : 0) |
544     ((req.http_major == 3 || req.http_major == 2)
545        ? http2::HDOP_STRIP_SEC_WEBSOCKET_KEY
546        : 0);
547 
548   http2::build_http1_headers_from_headers(buf, req.fs.headers(), build_flags);
549 
550   auto cookie = downstream_->assemble_request_cookie();
551   if (!cookie.empty()) {
552     buf->append("Cookie: ");
553     buf->append(cookie);
554     buf->append("\r\n");
555   }
556 
557   // set transfer-encoding only when content-length is unknown and
558   // request body is expected.
559   if (req.method != HTTP_CONNECT && req.http2_expect_body &&
560       req.fs.content_length == -1) {
561     downstream_->set_chunked_request(true);
562     buf->append("Transfer-Encoding: chunked\r\n");
563   }
564 
565   if (req.connect_proto == ConnectProto::WEBSOCKET) {
566     if (req.http_major == 3 || req.http_major == 2) {
567       std::array<uint8_t, 16> nonce;
568       if (RAND_bytes(nonce.data(), nonce.size()) != 1) {
569         return -1;
570       }
571       auto iov = make_byte_ref(balloc, base64::encode_length(nonce.size()) + 1);
572       auto p =
573         base64::encode(std::begin(nonce), std::end(nonce), std::begin(iov));
574       *p = '\0';
575       auto key = StringRef{std::span{std::begin(iov), p}};
576       downstream_->set_ws_key(key);
577 
578       buf->append("Sec-Websocket-Key: ");
579       buf->append(key);
580       buf->append("\r\n");
581     }
582 
583     buf->append("Upgrade: websocket\r\nConnection: Upgrade\r\n");
584   } else if (!connect_method && req.upgrade_request) {
585     auto connection = req.fs.header(http2::HD_CONNECTION);
586     if (connection) {
587       buf->append("Connection: ");
588       buf->append((*connection).value);
589       buf->append("\r\n");
590     }
591 
592     auto upgrade = req.fs.header(http2::HD_UPGRADE);
593     if (upgrade) {
594       buf->append("Upgrade: ");
595       buf->append((*upgrade).value);
596       buf->append("\r\n");
597     }
598   } else if (req.connection_close) {
599     buf->append("Connection: close\r\n");
600   }
601 
602   auto upstream = downstream_->get_upstream();
603   auto handler = upstream->get_client_handler();
604 
605 #if defined(NGHTTP2_GENUINE_OPENSSL) ||                                        \
606   defined(NGHTTP2_OPENSSL_IS_BORINGSSL) || defined(NGHTTP2_OPENSSL_IS_WOLFSSL)
607   auto conn = handler->get_connection();
608 
609   if (conn->tls.ssl && !SSL_is_init_finished(conn->tls.ssl)) {
610     buf->append("Early-Data: 1\r\n");
611   }
612 #endif // NGHTTP2_GENUINE_OPENSSL || NGHTTP2_OPENSSL_IS_BORINGSSL ||
613        // NGHTTP2_OPENSSL_IS_WOLFSSL
614 
615   auto fwd =
616     fwdconf.strip_incoming ? nullptr : req.fs.header(http2::HD_FORWARDED);
617 
618   if (fwdconf.params) {
619     auto params = fwdconf.params;
620 
621     if (config->http2_proxy || connect_method) {
622       params &= ~FORWARDED_PROTO;
623     }
624 
625     auto value = http::create_forwarded(
626       balloc, params, handler->get_forwarded_by(), handler->get_forwarded_for(),
627       req.authority, req.scheme);
628 
629     if (fwd || !value.empty()) {
630       buf->append("Forwarded: ");
631       if (fwd) {
632         buf->append(fwd->value);
633 
634         if (!value.empty()) {
635           buf->append(", ");
636         }
637       }
638       buf->append(value);
639       buf->append("\r\n");
640     }
641   } else if (fwd) {
642     buf->append("Forwarded: ");
643     buf->append(fwd->value);
644     buf->append("\r\n");
645   }
646 
647   auto xff =
648     xffconf.strip_incoming ? nullptr : req.fs.header(http2::HD_X_FORWARDED_FOR);
649 
650   if (xffconf.add) {
651     buf->append("X-Forwarded-For: ");
652     if (xff) {
653       buf->append((*xff).value);
654       buf->append(", ");
655     }
656     buf->append(client_handler_->get_ipaddr());
657     buf->append("\r\n");
658   } else if (xff) {
659     buf->append("X-Forwarded-For: ");
660     buf->append((*xff).value);
661     buf->append("\r\n");
662   }
663   if (!config->http2_proxy && !connect_method) {
664     auto xfp = xfpconf.strip_incoming
665                  ? nullptr
666                  : req.fs.header(http2::HD_X_FORWARDED_PROTO);
667 
668     if (xfpconf.add) {
669       buf->append("X-Forwarded-Proto: ");
670       if (xfp) {
671         buf->append((*xfp).value);
672         buf->append(", ");
673       }
674       assert(!req.scheme.empty());
675       buf->append(req.scheme);
676       buf->append("\r\n");
677     } else if (xfp) {
678       buf->append("X-Forwarded-Proto: ");
679       buf->append((*xfp).value);
680       buf->append("\r\n");
681     }
682   }
683   auto via = req.fs.header(http2::HD_VIA);
684   if (httpconf.no_via) {
685     if (via) {
686       buf->append("Via: ");
687       buf->append((*via).value);
688       buf->append("\r\n");
689     }
690   } else {
691     buf->append("Via: ");
692     if (via) {
693       buf->append((*via).value);
694       buf->append(", ");
695     }
696     std::array<char, 16> viabuf;
697     auto end = http::create_via_header_value(viabuf.data(), req.http_major,
698                                              req.http_minor);
699     buf->append(viabuf.data(), end - viabuf.data());
700     buf->append("\r\n");
701   }
702 
703   for (auto &p : httpconf.add_request_headers) {
704     buf->append(p.name);
705     buf->append(": ");
706     buf->append(p.value);
707     buf->append("\r\n");
708   }
709 
710   buf->append("\r\n");
711 
712   if (LOG_ENABLED(INFO)) {
713     std::string nhdrs;
714     for (auto chunk = buf->head; chunk; chunk = chunk->next) {
715       nhdrs.append(chunk->pos, chunk->last);
716     }
717     if (log_config()->errorlog_tty) {
718       nhdrs = http::colorizeHeaders(nhdrs.c_str());
719     }
720     DCLOG(INFO, this) << "HTTP request headers. stream_id="
721                       << downstream_->get_stream_id() << "\n"
722                       << nhdrs;
723   }
724 
725   // Don't call signal_write() if we anticipate request body.  We call
726   // signal_write() when we received request body chunk, and it
727   // enables us to send headers and data in one writev system call.
728   if (req.method == HTTP_CONNECT ||
729       downstream_->get_blocked_request_buf()->rleft() ||
730       (!req.http2_expect_body && req.fs.content_length == 0) ||
731       downstream_->get_expect_100_continue()) {
732     signal_write();
733   }
734 
735   return 0;
736 }
737 
process_blocked_request_buf()738 int HttpDownstreamConnection::process_blocked_request_buf() {
739   auto src = downstream_->get_blocked_request_buf();
740 
741   if (src->rleft()) {
742     auto dest = downstream_->get_request_buf();
743     auto chunked = downstream_->get_chunked_request();
744     if (chunked) {
745       auto chunk_size_hex = util::utox(src->rleft());
746       dest->append(chunk_size_hex);
747       dest->append("\r\n");
748     }
749 
750     src->copy(*dest);
751 
752     if (chunked) {
753       dest->append("\r\n");
754     }
755   }
756 
757   if (downstream_->get_blocked_request_data_eof() &&
758       downstream_->get_chunked_request()) {
759     end_upload_data_chunk();
760   }
761 
762   return 0;
763 }
764 
push_upload_data_chunk(const uint8_t * data,size_t datalen)765 int HttpDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
766                                                      size_t datalen) {
767   if (!downstream_->get_request_header_sent()) {
768     auto output = downstream_->get_blocked_request_buf();
769     auto &req = downstream_->request();
770     output->append(data, datalen);
771     req.unconsumed_body_length += datalen;
772     if (request_header_written_) {
773       signal_write();
774     }
775     return 0;
776   }
777 
778   auto chunked = downstream_->get_chunked_request();
779   auto output = downstream_->get_request_buf();
780 
781   if (chunked) {
782     auto chunk_size_hex = util::utox(datalen);
783     output->append(chunk_size_hex);
784     output->append("\r\n");
785   }
786 
787   output->append(data, datalen);
788 
789   if (chunked) {
790     output->append("\r\n");
791   }
792 
793   signal_write();
794 
795   return 0;
796 }
797 
end_upload_data()798 int HttpDownstreamConnection::end_upload_data() {
799   if (!downstream_->get_request_header_sent()) {
800     downstream_->set_blocked_request_data_eof(true);
801     if (request_header_written_) {
802       signal_write();
803     }
804     return 0;
805   }
806 
807   signal_write();
808 
809   if (!downstream_->get_chunked_request()) {
810     return 0;
811   }
812 
813   end_upload_data_chunk();
814 
815   return 0;
816 }
817 
end_upload_data_chunk()818 void HttpDownstreamConnection::end_upload_data_chunk() {
819   const auto &req = downstream_->request();
820 
821   auto output = downstream_->get_request_buf();
822   const auto &trailers = req.fs.trailers();
823   if (trailers.empty()) {
824     output->append("0\r\n\r\n");
825   } else {
826     output->append("0\r\n");
827     http2::build_http1_headers_from_headers(output, trailers,
828                                             http2::HDOP_STRIP_ALL);
829     output->append("\r\n");
830   }
831 }
832 
833 namespace {
remove_from_pool(HttpDownstreamConnection * dconn)834 void remove_from_pool(HttpDownstreamConnection *dconn) {
835   auto addr = dconn->get_addr();
836   auto &dconn_pool = addr->dconn_pool;
837   dconn_pool->remove_downstream_connection(dconn);
838 }
839 } // namespace
840 
841 namespace {
idle_readcb(struct ev_loop * loop,ev_io * w,int revents)842 void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) {
843   auto conn = static_cast<Connection *>(w->data);
844   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
845   if (LOG_ENABLED(INFO)) {
846     DCLOG(INFO, dconn) << "Idle connection EOF";
847   }
848 
849   remove_from_pool(dconn);
850   // dconn was deleted
851 }
852 } // namespace
853 
854 namespace {
idle_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)855 void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
856   auto conn = static_cast<Connection *>(w->data);
857   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
858 
859   if (w == &conn->rt && !conn->expired_rt()) {
860     return;
861   }
862 
863   if (LOG_ENABLED(INFO)) {
864     DCLOG(INFO, dconn) << "Idle connection timeout";
865   }
866 
867   remove_from_pool(dconn);
868   // dconn was deleted
869 }
870 } // namespace
871 
detach_downstream(Downstream * downstream)872 void HttpDownstreamConnection::detach_downstream(Downstream *downstream) {
873   if (LOG_ENABLED(INFO)) {
874     DCLOG(INFO, this) << "Detaching from DOWNSTREAM:" << downstream;
875   }
876   downstream_ = nullptr;
877 
878   ev_set_cb(&conn_.rev, idle_readcb);
879   ioctrl_.force_resume_read();
880 
881   auto &downstreamconf = *worker_->get_downstream_config();
882 
883   ev_set_cb(&conn_.rt, idle_timeoutcb);
884   if (conn_.read_timeout < downstreamconf.timeout.idle_read) {
885     conn_.read_timeout = downstreamconf.timeout.idle_read;
886     conn_.last_read = std::chrono::steady_clock::now();
887   } else {
888     conn_.again_rt(downstreamconf.timeout.idle_read);
889   }
890 
891   conn_.wlimit.stopw();
892   ev_timer_stop(conn_.loop, &conn_.wt);
893 }
894 
pause_read(IOCtrlReason reason)895 void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
896   ioctrl_.pause_read(reason);
897 }
898 
resume_read(IOCtrlReason reason,size_t consumed)899 int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
900                                           size_t consumed) {
901   auto &downstreamconf = *worker_->get_downstream_config();
902 
903   if (downstream_->get_response_buf()->rleft() <=
904       downstreamconf.request_buffer_size / 2) {
905     ioctrl_.resume_read(reason);
906   }
907 
908   return 0;
909 }
910 
force_resume_read()911 void HttpDownstreamConnection::force_resume_read() {
912   ioctrl_.force_resume_read();
913 }
914 
915 namespace {
htp_msg_begincb(llhttp_t * htp)916 int htp_msg_begincb(llhttp_t *htp) {
917   auto downstream = static_cast<Downstream *>(htp->data);
918 
919   if (downstream->get_response_state() != DownstreamState::INITIAL) {
920     return -1;
921   }
922 
923   return 0;
924 }
925 } // namespace
926 
927 namespace {
htp_hdrs_completecb(llhttp_t * htp)928 int htp_hdrs_completecb(llhttp_t *htp) {
929   auto downstream = static_cast<Downstream *>(htp->data);
930   auto upstream = downstream->get_upstream();
931   auto handler = upstream->get_client_handler();
932   const auto &req = downstream->request();
933   auto &resp = downstream->response();
934   int rv;
935 
936   auto &balloc = downstream->get_block_allocator();
937 
938   for (auto &kv : resp.fs.headers()) {
939     kv.value = util::rstrip(balloc, kv.value);
940 
941     if (kv.token == http2::HD_TRANSFER_ENCODING &&
942         !http2::check_transfer_encoding(kv.value)) {
943       return -1;
944     }
945   }
946 
947   auto config = get_config();
948   auto &loggingconf = config->logging;
949 
950   resp.http_status = htp->status_code;
951   resp.http_major = htp->http_major;
952   resp.http_minor = htp->http_minor;
953 
954   if (resp.http_major > 1 || req.http_minor > 1) {
955     resp.http_major = 1;
956     resp.http_minor = 1;
957     return -1;
958   }
959 
960   auto dconn = downstream->get_downstream_connection();
961 
962   downstream->set_downstream_addr_group(dconn->get_downstream_addr_group());
963   downstream->set_addr(dconn->get_addr());
964 
965   // Server MUST NOT send Transfer-Encoding with a status code 1xx or
966   // 204.  Also server MUST NOT send Transfer-Encoding with a status
967   // code 2xx to a CONNECT request.  Same holds true with
968   // Content-Length.
969   if (resp.http_status == 204) {
970     if (resp.fs.header(http2::HD_TRANSFER_ENCODING)) {
971       return -1;
972     }
973     // Some server send content-length: 0 for 204.  Until they get
974     // fixed, we accept, but ignore it.
975 
976     // Calling parse_content_length() detects duplicated
977     // content-length header fields.
978     if (resp.fs.parse_content_length() != 0) {
979       return -1;
980     }
981     if (resp.fs.content_length == 0) {
982       resp.fs.erase_content_length_and_transfer_encoding();
983     } else if (resp.fs.content_length != -1) {
984       return -1;
985     }
986   } else if (resp.http_status / 100 == 1 ||
987              (resp.http_status / 100 == 2 && req.method == HTTP_CONNECT)) {
988     // Server MUST NOT send Content-Length and Transfer-Encoding in
989     // these responses.
990     resp.fs.erase_content_length_and_transfer_encoding();
991   } else if (resp.fs.parse_content_length() != 0) {
992     downstream->set_response_state(DownstreamState::MSG_BAD_HEADER);
993     return -1;
994   }
995 
996   // Check upgrade before processing non-final response, since if
997   // upgrade succeeded, 101 response is treated as final in nghttpx.
998   downstream->check_upgrade_fulfilled_http1();
999 
1000   if (downstream->get_non_final_response()) {
1001     // Reset content-length because we reuse same Downstream for the
1002     // next response.
1003     resp.fs.content_length = -1;
1004     // For non-final response code, we just call
1005     // on_downstream_header_complete() without changing response
1006     // state.
1007     rv = upstream->on_downstream_header_complete(downstream);
1008 
1009     if (rv != 0) {
1010       return -1;
1011     }
1012 
1013     // Ignore response body for non-final response.
1014     return 1;
1015   }
1016 
1017   resp.connection_close = !llhttp_should_keep_alive(htp);
1018   downstream->set_response_state(DownstreamState::HEADER_COMPLETE);
1019   downstream->inspect_http1_response();
1020 
1021   if (htp->flags & F_CHUNKED) {
1022     downstream->set_chunked_response(true);
1023   }
1024 
1025   auto transfer_encoding = resp.fs.header(http2::HD_TRANSFER_ENCODING);
1026   if (transfer_encoding && !downstream->get_chunked_response()) {
1027     resp.connection_close = true;
1028   }
1029 
1030   if (downstream->get_upgraded()) {
1031     // content-length must be ignored for upgraded connection.
1032     resp.fs.content_length = -1;
1033     resp.connection_close = true;
1034     // transfer-encoding not applied to upgraded connection
1035     downstream->set_chunked_response(false);
1036   } else if (http2::legacy_http1(req.http_major, req.http_minor)) {
1037     if (resp.fs.content_length == -1) {
1038       resp.connection_close = true;
1039     }
1040     downstream->set_chunked_response(false);
1041   } else if (!downstream->expect_response_body()) {
1042     downstream->set_chunked_response(false);
1043   }
1044 
1045   if (loggingconf.access.write_early && downstream->accesslog_ready()) {
1046     handler->write_accesslog(downstream);
1047     downstream->set_accesslog_written(true);
1048   }
1049 
1050   if (upstream->on_downstream_header_complete(downstream) != 0) {
1051     return -1;
1052   }
1053 
1054   if (downstream->get_upgraded()) {
1055     // Upgrade complete, read until EOF in both ends
1056     if (upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0) != 0) {
1057       return -1;
1058     }
1059     downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
1060     if (LOG_ENABLED(INFO)) {
1061       LOG(INFO) << "HTTP upgrade success. stream_id="
1062                 << downstream->get_stream_id();
1063     }
1064   }
1065 
1066   // Ignore the response body. HEAD response may contain
1067   // Content-Length or Transfer-Encoding: chunked.  Some server send
1068   // 304 status code with nonzero Content-Length, but without response
1069   // body. See
1070   // https://tools.ietf.org/html/rfc7230#section-3.3
1071 
1072   // TODO It seems that the cases other than HEAD are handled by
1073   // llhttp.  Need test.
1074   return !http2::expect_response_body(req.method, resp.http_status);
1075 }
1076 } // namespace
1077 
1078 namespace {
ensure_header_field_buffer(const Downstream * downstream,const HttpConfig & httpconf,size_t len)1079 int ensure_header_field_buffer(const Downstream *downstream,
1080                                const HttpConfig &httpconf, size_t len) {
1081   auto &resp = downstream->response();
1082 
1083   if (resp.fs.buffer_size() + len > httpconf.response_header_field_buffer) {
1084     if (LOG_ENABLED(INFO)) {
1085       DLOG(INFO, downstream)
1086         << "Too large header header field size=" << resp.fs.buffer_size() + len;
1087     }
1088     return -1;
1089   }
1090 
1091   return 0;
1092 }
1093 } // namespace
1094 
1095 namespace {
ensure_max_header_fields(const Downstream * downstream,const HttpConfig & httpconf)1096 int ensure_max_header_fields(const Downstream *downstream,
1097                              const HttpConfig &httpconf) {
1098   auto &resp = downstream->response();
1099 
1100   if (resp.fs.num_fields() >= httpconf.max_response_header_fields) {
1101     if (LOG_ENABLED(INFO)) {
1102       DLOG(INFO, downstream)
1103         << "Too many header field num=" << resp.fs.num_fields() + 1;
1104     }
1105     return -1;
1106   }
1107 
1108   return 0;
1109 }
1110 } // namespace
1111 
1112 namespace {
htp_hdr_keycb(llhttp_t * htp,const char * data,size_t len)1113 int htp_hdr_keycb(llhttp_t *htp, const char *data, size_t len) {
1114   auto downstream = static_cast<Downstream *>(htp->data);
1115   auto &resp = downstream->response();
1116   auto &httpconf = get_config()->http;
1117 
1118   if (ensure_header_field_buffer(downstream, httpconf, len) != 0) {
1119     return -1;
1120   }
1121 
1122   if (downstream->get_response_state() == DownstreamState::INITIAL) {
1123     if (resp.fs.header_key_prev()) {
1124       resp.fs.append_last_header_key(data, len);
1125     } else {
1126       if (ensure_max_header_fields(downstream, httpconf) != 0) {
1127         return -1;
1128       }
1129       resp.fs.alloc_add_header_name(StringRef{data, len});
1130     }
1131   } else {
1132     // trailer part
1133     if (resp.fs.trailer_key_prev()) {
1134       resp.fs.append_last_trailer_key(data, len);
1135     } else {
1136       if (ensure_max_header_fields(downstream, httpconf) != 0) {
1137         // Could not ignore this trailer field easily, since we may
1138         // get its value in htp_hdr_valcb, and it will be added to
1139         // wrong place or crash if trailer fields are currently empty.
1140         return -1;
1141       }
1142       resp.fs.alloc_add_trailer_name(StringRef{data, len});
1143     }
1144   }
1145   return 0;
1146 }
1147 } // namespace
1148 
1149 namespace {
htp_hdr_valcb(llhttp_t * htp,const char * data,size_t len)1150 int htp_hdr_valcb(llhttp_t *htp, const char *data, size_t len) {
1151   auto downstream = static_cast<Downstream *>(htp->data);
1152   auto &resp = downstream->response();
1153   auto &httpconf = get_config()->http;
1154 
1155   if (ensure_header_field_buffer(downstream, httpconf, len) != 0) {
1156     return -1;
1157   }
1158 
1159   if (downstream->get_response_state() == DownstreamState::INITIAL) {
1160     resp.fs.append_last_header_value(data, len);
1161   } else {
1162     resp.fs.append_last_trailer_value(data, len);
1163   }
1164   return 0;
1165 }
1166 } // namespace
1167 
1168 namespace {
htp_bodycb(llhttp_t * htp,const char * data,size_t len)1169 int htp_bodycb(llhttp_t *htp, const char *data, size_t len) {
1170   auto downstream = static_cast<Downstream *>(htp->data);
1171   auto &resp = downstream->response();
1172 
1173   resp.recv_body_length += len;
1174 
1175   return downstream->get_upstream()->on_downstream_body(
1176     downstream, reinterpret_cast<const uint8_t *>(data), len, true);
1177 }
1178 } // namespace
1179 
1180 namespace {
htp_msg_completecb(llhttp_t * htp)1181 int htp_msg_completecb(llhttp_t *htp) {
1182   auto downstream = static_cast<Downstream *>(htp->data);
1183   auto &resp = downstream->response();
1184   auto &balloc = downstream->get_block_allocator();
1185 
1186   for (auto &kv : resp.fs.trailers()) {
1187     kv.value = util::rstrip(balloc, kv.value);
1188   }
1189 
1190   // llhttp does not treat "200 connection established" response
1191   // against CONNECT request, and in that case, this function is not
1192   // called.  But if HTTP Upgrade is made (e.g., WebSocket), this
1193   // function is called, and llhttp_execute() returns just after that.
1194   if (downstream->get_upgraded()) {
1195     return 0;
1196   }
1197 
1198   if (downstream->get_non_final_response()) {
1199     downstream->reset_response();
1200 
1201     return 0;
1202   }
1203 
1204   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1205   // Block reading another response message from (broken?)
1206   // server. This callback is not called if the connection is
1207   // tunneled.
1208   downstream->pause_read(SHRPX_MSG_BLOCK);
1209   return downstream->get_upstream()->on_downstream_body_complete(downstream);
1210 }
1211 } // namespace
1212 
write_first()1213 int HttpDownstreamConnection::write_first() {
1214   int rv;
1215 
1216   process_blocked_request_buf();
1217 
1218   if (conn_.tls.ssl) {
1219     rv = write_tls();
1220   } else {
1221     rv = write_clear();
1222   }
1223 
1224   if (rv != 0) {
1225     return SHRPX_ERR_RETRY;
1226   }
1227 
1228   if (conn_.tls.ssl) {
1229     on_write_ = &HttpDownstreamConnection::write_tls;
1230   } else {
1231     on_write_ = &HttpDownstreamConnection::write_clear;
1232   }
1233 
1234   first_write_done_ = true;
1235   downstream_->set_request_header_sent(true);
1236 
1237   auto buf = downstream_->get_blocked_request_buf();
1238   buf->reset();
1239 
1240   // upstream->resume_read() might be called in
1241   // write_tls()/write_clear(), but before blocked_request_buf_ is
1242   // reset.  So upstream read might still be blocked.  Let's do it
1243   // again here.
1244   auto input = downstream_->get_request_buf();
1245   if (input->rleft() == 0) {
1246     auto upstream = downstream_->get_upstream();
1247     auto &req = downstream_->request();
1248 
1249     upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
1250                           req.unconsumed_body_length);
1251   }
1252 
1253   return 0;
1254 }
1255 
read_clear()1256 int HttpDownstreamConnection::read_clear() {
1257   conn_.last_read = std::chrono::steady_clock::now();
1258 
1259   std::array<uint8_t, 16_k> buf;
1260   int rv;
1261 
1262   for (;;) {
1263     auto nread = conn_.read_clear(buf.data(), buf.size());
1264     if (nread == 0) {
1265       return 0;
1266     }
1267 
1268     if (nread < 0) {
1269       if (nread == SHRPX_ERR_EOF && !downstream_->get_upgraded()) {
1270         auto htperr = llhttp_finish(&response_htp_);
1271         if (htperr != HPE_OK) {
1272           if (LOG_ENABLED(INFO)) {
1273             DCLOG(INFO, this) << "HTTP response ended prematurely: "
1274                               << llhttp_errno_name(htperr);
1275           }
1276 
1277           return -1;
1278         }
1279       }
1280 
1281       return nread;
1282     }
1283 
1284     rv = process_input(buf.data(), nread);
1285     if (rv != 0) {
1286       return rv;
1287     }
1288 
1289     if (!ev_is_active(&conn_.rev)) {
1290       return 0;
1291     }
1292   }
1293 }
1294 
write_clear()1295 int HttpDownstreamConnection::write_clear() {
1296   conn_.last_read = std::chrono::steady_clock::now();
1297 
1298   auto upstream = downstream_->get_upstream();
1299   auto input = downstream_->get_request_buf();
1300 
1301   std::array<struct iovec, MAX_WR_IOVCNT> iov;
1302 
1303   while (input->rleft() > 0) {
1304     auto iovcnt = input->riovec(iov.data(), iov.size());
1305 
1306     auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
1307 
1308     if (nwrite == 0) {
1309       return 0;
1310     }
1311 
1312     if (nwrite < 0) {
1313       if (!first_write_done_) {
1314         return nwrite;
1315       }
1316       // We may have pending data in receive buffer which may contain
1317       // part of response body.  So keep reading.  Invoke read event
1318       // to get read(2) error just in case.
1319       ev_feed_event(conn_.loop, &conn_.rev, EV_READ);
1320       on_write_ = &HttpDownstreamConnection::noop;
1321       reusable_ = false;
1322       break;
1323     }
1324 
1325     input->drain(nwrite);
1326   }
1327 
1328   conn_.wlimit.stopw();
1329   ev_timer_stop(conn_.loop, &conn_.wt);
1330 
1331   if (input->rleft() == 0) {
1332     auto &req = downstream_->request();
1333 
1334     upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
1335                           req.unconsumed_body_length);
1336   }
1337 
1338   return 0;
1339 }
1340 
tls_handshake()1341 int HttpDownstreamConnection::tls_handshake() {
1342   ERR_clear_error();
1343 
1344   conn_.last_read = std::chrono::steady_clock::now();
1345 
1346   auto rv = conn_.tls_handshake();
1347   if (rv == SHRPX_ERR_INPROGRESS) {
1348     return 0;
1349   }
1350 
1351   if (rv < 0) {
1352     downstream_failure(addr_, raddr_);
1353 
1354     return rv;
1355   }
1356 
1357   if (LOG_ENABLED(INFO)) {
1358     DCLOG(INFO, this) << "SSL/TLS handshake completed";
1359   }
1360 
1361   if (!get_config()->tls.insecure &&
1362       tls::check_cert(conn_.tls.ssl, addr_, raddr_) != 0) {
1363     downstream_failure(addr_, raddr_);
1364 
1365     return -1;
1366   }
1367 
1368   auto &connect_blocker = addr_->connect_blocker;
1369 
1370   signal_write_ = &HttpDownstreamConnection::actual_signal_write;
1371 
1372   connect_blocker->on_success();
1373 
1374   ev_set_cb(&conn_.rt, timeoutcb);
1375   ev_set_cb(&conn_.wt, timeoutcb);
1376 
1377   on_read_ = &HttpDownstreamConnection::read_tls;
1378   on_write_ = &HttpDownstreamConnection::write_first;
1379 
1380   // TODO Check negotiated ALPN
1381 
1382   return on_write();
1383 }
1384 
read_tls()1385 int HttpDownstreamConnection::read_tls() {
1386   conn_.last_read = std::chrono::steady_clock::now();
1387 
1388   ERR_clear_error();
1389 
1390   std::array<uint8_t, 16_k> buf;
1391   int rv;
1392 
1393   for (;;) {
1394     auto nread = conn_.read_tls(buf.data(), buf.size());
1395     if (nread == 0) {
1396       return 0;
1397     }
1398 
1399     if (nread < 0) {
1400       if (nread == SHRPX_ERR_EOF && !downstream_->get_upgraded()) {
1401         auto htperr = llhttp_finish(&response_htp_);
1402         if (htperr != HPE_OK) {
1403           if (LOG_ENABLED(INFO)) {
1404             DCLOG(INFO, this) << "HTTP response ended prematurely: "
1405                               << llhttp_errno_name(htperr);
1406           }
1407 
1408           return -1;
1409         }
1410       }
1411 
1412       return nread;
1413     }
1414 
1415     rv = process_input(buf.data(), nread);
1416     if (rv != 0) {
1417       return rv;
1418     }
1419 
1420     if (!ev_is_active(&conn_.rev)) {
1421       return 0;
1422     }
1423   }
1424 }
1425 
write_tls()1426 int HttpDownstreamConnection::write_tls() {
1427   conn_.last_read = std::chrono::steady_clock::now();
1428 
1429   ERR_clear_error();
1430 
1431   auto upstream = downstream_->get_upstream();
1432   auto input = downstream_->get_request_buf();
1433 
1434   struct iovec iov;
1435 
1436   while (input->rleft() > 0) {
1437     auto iovcnt = input->riovec(&iov, 1);
1438     if (iovcnt != 1) {
1439       assert(0);
1440       return -1;
1441     }
1442     auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
1443 
1444     if (nwrite == 0) {
1445       return 0;
1446     }
1447 
1448     if (nwrite < 0) {
1449       if (!first_write_done_) {
1450         return nwrite;
1451       }
1452       // We may have pending data in receive buffer which may contain
1453       // part of response body.  So keep reading.  Invoke read event
1454       // to get read(2) error just in case.
1455       ev_feed_event(conn_.loop, &conn_.rev, EV_READ);
1456       on_write_ = &HttpDownstreamConnection::noop;
1457       reusable_ = false;
1458       break;
1459     }
1460 
1461     input->drain(nwrite);
1462   }
1463 
1464   conn_.wlimit.stopw();
1465   ev_timer_stop(conn_.loop, &conn_.wt);
1466 
1467   if (input->rleft() == 0) {
1468     auto &req = downstream_->request();
1469 
1470     upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
1471                           req.unconsumed_body_length);
1472   }
1473 
1474   return 0;
1475 }
1476 
process_input(const uint8_t * data,size_t datalen)1477 int HttpDownstreamConnection::process_input(const uint8_t *data,
1478                                             size_t datalen) {
1479   int rv;
1480 
1481   if (downstream_->get_upgraded()) {
1482     // For upgraded connection, just pass data to the upstream.
1483     rv = downstream_->get_upstream()->on_downstream_body(downstream_, data,
1484                                                          datalen, true);
1485     if (rv != 0) {
1486       return rv;
1487     }
1488 
1489     if (downstream_->response_buf_full()) {
1490       downstream_->pause_read(SHRPX_NO_BUFFER);
1491       return 0;
1492     }
1493 
1494     return 0;
1495   }
1496 
1497   auto htperr = llhttp_execute(&response_htp_,
1498                                reinterpret_cast<const char *>(data), datalen);
1499   auto nproc = htperr == HPE_OK
1500                  ? datalen
1501                  : static_cast<size_t>(reinterpret_cast<const uint8_t *>(
1502                                          llhttp_get_error_pos(&response_htp_)) -
1503                                        data);
1504 
1505   if (htperr != HPE_OK &&
1506       (!downstream_->get_upgraded() || htperr != HPE_PAUSED_UPGRADE)) {
1507     // Handling early return (in other words, response was hijacked by
1508     // mruby scripting).
1509     if (downstream_->get_response_state() == DownstreamState::MSG_COMPLETE) {
1510       return SHRPX_ERR_DCONN_CANCELED;
1511     }
1512 
1513     if (LOG_ENABLED(INFO)) {
1514       DCLOG(INFO, this) << "HTTP parser failure: "
1515                         << "(" << llhttp_errno_name(htperr) << ") "
1516                         << llhttp_get_error_reason(&response_htp_);
1517     }
1518 
1519     return -1;
1520   }
1521 
1522   if (downstream_->get_upgraded()) {
1523     if (nproc < datalen) {
1524       // Data from data + nproc are for upgraded protocol.
1525       rv = downstream_->get_upstream()->on_downstream_body(
1526         downstream_, data + nproc, datalen - nproc, true);
1527       if (rv != 0) {
1528         return rv;
1529       }
1530 
1531       if (downstream_->response_buf_full()) {
1532         downstream_->pause_read(SHRPX_NO_BUFFER);
1533         return 0;
1534       }
1535     }
1536     return 0;
1537   }
1538 
1539   if (downstream_->response_buf_full()) {
1540     downstream_->pause_read(SHRPX_NO_BUFFER);
1541     return 0;
1542   }
1543 
1544   return 0;
1545 }
1546 
connected()1547 int HttpDownstreamConnection::connected() {
1548   auto &connect_blocker = addr_->connect_blocker;
1549 
1550   auto sock_error = util::get_socket_error(conn_.fd);
1551   if (sock_error != 0) {
1552     conn_.wlimit.stopw();
1553 
1554     DCLOG(WARN, this) << "Backend connect failed; addr="
1555                       << util::to_numeric_addr(raddr_)
1556                       << ": errno=" << sock_error;
1557 
1558     downstream_failure(addr_, raddr_);
1559 
1560     return -1;
1561   }
1562 
1563   if (LOG_ENABLED(INFO)) {
1564     DCLOG(INFO, this) << "Connected to downstream host";
1565   }
1566 
1567   // Reset timeout for write.  Previously, we set timeout for connect.
1568   conn_.wt.repeat = group_->shared_addr->timeout.write;
1569   ev_timer_again(conn_.loop, &conn_.wt);
1570 
1571   conn_.rlimit.startw();
1572   conn_.again_rt();
1573 
1574   ev_set_cb(&conn_.wev, writecb);
1575 
1576   if (conn_.tls.ssl) {
1577     on_read_ = &HttpDownstreamConnection::tls_handshake;
1578     on_write_ = &HttpDownstreamConnection::tls_handshake;
1579 
1580     return 0;
1581   }
1582 
1583   signal_write_ = &HttpDownstreamConnection::actual_signal_write;
1584 
1585   connect_blocker->on_success();
1586 
1587   ev_set_cb(&conn_.rt, timeoutcb);
1588   ev_set_cb(&conn_.wt, timeoutcb);
1589 
1590   on_read_ = &HttpDownstreamConnection::read_clear;
1591   on_write_ = &HttpDownstreamConnection::write_first;
1592 
1593   return 0;
1594 }
1595 
on_read()1596 int HttpDownstreamConnection::on_read() { return on_read_(*this); }
1597 
on_write()1598 int HttpDownstreamConnection::on_write() { return on_write_(*this); }
1599 
on_upstream_change(Upstream * upstream)1600 void HttpDownstreamConnection::on_upstream_change(Upstream *upstream) {}
1601 
signal_write()1602 void HttpDownstreamConnection::signal_write() { signal_write_(*this); }
1603 
actual_signal_write()1604 int HttpDownstreamConnection::actual_signal_write() {
1605   ev_feed_event(conn_.loop, &conn_.wev, EV_WRITE);
1606   return 0;
1607 }
1608 
noop()1609 int HttpDownstreamConnection::noop() { return 0; }
1610 
1611 const std::shared_ptr<DownstreamAddrGroup> &
get_downstream_addr_group() const1612 HttpDownstreamConnection::get_downstream_addr_group() const {
1613   return group_;
1614 }
1615 
get_addr() const1616 DownstreamAddr *HttpDownstreamConnection::get_addr() const { return addr_; }
1617 
poolable() const1618 bool HttpDownstreamConnection::poolable() const {
1619   return !group_->retired && reusable_;
1620 }
1621 
get_raddr() const1622 const Address *HttpDownstreamConnection::get_raddr() const { return raddr_; }
1623 
1624 } // namespace shrpx
1625