1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2012 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_client_handler.h"
26
27 #ifdef HAVE_UNISTD_H
28 # include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 #ifdef HAVE_SYS_SOCKET_H
31 # include <sys/socket.h>
32 #endif // HAVE_SYS_SOCKET_H
33 #ifdef HAVE_NETDB_H
34 # include <netdb.h>
35 #endif // HAVE_NETDB_H
36
37 #include <cerrno>
38
39 #include "shrpx_upstream.h"
40 #include "shrpx_http2_upstream.h"
41 #include "shrpx_https_upstream.h"
42 #include "shrpx_config.h"
43 #include "shrpx_http_downstream_connection.h"
44 #include "shrpx_http2_downstream_connection.h"
45 #include "shrpx_tls.h"
46 #include "shrpx_worker.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "shrpx_downstream.h"
49 #include "shrpx_http2_session.h"
50 #include "shrpx_connect_blocker.h"
51 #include "shrpx_api_downstream_connection.h"
52 #include "shrpx_health_monitor_downstream_connection.h"
53 #include "shrpx_null_downstream_connection.h"
54 #ifdef ENABLE_HTTP3
55 # include "shrpx_http3_upstream.h"
56 #endif // ENABLE_HTTP3
57 #include "shrpx_log.h"
58 #include "util.h"
59 #include "template.h"
60 #include "tls.h"
61
62 using namespace nghttp2;
63
64 namespace shrpx {
65
66 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)67 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
68 auto conn = static_cast<Connection *>(w->data);
69 auto handler = static_cast<ClientHandler *>(conn->data);
70
71 if (LOG_ENABLED(INFO)) {
72 CLOG(INFO, handler) << "Time out";
73 }
74
75 delete handler;
76 }
77 } // namespace
78
79 namespace {
shutdowncb(struct ev_loop * loop,ev_timer * w,int revents)80 void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) {
81 auto handler = static_cast<ClientHandler *>(w->data);
82
83 if (LOG_ENABLED(INFO)) {
84 CLOG(INFO, handler) << "Close connection due to TLS renegotiation";
85 }
86
87 delete handler;
88 }
89 } // namespace
90
91 namespace {
readcb(struct ev_loop * loop,ev_io * w,int revents)92 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
93 auto conn = static_cast<Connection *>(w->data);
94 auto handler = static_cast<ClientHandler *>(conn->data);
95
96 if (handler->do_read() != 0) {
97 delete handler;
98 return;
99 }
100 }
101 } // namespace
102
103 namespace {
writecb(struct ev_loop * loop,ev_io * w,int revents)104 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
105 auto conn = static_cast<Connection *>(w->data);
106 auto handler = static_cast<ClientHandler *>(conn->data);
107
108 if (handler->do_write() != 0) {
109 delete handler;
110 return;
111 }
112 }
113 } // namespace
114
noop()115 int ClientHandler::noop() { return 0; }
116
read_clear()117 int ClientHandler::read_clear() {
118 auto should_break = false;
119 rb_.ensure_chunk();
120 for (;;) {
121 if (rb_.rleft() && on_read() != 0) {
122 return -1;
123 }
124 if (rb_.rleft() == 0) {
125 rb_.reset();
126 } else if (rb_.wleft() == 0) {
127 conn_.rlimit.stopw();
128 return 0;
129 }
130
131 if (!ev_is_active(&conn_.rev) || should_break) {
132 return 0;
133 }
134
135 auto nread = conn_.read_clear(rb_.last(), rb_.wleft());
136
137 if (nread == 0) {
138 if (rb_.rleft() == 0) {
139 rb_.release_chunk();
140 }
141 return 0;
142 }
143
144 if (nread < 0) {
145 return -1;
146 }
147
148 rb_.write(nread);
149 should_break = true;
150 }
151 }
152
write_clear()153 int ClientHandler::write_clear() {
154 std::array<iovec, 2> iov;
155
156 for (;;) {
157 if (on_write() != 0) {
158 return -1;
159 }
160
161 auto iovcnt = upstream_->response_riovec(iov.data(), iov.size());
162 if (iovcnt == 0) {
163 break;
164 }
165
166 auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
167 if (nwrite < 0) {
168 return -1;
169 }
170
171 if (nwrite == 0) {
172 return 0;
173 }
174
175 upstream_->response_drain(nwrite);
176 }
177
178 conn_.wlimit.stopw();
179 ev_timer_stop(conn_.loop, &conn_.wt);
180
181 return 0;
182 }
183
proxy_protocol_peek_clear()184 int ClientHandler::proxy_protocol_peek_clear() {
185 rb_.ensure_chunk();
186
187 assert(rb_.rleft() == 0);
188
189 auto nread = conn_.peek_clear(rb_.last(), rb_.wleft());
190 if (nread < 0) {
191 return -1;
192 }
193 if (nread == 0) {
194 return 0;
195 }
196
197 if (LOG_ENABLED(INFO)) {
198 CLOG(INFO, this) << "PROXY-protocol: Peek " << nread
199 << " bytes from socket";
200 }
201
202 rb_.write(nread);
203
204 if (on_read() != 0) {
205 return -1;
206 }
207
208 rb_.reset();
209
210 return 0;
211 }
212
tls_handshake()213 int ClientHandler::tls_handshake() {
214 ev_timer_again(conn_.loop, &conn_.rt);
215
216 ERR_clear_error();
217
218 auto rv = conn_.tls_handshake();
219
220 if (rv == SHRPX_ERR_INPROGRESS) {
221 return 0;
222 }
223
224 if (rv < 0) {
225 return -1;
226 }
227
228 if (LOG_ENABLED(INFO)) {
229 CLOG(INFO, this) << "SSL/TLS handshake completed";
230 }
231
232 if (validate_next_proto() != 0) {
233 return -1;
234 }
235
236 read_ = &ClientHandler::read_tls;
237 write_ = &ClientHandler::write_tls;
238
239 return 0;
240 }
241
read_tls()242 int ClientHandler::read_tls() {
243 auto should_break = false;
244
245 ERR_clear_error();
246
247 rb_.ensure_chunk();
248
249 for (;;) {
250 // we should process buffered data first before we read EOF.
251 if (rb_.rleft() && on_read() != 0) {
252 return -1;
253 }
254 if (rb_.rleft() == 0) {
255 rb_.reset();
256 } else if (rb_.wleft() == 0) {
257 conn_.rlimit.stopw();
258 return 0;
259 }
260
261 if (!ev_is_active(&conn_.rev) || should_break) {
262 return 0;
263 }
264
265 auto nread = conn_.read_tls(rb_.last(), rb_.wleft());
266
267 if (nread == 0) {
268 if (rb_.rleft() == 0) {
269 rb_.release_chunk();
270 }
271 return 0;
272 }
273
274 if (nread < 0) {
275 return -1;
276 }
277
278 rb_.write(nread);
279 should_break = true;
280 }
281 }
282
write_tls()283 int ClientHandler::write_tls() {
284 struct iovec iov;
285
286 ERR_clear_error();
287
288 if (on_write() != 0) {
289 return -1;
290 }
291
292 auto iovcnt = upstream_->response_riovec(&iov, 1);
293 if (iovcnt == 0) {
294 conn_.start_tls_write_idle();
295
296 conn_.wlimit.stopw();
297 ev_timer_stop(conn_.loop, &conn_.wt);
298
299 return 0;
300 }
301
302 for (;;) {
303 auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
304 if (nwrite < 0) {
305 return -1;
306 }
307
308 if (nwrite == 0) {
309 return 0;
310 }
311
312 upstream_->response_drain(nwrite);
313
314 iovcnt = upstream_->response_riovec(&iov, 1);
315 if (iovcnt == 0) {
316 return 0;
317 }
318 }
319 }
320
321 #ifdef ENABLE_HTTP3
read_quic(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen)322 int ClientHandler::read_quic(const UpstreamAddr *faddr,
323 const Address &remote_addr,
324 const Address &local_addr,
325 const ngtcp2_pkt_info &pi, const uint8_t *data,
326 size_t datalen) {
327 auto upstream = static_cast<Http3Upstream *>(upstream_.get());
328
329 return upstream->on_read(faddr, remote_addr, local_addr, pi, data, datalen);
330 }
331
write_quic()332 int ClientHandler::write_quic() { return upstream_->on_write(); }
333 #endif // ENABLE_HTTP3
334
upstream_noop()335 int ClientHandler::upstream_noop() { return 0; }
336
upstream_read()337 int ClientHandler::upstream_read() {
338 assert(upstream_);
339 if (upstream_->on_read() != 0) {
340 return -1;
341 }
342 return 0;
343 }
344
upstream_write()345 int ClientHandler::upstream_write() {
346 assert(upstream_);
347 if (upstream_->on_write() != 0) {
348 return -1;
349 }
350
351 if (get_should_close_after_write() && upstream_->response_empty()) {
352 return -1;
353 }
354
355 return 0;
356 }
357
upstream_http2_connhd_read()358 int ClientHandler::upstream_http2_connhd_read() {
359 auto nread = std::min(left_connhd_len_, rb_.rleft());
360 if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
361 rb_.pos(), nread) != 0) {
362 // There is no downgrade path here. Just drop the connection.
363 if (LOG_ENABLED(INFO)) {
364 CLOG(INFO, this) << "invalid client connection header";
365 }
366
367 return -1;
368 }
369
370 left_connhd_len_ -= nread;
371 rb_.drain(nread);
372 conn_.rlimit.startw();
373
374 if (left_connhd_len_ == 0) {
375 on_read_ = &ClientHandler::upstream_read;
376 // Run on_read to process data left in buffer since they are not
377 // notified further
378 if (on_read() != 0) {
379 return -1;
380 }
381 return 0;
382 }
383
384 return 0;
385 }
386
upstream_http1_connhd_read()387 int ClientHandler::upstream_http1_connhd_read() {
388 auto nread = std::min(left_connhd_len_, rb_.rleft());
389 if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
390 rb_.pos(), nread) != 0) {
391 if (LOG_ENABLED(INFO)) {
392 CLOG(INFO, this) << "This is HTTP/1.1 connection, "
393 << "but may be upgraded to HTTP/2 later.";
394 }
395
396 // Reset header length for later HTTP/2 upgrade
397 left_connhd_len_ = NGHTTP2_CLIENT_MAGIC_LEN;
398 on_read_ = &ClientHandler::upstream_read;
399 on_write_ = &ClientHandler::upstream_write;
400
401 if (on_read() != 0) {
402 return -1;
403 }
404
405 return 0;
406 }
407
408 left_connhd_len_ -= nread;
409 rb_.drain(nread);
410 conn_.rlimit.startw();
411
412 if (left_connhd_len_ == 0) {
413 if (LOG_ENABLED(INFO)) {
414 CLOG(INFO, this) << "direct HTTP/2 connection";
415 }
416
417 direct_http2_upgrade();
418 on_read_ = &ClientHandler::upstream_read;
419 on_write_ = &ClientHandler::upstream_write;
420
421 // Run on_read to process data left in buffer since they are not
422 // notified further
423 if (on_read() != 0) {
424 return -1;
425 }
426
427 return 0;
428 }
429
430 return 0;
431 }
432
ClientHandler(Worker * worker,int fd,SSL * ssl,const StringRef & ipaddr,const StringRef & port,int family,const UpstreamAddr * faddr)433 ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
434 const StringRef &ipaddr, const StringRef &port,
435 int family, const UpstreamAddr *faddr)
436 : // We use balloc_ for TLS session ID (64), ipaddr (IPv6) (39),
437 // port (5), forwarded-for (IPv6) (41), alpn (5), proxyproto
438 // ipaddr (15), proxyproto port (5), sni (32, estimated). we
439 // need terminal NULL byte for each. We also require 8 bytes
440 // header for each allocation. We align at 16 bytes boundary,
441 // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
442 // 32 + 8 + 8 * 8 = 328.
443 balloc_(512, 512),
444 rb_(worker->get_mcpool()),
445 conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
446 get_config()->conn.upstream.timeout.write,
447 get_config()->conn.upstream.timeout.read,
448 get_config()->conn.upstream.ratelimit.write,
449 get_config()->conn.upstream.ratelimit.read, writecb, readcb,
450 timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
451 get_config()->tls.dyn_rec.idle_timeout,
452 faddr->quic ? Proto::HTTP3 : Proto::NONE),
453 ipaddr_(make_string_ref(balloc_, ipaddr)),
454 port_(make_string_ref(balloc_, port)),
455 faddr_(faddr),
456 worker_(worker),
457 left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
458 affinity_hash_(0),
459 should_close_after_write_(false),
460 affinity_hash_computed_(false) {
461
462 ++worker_->get_worker_stat()->num_connections;
463
464 ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.);
465
466 reneg_shutdown_timer_.data = this;
467
468 if (!faddr->quic) {
469 conn_.rlimit.startw();
470 }
471 ev_timer_again(conn_.loop, &conn_.rt);
472
473 auto config = get_config();
474
475 if (!faddr->quic) {
476 if (faddr_->accept_proxy_protocol ||
477 config->conn.upstream.accept_proxy_protocol) {
478 read_ = &ClientHandler::proxy_protocol_peek_clear;
479 write_ = &ClientHandler::noop;
480 on_read_ = &ClientHandler::proxy_protocol_read;
481 on_write_ = &ClientHandler::upstream_noop;
482 } else {
483 setup_upstream_io_callback();
484 }
485 }
486
487 auto &fwdconf = config->http.forwarded;
488
489 if (fwdconf.params & FORWARDED_FOR) {
490 if (fwdconf.for_node_type == ForwardedNode::OBFUSCATED) {
491 // 1 for '_'
492 auto len = SHRPX_OBFUSCATED_NODE_LENGTH + 1;
493 // 1 for terminating NUL.
494 auto buf = make_byte_ref(balloc_, len + 1);
495 auto p = buf.base;
496 *p++ = '_';
497 p = util::random_alpha_digit(p, p + SHRPX_OBFUSCATED_NODE_LENGTH,
498 worker_->get_randgen());
499 *p = '\0';
500
501 forwarded_for_ = StringRef{buf.base, p};
502 } else {
503 init_forwarded_for(family, ipaddr_);
504 }
505 }
506 }
507
init_forwarded_for(int family,const StringRef & ipaddr)508 void ClientHandler::init_forwarded_for(int family, const StringRef &ipaddr) {
509 if (family == AF_INET6) {
510 // 2 for '[' and ']'
511 auto len = 2 + ipaddr.size();
512 // 1 for terminating NUL.
513 auto buf = make_byte_ref(balloc_, len + 1);
514 auto p = buf.base;
515 *p++ = '[';
516 p = std::copy(std::begin(ipaddr), std::end(ipaddr), p);
517 *p++ = ']';
518 *p = '\0';
519
520 forwarded_for_ = StringRef{buf.base, p};
521 } else {
522 // family == AF_INET or family == AF_UNIX
523 forwarded_for_ = ipaddr;
524 }
525 }
526
setup_upstream_io_callback()527 void ClientHandler::setup_upstream_io_callback() {
528 if (conn_.tls.ssl) {
529 conn_.prepare_server_handshake();
530 read_ = write_ = &ClientHandler::tls_handshake;
531 on_read_ = &ClientHandler::upstream_noop;
532 on_write_ = &ClientHandler::upstream_write;
533 } else {
534 // For non-TLS version, first create HttpsUpstream. It may be
535 // upgraded to HTTP/2 through HTTP Upgrade or direct HTTP/2
536 // connection.
537 upstream_ = std::make_unique<HttpsUpstream>(this);
538 alpn_ = StringRef::from_lit("http/1.1");
539 read_ = &ClientHandler::read_clear;
540 write_ = &ClientHandler::write_clear;
541 on_read_ = &ClientHandler::upstream_http1_connhd_read;
542 on_write_ = &ClientHandler::upstream_noop;
543 }
544 }
545
546 #ifdef ENABLE_HTTP3
setup_http3_upstream(std::unique_ptr<Http3Upstream> && upstream)547 void ClientHandler::setup_http3_upstream(
548 std::unique_ptr<Http3Upstream> &&upstream) {
549 upstream_ = std::move(upstream);
550 write_ = &ClientHandler::write_quic;
551
552 auto config = get_config();
553
554 reset_upstream_read_timeout(config->conn.upstream.timeout.http3_read);
555 }
556 #endif // ENABLE_HTTP3
557
~ClientHandler()558 ClientHandler::~ClientHandler() {
559 if (LOG_ENABLED(INFO)) {
560 CLOG(INFO, this) << "Deleting";
561 }
562
563 if (upstream_) {
564 upstream_->on_handler_delete();
565 }
566
567 auto worker_stat = worker_->get_worker_stat();
568 --worker_stat->num_connections;
569
570 if (worker_stat->num_connections == 0) {
571 worker_->schedule_clear_mcpool();
572 }
573
574 ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
575
576 // TODO If backend is http/2, and it is in CONNECTED state, signal
577 // it and make it loopbreak when output is zero.
578 if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0 &&
579 worker_stat->num_close_waits == 0) {
580 ev_break(conn_.loop);
581 }
582
583 if (LOG_ENABLED(INFO)) {
584 CLOG(INFO, this) << "Deleted";
585 }
586 }
587
get_upstream()588 Upstream *ClientHandler::get_upstream() { return upstream_.get(); }
589
get_loop() const590 struct ev_loop *ClientHandler::get_loop() const {
591 return conn_.loop;
592 }
593
reset_upstream_read_timeout(ev_tstamp t)594 void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) {
595 conn_.rt.repeat = t;
596 if (ev_is_active(&conn_.rt)) {
597 ev_timer_again(conn_.loop, &conn_.rt);
598 }
599 }
600
reset_upstream_write_timeout(ev_tstamp t)601 void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) {
602 conn_.wt.repeat = t;
603 if (ev_is_active(&conn_.wt)) {
604 ev_timer_again(conn_.loop, &conn_.wt);
605 }
606 }
607
repeat_read_timer()608 void ClientHandler::repeat_read_timer() {
609 ev_timer_again(conn_.loop, &conn_.rt);
610 }
611
stop_read_timer()612 void ClientHandler::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); }
613
validate_next_proto()614 int ClientHandler::validate_next_proto() {
615 const unsigned char *next_proto = nullptr;
616 unsigned int next_proto_len = 0;
617
618 // First set callback for catch all cases
619 on_read_ = &ClientHandler::upstream_read;
620
621 #ifndef OPENSSL_NO_NEXTPROTONEG
622 SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
623 #endif // !OPENSSL_NO_NEXTPROTONEG
624 #if OPENSSL_VERSION_NUMBER >= 0x10002000L
625 if (next_proto == nullptr) {
626 SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
627 }
628 #endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
629
630 StringRef proto;
631
632 if (next_proto) {
633 proto = StringRef{next_proto, next_proto_len};
634
635 if (LOG_ENABLED(INFO)) {
636 CLOG(INFO, this) << "The negotiated next protocol: " << proto;
637 }
638 } else {
639 if (LOG_ENABLED(INFO)) {
640 CLOG(INFO, this) << "No protocol negotiated. Fallback to HTTP/1.1";
641 }
642
643 proto = StringRef::from_lit("http/1.1");
644 }
645
646 if (!tls::in_proto_list(get_config()->tls.npn_list, proto)) {
647 if (LOG_ENABLED(INFO)) {
648 CLOG(INFO, this) << "The negotiated protocol is not supported: " << proto;
649 }
650 return -1;
651 }
652
653 if (util::check_h2_is_selected(proto)) {
654 on_read_ = &ClientHandler::upstream_http2_connhd_read;
655
656 auto http2_upstream = std::make_unique<Http2Upstream>(this);
657
658 upstream_ = std::move(http2_upstream);
659 alpn_ = make_string_ref(balloc_, proto);
660
661 // At this point, input buffer is already filled with some bytes.
662 // The read callback is not called until new data come. So consume
663 // input buffer here.
664 if (on_read() != 0) {
665 return -1;
666 }
667
668 return 0;
669 }
670
671 if (proto == StringRef::from_lit("http/1.1")) {
672 upstream_ = std::make_unique<HttpsUpstream>(this);
673 alpn_ = StringRef::from_lit("http/1.1");
674
675 // At this point, input buffer is already filled with some bytes.
676 // The read callback is not called until new data come. So consume
677 // input buffer here.
678 if (on_read() != 0) {
679 return -1;
680 }
681
682 return 0;
683 }
684 if (LOG_ENABLED(INFO)) {
685 CLOG(INFO, this) << "The negotiated protocol is not supported";
686 }
687 return -1;
688 }
689
do_read()690 int ClientHandler::do_read() { return read_(*this); }
do_write()691 int ClientHandler::do_write() { return write_(*this); }
692
on_read()693 int ClientHandler::on_read() {
694 if (rb_.chunk_avail()) {
695 auto rv = on_read_(*this);
696 if (rv != 0) {
697 return rv;
698 }
699 }
700 conn_.handle_tls_pending_read();
701 return 0;
702 }
on_write()703 int ClientHandler::on_write() { return on_write_(*this); }
704
get_ipaddr() const705 const StringRef &ClientHandler::get_ipaddr() const { return ipaddr_; }
706
get_should_close_after_write() const707 bool ClientHandler::get_should_close_after_write() const {
708 return should_close_after_write_;
709 }
710
set_should_close_after_write(bool f)711 void ClientHandler::set_should_close_after_write(bool f) {
712 should_close_after_write_ = f;
713 }
714
pool_downstream_connection(std::unique_ptr<DownstreamConnection> dconn)715 void ClientHandler::pool_downstream_connection(
716 std::unique_ptr<DownstreamConnection> dconn) {
717 if (!dconn->poolable()) {
718 return;
719 }
720
721 dconn->set_client_handler(nullptr);
722
723 auto &group = dconn->get_downstream_addr_group();
724
725 if (LOG_ENABLED(INFO)) {
726 CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get()
727 << " in group " << group;
728 }
729
730 auto addr = dconn->get_addr();
731 auto &dconn_pool = addr->dconn_pool;
732 dconn_pool->add_downstream_connection(std::move(dconn));
733 }
734
735 namespace {
736 // Computes 32bits hash for session affinity for IP address |ip|.
compute_affinity_from_ip(const StringRef & ip)737 uint32_t compute_affinity_from_ip(const StringRef &ip) {
738 int rv;
739 std::array<uint8_t, 32> buf;
740
741 rv = util::sha256(buf.data(), ip);
742 if (rv != 0) {
743 // Not sure when sha256 failed. Just fall back to another
744 // function.
745 return util::hash32(ip);
746 }
747
748 return (static_cast<uint32_t>(buf[0]) << 24) |
749 (static_cast<uint32_t>(buf[1]) << 16) |
750 (static_cast<uint32_t>(buf[2]) << 8) | static_cast<uint32_t>(buf[3]);
751 }
752 } // namespace
753
get_http2_session(const std::shared_ptr<DownstreamAddrGroup> & group,DownstreamAddr * addr)754 Http2Session *ClientHandler::get_http2_session(
755 const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr) {
756 auto &shared_addr = group->shared_addr;
757
758 if (LOG_ENABLED(INFO)) {
759 CLOG(INFO, this) << "Selected DownstreamAddr=" << addr
760 << ", index=" << (addr - shared_addr->addrs.data());
761 }
762
763 for (auto session = addr->http2_extra_freelist.head; session;) {
764 auto next = session->dlnext;
765
766 if (session->max_concurrency_reached(0)) {
767 if (LOG_ENABLED(INFO)) {
768 CLOG(INFO, this)
769 << "Maximum streams have been reached for Http2Session(" << session
770 << "). Skip it";
771 }
772
773 session->remove_from_freelist();
774 session = next;
775
776 continue;
777 }
778
779 if (LOG_ENABLED(INFO)) {
780 CLOG(INFO, this) << "Use Http2Session " << session
781 << " from http2_extra_freelist";
782 }
783
784 if (session->max_concurrency_reached(1)) {
785 if (LOG_ENABLED(INFO)) {
786 CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
787 << session << ").";
788 }
789
790 session->remove_from_freelist();
791 }
792 return session;
793 }
794
795 auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
796 worker_, group, addr);
797
798 if (LOG_ENABLED(INFO)) {
799 CLOG(INFO, this) << "Create new Http2Session " << session;
800 }
801
802 session->add_to_extra_freelist();
803
804 return session;
805 }
806
get_affinity_cookie(Downstream * downstream,const StringRef & cookie_name)807 uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
808 const StringRef &cookie_name) {
809 auto h = downstream->find_affinity_cookie(cookie_name);
810 if (h) {
811 return h;
812 }
813
814 auto d = std::uniform_int_distribution<uint32_t>(1);
815 auto rh = d(worker_->get_randgen());
816 h = util::hash32(StringRef{reinterpret_cast<uint8_t *>(&rh),
817 reinterpret_cast<uint8_t *>(&rh) + sizeof(rh)});
818
819 downstream->renew_affinity_cookie(h);
820
821 return h;
822 }
823
824 namespace {
reschedule_addr(std::priority_queue<DownstreamAddrEntry,std::vector<DownstreamAddrEntry>,DownstreamAddrEntryGreater> & pq,DownstreamAddr * addr)825 void reschedule_addr(
826 std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
827 DownstreamAddrEntryGreater> &pq,
828 DownstreamAddr *addr) {
829 auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty;
830 addr->cycle += penalty / addr->weight;
831 addr->pending_penalty = penalty % addr->weight;
832
833 pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
834 addr->queued = true;
835 }
836 } // namespace
837
838 namespace {
reschedule_wg(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & pq,WeightGroup * wg)839 void reschedule_wg(
840 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
841 WeightGroupEntryGreater> &pq,
842 WeightGroup *wg) {
843 auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty;
844 wg->cycle += penalty / wg->weight;
845 wg->pending_penalty = penalty % wg->weight;
846
847 pq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
848 wg->queued = true;
849 }
850 } // namespace
851
get_downstream_addr(int & err,DownstreamAddrGroup * group,Downstream * downstream)852 DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
853 DownstreamAddrGroup *group,
854 Downstream *downstream) {
855 err = 0;
856
857 switch (faddr_->alt_mode) {
858 case UpstreamAltMode::API:
859 case UpstreamAltMode::HEALTHMON:
860 assert(0);
861 default:
862 break;
863 }
864
865 auto &shared_addr = group->shared_addr;
866
867 if (shared_addr->affinity.type != SessionAffinity::NONE) {
868 uint32_t hash;
869 switch (shared_addr->affinity.type) {
870 case SessionAffinity::IP:
871 if (!affinity_hash_computed_) {
872 affinity_hash_ = compute_affinity_from_ip(ipaddr_);
873 affinity_hash_computed_ = true;
874 }
875 hash = affinity_hash_;
876 break;
877 case SessionAffinity::COOKIE:
878 if (shared_addr->affinity.cookie.stickiness ==
879 SessionAffinityCookieStickiness::STRICT) {
880 return get_downstream_addr_strict_affinity(err, shared_addr,
881 downstream);
882 }
883
884 hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name);
885 break;
886 default:
887 assert(0);
888 }
889
890 const auto &affinity_hash = shared_addr->affinity_hash;
891
892 auto it = std::lower_bound(
893 std::begin(affinity_hash), std::end(affinity_hash), hash,
894 [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
895
896 if (it == std::end(affinity_hash)) {
897 it = std::begin(affinity_hash);
898 }
899
900 auto aff_idx =
901 static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
902 auto idx = (*it).idx;
903 auto addr = &shared_addr->addrs[idx];
904
905 if (addr->connect_blocker->blocked()) {
906 size_t i;
907 for (i = aff_idx + 1; i != aff_idx; ++i) {
908 if (i == shared_addr->affinity_hash.size()) {
909 i = 0;
910 }
911 addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
912 if (addr->connect_blocker->blocked()) {
913 continue;
914 }
915 break;
916 }
917 if (i == aff_idx) {
918 err = -1;
919 return nullptr;
920 }
921 }
922
923 return addr;
924 }
925
926 auto &wgpq = shared_addr->pq;
927
928 for (;;) {
929 if (wgpq.empty()) {
930 CLOG(INFO, this) << "No working downstream address found";
931 err = -1;
932 return nullptr;
933 }
934
935 auto wg = wgpq.top().wg;
936 wgpq.pop();
937 wg->queued = false;
938
939 for (;;) {
940 if (wg->pq.empty()) {
941 break;
942 }
943
944 auto addr = wg->pq.top().addr;
945 wg->pq.pop();
946 addr->queued = false;
947
948 if (addr->connect_blocker->blocked()) {
949 continue;
950 }
951
952 reschedule_addr(wg->pq, addr);
953 reschedule_wg(wgpq, wg);
954
955 return addr;
956 }
957 }
958 }
959
get_downstream_addr_strict_affinity(int & err,const std::shared_ptr<SharedDownstreamAddr> & shared_addr,Downstream * downstream)960 DownstreamAddr *ClientHandler::get_downstream_addr_strict_affinity(
961 int &err, const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
962 Downstream *downstream) {
963 const auto &affinity_hash = shared_addr->affinity_hash;
964
965 auto h = downstream->find_affinity_cookie(shared_addr->affinity.cookie.name);
966 if (h) {
967 auto it = shared_addr->affinity_hash_map.find(h);
968 if (it != std::end(shared_addr->affinity_hash_map)) {
969 auto addr = &shared_addr->addrs[(*it).second];
970 if (!addr->connect_blocker->blocked()) {
971 return addr;
972 }
973 }
974 } else {
975 auto d = std::uniform_int_distribution<uint32_t>(1);
976 auto rh = d(worker_->get_randgen());
977 h = util::hash32(StringRef{reinterpret_cast<uint8_t *>(&rh),
978 reinterpret_cast<uint8_t *>(&rh) + sizeof(rh)});
979 }
980
981 // Client is not bound to a particular backend, or the bound backend
982 // is not found, or is blocked. Find new backend using h. Using
983 // existing h allows us to find new server in a deterministic way.
984 // It is preferable because multiple concurrent requests with the
985 // stale cookie might be in-flight.
986 auto it = std::lower_bound(
987 std::begin(affinity_hash), std::end(affinity_hash), h,
988 [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
989
990 if (it == std::end(affinity_hash)) {
991 it = std::begin(affinity_hash);
992 }
993
994 auto aff_idx =
995 static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
996 auto idx = (*it).idx;
997 auto addr = &shared_addr->addrs[idx];
998
999 if (addr->connect_blocker->blocked()) {
1000 size_t i;
1001 for (i = aff_idx + 1; i != aff_idx; ++i) {
1002 if (i == shared_addr->affinity_hash.size()) {
1003 i = 0;
1004 }
1005 addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
1006 if (addr->connect_blocker->blocked()) {
1007 continue;
1008 }
1009 break;
1010 }
1011 if (i == aff_idx) {
1012 err = -1;
1013 return nullptr;
1014 }
1015 }
1016
1017 downstream->renew_affinity_cookie(addr->affinity_hash);
1018
1019 return addr;
1020 }
1021
1022 std::unique_ptr<DownstreamConnection>
get_downstream_connection(int & err,Downstream * downstream)1023 ClientHandler::get_downstream_connection(int &err, Downstream *downstream) {
1024 size_t group_idx;
1025 auto &downstreamconf = *worker_->get_downstream_config();
1026 auto &routerconf = downstreamconf.router;
1027
1028 auto catch_all = downstreamconf.addr_group_catch_all;
1029 auto &groups = worker_->get_downstream_addr_groups();
1030
1031 auto &req = downstream->request();
1032
1033 err = 0;
1034
1035 switch (faddr_->alt_mode) {
1036 case UpstreamAltMode::API: {
1037 auto dconn = std::make_unique<APIDownstreamConnection>(worker_);
1038 dconn->set_client_handler(this);
1039 return dconn;
1040 }
1041 case UpstreamAltMode::HEALTHMON: {
1042 auto dconn = std::make_unique<HealthMonitorDownstreamConnection>();
1043 dconn->set_client_handler(this);
1044 return dconn;
1045 }
1046 default:
1047 break;
1048 }
1049
1050 auto &balloc = downstream->get_block_allocator();
1051
1052 StringRef authority, path;
1053
1054 if (req.forwarded_once) {
1055 if (groups.size() != 1) {
1056 authority = req.orig_authority;
1057 path = req.orig_path;
1058 }
1059 } else {
1060 if (faddr_->sni_fwd) {
1061 authority = sni_;
1062 } else if (!req.authority.empty()) {
1063 authority = req.authority;
1064 } else {
1065 auto h = req.fs.header(http2::HD_HOST);
1066 if (h) {
1067 authority = h->value;
1068 }
1069 }
1070
1071 // CONNECT method does not have path. But we requires path in
1072 // host-path mapping. As workaround, we assume that path is
1073 // "/".
1074 if (!req.regular_connect_method()) {
1075 path = req.path;
1076 }
1077
1078 // Cache the authority and path used for the first-time backend
1079 // selection because per-pattern mruby script can change them.
1080 req.orig_authority = authority;
1081 req.orig_path = path;
1082 req.forwarded_once = true;
1083 }
1084
1085 // Fast path. If we have one group, it must be catch-all group.
1086 if (groups.size() == 1) {
1087 group_idx = 0;
1088 } else {
1089 group_idx = match_downstream_addr_group(routerconf, authority, path, groups,
1090 catch_all, balloc);
1091 }
1092
1093 if (LOG_ENABLED(INFO)) {
1094 CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
1095 }
1096
1097 if (groups[group_idx]->shared_addr->redirect_if_not_tls && !conn_.tls.ssl) {
1098 if (LOG_ENABLED(INFO)) {
1099 CLOG(INFO, this) << "Downstream address group " << group_idx
1100 << " requires frontend TLS connection.";
1101 }
1102 err = SHRPX_ERR_TLS_REQUIRED;
1103 return nullptr;
1104 }
1105
1106 auto &group = groups[group_idx];
1107
1108 if (group->shared_addr->dnf) {
1109 auto dconn = std::make_unique<NullDownstreamConnection>(group);
1110 dconn->set_client_handler(this);
1111 return dconn;
1112 }
1113
1114 auto addr = get_downstream_addr(err, group.get(), downstream);
1115 if (addr == nullptr) {
1116 return nullptr;
1117 }
1118
1119 if (addr->proto == Proto::HTTP1) {
1120 auto dconn = addr->dconn_pool->pop_downstream_connection();
1121 if (dconn) {
1122 dconn->set_client_handler(this);
1123 return dconn;
1124 }
1125
1126 if (worker_->get_connect_blocker()->blocked()) {
1127 if (LOG_ENABLED(INFO)) {
1128 DCLOG(INFO, this)
1129 << "Worker wide backend connection was blocked temporarily";
1130 }
1131 return nullptr;
1132 }
1133
1134 if (LOG_ENABLED(INFO)) {
1135 CLOG(INFO, this) << "Downstream connection pool is empty."
1136 << " Create new one";
1137 }
1138
1139 dconn = std::make_unique<HttpDownstreamConnection>(group, addr, conn_.loop,
1140 worker_);
1141 dconn->set_client_handler(this);
1142 return dconn;
1143 }
1144
1145 if (LOG_ENABLED(INFO)) {
1146 CLOG(INFO, this) << "Downstream connection pool is empty."
1147 << " Create new one";
1148 }
1149
1150 auto http2session = get_http2_session(group, addr);
1151 auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
1152 dconn->set_client_handler(this);
1153 return dconn;
1154 }
1155
get_mcpool()1156 MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); }
1157
get_ssl() const1158 SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
1159
direct_http2_upgrade()1160 void ClientHandler::direct_http2_upgrade() {
1161 upstream_ = std::make_unique<Http2Upstream>(this);
1162 alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1163 on_read_ = &ClientHandler::upstream_read;
1164 write_ = &ClientHandler::write_clear;
1165 }
1166
perform_http2_upgrade(HttpsUpstream * http)1167 int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
1168 auto upstream = std::make_unique<Http2Upstream>(this);
1169
1170 auto output = upstream->get_response_buf();
1171
1172 // We might have written non-final header in response_buf, in this
1173 // case, response_state is still INITIAL. If this non-final header
1174 // and upgrade header fit in output buffer, do upgrade. Otherwise,
1175 // to avoid to send this non-final header as response body in HTTP/2
1176 // upstream, fail upgrade.
1177 auto downstream = http->get_downstream();
1178 auto input = downstream->get_response_buf();
1179
1180 if (upstream->upgrade_upstream(http) != 0) {
1181 return -1;
1182 }
1183 // http pointer is now owned by upstream.
1184 upstream_.release();
1185 // TODO We might get other version id in HTTP2-settings, if we
1186 // support aliasing for h2, but we just use library default for now.
1187 alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1188 on_read_ = &ClientHandler::upstream_http2_connhd_read;
1189 write_ = &ClientHandler::write_clear;
1190
1191 input->remove(*output, input->rleft());
1192
1193 constexpr auto res =
1194 StringRef::from_lit("HTTP/1.1 101 Switching Protocols\r\n"
1195 "Connection: Upgrade\r\n"
1196 "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
1197 "\r\n");
1198
1199 output->append(res);
1200 upstream_ = std::move(upstream);
1201
1202 signal_write();
1203 return 0;
1204 }
1205
get_http2_upgrade_allowed() const1206 bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; }
1207
get_upstream_scheme() const1208 StringRef ClientHandler::get_upstream_scheme() const {
1209 if (conn_.tls.ssl) {
1210 return StringRef::from_lit("https");
1211 } else {
1212 return StringRef::from_lit("http");
1213 }
1214 }
1215
start_immediate_shutdown()1216 void ClientHandler::start_immediate_shutdown() {
1217 ev_timer_start(conn_.loop, &reneg_shutdown_timer_);
1218 }
1219
write_accesslog(Downstream * downstream)1220 void ClientHandler::write_accesslog(Downstream *downstream) {
1221 auto &req = downstream->request();
1222
1223 auto config = get_config();
1224
1225 if (!req.tstamp) {
1226 auto lgconf = log_config();
1227 lgconf->update_tstamp(std::chrono::system_clock::now());
1228 req.tstamp = lgconf->tstamp;
1229 }
1230
1231 upstream_accesslog(
1232 config->logging.access.format,
1233 LogSpec{
1234 downstream,
1235 ipaddr_,
1236 alpn_,
1237 sni_,
1238 conn_.tls.ssl,
1239 std::chrono::high_resolution_clock::now(), // request_end_time
1240 port_,
1241 faddr_->port,
1242 config->pid,
1243 });
1244 }
1245
get_rb()1246 ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
1247
signal_write()1248 void ClientHandler::signal_write() { conn_.wlimit.startw(); }
1249
get_rlimit()1250 RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; }
get_wlimit()1251 RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; }
1252
get_wev()1253 ev_io *ClientHandler::get_wev() { return &conn_.wev; }
1254
get_worker() const1255 Worker *ClientHandler::get_worker() const { return worker_; }
1256
1257 namespace {
parse_proxy_line_port(const uint8_t * first,const uint8_t * last)1258 ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {
1259 auto p = first;
1260 int32_t port = 0;
1261
1262 if (p == last) {
1263 return -1;
1264 }
1265
1266 if (*p == '0') {
1267 if (p + 1 != last && util::is_digit(*(p + 1))) {
1268 return -1;
1269 }
1270 return 1;
1271 }
1272
1273 for (; p != last && util::is_digit(*p); ++p) {
1274 port *= 10;
1275 port += *p - '0';
1276
1277 if (port > 65535) {
1278 return -1;
1279 }
1280 }
1281
1282 return p - first;
1283 }
1284 } // namespace
1285
on_proxy_protocol_finish()1286 int ClientHandler::on_proxy_protocol_finish() {
1287 auto len = rb_.pos() - rb_.begin();
1288
1289 assert(len);
1290
1291 if (LOG_ENABLED(INFO)) {
1292 CLOG(INFO, this) << "PROXY-protocol: Draining " << len
1293 << " bytes from socket";
1294 }
1295
1296 rb_.reset();
1297
1298 if (conn_.read_nolim_clear(rb_.pos(), len) < 0) {
1299 return -1;
1300 }
1301
1302 rb_.reset();
1303
1304 setup_upstream_io_callback();
1305
1306 return 0;
1307 }
1308
1309 namespace {
1310 // PROXY-protocol v2 header signature
1311 constexpr uint8_t PROXY_PROTO_V2_SIG[] =
1312 "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
1313
1314 // PROXY-protocol v2 header length
1315 constexpr size_t PROXY_PROTO_V2_HDLEN =
1316 str_size(PROXY_PROTO_V2_SIG) + /* ver_cmd(1) + fam(1) + len(2) = */ 4;
1317 } // namespace
1318
1319 // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
proxy_protocol_read()1320 int ClientHandler::proxy_protocol_read() {
1321 if (LOG_ENABLED(INFO)) {
1322 CLOG(INFO, this) << "PROXY-protocol: Started";
1323 }
1324
1325 auto first = rb_.pos();
1326
1327 if (rb_.rleft() >= PROXY_PROTO_V2_HDLEN &&
1328 (*(first + str_size(PROXY_PROTO_V2_SIG)) & 0xf0) == 0x20) {
1329 if (LOG_ENABLED(INFO)) {
1330 CLOG(INFO, this) << "PROXY-protocol: Detected v2 header signature";
1331 }
1332 return proxy_protocol_v2_read();
1333 }
1334
1335 // NULL character really destroys functions which expects NULL
1336 // terminated string. We won't expect it in PROXY protocol line, so
1337 // find it here.
1338 auto chrs = std::array<char, 2>{'\n', '\0'};
1339
1340 constexpr size_t MAX_PROXY_LINELEN = 107;
1341
1342 auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());
1343
1344 auto end =
1345 std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));
1346
1347 if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
1348 if (LOG_ENABLED(INFO)) {
1349 CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
1350 }
1351 return -1;
1352 }
1353
1354 --end;
1355
1356 constexpr auto HEADER = StringRef::from_lit("PROXY ");
1357
1358 if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
1359 if (LOG_ENABLED(INFO)) {
1360 CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
1361 }
1362 return -1;
1363 }
1364
1365 if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
1366 if (LOG_ENABLED(INFO)) {
1367 CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
1368 }
1369 return -1;
1370 }
1371
1372 rb_.drain(HEADER.size());
1373
1374 int family;
1375
1376 if (rb_.pos()[0] == 'T') {
1377 if (end - rb_.pos() < 5) {
1378 if (LOG_ENABLED(INFO)) {
1379 CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1380 }
1381 return -1;
1382 }
1383
1384 if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
1385 if (LOG_ENABLED(INFO)) {
1386 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1387 }
1388 return -1;
1389 }
1390
1391 switch (rb_.pos()[3]) {
1392 case '4':
1393 family = AF_INET;
1394 break;
1395 case '6':
1396 family = AF_INET6;
1397 break;
1398 default:
1399 if (LOG_ENABLED(INFO)) {
1400 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1401 }
1402 return -1;
1403 }
1404
1405 rb_.drain(5);
1406 } else {
1407 if (end - rb_.pos() < 7) {
1408 if (LOG_ENABLED(INFO)) {
1409 CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1410 }
1411 return -1;
1412 }
1413 if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
1414 if (LOG_ENABLED(INFO)) {
1415 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1416 }
1417 return -1;
1418 }
1419
1420 rb_.drain(end + 2 - rb_.pos());
1421
1422 return on_proxy_protocol_finish();
1423 }
1424
1425 // source address
1426 auto token_end = std::find(rb_.pos(), end, ' ');
1427 if (token_end == end) {
1428 if (LOG_ENABLED(INFO)) {
1429 CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
1430 }
1431 return -1;
1432 }
1433
1434 *token_end = '\0';
1435 if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1436 if (LOG_ENABLED(INFO)) {
1437 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
1438 }
1439 return -1;
1440 }
1441
1442 auto src_addr = rb_.pos();
1443 auto src_addrlen = token_end - rb_.pos();
1444
1445 rb_.drain(token_end - rb_.pos() + 1);
1446
1447 // destination address
1448 token_end = std::find(rb_.pos(), end, ' ');
1449 if (token_end == end) {
1450 if (LOG_ENABLED(INFO)) {
1451 CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
1452 }
1453 return -1;
1454 }
1455
1456 *token_end = '\0';
1457 if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1458 if (LOG_ENABLED(INFO)) {
1459 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
1460 }
1461 return -1;
1462 }
1463
1464 // Currently we don't use destination address
1465
1466 rb_.drain(token_end - rb_.pos() + 1);
1467
1468 // source port
1469 auto n = parse_proxy_line_port(rb_.pos(), end);
1470 if (n <= 0 || *(rb_.pos() + n) != ' ') {
1471 if (LOG_ENABLED(INFO)) {
1472 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
1473 }
1474 return -1;
1475 }
1476
1477 rb_.pos()[n] = '\0';
1478 auto src_port = rb_.pos();
1479 auto src_portlen = n;
1480
1481 rb_.drain(n + 1);
1482
1483 // destination port
1484 n = parse_proxy_line_port(rb_.pos(), end);
1485 if (n <= 0 || rb_.pos() + n != end) {
1486 if (LOG_ENABLED(INFO)) {
1487 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
1488 }
1489 return -1;
1490 }
1491
1492 // Currently we don't use destination port
1493
1494 rb_.drain(end + 2 - rb_.pos());
1495
1496 ipaddr_ =
1497 make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
1498 port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});
1499
1500 if (LOG_ENABLED(INFO)) {
1501 CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
1502 << " bytes read";
1503 }
1504
1505 auto config = get_config();
1506 auto &fwdconf = config->http.forwarded;
1507
1508 if ((fwdconf.params & FORWARDED_FOR) &&
1509 fwdconf.for_node_type == ForwardedNode::IP) {
1510 init_forwarded_for(family, ipaddr_);
1511 }
1512
1513 return on_proxy_protocol_finish();
1514 }
1515
proxy_protocol_v2_read()1516 int ClientHandler::proxy_protocol_v2_read() {
1517 // Assume that first str_size(PROXY_PROTO_V2_SIG) octets match v2
1518 // protocol signature and followed by the bytes which indicates v2.
1519 assert(rb_.rleft() >= PROXY_PROTO_V2_HDLEN);
1520
1521 auto p = rb_.pos() + str_size(PROXY_PROTO_V2_SIG);
1522
1523 assert(((*p) & 0xf0) == 0x20);
1524
1525 enum { LOCAL, PROXY } cmd;
1526
1527 auto cmd_bits = (*p++) & 0xf;
1528 switch (cmd_bits) {
1529 case 0x0:
1530 cmd = LOCAL;
1531 break;
1532 case 0x01:
1533 cmd = PROXY;
1534 break;
1535 default:
1536 if (LOG_ENABLED(INFO)) {
1537 CLOG(INFO, this) << "PROXY-protocol-v2: Unknown command " << log::hex
1538 << cmd_bits;
1539 }
1540 return -1;
1541 }
1542
1543 auto fam = *p++;
1544 uint16_t len;
1545 memcpy(&len, p, sizeof(len));
1546 len = ntohs(len);
1547
1548 p += sizeof(len);
1549
1550 if (LOG_ENABLED(INFO)) {
1551 CLOG(INFO, this) << "PROXY-protocol-v2: Detected family=" << log::hex << fam
1552 << ", len=" << log::dec << len;
1553 }
1554
1555 if (rb_.last() - p < len) {
1556 if (LOG_ENABLED(INFO)) {
1557 CLOG(INFO, this)
1558 << "PROXY-protocol-v2: Prematurely truncated header block; require "
1559 << len << " bytes, " << rb_.last() - p << " bytes left";
1560 }
1561 return -1;
1562 }
1563
1564 int family;
1565 std::array<char, std::max(INET_ADDRSTRLEN, INET6_ADDRSTRLEN)> src_addr,
1566 dst_addr;
1567 size_t addrlen;
1568
1569 switch (fam) {
1570 case 0x11:
1571 case 0x12:
1572 if (len < 12) {
1573 if (LOG_ENABLED(INFO)) {
1574 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET addresses";
1575 }
1576 return -1;
1577 }
1578 family = AF_INET;
1579 addrlen = 4;
1580 break;
1581 case 0x21:
1582 case 0x22:
1583 if (len < 36) {
1584 if (LOG_ENABLED(INFO)) {
1585 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET6 addresses";
1586 }
1587 return -1;
1588 }
1589 family = AF_INET6;
1590 addrlen = 16;
1591 break;
1592 case 0x31:
1593 case 0x32:
1594 if (len < 216) {
1595 if (LOG_ENABLED(INFO)) {
1596 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_UNIX addresses";
1597 }
1598 return -1;
1599 }
1600 // fall through
1601 case 0x00: {
1602 // UNSPEC and UNIX are just ignored.
1603 if (LOG_ENABLED(INFO)) {
1604 CLOG(INFO, this) << "PROXY-protocol-v2: Ignore combination of address "
1605 "family and protocol "
1606 << log::hex << fam;
1607 }
1608 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1609 return on_proxy_protocol_finish();
1610 }
1611 default:
1612 if (LOG_ENABLED(INFO)) {
1613 CLOG(INFO, this) << "PROXY-protocol-v2: Unknown combination of address "
1614 "family and protocol "
1615 << log::hex << fam;
1616 }
1617 return -1;
1618 }
1619
1620 if (cmd != PROXY) {
1621 if (LOG_ENABLED(INFO)) {
1622 CLOG(INFO, this) << "PROXY-protocol-v2: Ignore non-PROXY command";
1623 }
1624 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1625 return on_proxy_protocol_finish();
1626 }
1627
1628 if (inet_ntop(family, p, src_addr.data(), src_addr.size()) == nullptr) {
1629 if (LOG_ENABLED(INFO)) {
1630 CLOG(INFO, this) << "PROXY-protocol-v2: Unable to parse source address";
1631 }
1632 return -1;
1633 }
1634
1635 p += addrlen;
1636
1637 if (inet_ntop(family, p, dst_addr.data(), dst_addr.size()) == nullptr) {
1638 if (LOG_ENABLED(INFO)) {
1639 CLOG(INFO, this)
1640 << "PROXY-protocol-v2: Unable to parse destination address";
1641 }
1642 return -1;
1643 }
1644
1645 p += addrlen;
1646
1647 uint16_t src_port;
1648
1649 memcpy(&src_port, p, sizeof(src_port));
1650 src_port = ntohs(src_port);
1651
1652 // We don't use destination port.
1653 p += 4;
1654
1655 ipaddr_ = make_string_ref(balloc_, StringRef{src_addr.data()});
1656 port_ = util::make_string_ref_uint(balloc_, src_port);
1657
1658 if (LOG_ENABLED(INFO)) {
1659 CLOG(INFO, this) << "PROXY-protocol-v2: Finished reading proxy addresses, "
1660 << p - rb_.pos() << " bytes read, "
1661 << PROXY_PROTO_V2_HDLEN + len - (p - rb_.pos())
1662 << " bytes left";
1663 }
1664
1665 auto config = get_config();
1666 auto &fwdconf = config->http.forwarded;
1667
1668 if ((fwdconf.params & FORWARDED_FOR) &&
1669 fwdconf.for_node_type == ForwardedNode::IP) {
1670 init_forwarded_for(family, ipaddr_);
1671 }
1672
1673 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1674 return on_proxy_protocol_finish();
1675 }
1676
get_forwarded_by() const1677 StringRef ClientHandler::get_forwarded_by() const {
1678 auto &fwdconf = get_config()->http.forwarded;
1679
1680 if (fwdconf.by_node_type == ForwardedNode::OBFUSCATED) {
1681 return fwdconf.by_obfuscated;
1682 }
1683
1684 return faddr_->hostport;
1685 }
1686
get_forwarded_for() const1687 StringRef ClientHandler::get_forwarded_for() const { return forwarded_for_; }
1688
get_upstream_addr() const1689 const UpstreamAddr *ClientHandler::get_upstream_addr() const { return faddr_; }
1690
get_connection()1691 Connection *ClientHandler::get_connection() { return &conn_; };
1692
set_tls_sni(const StringRef & sni)1693 void ClientHandler::set_tls_sni(const StringRef &sni) {
1694 sni_ = make_string_ref(balloc_, sni);
1695 }
1696
get_tls_sni() const1697 StringRef ClientHandler::get_tls_sni() const { return sni_; }
1698
get_alpn() const1699 StringRef ClientHandler::get_alpn() const { return alpn_; }
1700
get_block_allocator()1701 BlockAllocator &ClientHandler::get_block_allocator() { return balloc_; }
1702
set_alpn_from_conn()1703 void ClientHandler::set_alpn_from_conn() {
1704 const unsigned char *alpn;
1705 unsigned int alpnlen;
1706
1707 SSL_get0_alpn_selected(conn_.tls.ssl, &alpn, &alpnlen);
1708
1709 alpn_ = make_string_ref(balloc_, StringRef{alpn, alpnlen});
1710 }
1711
1712 } // namespace shrpx
1713