• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_client_handler.h"
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 #ifdef HAVE_SYS_SOCKET_H
31 #  include <sys/socket.h>
32 #endif // HAVE_SYS_SOCKET_H
33 #ifdef HAVE_NETDB_H
34 #  include <netdb.h>
35 #endif // HAVE_NETDB_H
36 
37 #include <cerrno>
38 
39 #include "shrpx_upstream.h"
40 #include "shrpx_http2_upstream.h"
41 #include "shrpx_https_upstream.h"
42 #include "shrpx_config.h"
43 #include "shrpx_http_downstream_connection.h"
44 #include "shrpx_http2_downstream_connection.h"
45 #include "shrpx_tls.h"
46 #include "shrpx_worker.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "shrpx_downstream.h"
49 #include "shrpx_http2_session.h"
50 #include "shrpx_connect_blocker.h"
51 #include "shrpx_api_downstream_connection.h"
52 #include "shrpx_health_monitor_downstream_connection.h"
53 #include "shrpx_null_downstream_connection.h"
54 #ifdef ENABLE_HTTP3
55 #  include "shrpx_http3_upstream.h"
56 #endif // ENABLE_HTTP3
57 #include "shrpx_log.h"
58 #include "util.h"
59 #include "template.h"
60 #include "tls.h"
61 
62 using namespace nghttp2;
63 
64 namespace shrpx {
65 
66 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)67 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
68   auto conn = static_cast<Connection *>(w->data);
69   auto handler = static_cast<ClientHandler *>(conn->data);
70 
71   if (LOG_ENABLED(INFO)) {
72     CLOG(INFO, handler) << "Time out";
73   }
74 
75   delete handler;
76 }
77 } // namespace
78 
79 namespace {
shutdowncb(struct ev_loop * loop,ev_timer * w,int revents)80 void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) {
81   auto handler = static_cast<ClientHandler *>(w->data);
82 
83   if (LOG_ENABLED(INFO)) {
84     CLOG(INFO, handler) << "Close connection due to TLS renegotiation";
85   }
86 
87   delete handler;
88 }
89 } // namespace
90 
91 namespace {
readcb(struct ev_loop * loop,ev_io * w,int revents)92 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
93   auto conn = static_cast<Connection *>(w->data);
94   auto handler = static_cast<ClientHandler *>(conn->data);
95 
96   if (handler->do_read() != 0) {
97     delete handler;
98     return;
99   }
100 }
101 } // namespace
102 
103 namespace {
writecb(struct ev_loop * loop,ev_io * w,int revents)104 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
105   auto conn = static_cast<Connection *>(w->data);
106   auto handler = static_cast<ClientHandler *>(conn->data);
107 
108   if (handler->do_write() != 0) {
109     delete handler;
110     return;
111   }
112 }
113 } // namespace
114 
noop()115 int ClientHandler::noop() { return 0; }
116 
read_clear()117 int ClientHandler::read_clear() {
118   auto should_break = false;
119   rb_.ensure_chunk();
120   for (;;) {
121     if (rb_.rleft() && on_read() != 0) {
122       return -1;
123     }
124     if (rb_.rleft() == 0) {
125       rb_.reset();
126     } else if (rb_.wleft() == 0) {
127       conn_.rlimit.stopw();
128       return 0;
129     }
130 
131     if (!ev_is_active(&conn_.rev) || should_break) {
132       return 0;
133     }
134 
135     auto nread = conn_.read_clear(rb_.last(), rb_.wleft());
136 
137     if (nread == 0) {
138       if (rb_.rleft() == 0) {
139         rb_.release_chunk();
140       }
141       return 0;
142     }
143 
144     if (nread < 0) {
145       return -1;
146     }
147 
148     rb_.write(nread);
149     should_break = true;
150   }
151 }
152 
write_clear()153 int ClientHandler::write_clear() {
154   std::array<iovec, 2> iov;
155 
156   for (;;) {
157     if (on_write() != 0) {
158       return -1;
159     }
160 
161     auto iovcnt = upstream_->response_riovec(iov.data(), iov.size());
162     if (iovcnt == 0) {
163       break;
164     }
165 
166     auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
167     if (nwrite < 0) {
168       return -1;
169     }
170 
171     if (nwrite == 0) {
172       return 0;
173     }
174 
175     upstream_->response_drain(nwrite);
176   }
177 
178   conn_.wlimit.stopw();
179   ev_timer_stop(conn_.loop, &conn_.wt);
180 
181   return 0;
182 }
183 
proxy_protocol_peek_clear()184 int ClientHandler::proxy_protocol_peek_clear() {
185   rb_.ensure_chunk();
186 
187   assert(rb_.rleft() == 0);
188 
189   auto nread = conn_.peek_clear(rb_.last(), rb_.wleft());
190   if (nread < 0) {
191     return -1;
192   }
193   if (nread == 0) {
194     return 0;
195   }
196 
197   if (LOG_ENABLED(INFO)) {
198     CLOG(INFO, this) << "PROXY-protocol: Peek " << nread
199                      << " bytes from socket";
200   }
201 
202   rb_.write(nread);
203 
204   if (on_read() != 0) {
205     return -1;
206   }
207 
208   rb_.reset();
209 
210   return 0;
211 }
212 
tls_handshake()213 int ClientHandler::tls_handshake() {
214   ev_timer_again(conn_.loop, &conn_.rt);
215 
216   ERR_clear_error();
217 
218   auto rv = conn_.tls_handshake();
219 
220   if (rv == SHRPX_ERR_INPROGRESS) {
221     return 0;
222   }
223 
224   if (rv < 0) {
225     return -1;
226   }
227 
228   if (LOG_ENABLED(INFO)) {
229     CLOG(INFO, this) << "SSL/TLS handshake completed";
230   }
231 
232   if (validate_next_proto() != 0) {
233     return -1;
234   }
235 
236   read_ = &ClientHandler::read_tls;
237   write_ = &ClientHandler::write_tls;
238 
239   return 0;
240 }
241 
read_tls()242 int ClientHandler::read_tls() {
243   auto should_break = false;
244 
245   ERR_clear_error();
246 
247   rb_.ensure_chunk();
248 
249   for (;;) {
250     // we should process buffered data first before we read EOF.
251     if (rb_.rleft() && on_read() != 0) {
252       return -1;
253     }
254     if (rb_.rleft() == 0) {
255       rb_.reset();
256     } else if (rb_.wleft() == 0) {
257       conn_.rlimit.stopw();
258       return 0;
259     }
260 
261     if (!ev_is_active(&conn_.rev) || should_break) {
262       return 0;
263     }
264 
265     auto nread = conn_.read_tls(rb_.last(), rb_.wleft());
266 
267     if (nread == 0) {
268       if (rb_.rleft() == 0) {
269         rb_.release_chunk();
270       }
271       return 0;
272     }
273 
274     if (nread < 0) {
275       return -1;
276     }
277 
278     rb_.write(nread);
279     should_break = true;
280   }
281 }
282 
write_tls()283 int ClientHandler::write_tls() {
284   struct iovec iov;
285 
286   ERR_clear_error();
287 
288   if (on_write() != 0) {
289     return -1;
290   }
291 
292   auto iovcnt = upstream_->response_riovec(&iov, 1);
293   if (iovcnt == 0) {
294     conn_.start_tls_write_idle();
295 
296     conn_.wlimit.stopw();
297     ev_timer_stop(conn_.loop, &conn_.wt);
298 
299     return 0;
300   }
301 
302   for (;;) {
303     auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
304     if (nwrite < 0) {
305       return -1;
306     }
307 
308     if (nwrite == 0) {
309       return 0;
310     }
311 
312     upstream_->response_drain(nwrite);
313 
314     iovcnt = upstream_->response_riovec(&iov, 1);
315     if (iovcnt == 0) {
316       return 0;
317     }
318   }
319 }
320 
321 #ifdef ENABLE_HTTP3
read_quic(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen)322 int ClientHandler::read_quic(const UpstreamAddr *faddr,
323                              const Address &remote_addr,
324                              const Address &local_addr,
325                              const ngtcp2_pkt_info &pi, const uint8_t *data,
326                              size_t datalen) {
327   auto upstream = static_cast<Http3Upstream *>(upstream_.get());
328 
329   return upstream->on_read(faddr, remote_addr, local_addr, pi, data, datalen);
330 }
331 
write_quic()332 int ClientHandler::write_quic() { return upstream_->on_write(); }
333 #endif // ENABLE_HTTP3
334 
upstream_noop()335 int ClientHandler::upstream_noop() { return 0; }
336 
upstream_read()337 int ClientHandler::upstream_read() {
338   assert(upstream_);
339   if (upstream_->on_read() != 0) {
340     return -1;
341   }
342   return 0;
343 }
344 
upstream_write()345 int ClientHandler::upstream_write() {
346   assert(upstream_);
347   if (upstream_->on_write() != 0) {
348     return -1;
349   }
350 
351   if (get_should_close_after_write() && upstream_->response_empty()) {
352     return -1;
353   }
354 
355   return 0;
356 }
357 
upstream_http2_connhd_read()358 int ClientHandler::upstream_http2_connhd_read() {
359   auto nread = std::min(left_connhd_len_, rb_.rleft());
360   if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
361              rb_.pos(), nread) != 0) {
362     // There is no downgrade path here. Just drop the connection.
363     if (LOG_ENABLED(INFO)) {
364       CLOG(INFO, this) << "invalid client connection header";
365     }
366 
367     return -1;
368   }
369 
370   left_connhd_len_ -= nread;
371   rb_.drain(nread);
372   conn_.rlimit.startw();
373 
374   if (left_connhd_len_ == 0) {
375     on_read_ = &ClientHandler::upstream_read;
376     // Run on_read to process data left in buffer since they are not
377     // notified further
378     if (on_read() != 0) {
379       return -1;
380     }
381     return 0;
382   }
383 
384   return 0;
385 }
386 
upstream_http1_connhd_read()387 int ClientHandler::upstream_http1_connhd_read() {
388   auto nread = std::min(left_connhd_len_, rb_.rleft());
389   if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
390              rb_.pos(), nread) != 0) {
391     if (LOG_ENABLED(INFO)) {
392       CLOG(INFO, this) << "This is HTTP/1.1 connection, "
393                        << "but may be upgraded to HTTP/2 later.";
394     }
395 
396     // Reset header length for later HTTP/2 upgrade
397     left_connhd_len_ = NGHTTP2_CLIENT_MAGIC_LEN;
398     on_read_ = &ClientHandler::upstream_read;
399     on_write_ = &ClientHandler::upstream_write;
400 
401     if (on_read() != 0) {
402       return -1;
403     }
404 
405     return 0;
406   }
407 
408   left_connhd_len_ -= nread;
409   rb_.drain(nread);
410   conn_.rlimit.startw();
411 
412   if (left_connhd_len_ == 0) {
413     if (LOG_ENABLED(INFO)) {
414       CLOG(INFO, this) << "direct HTTP/2 connection";
415     }
416 
417     direct_http2_upgrade();
418     on_read_ = &ClientHandler::upstream_read;
419     on_write_ = &ClientHandler::upstream_write;
420 
421     // Run on_read to process data left in buffer since they are not
422     // notified further
423     if (on_read() != 0) {
424       return -1;
425     }
426 
427     return 0;
428   }
429 
430   return 0;
431 }
432 
ClientHandler(Worker * worker,int fd,SSL * ssl,const StringRef & ipaddr,const StringRef & port,int family,const UpstreamAddr * faddr)433 ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
434                              const StringRef &ipaddr, const StringRef &port,
435                              int family, const UpstreamAddr *faddr)
436     : // We use balloc_ for TLS session ID (64), ipaddr (IPv6) (39),
437       // port (5), forwarded-for (IPv6) (41), alpn (5), proxyproto
438       // ipaddr (15), proxyproto port (5), sni (32, estimated).  we
439       // need terminal NULL byte for each.  We also require 8 bytes
440       // header for each allocation.  We align at 16 bytes boundary,
441       // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
442       // 32 + 8 + 8 * 8 = 328.
443       balloc_(512, 512),
444       rb_(worker->get_mcpool()),
445       conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
446             get_config()->conn.upstream.timeout.write,
447             get_config()->conn.upstream.timeout.read,
448             get_config()->conn.upstream.ratelimit.write,
449             get_config()->conn.upstream.ratelimit.read, writecb, readcb,
450             timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
451             get_config()->tls.dyn_rec.idle_timeout,
452             faddr->quic ? Proto::HTTP3 : Proto::NONE),
453       ipaddr_(make_string_ref(balloc_, ipaddr)),
454       port_(make_string_ref(balloc_, port)),
455       faddr_(faddr),
456       worker_(worker),
457       left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
458       affinity_hash_(0),
459       should_close_after_write_(false),
460       affinity_hash_computed_(false) {
461 
462   ++worker_->get_worker_stat()->num_connections;
463 
464   ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.);
465 
466   reneg_shutdown_timer_.data = this;
467 
468   if (!faddr->quic) {
469     conn_.rlimit.startw();
470   }
471   ev_timer_again(conn_.loop, &conn_.rt);
472 
473   auto config = get_config();
474 
475   if (!faddr->quic) {
476     if (faddr_->accept_proxy_protocol ||
477         config->conn.upstream.accept_proxy_protocol) {
478       read_ = &ClientHandler::proxy_protocol_peek_clear;
479       write_ = &ClientHandler::noop;
480       on_read_ = &ClientHandler::proxy_protocol_read;
481       on_write_ = &ClientHandler::upstream_noop;
482     } else {
483       setup_upstream_io_callback();
484     }
485   }
486 
487   auto &fwdconf = config->http.forwarded;
488 
489   if (fwdconf.params & FORWARDED_FOR) {
490     if (fwdconf.for_node_type == ForwardedNode::OBFUSCATED) {
491       // 1 for '_'
492       auto len = SHRPX_OBFUSCATED_NODE_LENGTH + 1;
493       // 1 for terminating NUL.
494       auto buf = make_byte_ref(balloc_, len + 1);
495       auto p = buf.base;
496       *p++ = '_';
497       p = util::random_alpha_digit(p, p + SHRPX_OBFUSCATED_NODE_LENGTH,
498                                    worker_->get_randgen());
499       *p = '\0';
500 
501       forwarded_for_ = StringRef{buf.base, p};
502     } else {
503       init_forwarded_for(family, ipaddr_);
504     }
505   }
506 }
507 
init_forwarded_for(int family,const StringRef & ipaddr)508 void ClientHandler::init_forwarded_for(int family, const StringRef &ipaddr) {
509   if (family == AF_INET6) {
510     // 2 for '[' and ']'
511     auto len = 2 + ipaddr.size();
512     // 1 for terminating NUL.
513     auto buf = make_byte_ref(balloc_, len + 1);
514     auto p = buf.base;
515     *p++ = '[';
516     p = std::copy(std::begin(ipaddr), std::end(ipaddr), p);
517     *p++ = ']';
518     *p = '\0';
519 
520     forwarded_for_ = StringRef{buf.base, p};
521   } else {
522     // family == AF_INET or family == AF_UNIX
523     forwarded_for_ = ipaddr;
524   }
525 }
526 
setup_upstream_io_callback()527 void ClientHandler::setup_upstream_io_callback() {
528   if (conn_.tls.ssl) {
529     conn_.prepare_server_handshake();
530     read_ = write_ = &ClientHandler::tls_handshake;
531     on_read_ = &ClientHandler::upstream_noop;
532     on_write_ = &ClientHandler::upstream_write;
533   } else {
534     // For non-TLS version, first create HttpsUpstream. It may be
535     // upgraded to HTTP/2 through HTTP Upgrade or direct HTTP/2
536     // connection.
537     upstream_ = std::make_unique<HttpsUpstream>(this);
538     alpn_ = StringRef::from_lit("http/1.1");
539     read_ = &ClientHandler::read_clear;
540     write_ = &ClientHandler::write_clear;
541     on_read_ = &ClientHandler::upstream_http1_connhd_read;
542     on_write_ = &ClientHandler::upstream_noop;
543   }
544 }
545 
546 #ifdef ENABLE_HTTP3
setup_http3_upstream(std::unique_ptr<Http3Upstream> && upstream)547 void ClientHandler::setup_http3_upstream(
548     std::unique_ptr<Http3Upstream> &&upstream) {
549   upstream_ = std::move(upstream);
550   write_ = &ClientHandler::write_quic;
551 
552   auto config = get_config();
553 
554   reset_upstream_read_timeout(config->conn.upstream.timeout.http3_read);
555 }
556 #endif // ENABLE_HTTP3
557 
~ClientHandler()558 ClientHandler::~ClientHandler() {
559   if (LOG_ENABLED(INFO)) {
560     CLOG(INFO, this) << "Deleting";
561   }
562 
563   if (upstream_) {
564     upstream_->on_handler_delete();
565   }
566 
567   auto worker_stat = worker_->get_worker_stat();
568   --worker_stat->num_connections;
569 
570   if (worker_stat->num_connections == 0) {
571     worker_->schedule_clear_mcpool();
572   }
573 
574   ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
575 
576   // TODO If backend is http/2, and it is in CONNECTED state, signal
577   // it and make it loopbreak when output is zero.
578   if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0 &&
579       worker_stat->num_close_waits == 0) {
580     ev_break(conn_.loop);
581   }
582 
583   if (LOG_ENABLED(INFO)) {
584     CLOG(INFO, this) << "Deleted";
585   }
586 }
587 
get_upstream()588 Upstream *ClientHandler::get_upstream() { return upstream_.get(); }
589 
get_loop() const590 struct ev_loop *ClientHandler::get_loop() const {
591   return conn_.loop;
592 }
593 
reset_upstream_read_timeout(ev_tstamp t)594 void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) {
595   conn_.rt.repeat = t;
596   if (ev_is_active(&conn_.rt)) {
597     ev_timer_again(conn_.loop, &conn_.rt);
598   }
599 }
600 
reset_upstream_write_timeout(ev_tstamp t)601 void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) {
602   conn_.wt.repeat = t;
603   if (ev_is_active(&conn_.wt)) {
604     ev_timer_again(conn_.loop, &conn_.wt);
605   }
606 }
607 
repeat_read_timer()608 void ClientHandler::repeat_read_timer() {
609   ev_timer_again(conn_.loop, &conn_.rt);
610 }
611 
stop_read_timer()612 void ClientHandler::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); }
613 
validate_next_proto()614 int ClientHandler::validate_next_proto() {
615   const unsigned char *next_proto = nullptr;
616   unsigned int next_proto_len = 0;
617 
618   // First set callback for catch all cases
619   on_read_ = &ClientHandler::upstream_read;
620 
621 #ifndef OPENSSL_NO_NEXTPROTONEG
622   SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
623 #endif // !OPENSSL_NO_NEXTPROTONEG
624 #if OPENSSL_VERSION_NUMBER >= 0x10002000L
625   if (next_proto == nullptr) {
626     SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
627   }
628 #endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
629 
630   StringRef proto;
631 
632   if (next_proto) {
633     proto = StringRef{next_proto, next_proto_len};
634 
635     if (LOG_ENABLED(INFO)) {
636       CLOG(INFO, this) << "The negotiated next protocol: " << proto;
637     }
638   } else {
639     if (LOG_ENABLED(INFO)) {
640       CLOG(INFO, this) << "No protocol negotiated. Fallback to HTTP/1.1";
641     }
642 
643     proto = StringRef::from_lit("http/1.1");
644   }
645 
646   if (!tls::in_proto_list(get_config()->tls.npn_list, proto)) {
647     if (LOG_ENABLED(INFO)) {
648       CLOG(INFO, this) << "The negotiated protocol is not supported: " << proto;
649     }
650     return -1;
651   }
652 
653   if (util::check_h2_is_selected(proto)) {
654     on_read_ = &ClientHandler::upstream_http2_connhd_read;
655 
656     auto http2_upstream = std::make_unique<Http2Upstream>(this);
657 
658     upstream_ = std::move(http2_upstream);
659     alpn_ = make_string_ref(balloc_, proto);
660 
661     // At this point, input buffer is already filled with some bytes.
662     // The read callback is not called until new data come. So consume
663     // input buffer here.
664     if (on_read() != 0) {
665       return -1;
666     }
667 
668     return 0;
669   }
670 
671   if (proto == StringRef::from_lit("http/1.1")) {
672     upstream_ = std::make_unique<HttpsUpstream>(this);
673     alpn_ = StringRef::from_lit("http/1.1");
674 
675     // At this point, input buffer is already filled with some bytes.
676     // The read callback is not called until new data come. So consume
677     // input buffer here.
678     if (on_read() != 0) {
679       return -1;
680     }
681 
682     return 0;
683   }
684   if (LOG_ENABLED(INFO)) {
685     CLOG(INFO, this) << "The negotiated protocol is not supported";
686   }
687   return -1;
688 }
689 
do_read()690 int ClientHandler::do_read() { return read_(*this); }
do_write()691 int ClientHandler::do_write() { return write_(*this); }
692 
on_read()693 int ClientHandler::on_read() {
694   if (rb_.chunk_avail()) {
695     auto rv = on_read_(*this);
696     if (rv != 0) {
697       return rv;
698     }
699   }
700   conn_.handle_tls_pending_read();
701   return 0;
702 }
on_write()703 int ClientHandler::on_write() { return on_write_(*this); }
704 
get_ipaddr() const705 const StringRef &ClientHandler::get_ipaddr() const { return ipaddr_; }
706 
get_should_close_after_write() const707 bool ClientHandler::get_should_close_after_write() const {
708   return should_close_after_write_;
709 }
710 
set_should_close_after_write(bool f)711 void ClientHandler::set_should_close_after_write(bool f) {
712   should_close_after_write_ = f;
713 }
714 
pool_downstream_connection(std::unique_ptr<DownstreamConnection> dconn)715 void ClientHandler::pool_downstream_connection(
716     std::unique_ptr<DownstreamConnection> dconn) {
717   if (!dconn->poolable()) {
718     return;
719   }
720 
721   dconn->set_client_handler(nullptr);
722 
723   auto &group = dconn->get_downstream_addr_group();
724 
725   if (LOG_ENABLED(INFO)) {
726     CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get()
727                      << " in group " << group;
728   }
729 
730   auto addr = dconn->get_addr();
731   auto &dconn_pool = addr->dconn_pool;
732   dconn_pool->add_downstream_connection(std::move(dconn));
733 }
734 
735 namespace {
736 // Computes 32bits hash for session affinity for IP address |ip|.
compute_affinity_from_ip(const StringRef & ip)737 uint32_t compute_affinity_from_ip(const StringRef &ip) {
738   int rv;
739   std::array<uint8_t, 32> buf;
740 
741   rv = util::sha256(buf.data(), ip);
742   if (rv != 0) {
743     // Not sure when sha256 failed.  Just fall back to another
744     // function.
745     return util::hash32(ip);
746   }
747 
748   return (static_cast<uint32_t>(buf[0]) << 24) |
749          (static_cast<uint32_t>(buf[1]) << 16) |
750          (static_cast<uint32_t>(buf[2]) << 8) | static_cast<uint32_t>(buf[3]);
751 }
752 } // namespace
753 
get_http2_session(const std::shared_ptr<DownstreamAddrGroup> & group,DownstreamAddr * addr)754 Http2Session *ClientHandler::get_http2_session(
755     const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr) {
756   auto &shared_addr = group->shared_addr;
757 
758   if (LOG_ENABLED(INFO)) {
759     CLOG(INFO, this) << "Selected DownstreamAddr=" << addr
760                      << ", index=" << (addr - shared_addr->addrs.data());
761   }
762 
763   for (auto session = addr->http2_extra_freelist.head; session;) {
764     auto next = session->dlnext;
765 
766     if (session->max_concurrency_reached(0)) {
767       if (LOG_ENABLED(INFO)) {
768         CLOG(INFO, this)
769             << "Maximum streams have been reached for Http2Session(" << session
770             << ").  Skip it";
771       }
772 
773       session->remove_from_freelist();
774       session = next;
775 
776       continue;
777     }
778 
779     if (LOG_ENABLED(INFO)) {
780       CLOG(INFO, this) << "Use Http2Session " << session
781                        << " from http2_extra_freelist";
782     }
783 
784     if (session->max_concurrency_reached(1)) {
785       if (LOG_ENABLED(INFO)) {
786         CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
787                          << session << ").";
788       }
789 
790       session->remove_from_freelist();
791     }
792     return session;
793   }
794 
795   auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
796                                   worker_, group, addr);
797 
798   if (LOG_ENABLED(INFO)) {
799     CLOG(INFO, this) << "Create new Http2Session " << session;
800   }
801 
802   session->add_to_extra_freelist();
803 
804   return session;
805 }
806 
get_affinity_cookie(Downstream * downstream,const StringRef & cookie_name)807 uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
808                                             const StringRef &cookie_name) {
809   auto h = downstream->find_affinity_cookie(cookie_name);
810   if (h) {
811     return h;
812   }
813 
814   auto d = std::uniform_int_distribution<uint32_t>(1);
815   auto rh = d(worker_->get_randgen());
816   h = util::hash32(StringRef{reinterpret_cast<uint8_t *>(&rh),
817                              reinterpret_cast<uint8_t *>(&rh) + sizeof(rh)});
818 
819   downstream->renew_affinity_cookie(h);
820 
821   return h;
822 }
823 
824 namespace {
reschedule_addr(std::priority_queue<DownstreamAddrEntry,std::vector<DownstreamAddrEntry>,DownstreamAddrEntryGreater> & pq,DownstreamAddr * addr)825 void reschedule_addr(
826     std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
827                         DownstreamAddrEntryGreater> &pq,
828     DownstreamAddr *addr) {
829   auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty;
830   addr->cycle += penalty / addr->weight;
831   addr->pending_penalty = penalty % addr->weight;
832 
833   pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
834   addr->queued = true;
835 }
836 } // namespace
837 
838 namespace {
reschedule_wg(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & pq,WeightGroup * wg)839 void reschedule_wg(
840     std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
841                         WeightGroupEntryGreater> &pq,
842     WeightGroup *wg) {
843   auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty;
844   wg->cycle += penalty / wg->weight;
845   wg->pending_penalty = penalty % wg->weight;
846 
847   pq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
848   wg->queued = true;
849 }
850 } // namespace
851 
get_downstream_addr(int & err,DownstreamAddrGroup * group,Downstream * downstream)852 DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
853                                                    DownstreamAddrGroup *group,
854                                                    Downstream *downstream) {
855   err = 0;
856 
857   switch (faddr_->alt_mode) {
858   case UpstreamAltMode::API:
859   case UpstreamAltMode::HEALTHMON:
860     assert(0);
861   default:
862     break;
863   }
864 
865   auto &shared_addr = group->shared_addr;
866 
867   if (shared_addr->affinity.type != SessionAffinity::NONE) {
868     uint32_t hash;
869     switch (shared_addr->affinity.type) {
870     case SessionAffinity::IP:
871       if (!affinity_hash_computed_) {
872         affinity_hash_ = compute_affinity_from_ip(ipaddr_);
873         affinity_hash_computed_ = true;
874       }
875       hash = affinity_hash_;
876       break;
877     case SessionAffinity::COOKIE:
878       if (shared_addr->affinity.cookie.stickiness ==
879           SessionAffinityCookieStickiness::STRICT) {
880         return get_downstream_addr_strict_affinity(err, shared_addr,
881                                                    downstream);
882       }
883 
884       hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name);
885       break;
886     default:
887       assert(0);
888     }
889 
890     const auto &affinity_hash = shared_addr->affinity_hash;
891 
892     auto it = std::lower_bound(
893         std::begin(affinity_hash), std::end(affinity_hash), hash,
894         [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
895 
896     if (it == std::end(affinity_hash)) {
897       it = std::begin(affinity_hash);
898     }
899 
900     auto aff_idx =
901         static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
902     auto idx = (*it).idx;
903     auto addr = &shared_addr->addrs[idx];
904 
905     if (addr->connect_blocker->blocked()) {
906       size_t i;
907       for (i = aff_idx + 1; i != aff_idx; ++i) {
908         if (i == shared_addr->affinity_hash.size()) {
909           i = 0;
910         }
911         addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
912         if (addr->connect_blocker->blocked()) {
913           continue;
914         }
915         break;
916       }
917       if (i == aff_idx) {
918         err = -1;
919         return nullptr;
920       }
921     }
922 
923     return addr;
924   }
925 
926   auto &wgpq = shared_addr->pq;
927 
928   for (;;) {
929     if (wgpq.empty()) {
930       CLOG(INFO, this) << "No working downstream address found";
931       err = -1;
932       return nullptr;
933     }
934 
935     auto wg = wgpq.top().wg;
936     wgpq.pop();
937     wg->queued = false;
938 
939     for (;;) {
940       if (wg->pq.empty()) {
941         break;
942       }
943 
944       auto addr = wg->pq.top().addr;
945       wg->pq.pop();
946       addr->queued = false;
947 
948       if (addr->connect_blocker->blocked()) {
949         continue;
950       }
951 
952       reschedule_addr(wg->pq, addr);
953       reschedule_wg(wgpq, wg);
954 
955       return addr;
956     }
957   }
958 }
959 
get_downstream_addr_strict_affinity(int & err,const std::shared_ptr<SharedDownstreamAddr> & shared_addr,Downstream * downstream)960 DownstreamAddr *ClientHandler::get_downstream_addr_strict_affinity(
961     int &err, const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
962     Downstream *downstream) {
963   const auto &affinity_hash = shared_addr->affinity_hash;
964 
965   auto h = downstream->find_affinity_cookie(shared_addr->affinity.cookie.name);
966   if (h) {
967     auto it = shared_addr->affinity_hash_map.find(h);
968     if (it != std::end(shared_addr->affinity_hash_map)) {
969       auto addr = &shared_addr->addrs[(*it).second];
970       if (!addr->connect_blocker->blocked()) {
971         return addr;
972       }
973     }
974   } else {
975     auto d = std::uniform_int_distribution<uint32_t>(1);
976     auto rh = d(worker_->get_randgen());
977     h = util::hash32(StringRef{reinterpret_cast<uint8_t *>(&rh),
978                                reinterpret_cast<uint8_t *>(&rh) + sizeof(rh)});
979   }
980 
981   // Client is not bound to a particular backend, or the bound backend
982   // is not found, or is blocked.  Find new backend using h.  Using
983   // existing h allows us to find new server in a deterministic way.
984   // It is preferable because multiple concurrent requests with the
985   // stale cookie might be in-flight.
986   auto it = std::lower_bound(
987       std::begin(affinity_hash), std::end(affinity_hash), h,
988       [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
989 
990   if (it == std::end(affinity_hash)) {
991     it = std::begin(affinity_hash);
992   }
993 
994   auto aff_idx =
995       static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
996   auto idx = (*it).idx;
997   auto addr = &shared_addr->addrs[idx];
998 
999   if (addr->connect_blocker->blocked()) {
1000     size_t i;
1001     for (i = aff_idx + 1; i != aff_idx; ++i) {
1002       if (i == shared_addr->affinity_hash.size()) {
1003         i = 0;
1004       }
1005       addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
1006       if (addr->connect_blocker->blocked()) {
1007         continue;
1008       }
1009       break;
1010     }
1011     if (i == aff_idx) {
1012       err = -1;
1013       return nullptr;
1014     }
1015   }
1016 
1017   downstream->renew_affinity_cookie(addr->affinity_hash);
1018 
1019   return addr;
1020 }
1021 
1022 std::unique_ptr<DownstreamConnection>
get_downstream_connection(int & err,Downstream * downstream)1023 ClientHandler::get_downstream_connection(int &err, Downstream *downstream) {
1024   size_t group_idx;
1025   auto &downstreamconf = *worker_->get_downstream_config();
1026   auto &routerconf = downstreamconf.router;
1027 
1028   auto catch_all = downstreamconf.addr_group_catch_all;
1029   auto &groups = worker_->get_downstream_addr_groups();
1030 
1031   auto &req = downstream->request();
1032 
1033   err = 0;
1034 
1035   switch (faddr_->alt_mode) {
1036   case UpstreamAltMode::API: {
1037     auto dconn = std::make_unique<APIDownstreamConnection>(worker_);
1038     dconn->set_client_handler(this);
1039     return dconn;
1040   }
1041   case UpstreamAltMode::HEALTHMON: {
1042     auto dconn = std::make_unique<HealthMonitorDownstreamConnection>();
1043     dconn->set_client_handler(this);
1044     return dconn;
1045   }
1046   default:
1047     break;
1048   }
1049 
1050   auto &balloc = downstream->get_block_allocator();
1051 
1052   StringRef authority, path;
1053 
1054   if (req.forwarded_once) {
1055     if (groups.size() != 1) {
1056       authority = req.orig_authority;
1057       path = req.orig_path;
1058     }
1059   } else {
1060     if (faddr_->sni_fwd) {
1061       authority = sni_;
1062     } else if (!req.authority.empty()) {
1063       authority = req.authority;
1064     } else {
1065       auto h = req.fs.header(http2::HD_HOST);
1066       if (h) {
1067         authority = h->value;
1068       }
1069     }
1070 
1071     // CONNECT method does not have path.  But we requires path in
1072     // host-path mapping.  As workaround, we assume that path is
1073     // "/".
1074     if (!req.regular_connect_method()) {
1075       path = req.path;
1076     }
1077 
1078     // Cache the authority and path used for the first-time backend
1079     // selection because per-pattern mruby script can change them.
1080     req.orig_authority = authority;
1081     req.orig_path = path;
1082     req.forwarded_once = true;
1083   }
1084 
1085   // Fast path.  If we have one group, it must be catch-all group.
1086   if (groups.size() == 1) {
1087     group_idx = 0;
1088   } else {
1089     group_idx = match_downstream_addr_group(routerconf, authority, path, groups,
1090                                             catch_all, balloc);
1091   }
1092 
1093   if (LOG_ENABLED(INFO)) {
1094     CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
1095   }
1096 
1097   if (groups[group_idx]->shared_addr->redirect_if_not_tls && !conn_.tls.ssl) {
1098     if (LOG_ENABLED(INFO)) {
1099       CLOG(INFO, this) << "Downstream address group " << group_idx
1100                        << " requires frontend TLS connection.";
1101     }
1102     err = SHRPX_ERR_TLS_REQUIRED;
1103     return nullptr;
1104   }
1105 
1106   auto &group = groups[group_idx];
1107 
1108   if (group->shared_addr->dnf) {
1109     auto dconn = std::make_unique<NullDownstreamConnection>(group);
1110     dconn->set_client_handler(this);
1111     return dconn;
1112   }
1113 
1114   auto addr = get_downstream_addr(err, group.get(), downstream);
1115   if (addr == nullptr) {
1116     return nullptr;
1117   }
1118 
1119   if (addr->proto == Proto::HTTP1) {
1120     auto dconn = addr->dconn_pool->pop_downstream_connection();
1121     if (dconn) {
1122       dconn->set_client_handler(this);
1123       return dconn;
1124     }
1125 
1126     if (worker_->get_connect_blocker()->blocked()) {
1127       if (LOG_ENABLED(INFO)) {
1128         DCLOG(INFO, this)
1129             << "Worker wide backend connection was blocked temporarily";
1130       }
1131       return nullptr;
1132     }
1133 
1134     if (LOG_ENABLED(INFO)) {
1135       CLOG(INFO, this) << "Downstream connection pool is empty."
1136                        << " Create new one";
1137     }
1138 
1139     dconn = std::make_unique<HttpDownstreamConnection>(group, addr, conn_.loop,
1140                                                        worker_);
1141     dconn->set_client_handler(this);
1142     return dconn;
1143   }
1144 
1145   if (LOG_ENABLED(INFO)) {
1146     CLOG(INFO, this) << "Downstream connection pool is empty."
1147                      << " Create new one";
1148   }
1149 
1150   auto http2session = get_http2_session(group, addr);
1151   auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
1152   dconn->set_client_handler(this);
1153   return dconn;
1154 }
1155 
get_mcpool()1156 MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); }
1157 
get_ssl() const1158 SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
1159 
direct_http2_upgrade()1160 void ClientHandler::direct_http2_upgrade() {
1161   upstream_ = std::make_unique<Http2Upstream>(this);
1162   alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1163   on_read_ = &ClientHandler::upstream_read;
1164   write_ = &ClientHandler::write_clear;
1165 }
1166 
perform_http2_upgrade(HttpsUpstream * http)1167 int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
1168   auto upstream = std::make_unique<Http2Upstream>(this);
1169 
1170   auto output = upstream->get_response_buf();
1171 
1172   // We might have written non-final header in response_buf, in this
1173   // case, response_state is still INITIAL.  If this non-final header
1174   // and upgrade header fit in output buffer, do upgrade.  Otherwise,
1175   // to avoid to send this non-final header as response body in HTTP/2
1176   // upstream, fail upgrade.
1177   auto downstream = http->get_downstream();
1178   auto input = downstream->get_response_buf();
1179 
1180   if (upstream->upgrade_upstream(http) != 0) {
1181     return -1;
1182   }
1183   // http pointer is now owned by upstream.
1184   upstream_.release();
1185   // TODO We might get other version id in HTTP2-settings, if we
1186   // support aliasing for h2, but we just use library default for now.
1187   alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1188   on_read_ = &ClientHandler::upstream_http2_connhd_read;
1189   write_ = &ClientHandler::write_clear;
1190 
1191   input->remove(*output, input->rleft());
1192 
1193   constexpr auto res =
1194       StringRef::from_lit("HTTP/1.1 101 Switching Protocols\r\n"
1195                           "Connection: Upgrade\r\n"
1196                           "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
1197                           "\r\n");
1198 
1199   output->append(res);
1200   upstream_ = std::move(upstream);
1201 
1202   signal_write();
1203   return 0;
1204 }
1205 
get_http2_upgrade_allowed() const1206 bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; }
1207 
get_upstream_scheme() const1208 StringRef ClientHandler::get_upstream_scheme() const {
1209   if (conn_.tls.ssl) {
1210     return StringRef::from_lit("https");
1211   } else {
1212     return StringRef::from_lit("http");
1213   }
1214 }
1215 
start_immediate_shutdown()1216 void ClientHandler::start_immediate_shutdown() {
1217   ev_timer_start(conn_.loop, &reneg_shutdown_timer_);
1218 }
1219 
write_accesslog(Downstream * downstream)1220 void ClientHandler::write_accesslog(Downstream *downstream) {
1221   auto &req = downstream->request();
1222 
1223   auto config = get_config();
1224 
1225   if (!req.tstamp) {
1226     auto lgconf = log_config();
1227     lgconf->update_tstamp(std::chrono::system_clock::now());
1228     req.tstamp = lgconf->tstamp;
1229   }
1230 
1231   upstream_accesslog(
1232       config->logging.access.format,
1233       LogSpec{
1234           downstream,
1235           ipaddr_,
1236           alpn_,
1237           sni_,
1238           conn_.tls.ssl,
1239           std::chrono::high_resolution_clock::now(), // request_end_time
1240           port_,
1241           faddr_->port,
1242           config->pid,
1243       });
1244 }
1245 
get_rb()1246 ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
1247 
signal_write()1248 void ClientHandler::signal_write() { conn_.wlimit.startw(); }
1249 
get_rlimit()1250 RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; }
get_wlimit()1251 RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; }
1252 
get_wev()1253 ev_io *ClientHandler::get_wev() { return &conn_.wev; }
1254 
get_worker() const1255 Worker *ClientHandler::get_worker() const { return worker_; }
1256 
1257 namespace {
parse_proxy_line_port(const uint8_t * first,const uint8_t * last)1258 ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {
1259   auto p = first;
1260   int32_t port = 0;
1261 
1262   if (p == last) {
1263     return -1;
1264   }
1265 
1266   if (*p == '0') {
1267     if (p + 1 != last && util::is_digit(*(p + 1))) {
1268       return -1;
1269     }
1270     return 1;
1271   }
1272 
1273   for (; p != last && util::is_digit(*p); ++p) {
1274     port *= 10;
1275     port += *p - '0';
1276 
1277     if (port > 65535) {
1278       return -1;
1279     }
1280   }
1281 
1282   return p - first;
1283 }
1284 } // namespace
1285 
on_proxy_protocol_finish()1286 int ClientHandler::on_proxy_protocol_finish() {
1287   auto len = rb_.pos() - rb_.begin();
1288 
1289   assert(len);
1290 
1291   if (LOG_ENABLED(INFO)) {
1292     CLOG(INFO, this) << "PROXY-protocol: Draining " << len
1293                      << " bytes from socket";
1294   }
1295 
1296   rb_.reset();
1297 
1298   if (conn_.read_nolim_clear(rb_.pos(), len) < 0) {
1299     return -1;
1300   }
1301 
1302   rb_.reset();
1303 
1304   setup_upstream_io_callback();
1305 
1306   return 0;
1307 }
1308 
1309 namespace {
1310 // PROXY-protocol v2 header signature
1311 constexpr uint8_t PROXY_PROTO_V2_SIG[] =
1312     "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
1313 
1314 // PROXY-protocol v2 header length
1315 constexpr size_t PROXY_PROTO_V2_HDLEN =
1316     str_size(PROXY_PROTO_V2_SIG) + /* ver_cmd(1) + fam(1) + len(2) = */ 4;
1317 } // namespace
1318 
1319 // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
proxy_protocol_read()1320 int ClientHandler::proxy_protocol_read() {
1321   if (LOG_ENABLED(INFO)) {
1322     CLOG(INFO, this) << "PROXY-protocol: Started";
1323   }
1324 
1325   auto first = rb_.pos();
1326 
1327   if (rb_.rleft() >= PROXY_PROTO_V2_HDLEN &&
1328       (*(first + str_size(PROXY_PROTO_V2_SIG)) & 0xf0) == 0x20) {
1329     if (LOG_ENABLED(INFO)) {
1330       CLOG(INFO, this) << "PROXY-protocol: Detected v2 header signature";
1331     }
1332     return proxy_protocol_v2_read();
1333   }
1334 
1335   // NULL character really destroys functions which expects NULL
1336   // terminated string.  We won't expect it in PROXY protocol line, so
1337   // find it here.
1338   auto chrs = std::array<char, 2>{'\n', '\0'};
1339 
1340   constexpr size_t MAX_PROXY_LINELEN = 107;
1341 
1342   auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());
1343 
1344   auto end =
1345       std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));
1346 
1347   if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
1348     if (LOG_ENABLED(INFO)) {
1349       CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
1350     }
1351     return -1;
1352   }
1353 
1354   --end;
1355 
1356   constexpr auto HEADER = StringRef::from_lit("PROXY ");
1357 
1358   if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
1359     if (LOG_ENABLED(INFO)) {
1360       CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
1361     }
1362     return -1;
1363   }
1364 
1365   if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
1366     if (LOG_ENABLED(INFO)) {
1367       CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
1368     }
1369     return -1;
1370   }
1371 
1372   rb_.drain(HEADER.size());
1373 
1374   int family;
1375 
1376   if (rb_.pos()[0] == 'T') {
1377     if (end - rb_.pos() < 5) {
1378       if (LOG_ENABLED(INFO)) {
1379         CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1380       }
1381       return -1;
1382     }
1383 
1384     if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
1385       if (LOG_ENABLED(INFO)) {
1386         CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1387       }
1388       return -1;
1389     }
1390 
1391     switch (rb_.pos()[3]) {
1392     case '4':
1393       family = AF_INET;
1394       break;
1395     case '6':
1396       family = AF_INET6;
1397       break;
1398     default:
1399       if (LOG_ENABLED(INFO)) {
1400         CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1401       }
1402       return -1;
1403     }
1404 
1405     rb_.drain(5);
1406   } else {
1407     if (end - rb_.pos() < 7) {
1408       if (LOG_ENABLED(INFO)) {
1409         CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1410       }
1411       return -1;
1412     }
1413     if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
1414       if (LOG_ENABLED(INFO)) {
1415         CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1416       }
1417       return -1;
1418     }
1419 
1420     rb_.drain(end + 2 - rb_.pos());
1421 
1422     return on_proxy_protocol_finish();
1423   }
1424 
1425   // source address
1426   auto token_end = std::find(rb_.pos(), end, ' ');
1427   if (token_end == end) {
1428     if (LOG_ENABLED(INFO)) {
1429       CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
1430     }
1431     return -1;
1432   }
1433 
1434   *token_end = '\0';
1435   if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1436     if (LOG_ENABLED(INFO)) {
1437       CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
1438     }
1439     return -1;
1440   }
1441 
1442   auto src_addr = rb_.pos();
1443   auto src_addrlen = token_end - rb_.pos();
1444 
1445   rb_.drain(token_end - rb_.pos() + 1);
1446 
1447   // destination address
1448   token_end = std::find(rb_.pos(), end, ' ');
1449   if (token_end == end) {
1450     if (LOG_ENABLED(INFO)) {
1451       CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
1452     }
1453     return -1;
1454   }
1455 
1456   *token_end = '\0';
1457   if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1458     if (LOG_ENABLED(INFO)) {
1459       CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
1460     }
1461     return -1;
1462   }
1463 
1464   // Currently we don't use destination address
1465 
1466   rb_.drain(token_end - rb_.pos() + 1);
1467 
1468   // source port
1469   auto n = parse_proxy_line_port(rb_.pos(), end);
1470   if (n <= 0 || *(rb_.pos() + n) != ' ') {
1471     if (LOG_ENABLED(INFO)) {
1472       CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
1473     }
1474     return -1;
1475   }
1476 
1477   rb_.pos()[n] = '\0';
1478   auto src_port = rb_.pos();
1479   auto src_portlen = n;
1480 
1481   rb_.drain(n + 1);
1482 
1483   // destination  port
1484   n = parse_proxy_line_port(rb_.pos(), end);
1485   if (n <= 0 || rb_.pos() + n != end) {
1486     if (LOG_ENABLED(INFO)) {
1487       CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
1488     }
1489     return -1;
1490   }
1491 
1492   // Currently we don't use destination port
1493 
1494   rb_.drain(end + 2 - rb_.pos());
1495 
1496   ipaddr_ =
1497       make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
1498   port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});
1499 
1500   if (LOG_ENABLED(INFO)) {
1501     CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
1502                      << " bytes read";
1503   }
1504 
1505   auto config = get_config();
1506   auto &fwdconf = config->http.forwarded;
1507 
1508   if ((fwdconf.params & FORWARDED_FOR) &&
1509       fwdconf.for_node_type == ForwardedNode::IP) {
1510     init_forwarded_for(family, ipaddr_);
1511   }
1512 
1513   return on_proxy_protocol_finish();
1514 }
1515 
proxy_protocol_v2_read()1516 int ClientHandler::proxy_protocol_v2_read() {
1517   // Assume that first str_size(PROXY_PROTO_V2_SIG) octets match v2
1518   // protocol signature and followed by the bytes which indicates v2.
1519   assert(rb_.rleft() >= PROXY_PROTO_V2_HDLEN);
1520 
1521   auto p = rb_.pos() + str_size(PROXY_PROTO_V2_SIG);
1522 
1523   assert(((*p) & 0xf0) == 0x20);
1524 
1525   enum { LOCAL, PROXY } cmd;
1526 
1527   auto cmd_bits = (*p++) & 0xf;
1528   switch (cmd_bits) {
1529   case 0x0:
1530     cmd = LOCAL;
1531     break;
1532   case 0x01:
1533     cmd = PROXY;
1534     break;
1535   default:
1536     if (LOG_ENABLED(INFO)) {
1537       CLOG(INFO, this) << "PROXY-protocol-v2: Unknown command " << log::hex
1538                        << cmd_bits;
1539     }
1540     return -1;
1541   }
1542 
1543   auto fam = *p++;
1544   uint16_t len;
1545   memcpy(&len, p, sizeof(len));
1546   len = ntohs(len);
1547 
1548   p += sizeof(len);
1549 
1550   if (LOG_ENABLED(INFO)) {
1551     CLOG(INFO, this) << "PROXY-protocol-v2: Detected family=" << log::hex << fam
1552                      << ", len=" << log::dec << len;
1553   }
1554 
1555   if (rb_.last() - p < len) {
1556     if (LOG_ENABLED(INFO)) {
1557       CLOG(INFO, this)
1558           << "PROXY-protocol-v2: Prematurely truncated header block; require "
1559           << len << " bytes, " << rb_.last() - p << " bytes left";
1560     }
1561     return -1;
1562   }
1563 
1564   int family;
1565   std::array<char, std::max(INET_ADDRSTRLEN, INET6_ADDRSTRLEN)> src_addr,
1566       dst_addr;
1567   size_t addrlen;
1568 
1569   switch (fam) {
1570   case 0x11:
1571   case 0x12:
1572     if (len < 12) {
1573       if (LOG_ENABLED(INFO)) {
1574         CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET addresses";
1575       }
1576       return -1;
1577     }
1578     family = AF_INET;
1579     addrlen = 4;
1580     break;
1581   case 0x21:
1582   case 0x22:
1583     if (len < 36) {
1584       if (LOG_ENABLED(INFO)) {
1585         CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET6 addresses";
1586       }
1587       return -1;
1588     }
1589     family = AF_INET6;
1590     addrlen = 16;
1591     break;
1592   case 0x31:
1593   case 0x32:
1594     if (len < 216) {
1595       if (LOG_ENABLED(INFO)) {
1596         CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_UNIX addresses";
1597       }
1598       return -1;
1599     }
1600     // fall through
1601   case 0x00: {
1602     // UNSPEC and UNIX are just ignored.
1603     if (LOG_ENABLED(INFO)) {
1604       CLOG(INFO, this) << "PROXY-protocol-v2: Ignore combination of address "
1605                           "family and protocol "
1606                        << log::hex << fam;
1607     }
1608     rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1609     return on_proxy_protocol_finish();
1610   }
1611   default:
1612     if (LOG_ENABLED(INFO)) {
1613       CLOG(INFO, this) << "PROXY-protocol-v2: Unknown combination of address "
1614                           "family and protocol "
1615                        << log::hex << fam;
1616     }
1617     return -1;
1618   }
1619 
1620   if (cmd != PROXY) {
1621     if (LOG_ENABLED(INFO)) {
1622       CLOG(INFO, this) << "PROXY-protocol-v2: Ignore non-PROXY command";
1623     }
1624     rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1625     return on_proxy_protocol_finish();
1626   }
1627 
1628   if (inet_ntop(family, p, src_addr.data(), src_addr.size()) == nullptr) {
1629     if (LOG_ENABLED(INFO)) {
1630       CLOG(INFO, this) << "PROXY-protocol-v2: Unable to parse source address";
1631     }
1632     return -1;
1633   }
1634 
1635   p += addrlen;
1636 
1637   if (inet_ntop(family, p, dst_addr.data(), dst_addr.size()) == nullptr) {
1638     if (LOG_ENABLED(INFO)) {
1639       CLOG(INFO, this)
1640           << "PROXY-protocol-v2: Unable to parse destination address";
1641     }
1642     return -1;
1643   }
1644 
1645   p += addrlen;
1646 
1647   uint16_t src_port;
1648 
1649   memcpy(&src_port, p, sizeof(src_port));
1650   src_port = ntohs(src_port);
1651 
1652   // We don't use destination port.
1653   p += 4;
1654 
1655   ipaddr_ = make_string_ref(balloc_, StringRef{src_addr.data()});
1656   port_ = util::make_string_ref_uint(balloc_, src_port);
1657 
1658   if (LOG_ENABLED(INFO)) {
1659     CLOG(INFO, this) << "PROXY-protocol-v2: Finished reading proxy addresses, "
1660                      << p - rb_.pos() << " bytes read, "
1661                      << PROXY_PROTO_V2_HDLEN + len - (p - rb_.pos())
1662                      << " bytes left";
1663   }
1664 
1665   auto config = get_config();
1666   auto &fwdconf = config->http.forwarded;
1667 
1668   if ((fwdconf.params & FORWARDED_FOR) &&
1669       fwdconf.for_node_type == ForwardedNode::IP) {
1670     init_forwarded_for(family, ipaddr_);
1671   }
1672 
1673   rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1674   return on_proxy_protocol_finish();
1675 }
1676 
get_forwarded_by() const1677 StringRef ClientHandler::get_forwarded_by() const {
1678   auto &fwdconf = get_config()->http.forwarded;
1679 
1680   if (fwdconf.by_node_type == ForwardedNode::OBFUSCATED) {
1681     return fwdconf.by_obfuscated;
1682   }
1683 
1684   return faddr_->hostport;
1685 }
1686 
get_forwarded_for() const1687 StringRef ClientHandler::get_forwarded_for() const { return forwarded_for_; }
1688 
get_upstream_addr() const1689 const UpstreamAddr *ClientHandler::get_upstream_addr() const { return faddr_; }
1690 
get_connection()1691 Connection *ClientHandler::get_connection() { return &conn_; };
1692 
set_tls_sni(const StringRef & sni)1693 void ClientHandler::set_tls_sni(const StringRef &sni) {
1694   sni_ = make_string_ref(balloc_, sni);
1695 }
1696 
get_tls_sni() const1697 StringRef ClientHandler::get_tls_sni() const { return sni_; }
1698 
get_alpn() const1699 StringRef ClientHandler::get_alpn() const { return alpn_; }
1700 
get_block_allocator()1701 BlockAllocator &ClientHandler::get_block_allocator() { return balloc_; }
1702 
set_alpn_from_conn()1703 void ClientHandler::set_alpn_from_conn() {
1704   const unsigned char *alpn;
1705   unsigned int alpnlen;
1706 
1707   SSL_get0_alpn_selected(conn_.tls.ssl, &alpn, &alpnlen);
1708 
1709   alpn_ = make_string_ref(balloc_, StringRef{alpn, alpnlen});
1710 }
1711 
1712 } // namespace shrpx
1713