1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2012 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_client_handler.h"
26
27 #ifdef HAVE_UNISTD_H
28 # include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 #ifdef HAVE_SYS_SOCKET_H
31 # include <sys/socket.h>
32 #endif // HAVE_SYS_SOCKET_H
33 #ifdef HAVE_NETDB_H
34 # include <netdb.h>
35 #endif // HAVE_NETDB_H
36
37 #include <cerrno>
38
39 #include "shrpx_upstream.h"
40 #include "shrpx_http2_upstream.h"
41 #include "shrpx_https_upstream.h"
42 #include "shrpx_config.h"
43 #include "shrpx_http_downstream_connection.h"
44 #include "shrpx_http2_downstream_connection.h"
45 #include "shrpx_tls.h"
46 #include "shrpx_worker.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "shrpx_downstream.h"
49 #include "shrpx_http2_session.h"
50 #include "shrpx_connect_blocker.h"
51 #include "shrpx_api_downstream_connection.h"
52 #include "shrpx_health_monitor_downstream_connection.h"
53 #include "shrpx_log.h"
54 #include "util.h"
55 #include "template.h"
56 #include "tls.h"
57
58 using namespace nghttp2;
59
60 namespace shrpx {
61
62 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)63 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
64 auto conn = static_cast<Connection *>(w->data);
65 auto handler = static_cast<ClientHandler *>(conn->data);
66
67 if (LOG_ENABLED(INFO)) {
68 CLOG(INFO, handler) << "Time out";
69 }
70
71 delete handler;
72 }
73 } // namespace
74
75 namespace {
shutdowncb(struct ev_loop * loop,ev_timer * w,int revents)76 void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) {
77 auto handler = static_cast<ClientHandler *>(w->data);
78
79 if (LOG_ENABLED(INFO)) {
80 CLOG(INFO, handler) << "Close connection due to TLS renegotiation";
81 }
82
83 delete handler;
84 }
85 } // namespace
86
87 namespace {
readcb(struct ev_loop * loop,ev_io * w,int revents)88 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
89 auto conn = static_cast<Connection *>(w->data);
90 auto handler = static_cast<ClientHandler *>(conn->data);
91
92 if (handler->do_read() != 0) {
93 delete handler;
94 return;
95 }
96 }
97 } // namespace
98
99 namespace {
writecb(struct ev_loop * loop,ev_io * w,int revents)100 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
101 auto conn = static_cast<Connection *>(w->data);
102 auto handler = static_cast<ClientHandler *>(conn->data);
103
104 if (handler->do_write() != 0) {
105 delete handler;
106 return;
107 }
108 }
109 } // namespace
110
noop()111 int ClientHandler::noop() { return 0; }
112
read_clear()113 int ClientHandler::read_clear() {
114 auto should_break = false;
115 rb_.ensure_chunk();
116 for (;;) {
117 if (rb_.rleft() && on_read() != 0) {
118 return -1;
119 }
120 if (rb_.rleft() == 0) {
121 rb_.reset();
122 } else if (rb_.wleft() == 0) {
123 conn_.rlimit.stopw();
124 return 0;
125 }
126
127 if (!ev_is_active(&conn_.rev) || should_break) {
128 return 0;
129 }
130
131 auto nread = conn_.read_clear(rb_.last(), rb_.wleft());
132
133 if (nread == 0) {
134 if (rb_.rleft() == 0) {
135 rb_.release_chunk();
136 }
137 return 0;
138 }
139
140 if (nread < 0) {
141 return -1;
142 }
143
144 rb_.write(nread);
145 should_break = true;
146 }
147 }
148
write_clear()149 int ClientHandler::write_clear() {
150 std::array<iovec, 2> iov;
151
152 for (;;) {
153 if (on_write() != 0) {
154 return -1;
155 }
156
157 auto iovcnt = upstream_->response_riovec(iov.data(), iov.size());
158 if (iovcnt == 0) {
159 break;
160 }
161
162 auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
163 if (nwrite < 0) {
164 return -1;
165 }
166
167 if (nwrite == 0) {
168 return 0;
169 }
170
171 upstream_->response_drain(nwrite);
172 }
173
174 conn_.wlimit.stopw();
175 ev_timer_stop(conn_.loop, &conn_.wt);
176
177 return 0;
178 }
179
tls_handshake()180 int ClientHandler::tls_handshake() {
181 ev_timer_again(conn_.loop, &conn_.rt);
182
183 ERR_clear_error();
184
185 auto rv = conn_.tls_handshake();
186
187 if (rv == SHRPX_ERR_INPROGRESS) {
188 return 0;
189 }
190
191 if (rv < 0) {
192 return -1;
193 }
194
195 if (LOG_ENABLED(INFO)) {
196 CLOG(INFO, this) << "SSL/TLS handshake completed";
197 }
198
199 if (validate_next_proto() != 0) {
200 return -1;
201 }
202
203 read_ = &ClientHandler::read_tls;
204 write_ = &ClientHandler::write_tls;
205
206 return 0;
207 }
208
read_tls()209 int ClientHandler::read_tls() {
210 auto should_break = false;
211
212 ERR_clear_error();
213
214 rb_.ensure_chunk();
215
216 for (;;) {
217 // we should process buffered data first before we read EOF.
218 if (rb_.rleft() && on_read() != 0) {
219 return -1;
220 }
221 if (rb_.rleft() == 0) {
222 rb_.reset();
223 } else if (rb_.wleft() == 0) {
224 conn_.rlimit.stopw();
225 return 0;
226 }
227
228 if (!ev_is_active(&conn_.rev) || should_break) {
229 return 0;
230 }
231
232 auto nread = conn_.read_tls(rb_.last(), rb_.wleft());
233
234 if (nread == 0) {
235 if (rb_.rleft() == 0) {
236 rb_.release_chunk();
237 }
238 return 0;
239 }
240
241 if (nread < 0) {
242 return -1;
243 }
244
245 rb_.write(nread);
246 should_break = true;
247 }
248 }
249
write_tls()250 int ClientHandler::write_tls() {
251 struct iovec iov;
252
253 ERR_clear_error();
254
255 if (on_write() != 0) {
256 return -1;
257 }
258
259 auto iovcnt = upstream_->response_riovec(&iov, 1);
260 if (iovcnt == 0) {
261 conn_.start_tls_write_idle();
262
263 conn_.wlimit.stopw();
264 ev_timer_stop(conn_.loop, &conn_.wt);
265
266 return 0;
267 }
268
269 for (;;) {
270 auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
271 if (nwrite < 0) {
272 return -1;
273 }
274
275 if (nwrite == 0) {
276 return 0;
277 }
278
279 upstream_->response_drain(nwrite);
280
281 iovcnt = upstream_->response_riovec(&iov, 1);
282 if (iovcnt == 0) {
283 return 0;
284 }
285 }
286 }
287
upstream_noop()288 int ClientHandler::upstream_noop() { return 0; }
289
upstream_read()290 int ClientHandler::upstream_read() {
291 assert(upstream_);
292 if (upstream_->on_read() != 0) {
293 return -1;
294 }
295 return 0;
296 }
297
upstream_write()298 int ClientHandler::upstream_write() {
299 assert(upstream_);
300 if (upstream_->on_write() != 0) {
301 return -1;
302 }
303
304 if (get_should_close_after_write() && upstream_->response_empty()) {
305 return -1;
306 }
307
308 return 0;
309 }
310
upstream_http2_connhd_read()311 int ClientHandler::upstream_http2_connhd_read() {
312 auto nread = std::min(left_connhd_len_, rb_.rleft());
313 if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
314 rb_.pos(), nread) != 0) {
315 // There is no downgrade path here. Just drop the connection.
316 if (LOG_ENABLED(INFO)) {
317 CLOG(INFO, this) << "invalid client connection header";
318 }
319
320 return -1;
321 }
322
323 left_connhd_len_ -= nread;
324 rb_.drain(nread);
325 conn_.rlimit.startw();
326
327 if (left_connhd_len_ == 0) {
328 on_read_ = &ClientHandler::upstream_read;
329 // Run on_read to process data left in buffer since they are not
330 // notified further
331 if (on_read() != 0) {
332 return -1;
333 }
334 return 0;
335 }
336
337 return 0;
338 }
339
upstream_http1_connhd_read()340 int ClientHandler::upstream_http1_connhd_read() {
341 auto nread = std::min(left_connhd_len_, rb_.rleft());
342 if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
343 rb_.pos(), nread) != 0) {
344 if (LOG_ENABLED(INFO)) {
345 CLOG(INFO, this) << "This is HTTP/1.1 connection, "
346 << "but may be upgraded to HTTP/2 later.";
347 }
348
349 // Reset header length for later HTTP/2 upgrade
350 left_connhd_len_ = NGHTTP2_CLIENT_MAGIC_LEN;
351 on_read_ = &ClientHandler::upstream_read;
352 on_write_ = &ClientHandler::upstream_write;
353
354 if (on_read() != 0) {
355 return -1;
356 }
357
358 return 0;
359 }
360
361 left_connhd_len_ -= nread;
362 rb_.drain(nread);
363 conn_.rlimit.startw();
364
365 if (left_connhd_len_ == 0) {
366 if (LOG_ENABLED(INFO)) {
367 CLOG(INFO, this) << "direct HTTP/2 connection";
368 }
369
370 direct_http2_upgrade();
371 on_read_ = &ClientHandler::upstream_read;
372 on_write_ = &ClientHandler::upstream_write;
373
374 // Run on_read to process data left in buffer since they are not
375 // notified further
376 if (on_read() != 0) {
377 return -1;
378 }
379
380 return 0;
381 }
382
383 return 0;
384 }
385
ClientHandler(Worker * worker,int fd,SSL * ssl,const StringRef & ipaddr,const StringRef & port,int family,const UpstreamAddr * faddr)386 ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
387 const StringRef &ipaddr, const StringRef &port,
388 int family, const UpstreamAddr *faddr)
389 : // We use balloc_ for TLS session ID (64), ipaddr (IPv6) (39),
390 // port (5), forwarded-for (IPv6) (41), alpn (5), proxyproto
391 // ipaddr (15), proxyproto port (5), sni (32, estimated). we
392 // need terminal NULL byte for each. We also require 8 bytes
393 // header for each allocation. We align at 16 bytes boundary,
394 // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
395 // 32 + 8 + 8 * 8 = 328.
396 balloc_(512, 512),
397 rb_(worker->get_mcpool()),
398 conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
399 get_config()->conn.upstream.timeout.write,
400 get_config()->conn.upstream.timeout.read,
401 get_config()->conn.upstream.ratelimit.write,
402 get_config()->conn.upstream.ratelimit.read, writecb, readcb,
403 timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
404 get_config()->tls.dyn_rec.idle_timeout, Proto::NONE),
405 ipaddr_(make_string_ref(balloc_, ipaddr)),
406 port_(make_string_ref(balloc_, port)),
407 faddr_(faddr),
408 worker_(worker),
409 left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
410 affinity_hash_(0),
411 should_close_after_write_(false),
412 affinity_hash_computed_(false) {
413
414 ++worker_->get_worker_stat()->num_connections;
415
416 ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.);
417
418 reneg_shutdown_timer_.data = this;
419
420 conn_.rlimit.startw();
421 ev_timer_again(conn_.loop, &conn_.rt);
422
423 auto config = get_config();
424
425 if (faddr_->accept_proxy_protocol ||
426 config->conn.upstream.accept_proxy_protocol) {
427 read_ = &ClientHandler::read_clear;
428 write_ = &ClientHandler::noop;
429 on_read_ = &ClientHandler::proxy_protocol_read;
430 on_write_ = &ClientHandler::upstream_noop;
431 } else {
432 setup_upstream_io_callback();
433 }
434
435 auto &fwdconf = config->http.forwarded;
436
437 if (fwdconf.params & FORWARDED_FOR) {
438 if (fwdconf.for_node_type == ForwardedNode::OBFUSCATED) {
439 // 1 for '_'
440 auto len = SHRPX_OBFUSCATED_NODE_LENGTH + 1;
441 // 1 for terminating NUL.
442 auto buf = make_byte_ref(balloc_, len + 1);
443 auto p = buf.base;
444 *p++ = '_';
445 p = util::random_alpha_digit(p, p + SHRPX_OBFUSCATED_NODE_LENGTH,
446 worker_->get_randgen());
447 *p = '\0';
448
449 forwarded_for_ = StringRef{buf.base, p};
450 } else {
451 init_forwarded_for(family, ipaddr_);
452 }
453 }
454 }
455
init_forwarded_for(int family,const StringRef & ipaddr)456 void ClientHandler::init_forwarded_for(int family, const StringRef &ipaddr) {
457 if (family == AF_INET6) {
458 // 2 for '[' and ']'
459 auto len = 2 + ipaddr.size();
460 // 1 for terminating NUL.
461 auto buf = make_byte_ref(balloc_, len + 1);
462 auto p = buf.base;
463 *p++ = '[';
464 p = std::copy(std::begin(ipaddr), std::end(ipaddr), p);
465 *p++ = ']';
466 *p = '\0';
467
468 forwarded_for_ = StringRef{buf.base, p};
469 } else {
470 // family == AF_INET or family == AF_UNIX
471 forwarded_for_ = ipaddr;
472 }
473 }
474
setup_upstream_io_callback()475 void ClientHandler::setup_upstream_io_callback() {
476 if (conn_.tls.ssl) {
477 conn_.prepare_server_handshake();
478 read_ = write_ = &ClientHandler::tls_handshake;
479 on_read_ = &ClientHandler::upstream_noop;
480 on_write_ = &ClientHandler::upstream_write;
481 } else {
482 // For non-TLS version, first create HttpsUpstream. It may be
483 // upgraded to HTTP/2 through HTTP Upgrade or direct HTTP/2
484 // connection.
485 upstream_ = std::make_unique<HttpsUpstream>(this);
486 alpn_ = StringRef::from_lit("http/1.1");
487 read_ = &ClientHandler::read_clear;
488 write_ = &ClientHandler::write_clear;
489 on_read_ = &ClientHandler::upstream_http1_connhd_read;
490 on_write_ = &ClientHandler::upstream_noop;
491 }
492 }
493
~ClientHandler()494 ClientHandler::~ClientHandler() {
495 if (LOG_ENABLED(INFO)) {
496 CLOG(INFO, this) << "Deleting";
497 }
498
499 if (upstream_) {
500 upstream_->on_handler_delete();
501 }
502
503 auto worker_stat = worker_->get_worker_stat();
504 --worker_stat->num_connections;
505
506 if (worker_stat->num_connections == 0) {
507 worker_->schedule_clear_mcpool();
508 }
509
510 ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
511
512 // TODO If backend is http/2, and it is in CONNECTED state, signal
513 // it and make it loopbreak when output is zero.
514 if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0) {
515 ev_break(conn_.loop);
516 }
517
518 if (LOG_ENABLED(INFO)) {
519 CLOG(INFO, this) << "Deleted";
520 }
521 }
522
get_upstream()523 Upstream *ClientHandler::get_upstream() { return upstream_.get(); }
524
get_loop() const525 struct ev_loop *ClientHandler::get_loop() const {
526 return conn_.loop;
527 }
528
reset_upstream_read_timeout(ev_tstamp t)529 void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) {
530 conn_.rt.repeat = t;
531 if (ev_is_active(&conn_.rt)) {
532 ev_timer_again(conn_.loop, &conn_.rt);
533 }
534 }
535
reset_upstream_write_timeout(ev_tstamp t)536 void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) {
537 conn_.wt.repeat = t;
538 if (ev_is_active(&conn_.wt)) {
539 ev_timer_again(conn_.loop, &conn_.wt);
540 }
541 }
542
repeat_read_timer()543 void ClientHandler::repeat_read_timer() {
544 ev_timer_again(conn_.loop, &conn_.rt);
545 }
546
stop_read_timer()547 void ClientHandler::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); }
548
validate_next_proto()549 int ClientHandler::validate_next_proto() {
550 const unsigned char *next_proto = nullptr;
551 unsigned int next_proto_len = 0;
552
553 // First set callback for catch all cases
554 on_read_ = &ClientHandler::upstream_read;
555
556 #ifndef OPENSSL_NO_NEXTPROTONEG
557 SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
558 #endif // !OPENSSL_NO_NEXTPROTONEG
559 #if OPENSSL_VERSION_NUMBER >= 0x10002000L
560 if (next_proto == nullptr) {
561 SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
562 }
563 #endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
564
565 StringRef proto;
566
567 if (next_proto) {
568 proto = StringRef{next_proto, next_proto_len};
569
570 if (LOG_ENABLED(INFO)) {
571 CLOG(INFO, this) << "The negotiated next protocol: " << proto;
572 }
573 } else {
574 if (LOG_ENABLED(INFO)) {
575 CLOG(INFO, this) << "No protocol negotiated. Fallback to HTTP/1.1";
576 }
577
578 proto = StringRef::from_lit("http/1.1");
579 }
580
581 if (!tls::in_proto_list(get_config()->tls.npn_list, proto)) {
582 if (LOG_ENABLED(INFO)) {
583 CLOG(INFO, this) << "The negotiated protocol is not supported: " << proto;
584 }
585 return -1;
586 }
587
588 if (util::check_h2_is_selected(proto)) {
589 on_read_ = &ClientHandler::upstream_http2_connhd_read;
590
591 auto http2_upstream = std::make_unique<Http2Upstream>(this);
592
593 upstream_ = std::move(http2_upstream);
594 alpn_ = make_string_ref(balloc_, proto);
595
596 // At this point, input buffer is already filled with some bytes.
597 // The read callback is not called until new data come. So consume
598 // input buffer here.
599 if (on_read() != 0) {
600 return -1;
601 }
602
603 return 0;
604 }
605
606 if (proto == StringRef::from_lit("http/1.1")) {
607 upstream_ = std::make_unique<HttpsUpstream>(this);
608 alpn_ = StringRef::from_lit("http/1.1");
609
610 // At this point, input buffer is already filled with some bytes.
611 // The read callback is not called until new data come. So consume
612 // input buffer here.
613 if (on_read() != 0) {
614 return -1;
615 }
616
617 return 0;
618 }
619 if (LOG_ENABLED(INFO)) {
620 CLOG(INFO, this) << "The negotiated protocol is not supported";
621 }
622 return -1;
623 }
624
do_read()625 int ClientHandler::do_read() { return read_(*this); }
do_write()626 int ClientHandler::do_write() { return write_(*this); }
627
on_read()628 int ClientHandler::on_read() {
629 if (rb_.chunk_avail()) {
630 auto rv = on_read_(*this);
631 if (rv != 0) {
632 return rv;
633 }
634 }
635 conn_.handle_tls_pending_read();
636 return 0;
637 }
on_write()638 int ClientHandler::on_write() { return on_write_(*this); }
639
get_ipaddr() const640 const StringRef &ClientHandler::get_ipaddr() const { return ipaddr_; }
641
get_should_close_after_write() const642 bool ClientHandler::get_should_close_after_write() const {
643 return should_close_after_write_;
644 }
645
set_should_close_after_write(bool f)646 void ClientHandler::set_should_close_after_write(bool f) {
647 should_close_after_write_ = f;
648 }
649
pool_downstream_connection(std::unique_ptr<DownstreamConnection> dconn)650 void ClientHandler::pool_downstream_connection(
651 std::unique_ptr<DownstreamConnection> dconn) {
652 if (!dconn->poolable()) {
653 return;
654 }
655
656 dconn->set_client_handler(nullptr);
657
658 auto &group = dconn->get_downstream_addr_group();
659
660 if (LOG_ENABLED(INFO)) {
661 CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get()
662 << " in group " << group;
663 }
664
665 auto addr = dconn->get_addr();
666 auto &dconn_pool = addr->dconn_pool;
667 dconn_pool->add_downstream_connection(std::move(dconn));
668 }
669
670 namespace {
671 // Computes 32bits hash for session affinity for IP address |ip|.
compute_affinity_from_ip(const StringRef & ip)672 uint32_t compute_affinity_from_ip(const StringRef &ip) {
673 int rv;
674 std::array<uint8_t, 32> buf;
675
676 rv = util::sha256(buf.data(), ip);
677 if (rv != 0) {
678 // Not sure when sha256 failed. Just fall back to another
679 // function.
680 return util::hash32(ip);
681 }
682
683 return (static_cast<uint32_t>(buf[0]) << 24) |
684 (static_cast<uint32_t>(buf[1]) << 16) |
685 (static_cast<uint32_t>(buf[2]) << 8) | static_cast<uint32_t>(buf[3]);
686 }
687 } // namespace
688
get_http2_session(const std::shared_ptr<DownstreamAddrGroup> & group,DownstreamAddr * addr)689 Http2Session *ClientHandler::get_http2_session(
690 const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr) {
691 auto &shared_addr = group->shared_addr;
692
693 if (LOG_ENABLED(INFO)) {
694 CLOG(INFO, this) << "Selected DownstreamAddr=" << addr
695 << ", index=" << (addr - shared_addr->addrs.data());
696 }
697
698 for (auto session = addr->http2_extra_freelist.head; session;) {
699 auto next = session->dlnext;
700
701 if (session->max_concurrency_reached(0)) {
702 if (LOG_ENABLED(INFO)) {
703 CLOG(INFO, this)
704 << "Maximum streams have been reached for Http2Session(" << session
705 << "). Skip it";
706 }
707
708 session->remove_from_freelist();
709 session = next;
710
711 continue;
712 }
713
714 if (LOG_ENABLED(INFO)) {
715 CLOG(INFO, this) << "Use Http2Session " << session
716 << " from http2_extra_freelist";
717 }
718
719 if (session->max_concurrency_reached(1)) {
720 if (LOG_ENABLED(INFO)) {
721 CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
722 << session << ").";
723 }
724
725 session->remove_from_freelist();
726 }
727 return session;
728 }
729
730 auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
731 worker_, group, addr);
732
733 if (LOG_ENABLED(INFO)) {
734 CLOG(INFO, this) << "Create new Http2Session " << session;
735 }
736
737 session->add_to_extra_freelist();
738
739 return session;
740 }
741
get_affinity_cookie(Downstream * downstream,const StringRef & cookie_name)742 uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
743 const StringRef &cookie_name) {
744 auto h = downstream->find_affinity_cookie(cookie_name);
745 if (h) {
746 return h;
747 }
748
749 auto d = std::uniform_int_distribution<uint32_t>(
750 1, std::numeric_limits<uint32_t>::max());
751 auto rh = d(worker_->get_randgen());
752 h = util::hash32(StringRef{reinterpret_cast<uint8_t *>(&rh),
753 reinterpret_cast<uint8_t *>(&rh) + sizeof(rh)});
754
755 downstream->renew_affinity_cookie(h);
756
757 return h;
758 }
759
760 namespace {
reschedule_addr(std::priority_queue<DownstreamAddrEntry,std::vector<DownstreamAddrEntry>,DownstreamAddrEntryGreater> & pq,DownstreamAddr * addr)761 void reschedule_addr(
762 std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
763 DownstreamAddrEntryGreater> &pq,
764 DownstreamAddr *addr) {
765 auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty;
766 addr->cycle += penalty / addr->weight;
767 addr->pending_penalty = penalty % addr->weight;
768
769 pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
770 addr->queued = true;
771 }
772 } // namespace
773
774 namespace {
reschedule_wg(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & pq,WeightGroup * wg)775 void reschedule_wg(
776 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
777 WeightGroupEntryGreater> &pq,
778 WeightGroup *wg) {
779 auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty;
780 wg->cycle += penalty / wg->weight;
781 wg->pending_penalty = penalty % wg->weight;
782
783 pq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
784 wg->queued = true;
785 }
786 } // namespace
787
get_downstream_addr(int & err,DownstreamAddrGroup * group,Downstream * downstream)788 DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
789 DownstreamAddrGroup *group,
790 Downstream *downstream) {
791 err = 0;
792
793 switch (faddr_->alt_mode) {
794 case UpstreamAltMode::API:
795 case UpstreamAltMode::HEALTHMON:
796 assert(0);
797 default:
798 break;
799 }
800
801 auto &shared_addr = group->shared_addr;
802
803 if (shared_addr->affinity.type != SessionAffinity::NONE) {
804 uint32_t hash;
805 switch (shared_addr->affinity.type) {
806 case SessionAffinity::IP:
807 if (!affinity_hash_computed_) {
808 affinity_hash_ = compute_affinity_from_ip(ipaddr_);
809 affinity_hash_computed_ = true;
810 }
811 hash = affinity_hash_;
812 break;
813 case SessionAffinity::COOKIE:
814 hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name);
815 break;
816 default:
817 assert(0);
818 }
819
820 const auto &affinity_hash = shared_addr->affinity_hash;
821
822 auto it = std::lower_bound(
823 std::begin(affinity_hash), std::end(affinity_hash), hash,
824 [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
825
826 if (it == std::end(affinity_hash)) {
827 it = std::begin(affinity_hash);
828 }
829
830 auto aff_idx =
831 static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
832 auto idx = (*it).idx;
833 auto addr = &shared_addr->addrs[idx];
834
835 if (addr->connect_blocker->blocked()) {
836 size_t i;
837 for (i = aff_idx + 1; i != aff_idx; ++i) {
838 if (i == shared_addr->affinity_hash.size()) {
839 i = 0;
840 }
841 addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
842 if (addr->connect_blocker->blocked()) {
843 continue;
844 }
845 break;
846 }
847 if (i == aff_idx) {
848 err = -1;
849 return nullptr;
850 }
851 aff_idx = i;
852 }
853
854 return addr;
855 }
856
857 auto &wgpq = shared_addr->pq;
858
859 for (;;) {
860 if (wgpq.empty()) {
861 CLOG(INFO, this) << "No working downstream address found";
862 err = -1;
863 return nullptr;
864 }
865
866 auto wg = wgpq.top().wg;
867 wgpq.pop();
868 wg->queued = false;
869
870 for (;;) {
871 if (wg->pq.empty()) {
872 break;
873 }
874
875 auto addr = wg->pq.top().addr;
876 wg->pq.pop();
877 addr->queued = false;
878
879 if (addr->connect_blocker->blocked()) {
880 continue;
881 }
882
883 reschedule_addr(wg->pq, addr);
884 reschedule_wg(wgpq, wg);
885
886 return addr;
887 }
888 }
889 }
890
891 std::unique_ptr<DownstreamConnection>
get_downstream_connection(int & err,Downstream * downstream)892 ClientHandler::get_downstream_connection(int &err, Downstream *downstream) {
893 size_t group_idx;
894 auto &downstreamconf = *worker_->get_downstream_config();
895 auto &routerconf = downstreamconf.router;
896
897 auto catch_all = downstreamconf.addr_group_catch_all;
898 auto &groups = worker_->get_downstream_addr_groups();
899
900 auto &req = downstream->request();
901
902 err = 0;
903
904 switch (faddr_->alt_mode) {
905 case UpstreamAltMode::API: {
906 auto dconn = std::make_unique<APIDownstreamConnection>(worker_);
907 dconn->set_client_handler(this);
908 return dconn;
909 }
910 case UpstreamAltMode::HEALTHMON: {
911 auto dconn = std::make_unique<HealthMonitorDownstreamConnection>();
912 dconn->set_client_handler(this);
913 return dconn;
914 }
915 default:
916 break;
917 }
918
919 auto &balloc = downstream->get_block_allocator();
920
921 StringRef authority, path;
922
923 if (req.forwarded_once) {
924 if (groups.size() != 1) {
925 authority = req.orig_authority;
926 path = req.orig_path;
927 }
928 } else {
929 if (faddr_->sni_fwd) {
930 authority = sni_;
931 } else if (!req.authority.empty()) {
932 authority = req.authority;
933 } else {
934 auto h = req.fs.header(http2::HD_HOST);
935 if (h) {
936 authority = h->value;
937 }
938 }
939
940 // CONNECT method does not have path. But we requires path in
941 // host-path mapping. As workaround, we assume that path is
942 // "/".
943 if (!req.regular_connect_method()) {
944 path = req.path;
945 }
946
947 // Cache the authority and path used for the first-time backend
948 // selection because per-pattern mruby script can change them.
949 req.orig_authority = authority;
950 req.orig_path = path;
951 req.forwarded_once = true;
952 }
953
954 // Fast path. If we have one group, it must be catch-all group.
955 if (groups.size() == 1) {
956 group_idx = 0;
957 } else {
958 group_idx = match_downstream_addr_group(routerconf, authority, path, groups,
959 catch_all, balloc);
960 }
961
962 if (LOG_ENABLED(INFO)) {
963 CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
964 }
965
966 if (groups[group_idx]->shared_addr->redirect_if_not_tls && !conn_.tls.ssl) {
967 if (LOG_ENABLED(INFO)) {
968 CLOG(INFO, this) << "Downstream address group " << group_idx
969 << " requires frontend TLS connection.";
970 }
971 err = SHRPX_ERR_TLS_REQUIRED;
972 return nullptr;
973 }
974
975 auto &group = groups[group_idx];
976 auto addr = get_downstream_addr(err, group.get(), downstream);
977 if (addr == nullptr) {
978 return nullptr;
979 }
980
981 if (addr->proto == Proto::HTTP1) {
982 auto dconn = addr->dconn_pool->pop_downstream_connection();
983 if (dconn) {
984 dconn->set_client_handler(this);
985 return dconn;
986 }
987
988 if (worker_->get_connect_blocker()->blocked()) {
989 if (LOG_ENABLED(INFO)) {
990 DCLOG(INFO, this)
991 << "Worker wide backend connection was blocked temporarily";
992 }
993 return nullptr;
994 }
995
996 if (LOG_ENABLED(INFO)) {
997 CLOG(INFO, this) << "Downstream connection pool is empty."
998 << " Create new one";
999 }
1000
1001 dconn = std::make_unique<HttpDownstreamConnection>(group, addr, conn_.loop,
1002 worker_);
1003 dconn->set_client_handler(this);
1004 return dconn;
1005 }
1006
1007 if (LOG_ENABLED(INFO)) {
1008 CLOG(INFO, this) << "Downstream connection pool is empty."
1009 << " Create new one";
1010 }
1011
1012 auto http2session = get_http2_session(group, addr);
1013 auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
1014 dconn->set_client_handler(this);
1015 return dconn;
1016 }
1017
get_mcpool()1018 MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); }
1019
get_ssl() const1020 SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
1021
direct_http2_upgrade()1022 void ClientHandler::direct_http2_upgrade() {
1023 upstream_ = std::make_unique<Http2Upstream>(this);
1024 alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1025 on_read_ = &ClientHandler::upstream_read;
1026 write_ = &ClientHandler::write_clear;
1027 }
1028
perform_http2_upgrade(HttpsUpstream * http)1029 int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
1030 auto upstream = std::make_unique<Http2Upstream>(this);
1031
1032 auto output = upstream->get_response_buf();
1033
1034 // We might have written non-final header in response_buf, in this
1035 // case, response_state is still INITIAL. If this non-final header
1036 // and upgrade header fit in output buffer, do upgrade. Otherwise,
1037 // to avoid to send this non-final header as response body in HTTP/2
1038 // upstream, fail upgrade.
1039 auto downstream = http->get_downstream();
1040 auto input = downstream->get_response_buf();
1041
1042 if (upstream->upgrade_upstream(http) != 0) {
1043 return -1;
1044 }
1045 // http pointer is now owned by upstream.
1046 upstream_.release();
1047 // TODO We might get other version id in HTTP2-settings, if we
1048 // support aliasing for h2, but we just use library default for now.
1049 alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1050 on_read_ = &ClientHandler::upstream_http2_connhd_read;
1051 write_ = &ClientHandler::write_clear;
1052
1053 input->remove(*output, input->rleft());
1054
1055 constexpr auto res =
1056 StringRef::from_lit("HTTP/1.1 101 Switching Protocols\r\n"
1057 "Connection: Upgrade\r\n"
1058 "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
1059 "\r\n");
1060
1061 output->append(res);
1062 upstream_ = std::move(upstream);
1063
1064 signal_write();
1065 return 0;
1066 }
1067
get_http2_upgrade_allowed() const1068 bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; }
1069
get_upstream_scheme() const1070 StringRef ClientHandler::get_upstream_scheme() const {
1071 if (conn_.tls.ssl) {
1072 return StringRef::from_lit("https");
1073 } else {
1074 return StringRef::from_lit("http");
1075 }
1076 }
1077
start_immediate_shutdown()1078 void ClientHandler::start_immediate_shutdown() {
1079 ev_timer_start(conn_.loop, &reneg_shutdown_timer_);
1080 }
1081
write_accesslog(Downstream * downstream)1082 void ClientHandler::write_accesslog(Downstream *downstream) {
1083 auto &req = downstream->request();
1084
1085 auto config = get_config();
1086
1087 if (!req.tstamp) {
1088 auto lgconf = log_config();
1089 lgconf->update_tstamp(std::chrono::system_clock::now());
1090 req.tstamp = lgconf->tstamp;
1091 }
1092
1093 upstream_accesslog(
1094 config->logging.access.format,
1095 LogSpec{
1096 downstream,
1097 ipaddr_,
1098 alpn_,
1099 sni_,
1100 conn_.tls.ssl,
1101 std::chrono::high_resolution_clock::now(), // request_end_time
1102 port_,
1103 faddr_->port,
1104 config->pid,
1105 });
1106 }
1107
get_rb()1108 ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
1109
signal_write()1110 void ClientHandler::signal_write() { conn_.wlimit.startw(); }
1111
get_rlimit()1112 RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; }
get_wlimit()1113 RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; }
1114
get_wev()1115 ev_io *ClientHandler::get_wev() { return &conn_.wev; }
1116
get_worker() const1117 Worker *ClientHandler::get_worker() const { return worker_; }
1118
1119 namespace {
parse_proxy_line_port(const uint8_t * first,const uint8_t * last)1120 ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {
1121 auto p = first;
1122 int32_t port = 0;
1123
1124 if (p == last) {
1125 return -1;
1126 }
1127
1128 if (*p == '0') {
1129 if (p + 1 != last && util::is_digit(*(p + 1))) {
1130 return -1;
1131 }
1132 return 1;
1133 }
1134
1135 for (; p != last && util::is_digit(*p); ++p) {
1136 port *= 10;
1137 port += *p - '0';
1138
1139 if (port > 65535) {
1140 return -1;
1141 }
1142 }
1143
1144 return p - first;
1145 }
1146 } // namespace
1147
on_proxy_protocol_finish()1148 int ClientHandler::on_proxy_protocol_finish() {
1149 if (conn_.tls.ssl) {
1150 conn_.tls.rbuf.append(rb_.pos(), rb_.rleft());
1151 rb_.reset();
1152 }
1153
1154 setup_upstream_io_callback();
1155
1156 // Run on_read to process data left in buffer since they are not
1157 // notified further
1158 if (on_read() != 0) {
1159 return -1;
1160 }
1161
1162 return 0;
1163 }
1164
1165 namespace {
1166 // PROXY-protocol v2 header signature
1167 constexpr uint8_t PROXY_PROTO_V2_SIG[] =
1168 "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
1169
1170 // PROXY-protocol v2 header length
1171 constexpr size_t PROXY_PROTO_V2_HDLEN =
1172 str_size(PROXY_PROTO_V2_SIG) + /* ver_cmd(1) + fam(1) + len(2) = */ 4;
1173 } // namespace
1174
1175 // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
proxy_protocol_read()1176 int ClientHandler::proxy_protocol_read() {
1177 if (LOG_ENABLED(INFO)) {
1178 CLOG(INFO, this) << "PROXY-protocol: Started";
1179 }
1180
1181 auto first = rb_.pos();
1182
1183 if (rb_.rleft() >= PROXY_PROTO_V2_HDLEN &&
1184 (*(first + str_size(PROXY_PROTO_V2_SIG)) & 0xf0) == 0x20) {
1185 if (LOG_ENABLED(INFO)) {
1186 CLOG(INFO, this) << "PROXY-protocol: Detected v2 header signature";
1187 }
1188 return proxy_protocol_v2_read();
1189 }
1190
1191 // NULL character really destroys functions which expects NULL
1192 // terminated string. We won't expect it in PROXY protocol line, so
1193 // find it here.
1194 auto chrs = std::array<char, 2>{'\n', '\0'};
1195
1196 constexpr size_t MAX_PROXY_LINELEN = 107;
1197
1198 auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());
1199
1200 auto end =
1201 std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));
1202
1203 if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
1204 if (LOG_ENABLED(INFO)) {
1205 CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
1206 }
1207 return -1;
1208 }
1209
1210 --end;
1211
1212 constexpr auto HEADER = StringRef::from_lit("PROXY ");
1213
1214 if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
1215 if (LOG_ENABLED(INFO)) {
1216 CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
1217 }
1218 return -1;
1219 }
1220
1221 if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
1222 if (LOG_ENABLED(INFO)) {
1223 CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
1224 }
1225 return -1;
1226 }
1227
1228 rb_.drain(HEADER.size());
1229
1230 int family;
1231
1232 if (rb_.pos()[0] == 'T') {
1233 if (end - rb_.pos() < 5) {
1234 if (LOG_ENABLED(INFO)) {
1235 CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1236 }
1237 return -1;
1238 }
1239
1240 if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
1241 if (LOG_ENABLED(INFO)) {
1242 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1243 }
1244 return -1;
1245 }
1246
1247 switch (rb_.pos()[3]) {
1248 case '4':
1249 family = AF_INET;
1250 break;
1251 case '6':
1252 family = AF_INET6;
1253 break;
1254 default:
1255 if (LOG_ENABLED(INFO)) {
1256 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1257 }
1258 return -1;
1259 }
1260
1261 rb_.drain(5);
1262 } else {
1263 if (end - rb_.pos() < 7) {
1264 if (LOG_ENABLED(INFO)) {
1265 CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1266 }
1267 return -1;
1268 }
1269 if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
1270 if (LOG_ENABLED(INFO)) {
1271 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1272 }
1273 return -1;
1274 }
1275
1276 rb_.drain(end + 2 - rb_.pos());
1277
1278 return on_proxy_protocol_finish();
1279 }
1280
1281 // source address
1282 auto token_end = std::find(rb_.pos(), end, ' ');
1283 if (token_end == end) {
1284 if (LOG_ENABLED(INFO)) {
1285 CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
1286 }
1287 return -1;
1288 }
1289
1290 *token_end = '\0';
1291 if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1292 if (LOG_ENABLED(INFO)) {
1293 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
1294 }
1295 return -1;
1296 }
1297
1298 auto src_addr = rb_.pos();
1299 auto src_addrlen = token_end - rb_.pos();
1300
1301 rb_.drain(token_end - rb_.pos() + 1);
1302
1303 // destination address
1304 token_end = std::find(rb_.pos(), end, ' ');
1305 if (token_end == end) {
1306 if (LOG_ENABLED(INFO)) {
1307 CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
1308 }
1309 return -1;
1310 }
1311
1312 *token_end = '\0';
1313 if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1314 if (LOG_ENABLED(INFO)) {
1315 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
1316 }
1317 return -1;
1318 }
1319
1320 // Currently we don't use destination address
1321
1322 rb_.drain(token_end - rb_.pos() + 1);
1323
1324 // source port
1325 auto n = parse_proxy_line_port(rb_.pos(), end);
1326 if (n <= 0 || *(rb_.pos() + n) != ' ') {
1327 if (LOG_ENABLED(INFO)) {
1328 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
1329 }
1330 return -1;
1331 }
1332
1333 rb_.pos()[n] = '\0';
1334 auto src_port = rb_.pos();
1335 auto src_portlen = n;
1336
1337 rb_.drain(n + 1);
1338
1339 // destination port
1340 n = parse_proxy_line_port(rb_.pos(), end);
1341 if (n <= 0 || rb_.pos() + n != end) {
1342 if (LOG_ENABLED(INFO)) {
1343 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
1344 }
1345 return -1;
1346 }
1347
1348 // Currently we don't use destination port
1349
1350 rb_.drain(end + 2 - rb_.pos());
1351
1352 ipaddr_ =
1353 make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
1354 port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});
1355
1356 if (LOG_ENABLED(INFO)) {
1357 CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
1358 << " bytes read";
1359 }
1360
1361 auto config = get_config();
1362 auto &fwdconf = config->http.forwarded;
1363
1364 if ((fwdconf.params & FORWARDED_FOR) &&
1365 fwdconf.for_node_type == ForwardedNode::IP) {
1366 init_forwarded_for(family, ipaddr_);
1367 }
1368
1369 return on_proxy_protocol_finish();
1370 }
1371
proxy_protocol_v2_read()1372 int ClientHandler::proxy_protocol_v2_read() {
1373 // Assume that first str_size(PROXY_PROTO_V2_SIG) octets match v2
1374 // protocol signature and followed by the bytes which indicates v2.
1375 assert(rb_.rleft() >= PROXY_PROTO_V2_HDLEN);
1376
1377 auto p = rb_.pos() + str_size(PROXY_PROTO_V2_SIG);
1378
1379 assert(((*p) & 0xf0) == 0x20);
1380
1381 enum { LOCAL, PROXY } cmd;
1382
1383 auto cmd_bits = (*p++) & 0xf;
1384 switch (cmd_bits) {
1385 case 0x0:
1386 cmd = LOCAL;
1387 break;
1388 case 0x01:
1389 cmd = PROXY;
1390 break;
1391 default:
1392 if (LOG_ENABLED(INFO)) {
1393 CLOG(INFO, this) << "PROXY-protocol-v2: Unknown command " << log::hex
1394 << cmd_bits;
1395 }
1396 return -1;
1397 }
1398
1399 auto fam = *p++;
1400 uint16_t len;
1401 memcpy(&len, p, sizeof(len));
1402 len = ntohs(len);
1403
1404 p += sizeof(len);
1405
1406 if (LOG_ENABLED(INFO)) {
1407 CLOG(INFO, this) << "PROXY-protocol-v2: Detected family=" << log::hex << fam
1408 << ", len=" << log::dec << len;
1409 }
1410
1411 if (rb_.last() - p < len) {
1412 if (LOG_ENABLED(INFO)) {
1413 CLOG(INFO, this)
1414 << "PROXY-protocol-v2: Prematurely truncated header block; require "
1415 << len << " bytes, " << rb_.last() - p << " bytes left";
1416 }
1417 return -1;
1418 }
1419
1420 int family;
1421 std::array<char, std::max(INET_ADDRSTRLEN, INET6_ADDRSTRLEN)> src_addr,
1422 dst_addr;
1423 size_t addrlen;
1424
1425 switch (fam) {
1426 case 0x11:
1427 case 0x12:
1428 if (len < 12) {
1429 if (LOG_ENABLED(INFO)) {
1430 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET addresses";
1431 }
1432 return -1;
1433 }
1434 family = AF_INET;
1435 addrlen = 4;
1436 break;
1437 case 0x21:
1438 case 0x22:
1439 if (len < 36) {
1440 if (LOG_ENABLED(INFO)) {
1441 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET6 addresses";
1442 }
1443 return -1;
1444 }
1445 family = AF_INET6;
1446 addrlen = 16;
1447 break;
1448 case 0x31:
1449 case 0x32:
1450 if (len < 216) {
1451 if (LOG_ENABLED(INFO)) {
1452 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_UNIX addresses";
1453 }
1454 return -1;
1455 }
1456 // fall through
1457 case 0x00: {
1458 // UNSPEC and UNIX are just ignored.
1459 if (LOG_ENABLED(INFO)) {
1460 CLOG(INFO, this) << "PROXY-protocol-v2: Ignore combination of address "
1461 "family and protocol "
1462 << log::hex << fam;
1463 }
1464 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1465 return on_proxy_protocol_finish();
1466 }
1467 default:
1468 if (LOG_ENABLED(INFO)) {
1469 CLOG(INFO, this) << "PROXY-protocol-v2: Unknown combination of address "
1470 "family and protocol "
1471 << log::hex << fam;
1472 }
1473 return -1;
1474 }
1475
1476 if (cmd != PROXY) {
1477 if (LOG_ENABLED(INFO)) {
1478 CLOG(INFO, this) << "PROXY-protocol-v2: Ignore non-PROXY command";
1479 }
1480 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1481 return on_proxy_protocol_finish();
1482 }
1483
1484 if (inet_ntop(family, p, src_addr.data(), src_addr.size()) == nullptr) {
1485 if (LOG_ENABLED(INFO)) {
1486 CLOG(INFO, this) << "PROXY-protocol-v2: Unable to parse source address";
1487 }
1488 return -1;
1489 }
1490
1491 p += addrlen;
1492
1493 if (inet_ntop(family, p, dst_addr.data(), dst_addr.size()) == nullptr) {
1494 if (LOG_ENABLED(INFO)) {
1495 CLOG(INFO, this)
1496 << "PROXY-protocol-v2: Unable to parse destination address";
1497 }
1498 return -1;
1499 }
1500
1501 p += addrlen;
1502
1503 uint16_t src_port;
1504
1505 memcpy(&src_port, p, sizeof(src_port));
1506 src_port = ntohs(src_port);
1507
1508 // We don't use destination port.
1509 p += 4;
1510
1511 ipaddr_ = make_string_ref(balloc_, StringRef{src_addr.data()});
1512 port_ = util::make_string_ref_uint(balloc_, src_port);
1513
1514 if (LOG_ENABLED(INFO)) {
1515 CLOG(INFO, this) << "PROXY-protocol-v2: Finished reading proxy addresses, "
1516 << p - rb_.pos() << " bytes read, "
1517 << PROXY_PROTO_V2_HDLEN + len - (p - rb_.pos())
1518 << " bytes left";
1519 }
1520
1521 auto config = get_config();
1522 auto &fwdconf = config->http.forwarded;
1523
1524 if ((fwdconf.params & FORWARDED_FOR) &&
1525 fwdconf.for_node_type == ForwardedNode::IP) {
1526 init_forwarded_for(family, ipaddr_);
1527 }
1528
1529 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1530 return on_proxy_protocol_finish();
1531 }
1532
get_forwarded_by() const1533 StringRef ClientHandler::get_forwarded_by() const {
1534 auto &fwdconf = get_config()->http.forwarded;
1535
1536 if (fwdconf.by_node_type == ForwardedNode::OBFUSCATED) {
1537 return fwdconf.by_obfuscated;
1538 }
1539
1540 return faddr_->hostport;
1541 }
1542
get_forwarded_for() const1543 StringRef ClientHandler::get_forwarded_for() const { return forwarded_for_; }
1544
get_upstream_addr() const1545 const UpstreamAddr *ClientHandler::get_upstream_addr() const { return faddr_; }
1546
get_connection()1547 Connection *ClientHandler::get_connection() { return &conn_; };
1548
set_tls_sni(const StringRef & sni)1549 void ClientHandler::set_tls_sni(const StringRef &sni) {
1550 sni_ = make_string_ref(balloc_, sni);
1551 }
1552
get_tls_sni() const1553 StringRef ClientHandler::get_tls_sni() const { return sni_; }
1554
get_alpn() const1555 StringRef ClientHandler::get_alpn() const { return alpn_; }
1556
get_block_allocator()1557 BlockAllocator &ClientHandler::get_block_allocator() { return balloc_; }
1558
1559 } // namespace shrpx
1560