• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::error::Error as StdError;
2 use std::future::Future;
3 use std::marker::Unpin;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 #[cfg(feature = "runtime")]
7 use std::time::Duration;
8 
9 use bytes::Bytes;
10 use h2::server::{Connection, Handshake, SendResponse};
11 use h2::{Reason, RecvStream};
12 use http::{Method, Request};
13 use pin_project_lite::pin_project;
14 use tokio::io::{AsyncRead, AsyncWrite};
15 use tracing::{debug, trace, warn};
16 
17 use super::{ping, PipeToSendStream, SendBuf};
18 use crate::body::HttpBody;
19 use crate::common::date;
20 use crate::common::exec::ConnStreamExec;
21 use crate::ext::Protocol;
22 use crate::headers;
23 use crate::proto::h2::ping::Recorder;
24 use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
25 use crate::proto::Dispatched;
26 use crate::service::HttpService;
27 
28 use crate::upgrade::{OnUpgrade, Pending, Upgraded};
29 use crate::{Body, Response};
30 
31 // Our defaults are chosen for the "majority" case, which usually are not
32 // resource constrained, and so the spec default of 64kb can be too limiting
33 // for performance.
34 //
35 // At the same time, a server more often has multiple clients connected, and
36 // so is more likely to use more resources than a client would.
37 const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
38 const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
39 const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
40 const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
41 const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20; // 16 MB "sane default" taken from golang http2
42 const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
43 
44 #[derive(Clone, Debug)]
45 pub(crate) struct Config {
46     pub(crate) adaptive_window: bool,
47     pub(crate) initial_conn_window_size: u32,
48     pub(crate) initial_stream_window_size: u32,
49     pub(crate) max_frame_size: u32,
50     pub(crate) enable_connect_protocol: bool,
51     pub(crate) max_concurrent_streams: Option<u32>,
52     pub(crate) max_pending_accept_reset_streams: Option<usize>,
53     pub(crate) max_local_error_reset_streams: Option<usize>,
54     #[cfg(feature = "runtime")]
55     pub(crate) keep_alive_interval: Option<Duration>,
56     #[cfg(feature = "runtime")]
57     pub(crate) keep_alive_timeout: Duration,
58     pub(crate) max_send_buffer_size: usize,
59     pub(crate) max_header_list_size: u32,
60 }
61 
62 impl Default for Config {
default() -> Config63     fn default() -> Config {
64         Config {
65             adaptive_window: false,
66             initial_conn_window_size: DEFAULT_CONN_WINDOW,
67             initial_stream_window_size: DEFAULT_STREAM_WINDOW,
68             max_frame_size: DEFAULT_MAX_FRAME_SIZE,
69             enable_connect_protocol: false,
70             max_concurrent_streams: None,
71             max_pending_accept_reset_streams: None,
72             max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
73             #[cfg(feature = "runtime")]
74             keep_alive_interval: None,
75             #[cfg(feature = "runtime")]
76             keep_alive_timeout: Duration::from_secs(20),
77             max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
78             max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
79         }
80     }
81 }
82 
83 pin_project! {
84     pub(crate) struct Server<T, S, B, E>
85     where
86         S: HttpService<Body>,
87         B: HttpBody,
88     {
89         exec: E,
90         service: S,
91         state: State<T, B>,
92     }
93 }
94 
95 enum State<T, B>
96 where
97     B: HttpBody,
98 {
99     Handshaking {
100         ping_config: ping::Config,
101         hs: Handshake<T, SendBuf<B::Data>>,
102     },
103     Serving(Serving<T, B>),
104     Closed,
105 }
106 
107 struct Serving<T, B>
108 where
109     B: HttpBody,
110 {
111     ping: Option<(ping::Recorder, ping::Ponger)>,
112     conn: Connection<T, SendBuf<B::Data>>,
113     closing: Option<crate::Error>,
114 }
115 
116 impl<T, S, B, E> Server<T, S, B, E>
117 where
118     T: AsyncRead + AsyncWrite + Unpin,
119     S: HttpService<Body, ResBody = B>,
120     S::Error: Into<Box<dyn StdError + Send + Sync>>,
121     B: HttpBody + 'static,
122     E: ConnStreamExec<S::Future, B>,
123 {
new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E>124     pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> {
125         let mut builder = h2::server::Builder::default();
126         builder
127             .initial_window_size(config.initial_stream_window_size)
128             .initial_connection_window_size(config.initial_conn_window_size)
129             .max_frame_size(config.max_frame_size)
130             .max_header_list_size(config.max_header_list_size)
131             .max_local_error_reset_streams(config.max_local_error_reset_streams)
132             .max_send_buffer_size(config.max_send_buffer_size);
133         if let Some(max) = config.max_concurrent_streams {
134             builder.max_concurrent_streams(max);
135         }
136         if let Some(max) = config.max_pending_accept_reset_streams {
137             builder.max_pending_accept_reset_streams(max);
138         }
139         if config.enable_connect_protocol {
140             builder.enable_connect_protocol();
141         }
142         let handshake = builder.handshake(io);
143 
144         let bdp = if config.adaptive_window {
145             Some(config.initial_stream_window_size)
146         } else {
147             None
148         };
149 
150         let ping_config = ping::Config {
151             bdp_initial_window: bdp,
152             #[cfg(feature = "runtime")]
153             keep_alive_interval: config.keep_alive_interval,
154             #[cfg(feature = "runtime")]
155             keep_alive_timeout: config.keep_alive_timeout,
156             // If keep-alive is enabled for servers, always enabled while
157             // idle, so it can more aggressively close dead connections.
158             #[cfg(feature = "runtime")]
159             keep_alive_while_idle: true,
160         };
161 
162         Server {
163             exec,
164             state: State::Handshaking {
165                 ping_config,
166                 hs: handshake,
167             },
168             service,
169         }
170     }
171 
graceful_shutdown(&mut self)172     pub(crate) fn graceful_shutdown(&mut self) {
173         trace!("graceful_shutdown");
174         match self.state {
175             State::Handshaking { .. } => {
176                 // fall-through, to replace state with Closed
177             }
178             State::Serving(ref mut srv) => {
179                 if srv.closing.is_none() {
180                     srv.conn.graceful_shutdown();
181                 }
182                 return;
183             }
184             State::Closed => {
185                 return;
186             }
187         }
188         self.state = State::Closed;
189     }
190 }
191 
192 impl<T, S, B, E> Future for Server<T, S, B, E>
193 where
194     T: AsyncRead + AsyncWrite + Unpin,
195     S: HttpService<Body, ResBody = B>,
196     S::Error: Into<Box<dyn StdError + Send + Sync>>,
197     B: HttpBody + 'static,
198     E: ConnStreamExec<S::Future, B>,
199 {
200     type Output = crate::Result<Dispatched>;
201 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>202     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203         let me = &mut *self;
204         loop {
205             let next = match me.state {
206                 State::Handshaking {
207                     ref mut hs,
208                     ref ping_config,
209                 } => {
210                     let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
211                     let ping = if ping_config.is_enabled() {
212                         let pp = conn.ping_pong().expect("conn.ping_pong");
213                         Some(ping::channel(pp, ping_config.clone()))
214                     } else {
215                         None
216                     };
217                     State::Serving(Serving {
218                         ping,
219                         conn,
220                         closing: None,
221                     })
222                 }
223                 State::Serving(ref mut srv) => {
224                     ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
225                     return Poll::Ready(Ok(Dispatched::Shutdown));
226                 }
227                 State::Closed => {
228                     // graceful_shutdown was called before handshaking finished,
229                     // nothing to do here...
230                     return Poll::Ready(Ok(Dispatched::Shutdown));
231                 }
232             };
233             me.state = next;
234         }
235     }
236 }
237 
238 impl<T, B> Serving<T, B>
239 where
240     T: AsyncRead + AsyncWrite + Unpin,
241     B: HttpBody + 'static,
242 {
poll_server<S, E>( &mut self, cx: &mut Context<'_>, service: &mut S, exec: &mut E, ) -> Poll<crate::Result<()>> where S: HttpService<Body, ResBody = B>, S::Error: Into<Box<dyn StdError + Send + Sync>>, E: ConnStreamExec<S::Future, B>,243     fn poll_server<S, E>(
244         &mut self,
245         cx: &mut Context<'_>,
246         service: &mut S,
247         exec: &mut E,
248     ) -> Poll<crate::Result<()>>
249     where
250         S: HttpService<Body, ResBody = B>,
251         S::Error: Into<Box<dyn StdError + Send + Sync>>,
252         E: ConnStreamExec<S::Future, B>,
253     {
254         if self.closing.is_none() {
255             loop {
256                 self.poll_ping(cx);
257 
258                 // Check that the service is ready to accept a new request.
259                 //
260                 // - If not, just drive the connection some.
261                 // - If ready, try to accept a new request from the connection.
262                 match service.poll_ready(cx) {
263                     Poll::Ready(Ok(())) => (),
264                     Poll::Pending => {
265                         // use `poll_closed` instead of `poll_accept`,
266                         // in order to avoid accepting a request.
267                         ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
268                         trace!("incoming connection complete");
269                         return Poll::Ready(Ok(()));
270                     }
271                     Poll::Ready(Err(err)) => {
272                         let err = crate::Error::new_user_service(err);
273                         debug!("service closed: {}", err);
274 
275                         let reason = err.h2_reason();
276                         if reason == Reason::NO_ERROR {
277                             // NO_ERROR is only used for graceful shutdowns...
278                             trace!("interpreting NO_ERROR user error as graceful_shutdown");
279                             self.conn.graceful_shutdown();
280                         } else {
281                             trace!("abruptly shutting down with {:?}", reason);
282                             self.conn.abrupt_shutdown(reason);
283                         }
284                         self.closing = Some(err);
285                         break;
286                     }
287                 }
288 
289                 // When the service is ready, accepts an incoming request.
290                 match ready!(self.conn.poll_accept(cx)) {
291                     Some(Ok((req, mut respond))) => {
292                         trace!("incoming request");
293                         let content_length = headers::content_length_parse_all(req.headers());
294                         let ping = self
295                             .ping
296                             .as_ref()
297                             .map(|ping| ping.0.clone())
298                             .unwrap_or_else(ping::disabled);
299 
300                         // Record the headers received
301                         ping.record_non_data();
302 
303                         let is_connect = req.method() == Method::CONNECT;
304                         let (mut parts, stream) = req.into_parts();
305                         let (mut req, connect_parts) = if !is_connect {
306                             (
307                                 Request::from_parts(
308                                     parts,
309                                     crate::Body::h2(stream, content_length.into(), ping),
310                                 ),
311                                 None,
312                             )
313                         } else {
314                             if content_length.map_or(false, |len| len != 0) {
315                                 warn!("h2 connect request with non-zero body not supported");
316                                 respond.send_reset(h2::Reason::INTERNAL_ERROR);
317                                 return Poll::Ready(Ok(()));
318                             }
319                             let (pending, upgrade) = crate::upgrade::pending();
320                             debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
321                             parts.extensions.insert(upgrade);
322                             (
323                                 Request::from_parts(parts, crate::Body::empty()),
324                                 Some(ConnectParts {
325                                     pending,
326                                     ping,
327                                     recv_stream: stream,
328                                 }),
329                             )
330                         };
331 
332                         if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
333                             req.extensions_mut().insert(Protocol::from_inner(protocol));
334                         }
335 
336                         let fut = H2Stream::new(service.call(req), connect_parts, respond);
337                         exec.execute_h2stream(fut);
338                     }
339                     Some(Err(e)) => {
340                         return Poll::Ready(Err(crate::Error::new_h2(e)));
341                     }
342                     None => {
343                         // no more incoming streams...
344                         if let Some((ref ping, _)) = self.ping {
345                             ping.ensure_not_timed_out()?;
346                         }
347 
348                         trace!("incoming connection complete");
349                         return Poll::Ready(Ok(()));
350                     }
351                 }
352             }
353         }
354 
355         debug_assert!(
356             self.closing.is_some(),
357             "poll_server broke loop without closing"
358         );
359 
360         ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
361 
362         Poll::Ready(Err(self.closing.take().expect("polled after error")))
363     }
364 
poll_ping(&mut self, cx: &mut Context<'_>)365     fn poll_ping(&mut self, cx: &mut Context<'_>) {
366         if let Some((_, ref mut estimator)) = self.ping {
367             match estimator.poll(cx) {
368                 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
369                     self.conn.set_target_window_size(wnd);
370                     let _ = self.conn.set_initial_window_size(wnd);
371                 }
372                 #[cfg(feature = "runtime")]
373                 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
374                     debug!("keep-alive timed out, closing connection");
375                     self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
376                 }
377                 Poll::Pending => {}
378             }
379         }
380     }
381 }
382 
383 pin_project! {
384     #[allow(missing_debug_implementations)]
385     pub struct H2Stream<F, B>
386     where
387         B: HttpBody,
388     {
389         reply: SendResponse<SendBuf<B::Data>>,
390         #[pin]
391         state: H2StreamState<F, B>,
392     }
393 }
394 
395 pin_project! {
396     #[project = H2StreamStateProj]
397     enum H2StreamState<F, B>
398     where
399         B: HttpBody,
400     {
401         Service {
402             #[pin]
403             fut: F,
404             connect_parts: Option<ConnectParts>,
405         },
406         Body {
407             #[pin]
408             pipe: PipeToSendStream<B>,
409         },
410     }
411 }
412 
413 struct ConnectParts {
414     pending: Pending,
415     ping: Recorder,
416     recv_stream: RecvStream,
417 }
418 
419 impl<F, B> H2Stream<F, B>
420 where
421     B: HttpBody,
422 {
new( fut: F, connect_parts: Option<ConnectParts>, respond: SendResponse<SendBuf<B::Data>>, ) -> H2Stream<F, B>423     fn new(
424         fut: F,
425         connect_parts: Option<ConnectParts>,
426         respond: SendResponse<SendBuf<B::Data>>,
427     ) -> H2Stream<F, B> {
428         H2Stream {
429             reply: respond,
430             state: H2StreamState::Service { fut, connect_parts },
431         }
432     }
433 }
434 
435 macro_rules! reply {
436     ($me:expr, $res:expr, $eos:expr) => {{
437         match $me.reply.send_response($res, $eos) {
438             Ok(tx) => tx,
439             Err(e) => {
440                 debug!("send response error: {}", e);
441                 $me.reply.send_reset(Reason::INTERNAL_ERROR);
442                 return Poll::Ready(Err(crate::Error::new_h2(e)));
443             }
444         }
445     }};
446 }
447 
448 impl<F, B, E> H2Stream<F, B>
449 where
450     F: Future<Output = Result<Response<B>, E>>,
451     B: HttpBody,
452     B::Data: 'static,
453     B::Error: Into<Box<dyn StdError + Send + Sync>>,
454     E: Into<Box<dyn StdError + Send + Sync>>,
455 {
poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>>456     fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
457         let mut me = self.project();
458         loop {
459             let next = match me.state.as_mut().project() {
460                 H2StreamStateProj::Service {
461                     fut: h,
462                     connect_parts,
463                 } => {
464                     let res = match h.poll(cx) {
465                         Poll::Ready(Ok(r)) => r,
466                         Poll::Pending => {
467                             // Response is not yet ready, so we want to check if the client has sent a
468                             // RST_STREAM frame which would cancel the current request.
469                             if let Poll::Ready(reason) =
470                                 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
471                             {
472                                 debug!("stream received RST_STREAM: {:?}", reason);
473                                 return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
474                             }
475                             return Poll::Pending;
476                         }
477                         Poll::Ready(Err(e)) => {
478                             let err = crate::Error::new_user_service(e);
479                             warn!("http2 service errored: {}", err);
480                             me.reply.send_reset(err.h2_reason());
481                             return Poll::Ready(Err(err));
482                         }
483                     };
484 
485                     let (head, body) = res.into_parts();
486                     let mut res = ::http::Response::from_parts(head, ());
487                     super::strip_connection_headers(res.headers_mut(), false);
488 
489                     // set Date header if it isn't already set...
490                     res.headers_mut()
491                         .entry(::http::header::DATE)
492                         .or_insert_with(date::update_and_header_value);
493 
494                     if let Some(connect_parts) = connect_parts.take() {
495                         if res.status().is_success() {
496                             if headers::content_length_parse_all(res.headers())
497                                 .map_or(false, |len| len != 0)
498                             {
499                                 warn!("h2 successful response to CONNECT request with body not supported");
500                                 me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
501                                 return Poll::Ready(Err(crate::Error::new_user_header()));
502                             }
503                             let send_stream = reply!(me, res, false);
504                             connect_parts.pending.fulfill(Upgraded::new(
505                                 H2Upgraded {
506                                     ping: connect_parts.ping,
507                                     recv_stream: connect_parts.recv_stream,
508                                     send_stream: unsafe { UpgradedSendStream::new(send_stream) },
509                                     buf: Bytes::new(),
510                                 },
511                                 Bytes::new(),
512                             ));
513                             return Poll::Ready(Ok(()));
514                         }
515                     }
516 
517                     if !body.is_end_stream() {
518                         // automatically set Content-Length from body...
519                         if let Some(len) = body.size_hint().exact() {
520                             headers::set_content_length_if_missing(res.headers_mut(), len);
521                         }
522 
523                         let body_tx = reply!(me, res, false);
524                         H2StreamState::Body {
525                             pipe: PipeToSendStream::new(body, body_tx),
526                         }
527                     } else {
528                         reply!(me, res, true);
529                         return Poll::Ready(Ok(()));
530                     }
531                 }
532                 H2StreamStateProj::Body { pipe } => {
533                     return pipe.poll(cx);
534                 }
535             };
536             me.state.set(next);
537         }
538     }
539 }
540 
541 impl<F, B, E> Future for H2Stream<F, B>
542 where
543     F: Future<Output = Result<Response<B>, E>>,
544     B: HttpBody,
545     B::Data: 'static,
546     B::Error: Into<Box<dyn StdError + Send + Sync>>,
547     E: Into<Box<dyn StdError + Send + Sync>>,
548 {
549     type Output = ();
550 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>551     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552         self.poll2(cx).map(|res| {
553             if let Err(e) = res {
554                 debug!("stream error: {}", e);
555             }
556         })
557     }
558 }
559