• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_client_handler.h"
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 #ifdef HAVE_SYS_SOCKET_H
31 #  include <sys/socket.h>
32 #endif // HAVE_SYS_SOCKET_H
33 #ifdef HAVE_NETDB_H
34 #  include <netdb.h>
35 #endif // HAVE_NETDB_H
36 
37 #include <cerrno>
38 
39 #include "shrpx_upstream.h"
40 #include "shrpx_http2_upstream.h"
41 #include "shrpx_https_upstream.h"
42 #include "shrpx_config.h"
43 #include "shrpx_http_downstream_connection.h"
44 #include "shrpx_http2_downstream_connection.h"
45 #include "shrpx_tls.h"
46 #include "shrpx_worker.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "shrpx_downstream.h"
49 #include "shrpx_http2_session.h"
50 #include "shrpx_connect_blocker.h"
51 #include "shrpx_api_downstream_connection.h"
52 #include "shrpx_health_monitor_downstream_connection.h"
53 #include "shrpx_log.h"
54 #include "util.h"
55 #include "template.h"
56 #include "tls.h"
57 
58 using namespace nghttp2;
59 
60 namespace shrpx {
61 
62 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)63 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
64   auto conn = static_cast<Connection *>(w->data);
65   auto handler = static_cast<ClientHandler *>(conn->data);
66 
67   if (LOG_ENABLED(INFO)) {
68     CLOG(INFO, handler) << "Time out";
69   }
70 
71   delete handler;
72 }
73 } // namespace
74 
75 namespace {
shutdowncb(struct ev_loop * loop,ev_timer * w,int revents)76 void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) {
77   auto handler = static_cast<ClientHandler *>(w->data);
78 
79   if (LOG_ENABLED(INFO)) {
80     CLOG(INFO, handler) << "Close connection due to TLS renegotiation";
81   }
82 
83   delete handler;
84 }
85 } // namespace
86 
87 namespace {
readcb(struct ev_loop * loop,ev_io * w,int revents)88 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
89   auto conn = static_cast<Connection *>(w->data);
90   auto handler = static_cast<ClientHandler *>(conn->data);
91 
92   if (handler->do_read() != 0) {
93     delete handler;
94     return;
95   }
96 }
97 } // namespace
98 
99 namespace {
writecb(struct ev_loop * loop,ev_io * w,int revents)100 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
101   auto conn = static_cast<Connection *>(w->data);
102   auto handler = static_cast<ClientHandler *>(conn->data);
103 
104   if (handler->do_write() != 0) {
105     delete handler;
106     return;
107   }
108 }
109 } // namespace
110 
noop()111 int ClientHandler::noop() { return 0; }
112 
read_clear()113 int ClientHandler::read_clear() {
114   auto should_break = false;
115   rb_.ensure_chunk();
116   for (;;) {
117     if (rb_.rleft() && on_read() != 0) {
118       return -1;
119     }
120     if (rb_.rleft() == 0) {
121       rb_.reset();
122     } else if (rb_.wleft() == 0) {
123       conn_.rlimit.stopw();
124       return 0;
125     }
126 
127     if (!ev_is_active(&conn_.rev) || should_break) {
128       return 0;
129     }
130 
131     auto nread = conn_.read_clear(rb_.last(), rb_.wleft());
132 
133     if (nread == 0) {
134       if (rb_.rleft() == 0) {
135         rb_.release_chunk();
136       }
137       return 0;
138     }
139 
140     if (nread < 0) {
141       return -1;
142     }
143 
144     rb_.write(nread);
145     should_break = true;
146   }
147 }
148 
write_clear()149 int ClientHandler::write_clear() {
150   std::array<iovec, 2> iov;
151 
152   for (;;) {
153     if (on_write() != 0) {
154       return -1;
155     }
156 
157     auto iovcnt = upstream_->response_riovec(iov.data(), iov.size());
158     if (iovcnt == 0) {
159       break;
160     }
161 
162     auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
163     if (nwrite < 0) {
164       return -1;
165     }
166 
167     if (nwrite == 0) {
168       return 0;
169     }
170 
171     upstream_->response_drain(nwrite);
172   }
173 
174   conn_.wlimit.stopw();
175   ev_timer_stop(conn_.loop, &conn_.wt);
176 
177   return 0;
178 }
179 
tls_handshake()180 int ClientHandler::tls_handshake() {
181   ev_timer_again(conn_.loop, &conn_.rt);
182 
183   ERR_clear_error();
184 
185   auto rv = conn_.tls_handshake();
186 
187   if (rv == SHRPX_ERR_INPROGRESS) {
188     return 0;
189   }
190 
191   if (rv < 0) {
192     return -1;
193   }
194 
195   if (LOG_ENABLED(INFO)) {
196     CLOG(INFO, this) << "SSL/TLS handshake completed";
197   }
198 
199   if (validate_next_proto() != 0) {
200     return -1;
201   }
202 
203   read_ = &ClientHandler::read_tls;
204   write_ = &ClientHandler::write_tls;
205 
206   return 0;
207 }
208 
read_tls()209 int ClientHandler::read_tls() {
210   auto should_break = false;
211 
212   ERR_clear_error();
213 
214   rb_.ensure_chunk();
215 
216   for (;;) {
217     // we should process buffered data first before we read EOF.
218     if (rb_.rleft() && on_read() != 0) {
219       return -1;
220     }
221     if (rb_.rleft() == 0) {
222       rb_.reset();
223     } else if (rb_.wleft() == 0) {
224       conn_.rlimit.stopw();
225       return 0;
226     }
227 
228     if (!ev_is_active(&conn_.rev) || should_break) {
229       return 0;
230     }
231 
232     auto nread = conn_.read_tls(rb_.last(), rb_.wleft());
233 
234     if (nread == 0) {
235       if (rb_.rleft() == 0) {
236         rb_.release_chunk();
237       }
238       return 0;
239     }
240 
241     if (nread < 0) {
242       return -1;
243     }
244 
245     rb_.write(nread);
246     should_break = true;
247   }
248 }
249 
write_tls()250 int ClientHandler::write_tls() {
251   struct iovec iov;
252 
253   ERR_clear_error();
254 
255   if (on_write() != 0) {
256     return -1;
257   }
258 
259   auto iovcnt = upstream_->response_riovec(&iov, 1);
260   if (iovcnt == 0) {
261     conn_.start_tls_write_idle();
262 
263     conn_.wlimit.stopw();
264     ev_timer_stop(conn_.loop, &conn_.wt);
265 
266     return 0;
267   }
268 
269   for (;;) {
270     auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
271     if (nwrite < 0) {
272       return -1;
273     }
274 
275     if (nwrite == 0) {
276       return 0;
277     }
278 
279     upstream_->response_drain(nwrite);
280 
281     iovcnt = upstream_->response_riovec(&iov, 1);
282     if (iovcnt == 0) {
283       return 0;
284     }
285   }
286 }
287 
upstream_noop()288 int ClientHandler::upstream_noop() { return 0; }
289 
upstream_read()290 int ClientHandler::upstream_read() {
291   assert(upstream_);
292   if (upstream_->on_read() != 0) {
293     return -1;
294   }
295   return 0;
296 }
297 
upstream_write()298 int ClientHandler::upstream_write() {
299   assert(upstream_);
300   if (upstream_->on_write() != 0) {
301     return -1;
302   }
303 
304   if (get_should_close_after_write() && upstream_->response_empty()) {
305     return -1;
306   }
307 
308   return 0;
309 }
310 
upstream_http2_connhd_read()311 int ClientHandler::upstream_http2_connhd_read() {
312   auto nread = std::min(left_connhd_len_, rb_.rleft());
313   if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
314              rb_.pos(), nread) != 0) {
315     // There is no downgrade path here. Just drop the connection.
316     if (LOG_ENABLED(INFO)) {
317       CLOG(INFO, this) << "invalid client connection header";
318     }
319 
320     return -1;
321   }
322 
323   left_connhd_len_ -= nread;
324   rb_.drain(nread);
325   conn_.rlimit.startw();
326 
327   if (left_connhd_len_ == 0) {
328     on_read_ = &ClientHandler::upstream_read;
329     // Run on_read to process data left in buffer since they are not
330     // notified further
331     if (on_read() != 0) {
332       return -1;
333     }
334     return 0;
335   }
336 
337   return 0;
338 }
339 
upstream_http1_connhd_read()340 int ClientHandler::upstream_http1_connhd_read() {
341   auto nread = std::min(left_connhd_len_, rb_.rleft());
342   if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
343              rb_.pos(), nread) != 0) {
344     if (LOG_ENABLED(INFO)) {
345       CLOG(INFO, this) << "This is HTTP/1.1 connection, "
346                        << "but may be upgraded to HTTP/2 later.";
347     }
348 
349     // Reset header length for later HTTP/2 upgrade
350     left_connhd_len_ = NGHTTP2_CLIENT_MAGIC_LEN;
351     on_read_ = &ClientHandler::upstream_read;
352     on_write_ = &ClientHandler::upstream_write;
353 
354     if (on_read() != 0) {
355       return -1;
356     }
357 
358     return 0;
359   }
360 
361   left_connhd_len_ -= nread;
362   rb_.drain(nread);
363   conn_.rlimit.startw();
364 
365   if (left_connhd_len_ == 0) {
366     if (LOG_ENABLED(INFO)) {
367       CLOG(INFO, this) << "direct HTTP/2 connection";
368     }
369 
370     direct_http2_upgrade();
371     on_read_ = &ClientHandler::upstream_read;
372     on_write_ = &ClientHandler::upstream_write;
373 
374     // Run on_read to process data left in buffer since they are not
375     // notified further
376     if (on_read() != 0) {
377       return -1;
378     }
379 
380     return 0;
381   }
382 
383   return 0;
384 }
385 
ClientHandler(Worker * worker,int fd,SSL * ssl,const StringRef & ipaddr,const StringRef & port,int family,const UpstreamAddr * faddr)386 ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
387                              const StringRef &ipaddr, const StringRef &port,
388                              int family, const UpstreamAddr *faddr)
389     : // We use balloc_ for TLS session ID (64), ipaddr (IPv6) (39),
390       // port (5), forwarded-for (IPv6) (41), alpn (5), proxyproto
391       // ipaddr (15), proxyproto port (5), sni (32, estimated).  we
392       // need terminal NULL byte for each.  We also require 8 bytes
393       // header for each allocation.  We align at 16 bytes boundary,
394       // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
395       // 32 + 8 + 8 * 8 = 328.
396       balloc_(512, 512),
397       rb_(worker->get_mcpool()),
398       conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
399             get_config()->conn.upstream.timeout.write,
400             get_config()->conn.upstream.timeout.read,
401             get_config()->conn.upstream.ratelimit.write,
402             get_config()->conn.upstream.ratelimit.read, writecb, readcb,
403             timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
404             get_config()->tls.dyn_rec.idle_timeout, Proto::NONE),
405       ipaddr_(make_string_ref(balloc_, ipaddr)),
406       port_(make_string_ref(balloc_, port)),
407       faddr_(faddr),
408       worker_(worker),
409       left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
410       affinity_hash_(0),
411       should_close_after_write_(false),
412       affinity_hash_computed_(false) {
413 
414   ++worker_->get_worker_stat()->num_connections;
415 
416   ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.);
417 
418   reneg_shutdown_timer_.data = this;
419 
420   conn_.rlimit.startw();
421   ev_timer_again(conn_.loop, &conn_.rt);
422 
423   auto config = get_config();
424 
425   if (faddr_->accept_proxy_protocol ||
426       config->conn.upstream.accept_proxy_protocol) {
427     read_ = &ClientHandler::read_clear;
428     write_ = &ClientHandler::noop;
429     on_read_ = &ClientHandler::proxy_protocol_read;
430     on_write_ = &ClientHandler::upstream_noop;
431   } else {
432     setup_upstream_io_callback();
433   }
434 
435   auto &fwdconf = config->http.forwarded;
436 
437   if (fwdconf.params & FORWARDED_FOR) {
438     if (fwdconf.for_node_type == ForwardedNode::OBFUSCATED) {
439       // 1 for '_'
440       auto len = SHRPX_OBFUSCATED_NODE_LENGTH + 1;
441       // 1 for terminating NUL.
442       auto buf = make_byte_ref(balloc_, len + 1);
443       auto p = buf.base;
444       *p++ = '_';
445       p = util::random_alpha_digit(p, p + SHRPX_OBFUSCATED_NODE_LENGTH,
446                                    worker_->get_randgen());
447       *p = '\0';
448 
449       forwarded_for_ = StringRef{buf.base, p};
450     } else {
451       init_forwarded_for(family, ipaddr_);
452     }
453   }
454 }
455 
init_forwarded_for(int family,const StringRef & ipaddr)456 void ClientHandler::init_forwarded_for(int family, const StringRef &ipaddr) {
457   if (family == AF_INET6) {
458     // 2 for '[' and ']'
459     auto len = 2 + ipaddr.size();
460     // 1 for terminating NUL.
461     auto buf = make_byte_ref(balloc_, len + 1);
462     auto p = buf.base;
463     *p++ = '[';
464     p = std::copy(std::begin(ipaddr), std::end(ipaddr), p);
465     *p++ = ']';
466     *p = '\0';
467 
468     forwarded_for_ = StringRef{buf.base, p};
469   } else {
470     // family == AF_INET or family == AF_UNIX
471     forwarded_for_ = ipaddr;
472   }
473 }
474 
setup_upstream_io_callback()475 void ClientHandler::setup_upstream_io_callback() {
476   if (conn_.tls.ssl) {
477     conn_.prepare_server_handshake();
478     read_ = write_ = &ClientHandler::tls_handshake;
479     on_read_ = &ClientHandler::upstream_noop;
480     on_write_ = &ClientHandler::upstream_write;
481   } else {
482     // For non-TLS version, first create HttpsUpstream. It may be
483     // upgraded to HTTP/2 through HTTP Upgrade or direct HTTP/2
484     // connection.
485     upstream_ = std::make_unique<HttpsUpstream>(this);
486     alpn_ = StringRef::from_lit("http/1.1");
487     read_ = &ClientHandler::read_clear;
488     write_ = &ClientHandler::write_clear;
489     on_read_ = &ClientHandler::upstream_http1_connhd_read;
490     on_write_ = &ClientHandler::upstream_noop;
491   }
492 }
493 
~ClientHandler()494 ClientHandler::~ClientHandler() {
495   if (LOG_ENABLED(INFO)) {
496     CLOG(INFO, this) << "Deleting";
497   }
498 
499   if (upstream_) {
500     upstream_->on_handler_delete();
501   }
502 
503   auto worker_stat = worker_->get_worker_stat();
504   --worker_stat->num_connections;
505 
506   if (worker_stat->num_connections == 0) {
507     worker_->schedule_clear_mcpool();
508   }
509 
510   ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
511 
512   // TODO If backend is http/2, and it is in CONNECTED state, signal
513   // it and make it loopbreak when output is zero.
514   if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0) {
515     ev_break(conn_.loop);
516   }
517 
518   if (LOG_ENABLED(INFO)) {
519     CLOG(INFO, this) << "Deleted";
520   }
521 }
522 
get_upstream()523 Upstream *ClientHandler::get_upstream() { return upstream_.get(); }
524 
get_loop() const525 struct ev_loop *ClientHandler::get_loop() const {
526   return conn_.loop;
527 }
528 
reset_upstream_read_timeout(ev_tstamp t)529 void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) {
530   conn_.rt.repeat = t;
531   if (ev_is_active(&conn_.rt)) {
532     ev_timer_again(conn_.loop, &conn_.rt);
533   }
534 }
535 
reset_upstream_write_timeout(ev_tstamp t)536 void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) {
537   conn_.wt.repeat = t;
538   if (ev_is_active(&conn_.wt)) {
539     ev_timer_again(conn_.loop, &conn_.wt);
540   }
541 }
542 
repeat_read_timer()543 void ClientHandler::repeat_read_timer() {
544   ev_timer_again(conn_.loop, &conn_.rt);
545 }
546 
stop_read_timer()547 void ClientHandler::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); }
548 
validate_next_proto()549 int ClientHandler::validate_next_proto() {
550   const unsigned char *next_proto = nullptr;
551   unsigned int next_proto_len = 0;
552 
553   // First set callback for catch all cases
554   on_read_ = &ClientHandler::upstream_read;
555 
556 #ifndef OPENSSL_NO_NEXTPROTONEG
557   SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
558 #endif // !OPENSSL_NO_NEXTPROTONEG
559 #if OPENSSL_VERSION_NUMBER >= 0x10002000L
560   if (next_proto == nullptr) {
561     SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
562   }
563 #endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
564 
565   StringRef proto;
566 
567   if (next_proto) {
568     proto = StringRef{next_proto, next_proto_len};
569 
570     if (LOG_ENABLED(INFO)) {
571       CLOG(INFO, this) << "The negotiated next protocol: " << proto;
572     }
573   } else {
574     if (LOG_ENABLED(INFO)) {
575       CLOG(INFO, this) << "No protocol negotiated. Fallback to HTTP/1.1";
576     }
577 
578     proto = StringRef::from_lit("http/1.1");
579   }
580 
581   if (!tls::in_proto_list(get_config()->tls.npn_list, proto)) {
582     if (LOG_ENABLED(INFO)) {
583       CLOG(INFO, this) << "The negotiated protocol is not supported: " << proto;
584     }
585     return -1;
586   }
587 
588   if (util::check_h2_is_selected(proto)) {
589     on_read_ = &ClientHandler::upstream_http2_connhd_read;
590 
591     auto http2_upstream = std::make_unique<Http2Upstream>(this);
592 
593     upstream_ = std::move(http2_upstream);
594     alpn_ = make_string_ref(balloc_, proto);
595 
596     // At this point, input buffer is already filled with some bytes.
597     // The read callback is not called until new data come. So consume
598     // input buffer here.
599     if (on_read() != 0) {
600       return -1;
601     }
602 
603     return 0;
604   }
605 
606   if (proto == StringRef::from_lit("http/1.1")) {
607     upstream_ = std::make_unique<HttpsUpstream>(this);
608     alpn_ = StringRef::from_lit("http/1.1");
609 
610     // At this point, input buffer is already filled with some bytes.
611     // The read callback is not called until new data come. So consume
612     // input buffer here.
613     if (on_read() != 0) {
614       return -1;
615     }
616 
617     return 0;
618   }
619   if (LOG_ENABLED(INFO)) {
620     CLOG(INFO, this) << "The negotiated protocol is not supported";
621   }
622   return -1;
623 }
624 
do_read()625 int ClientHandler::do_read() { return read_(*this); }
do_write()626 int ClientHandler::do_write() { return write_(*this); }
627 
on_read()628 int ClientHandler::on_read() {
629   if (rb_.chunk_avail()) {
630     auto rv = on_read_(*this);
631     if (rv != 0) {
632       return rv;
633     }
634   }
635   conn_.handle_tls_pending_read();
636   return 0;
637 }
on_write()638 int ClientHandler::on_write() { return on_write_(*this); }
639 
get_ipaddr() const640 const StringRef &ClientHandler::get_ipaddr() const { return ipaddr_; }
641 
get_should_close_after_write() const642 bool ClientHandler::get_should_close_after_write() const {
643   return should_close_after_write_;
644 }
645 
set_should_close_after_write(bool f)646 void ClientHandler::set_should_close_after_write(bool f) {
647   should_close_after_write_ = f;
648 }
649 
pool_downstream_connection(std::unique_ptr<DownstreamConnection> dconn)650 void ClientHandler::pool_downstream_connection(
651     std::unique_ptr<DownstreamConnection> dconn) {
652   if (!dconn->poolable()) {
653     return;
654   }
655 
656   dconn->set_client_handler(nullptr);
657 
658   auto &group = dconn->get_downstream_addr_group();
659 
660   if (LOG_ENABLED(INFO)) {
661     CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get()
662                      << " in group " << group;
663   }
664 
665   auto addr = dconn->get_addr();
666   auto &dconn_pool = addr->dconn_pool;
667   dconn_pool->add_downstream_connection(std::move(dconn));
668 }
669 
670 namespace {
671 // Computes 32bits hash for session affinity for IP address |ip|.
compute_affinity_from_ip(const StringRef & ip)672 uint32_t compute_affinity_from_ip(const StringRef &ip) {
673   int rv;
674   std::array<uint8_t, 32> buf;
675 
676   rv = util::sha256(buf.data(), ip);
677   if (rv != 0) {
678     // Not sure when sha256 failed.  Just fall back to another
679     // function.
680     return util::hash32(ip);
681   }
682 
683   return (static_cast<uint32_t>(buf[0]) << 24) |
684          (static_cast<uint32_t>(buf[1]) << 16) |
685          (static_cast<uint32_t>(buf[2]) << 8) | static_cast<uint32_t>(buf[3]);
686 }
687 } // namespace
688 
get_http2_session(const std::shared_ptr<DownstreamAddrGroup> & group,DownstreamAddr * addr)689 Http2Session *ClientHandler::get_http2_session(
690     const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr) {
691   auto &shared_addr = group->shared_addr;
692 
693   if (LOG_ENABLED(INFO)) {
694     CLOG(INFO, this) << "Selected DownstreamAddr=" << addr
695                      << ", index=" << (addr - shared_addr->addrs.data());
696   }
697 
698   for (auto session = addr->http2_extra_freelist.head; session;) {
699     auto next = session->dlnext;
700 
701     if (session->max_concurrency_reached(0)) {
702       if (LOG_ENABLED(INFO)) {
703         CLOG(INFO, this)
704             << "Maximum streams have been reached for Http2Session(" << session
705             << ").  Skip it";
706       }
707 
708       session->remove_from_freelist();
709       session = next;
710 
711       continue;
712     }
713 
714     if (LOG_ENABLED(INFO)) {
715       CLOG(INFO, this) << "Use Http2Session " << session
716                        << " from http2_extra_freelist";
717     }
718 
719     if (session->max_concurrency_reached(1)) {
720       if (LOG_ENABLED(INFO)) {
721         CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
722                          << session << ").";
723       }
724 
725       session->remove_from_freelist();
726     }
727     return session;
728   }
729 
730   auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
731                                   worker_, group, addr);
732 
733   if (LOG_ENABLED(INFO)) {
734     CLOG(INFO, this) << "Create new Http2Session " << session;
735   }
736 
737   session->add_to_extra_freelist();
738 
739   return session;
740 }
741 
get_affinity_cookie(Downstream * downstream,const StringRef & cookie_name)742 uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
743                                             const StringRef &cookie_name) {
744   auto h = downstream->find_affinity_cookie(cookie_name);
745   if (h) {
746     return h;
747   }
748 
749   auto d = std::uniform_int_distribution<uint32_t>(
750       1, std::numeric_limits<uint32_t>::max());
751   auto rh = d(worker_->get_randgen());
752   h = util::hash32(StringRef{reinterpret_cast<uint8_t *>(&rh),
753                              reinterpret_cast<uint8_t *>(&rh) + sizeof(rh)});
754 
755   downstream->renew_affinity_cookie(h);
756 
757   return h;
758 }
759 
760 namespace {
reschedule_addr(std::priority_queue<DownstreamAddrEntry,std::vector<DownstreamAddrEntry>,DownstreamAddrEntryGreater> & pq,DownstreamAddr * addr)761 void reschedule_addr(
762     std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
763                         DownstreamAddrEntryGreater> &pq,
764     DownstreamAddr *addr) {
765   auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty;
766   addr->cycle += penalty / addr->weight;
767   addr->pending_penalty = penalty % addr->weight;
768 
769   pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
770   addr->queued = true;
771 }
772 } // namespace
773 
774 namespace {
reschedule_wg(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & pq,WeightGroup * wg)775 void reschedule_wg(
776     std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
777                         WeightGroupEntryGreater> &pq,
778     WeightGroup *wg) {
779   auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty;
780   wg->cycle += penalty / wg->weight;
781   wg->pending_penalty = penalty % wg->weight;
782 
783   pq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
784   wg->queued = true;
785 }
786 } // namespace
787 
get_downstream_addr(int & err,DownstreamAddrGroup * group,Downstream * downstream)788 DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
789                                                    DownstreamAddrGroup *group,
790                                                    Downstream *downstream) {
791   err = 0;
792 
793   switch (faddr_->alt_mode) {
794   case UpstreamAltMode::API:
795   case UpstreamAltMode::HEALTHMON:
796     assert(0);
797   default:
798     break;
799   }
800 
801   auto &shared_addr = group->shared_addr;
802 
803   if (shared_addr->affinity.type != SessionAffinity::NONE) {
804     uint32_t hash;
805     switch (shared_addr->affinity.type) {
806     case SessionAffinity::IP:
807       if (!affinity_hash_computed_) {
808         affinity_hash_ = compute_affinity_from_ip(ipaddr_);
809         affinity_hash_computed_ = true;
810       }
811       hash = affinity_hash_;
812       break;
813     case SessionAffinity::COOKIE:
814       hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name);
815       break;
816     default:
817       assert(0);
818     }
819 
820     const auto &affinity_hash = shared_addr->affinity_hash;
821 
822     auto it = std::lower_bound(
823         std::begin(affinity_hash), std::end(affinity_hash), hash,
824         [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
825 
826     if (it == std::end(affinity_hash)) {
827       it = std::begin(affinity_hash);
828     }
829 
830     auto aff_idx =
831         static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
832     auto idx = (*it).idx;
833     auto addr = &shared_addr->addrs[idx];
834 
835     if (addr->connect_blocker->blocked()) {
836       size_t i;
837       for (i = aff_idx + 1; i != aff_idx; ++i) {
838         if (i == shared_addr->affinity_hash.size()) {
839           i = 0;
840         }
841         addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
842         if (addr->connect_blocker->blocked()) {
843           continue;
844         }
845         break;
846       }
847       if (i == aff_idx) {
848         err = -1;
849         return nullptr;
850       }
851       aff_idx = i;
852     }
853 
854     return addr;
855   }
856 
857   auto &wgpq = shared_addr->pq;
858 
859   for (;;) {
860     if (wgpq.empty()) {
861       CLOG(INFO, this) << "No working downstream address found";
862       err = -1;
863       return nullptr;
864     }
865 
866     auto wg = wgpq.top().wg;
867     wgpq.pop();
868     wg->queued = false;
869 
870     for (;;) {
871       if (wg->pq.empty()) {
872         break;
873       }
874 
875       auto addr = wg->pq.top().addr;
876       wg->pq.pop();
877       addr->queued = false;
878 
879       if (addr->connect_blocker->blocked()) {
880         continue;
881       }
882 
883       reschedule_addr(wg->pq, addr);
884       reschedule_wg(wgpq, wg);
885 
886       return addr;
887     }
888   }
889 }
890 
891 std::unique_ptr<DownstreamConnection>
get_downstream_connection(int & err,Downstream * downstream)892 ClientHandler::get_downstream_connection(int &err, Downstream *downstream) {
893   size_t group_idx;
894   auto &downstreamconf = *worker_->get_downstream_config();
895   auto &routerconf = downstreamconf.router;
896 
897   auto catch_all = downstreamconf.addr_group_catch_all;
898   auto &groups = worker_->get_downstream_addr_groups();
899 
900   auto &req = downstream->request();
901 
902   err = 0;
903 
904   switch (faddr_->alt_mode) {
905   case UpstreamAltMode::API: {
906     auto dconn = std::make_unique<APIDownstreamConnection>(worker_);
907     dconn->set_client_handler(this);
908     return dconn;
909   }
910   case UpstreamAltMode::HEALTHMON: {
911     auto dconn = std::make_unique<HealthMonitorDownstreamConnection>();
912     dconn->set_client_handler(this);
913     return dconn;
914   }
915   default:
916     break;
917   }
918 
919   auto &balloc = downstream->get_block_allocator();
920 
921   StringRef authority, path;
922 
923   if (req.forwarded_once) {
924     if (groups.size() != 1) {
925       authority = req.orig_authority;
926       path = req.orig_path;
927     }
928   } else {
929     if (faddr_->sni_fwd) {
930       authority = sni_;
931     } else if (!req.authority.empty()) {
932       authority = req.authority;
933     } else {
934       auto h = req.fs.header(http2::HD_HOST);
935       if (h) {
936         authority = h->value;
937       }
938     }
939 
940     // CONNECT method does not have path.  But we requires path in
941     // host-path mapping.  As workaround, we assume that path is
942     // "/".
943     if (!req.regular_connect_method()) {
944       path = req.path;
945     }
946 
947     // Cache the authority and path used for the first-time backend
948     // selection because per-pattern mruby script can change them.
949     req.orig_authority = authority;
950     req.orig_path = path;
951     req.forwarded_once = true;
952   }
953 
954   // Fast path.  If we have one group, it must be catch-all group.
955   if (groups.size() == 1) {
956     group_idx = 0;
957   } else {
958     group_idx = match_downstream_addr_group(routerconf, authority, path, groups,
959                                             catch_all, balloc);
960   }
961 
962   if (LOG_ENABLED(INFO)) {
963     CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
964   }
965 
966   if (groups[group_idx]->shared_addr->redirect_if_not_tls && !conn_.tls.ssl) {
967     if (LOG_ENABLED(INFO)) {
968       CLOG(INFO, this) << "Downstream address group " << group_idx
969                        << " requires frontend TLS connection.";
970     }
971     err = SHRPX_ERR_TLS_REQUIRED;
972     return nullptr;
973   }
974 
975   auto &group = groups[group_idx];
976   auto addr = get_downstream_addr(err, group.get(), downstream);
977   if (addr == nullptr) {
978     return nullptr;
979   }
980 
981   if (addr->proto == Proto::HTTP1) {
982     auto dconn = addr->dconn_pool->pop_downstream_connection();
983     if (dconn) {
984       dconn->set_client_handler(this);
985       return dconn;
986     }
987 
988     if (worker_->get_connect_blocker()->blocked()) {
989       if (LOG_ENABLED(INFO)) {
990         DCLOG(INFO, this)
991             << "Worker wide backend connection was blocked temporarily";
992       }
993       return nullptr;
994     }
995 
996     if (LOG_ENABLED(INFO)) {
997       CLOG(INFO, this) << "Downstream connection pool is empty."
998                        << " Create new one";
999     }
1000 
1001     dconn = std::make_unique<HttpDownstreamConnection>(group, addr, conn_.loop,
1002                                                        worker_);
1003     dconn->set_client_handler(this);
1004     return dconn;
1005   }
1006 
1007   if (LOG_ENABLED(INFO)) {
1008     CLOG(INFO, this) << "Downstream connection pool is empty."
1009                      << " Create new one";
1010   }
1011 
1012   auto http2session = get_http2_session(group, addr);
1013   auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
1014   dconn->set_client_handler(this);
1015   return dconn;
1016 }
1017 
get_mcpool()1018 MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); }
1019 
get_ssl() const1020 SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
1021 
direct_http2_upgrade()1022 void ClientHandler::direct_http2_upgrade() {
1023   upstream_ = std::make_unique<Http2Upstream>(this);
1024   alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1025   on_read_ = &ClientHandler::upstream_read;
1026   write_ = &ClientHandler::write_clear;
1027 }
1028 
perform_http2_upgrade(HttpsUpstream * http)1029 int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
1030   auto upstream = std::make_unique<Http2Upstream>(this);
1031 
1032   auto output = upstream->get_response_buf();
1033 
1034   // We might have written non-final header in response_buf, in this
1035   // case, response_state is still INITIAL.  If this non-final header
1036   // and upgrade header fit in output buffer, do upgrade.  Otherwise,
1037   // to avoid to send this non-final header as response body in HTTP/2
1038   // upstream, fail upgrade.
1039   auto downstream = http->get_downstream();
1040   auto input = downstream->get_response_buf();
1041 
1042   if (upstream->upgrade_upstream(http) != 0) {
1043     return -1;
1044   }
1045   // http pointer is now owned by upstream.
1046   upstream_.release();
1047   // TODO We might get other version id in HTTP2-settings, if we
1048   // support aliasing for h2, but we just use library default for now.
1049   alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1050   on_read_ = &ClientHandler::upstream_http2_connhd_read;
1051   write_ = &ClientHandler::write_clear;
1052 
1053   input->remove(*output, input->rleft());
1054 
1055   constexpr auto res =
1056       StringRef::from_lit("HTTP/1.1 101 Switching Protocols\r\n"
1057                           "Connection: Upgrade\r\n"
1058                           "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
1059                           "\r\n");
1060 
1061   output->append(res);
1062   upstream_ = std::move(upstream);
1063 
1064   signal_write();
1065   return 0;
1066 }
1067 
get_http2_upgrade_allowed() const1068 bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; }
1069 
get_upstream_scheme() const1070 StringRef ClientHandler::get_upstream_scheme() const {
1071   if (conn_.tls.ssl) {
1072     return StringRef::from_lit("https");
1073   } else {
1074     return StringRef::from_lit("http");
1075   }
1076 }
1077 
start_immediate_shutdown()1078 void ClientHandler::start_immediate_shutdown() {
1079   ev_timer_start(conn_.loop, &reneg_shutdown_timer_);
1080 }
1081 
write_accesslog(Downstream * downstream)1082 void ClientHandler::write_accesslog(Downstream *downstream) {
1083   auto &req = downstream->request();
1084 
1085   auto config = get_config();
1086 
1087   if (!req.tstamp) {
1088     auto lgconf = log_config();
1089     lgconf->update_tstamp(std::chrono::system_clock::now());
1090     req.tstamp = lgconf->tstamp;
1091   }
1092 
1093   upstream_accesslog(
1094       config->logging.access.format,
1095       LogSpec{
1096           downstream,
1097           ipaddr_,
1098           alpn_,
1099           sni_,
1100           conn_.tls.ssl,
1101           std::chrono::high_resolution_clock::now(), // request_end_time
1102           port_,
1103           faddr_->port,
1104           config->pid,
1105       });
1106 }
1107 
get_rb()1108 ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
1109 
signal_write()1110 void ClientHandler::signal_write() { conn_.wlimit.startw(); }
1111 
get_rlimit()1112 RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; }
get_wlimit()1113 RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; }
1114 
get_wev()1115 ev_io *ClientHandler::get_wev() { return &conn_.wev; }
1116 
get_worker() const1117 Worker *ClientHandler::get_worker() const { return worker_; }
1118 
1119 namespace {
parse_proxy_line_port(const uint8_t * first,const uint8_t * last)1120 ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {
1121   auto p = first;
1122   int32_t port = 0;
1123 
1124   if (p == last) {
1125     return -1;
1126   }
1127 
1128   if (*p == '0') {
1129     if (p + 1 != last && util::is_digit(*(p + 1))) {
1130       return -1;
1131     }
1132     return 1;
1133   }
1134 
1135   for (; p != last && util::is_digit(*p); ++p) {
1136     port *= 10;
1137     port += *p - '0';
1138 
1139     if (port > 65535) {
1140       return -1;
1141     }
1142   }
1143 
1144   return p - first;
1145 }
1146 } // namespace
1147 
on_proxy_protocol_finish()1148 int ClientHandler::on_proxy_protocol_finish() {
1149   if (conn_.tls.ssl) {
1150     conn_.tls.rbuf.append(rb_.pos(), rb_.rleft());
1151     rb_.reset();
1152   }
1153 
1154   setup_upstream_io_callback();
1155 
1156   // Run on_read to process data left in buffer since they are not
1157   // notified further
1158   if (on_read() != 0) {
1159     return -1;
1160   }
1161 
1162   return 0;
1163 }
1164 
1165 namespace {
1166 // PROXY-protocol v2 header signature
1167 constexpr uint8_t PROXY_PROTO_V2_SIG[] =
1168     "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
1169 
1170 // PROXY-protocol v2 header length
1171 constexpr size_t PROXY_PROTO_V2_HDLEN =
1172     str_size(PROXY_PROTO_V2_SIG) + /* ver_cmd(1) + fam(1) + len(2) = */ 4;
1173 } // namespace
1174 
1175 // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
proxy_protocol_read()1176 int ClientHandler::proxy_protocol_read() {
1177   if (LOG_ENABLED(INFO)) {
1178     CLOG(INFO, this) << "PROXY-protocol: Started";
1179   }
1180 
1181   auto first = rb_.pos();
1182 
1183   if (rb_.rleft() >= PROXY_PROTO_V2_HDLEN &&
1184       (*(first + str_size(PROXY_PROTO_V2_SIG)) & 0xf0) == 0x20) {
1185     if (LOG_ENABLED(INFO)) {
1186       CLOG(INFO, this) << "PROXY-protocol: Detected v2 header signature";
1187     }
1188     return proxy_protocol_v2_read();
1189   }
1190 
1191   // NULL character really destroys functions which expects NULL
1192   // terminated string.  We won't expect it in PROXY protocol line, so
1193   // find it here.
1194   auto chrs = std::array<char, 2>{'\n', '\0'};
1195 
1196   constexpr size_t MAX_PROXY_LINELEN = 107;
1197 
1198   auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());
1199 
1200   auto end =
1201       std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));
1202 
1203   if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
1204     if (LOG_ENABLED(INFO)) {
1205       CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
1206     }
1207     return -1;
1208   }
1209 
1210   --end;
1211 
1212   constexpr auto HEADER = StringRef::from_lit("PROXY ");
1213 
1214   if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
1215     if (LOG_ENABLED(INFO)) {
1216       CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
1217     }
1218     return -1;
1219   }
1220 
1221   if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
1222     if (LOG_ENABLED(INFO)) {
1223       CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
1224     }
1225     return -1;
1226   }
1227 
1228   rb_.drain(HEADER.size());
1229 
1230   int family;
1231 
1232   if (rb_.pos()[0] == 'T') {
1233     if (end - rb_.pos() < 5) {
1234       if (LOG_ENABLED(INFO)) {
1235         CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1236       }
1237       return -1;
1238     }
1239 
1240     if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
1241       if (LOG_ENABLED(INFO)) {
1242         CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1243       }
1244       return -1;
1245     }
1246 
1247     switch (rb_.pos()[3]) {
1248     case '4':
1249       family = AF_INET;
1250       break;
1251     case '6':
1252       family = AF_INET6;
1253       break;
1254     default:
1255       if (LOG_ENABLED(INFO)) {
1256         CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1257       }
1258       return -1;
1259     }
1260 
1261     rb_.drain(5);
1262   } else {
1263     if (end - rb_.pos() < 7) {
1264       if (LOG_ENABLED(INFO)) {
1265         CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1266       }
1267       return -1;
1268     }
1269     if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
1270       if (LOG_ENABLED(INFO)) {
1271         CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1272       }
1273       return -1;
1274     }
1275 
1276     rb_.drain(end + 2 - rb_.pos());
1277 
1278     return on_proxy_protocol_finish();
1279   }
1280 
1281   // source address
1282   auto token_end = std::find(rb_.pos(), end, ' ');
1283   if (token_end == end) {
1284     if (LOG_ENABLED(INFO)) {
1285       CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
1286     }
1287     return -1;
1288   }
1289 
1290   *token_end = '\0';
1291   if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1292     if (LOG_ENABLED(INFO)) {
1293       CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
1294     }
1295     return -1;
1296   }
1297 
1298   auto src_addr = rb_.pos();
1299   auto src_addrlen = token_end - rb_.pos();
1300 
1301   rb_.drain(token_end - rb_.pos() + 1);
1302 
1303   // destination address
1304   token_end = std::find(rb_.pos(), end, ' ');
1305   if (token_end == end) {
1306     if (LOG_ENABLED(INFO)) {
1307       CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
1308     }
1309     return -1;
1310   }
1311 
1312   *token_end = '\0';
1313   if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1314     if (LOG_ENABLED(INFO)) {
1315       CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
1316     }
1317     return -1;
1318   }
1319 
1320   // Currently we don't use destination address
1321 
1322   rb_.drain(token_end - rb_.pos() + 1);
1323 
1324   // source port
1325   auto n = parse_proxy_line_port(rb_.pos(), end);
1326   if (n <= 0 || *(rb_.pos() + n) != ' ') {
1327     if (LOG_ENABLED(INFO)) {
1328       CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
1329     }
1330     return -1;
1331   }
1332 
1333   rb_.pos()[n] = '\0';
1334   auto src_port = rb_.pos();
1335   auto src_portlen = n;
1336 
1337   rb_.drain(n + 1);
1338 
1339   // destination  port
1340   n = parse_proxy_line_port(rb_.pos(), end);
1341   if (n <= 0 || rb_.pos() + n != end) {
1342     if (LOG_ENABLED(INFO)) {
1343       CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
1344     }
1345     return -1;
1346   }
1347 
1348   // Currently we don't use destination port
1349 
1350   rb_.drain(end + 2 - rb_.pos());
1351 
1352   ipaddr_ =
1353       make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
1354   port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});
1355 
1356   if (LOG_ENABLED(INFO)) {
1357     CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
1358                      << " bytes read";
1359   }
1360 
1361   auto config = get_config();
1362   auto &fwdconf = config->http.forwarded;
1363 
1364   if ((fwdconf.params & FORWARDED_FOR) &&
1365       fwdconf.for_node_type == ForwardedNode::IP) {
1366     init_forwarded_for(family, ipaddr_);
1367   }
1368 
1369   return on_proxy_protocol_finish();
1370 }
1371 
proxy_protocol_v2_read()1372 int ClientHandler::proxy_protocol_v2_read() {
1373   // Assume that first str_size(PROXY_PROTO_V2_SIG) octets match v2
1374   // protocol signature and followed by the bytes which indicates v2.
1375   assert(rb_.rleft() >= PROXY_PROTO_V2_HDLEN);
1376 
1377   auto p = rb_.pos() + str_size(PROXY_PROTO_V2_SIG);
1378 
1379   assert(((*p) & 0xf0) == 0x20);
1380 
1381   enum { LOCAL, PROXY } cmd;
1382 
1383   auto cmd_bits = (*p++) & 0xf;
1384   switch (cmd_bits) {
1385   case 0x0:
1386     cmd = LOCAL;
1387     break;
1388   case 0x01:
1389     cmd = PROXY;
1390     break;
1391   default:
1392     if (LOG_ENABLED(INFO)) {
1393       CLOG(INFO, this) << "PROXY-protocol-v2: Unknown command " << log::hex
1394                        << cmd_bits;
1395     }
1396     return -1;
1397   }
1398 
1399   auto fam = *p++;
1400   uint16_t len;
1401   memcpy(&len, p, sizeof(len));
1402   len = ntohs(len);
1403 
1404   p += sizeof(len);
1405 
1406   if (LOG_ENABLED(INFO)) {
1407     CLOG(INFO, this) << "PROXY-protocol-v2: Detected family=" << log::hex << fam
1408                      << ", len=" << log::dec << len;
1409   }
1410 
1411   if (rb_.last() - p < len) {
1412     if (LOG_ENABLED(INFO)) {
1413       CLOG(INFO, this)
1414           << "PROXY-protocol-v2: Prematurely truncated header block; require "
1415           << len << " bytes, " << rb_.last() - p << " bytes left";
1416     }
1417     return -1;
1418   }
1419 
1420   int family;
1421   std::array<char, std::max(INET_ADDRSTRLEN, INET6_ADDRSTRLEN)> src_addr,
1422       dst_addr;
1423   size_t addrlen;
1424 
1425   switch (fam) {
1426   case 0x11:
1427   case 0x12:
1428     if (len < 12) {
1429       if (LOG_ENABLED(INFO)) {
1430         CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET addresses";
1431       }
1432       return -1;
1433     }
1434     family = AF_INET;
1435     addrlen = 4;
1436     break;
1437   case 0x21:
1438   case 0x22:
1439     if (len < 36) {
1440       if (LOG_ENABLED(INFO)) {
1441         CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET6 addresses";
1442       }
1443       return -1;
1444     }
1445     family = AF_INET6;
1446     addrlen = 16;
1447     break;
1448   case 0x31:
1449   case 0x32:
1450     if (len < 216) {
1451       if (LOG_ENABLED(INFO)) {
1452         CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_UNIX addresses";
1453       }
1454       return -1;
1455     }
1456     // fall through
1457   case 0x00: {
1458     // UNSPEC and UNIX are just ignored.
1459     if (LOG_ENABLED(INFO)) {
1460       CLOG(INFO, this) << "PROXY-protocol-v2: Ignore combination of address "
1461                           "family and protocol "
1462                        << log::hex << fam;
1463     }
1464     rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1465     return on_proxy_protocol_finish();
1466   }
1467   default:
1468     if (LOG_ENABLED(INFO)) {
1469       CLOG(INFO, this) << "PROXY-protocol-v2: Unknown combination of address "
1470                           "family and protocol "
1471                        << log::hex << fam;
1472     }
1473     return -1;
1474   }
1475 
1476   if (cmd != PROXY) {
1477     if (LOG_ENABLED(INFO)) {
1478       CLOG(INFO, this) << "PROXY-protocol-v2: Ignore non-PROXY command";
1479     }
1480     rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1481     return on_proxy_protocol_finish();
1482   }
1483 
1484   if (inet_ntop(family, p, src_addr.data(), src_addr.size()) == nullptr) {
1485     if (LOG_ENABLED(INFO)) {
1486       CLOG(INFO, this) << "PROXY-protocol-v2: Unable to parse source address";
1487     }
1488     return -1;
1489   }
1490 
1491   p += addrlen;
1492 
1493   if (inet_ntop(family, p, dst_addr.data(), dst_addr.size()) == nullptr) {
1494     if (LOG_ENABLED(INFO)) {
1495       CLOG(INFO, this)
1496           << "PROXY-protocol-v2: Unable to parse destination address";
1497     }
1498     return -1;
1499   }
1500 
1501   p += addrlen;
1502 
1503   uint16_t src_port;
1504 
1505   memcpy(&src_port, p, sizeof(src_port));
1506   src_port = ntohs(src_port);
1507 
1508   // We don't use destination port.
1509   p += 4;
1510 
1511   ipaddr_ = make_string_ref(balloc_, StringRef{src_addr.data()});
1512   port_ = util::make_string_ref_uint(balloc_, src_port);
1513 
1514   if (LOG_ENABLED(INFO)) {
1515     CLOG(INFO, this) << "PROXY-protocol-v2: Finished reading proxy addresses, "
1516                      << p - rb_.pos() << " bytes read, "
1517                      << PROXY_PROTO_V2_HDLEN + len - (p - rb_.pos())
1518                      << " bytes left";
1519   }
1520 
1521   auto config = get_config();
1522   auto &fwdconf = config->http.forwarded;
1523 
1524   if ((fwdconf.params & FORWARDED_FOR) &&
1525       fwdconf.for_node_type == ForwardedNode::IP) {
1526     init_forwarded_for(family, ipaddr_);
1527   }
1528 
1529   rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1530   return on_proxy_protocol_finish();
1531 }
1532 
get_forwarded_by() const1533 StringRef ClientHandler::get_forwarded_by() const {
1534   auto &fwdconf = get_config()->http.forwarded;
1535 
1536   if (fwdconf.by_node_type == ForwardedNode::OBFUSCATED) {
1537     return fwdconf.by_obfuscated;
1538   }
1539 
1540   return faddr_->hostport;
1541 }
1542 
get_forwarded_for() const1543 StringRef ClientHandler::get_forwarded_for() const { return forwarded_for_; }
1544 
get_upstream_addr() const1545 const UpstreamAddr *ClientHandler::get_upstream_addr() const { return faddr_; }
1546 
get_connection()1547 Connection *ClientHandler::get_connection() { return &conn_; };
1548 
set_tls_sni(const StringRef & sni)1549 void ClientHandler::set_tls_sni(const StringRef &sni) {
1550   sni_ = make_string_ref(balloc_, sni);
1551 }
1552 
get_tls_sni() const1553 StringRef ClientHandler::get_tls_sni() const { return sni_; }
1554 
get_alpn() const1555 StringRef ClientHandler::get_alpn() const { return alpn_; }
1556 
get_block_allocator()1557 BlockAllocator &ClientHandler::get_block_allocator() { return balloc_; }
1558 
1559 } // namespace shrpx
1560