• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! HTTP/2 Server Connections
2 
3 use std::error::Error as StdError;
4 use std::fmt;
5 use std::future::Future;
6 use std::marker::Unpin;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 use std::time::Duration;
10 
11 use pin_project_lite::pin_project;
12 use tokio::io::{AsyncRead, AsyncWrite};
13 
14 use crate::body::{Body as IncomingBody, HttpBody as Body};
15 use crate::common::exec::ConnStreamExec;
16 use crate::proto;
17 use crate::service::HttpService;
18 
19 pin_project! {
20     /// A future binding an HTTP/2 connection with a Service.
21     ///
22     /// Polling this future will drive HTTP forward.
23     #[must_use = "futures do nothing unless polled"]
24     pub struct Connection<T, S, E>
25     where
26         S: HttpService<IncomingBody>,
27     {
28         conn: proto::h2::Server<T, S, S::ResBody, E>,
29     }
30 }
31 
32 /// A configuration builder for HTTP/2 server connections.
33 #[derive(Clone, Debug)]
34 pub struct Builder<E> {
35     exec: E,
36     h2_builder: proto::h2::server::Config,
37 }
38 
39 // ===== impl Connection =====
40 
41 impl<I, S, E> fmt::Debug for Connection<I, S, E>
42 where
43     S: HttpService<IncomingBody>,
44 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result45     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46         f.debug_struct("Connection").finish()
47     }
48 }
49 
50 impl<I, B, S, E> Connection<I, S, E>
51 where
52     S: HttpService<IncomingBody, ResBody = B>,
53     S::Error: Into<Box<dyn StdError + Send + Sync>>,
54     I: AsyncRead + AsyncWrite + Unpin,
55     B: Body + 'static,
56     B::Error: Into<Box<dyn StdError + Send + Sync>>,
57     E: ConnStreamExec<S::Future, B>,
58 {
59     /// Start a graceful shutdown process for this connection.
60     ///
61     /// This `Connection` should continue to be polled until shutdown
62     /// can finish.
63     ///
64     /// # Note
65     ///
66     /// This should only be called while the `Connection` future is still
67     /// pending. If called after `Connection::poll` has resolved, this does
68     /// nothing.
graceful_shutdown(mut self: Pin<&mut Self>)69     pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
70         self.conn.graceful_shutdown();
71     }
72 }
73 
74 impl<I, B, S, E> Future for Connection<I, S, E>
75 where
76     S: HttpService<IncomingBody, ResBody = B>,
77     S::Error: Into<Box<dyn StdError + Send + Sync>>,
78     I: AsyncRead + AsyncWrite + Unpin + 'static,
79     B: Body + 'static,
80     B::Error: Into<Box<dyn StdError + Send + Sync>>,
81     E: ConnStreamExec<S::Future, B>,
82 {
83     type Output = crate::Result<()>;
84 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>85     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86         match ready!(Pin::new(&mut self.conn).poll(cx)) {
87             Ok(_done) => {
88                 //TODO: the proto::h2::Server no longer needs to return
89                 //the Dispatched enum
90                 Poll::Ready(Ok(()))
91             }
92             Err(e) => Poll::Ready(Err(e)),
93         }
94     }
95 }
96 
97 // ===== impl Builder =====
98 
99 impl<E> Builder<E> {
100     /// Create a new connection builder.
101     ///
102     /// This starts with the default options, and an executor.
new(exec: E) -> Self103     pub fn new(exec: E) -> Self {
104         Self {
105             exec: exec,
106             h2_builder: Default::default(),
107         }
108     }
109 
110     /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
111     /// stream-level flow control.
112     ///
113     /// Passing `None` will do nothing.
114     ///
115     /// If not set, hyper will use a default.
116     ///
117     /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self118     pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
119         if let Some(sz) = sz.into() {
120             self.h2_builder.adaptive_window = false;
121             self.h2_builder.initial_stream_window_size = sz;
122         }
123         self
124     }
125 
126     /// Sets the max connection-level flow control for HTTP2.
127     ///
128     /// Passing `None` will do nothing.
129     ///
130     /// If not set, hyper will use a default.
initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self131     pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
132         if let Some(sz) = sz.into() {
133             self.h2_builder.adaptive_window = false;
134             self.h2_builder.initial_conn_window_size = sz;
135         }
136         self
137     }
138 
139     /// Sets whether to use an adaptive flow control.
140     ///
141     /// Enabling this will override the limits set in
142     /// `initial_stream_window_size` and
143     /// `initial_connection_window_size`.
adaptive_window(&mut self, enabled: bool) -> &mut Self144     pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
145         use proto::h2::SPEC_WINDOW_SIZE;
146 
147         self.h2_builder.adaptive_window = enabled;
148         if enabled {
149             self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
150             self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
151         }
152         self
153     }
154 
155     /// Sets the maximum frame size to use for HTTP2.
156     ///
157     /// Passing `None` will do nothing.
158     ///
159     /// If not set, hyper will use a default.
max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self160     pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
161         if let Some(sz) = sz.into() {
162             self.h2_builder.max_frame_size = sz;
163         }
164         self
165     }
166 
167     /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
168     /// connections.
169     ///
170     /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
171     ///
172     /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self173     pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
174         self.h2_builder.max_concurrent_streams = max.into();
175         self
176     }
177 
178     /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
179     ///
180     /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
181     /// As of v0.3.17, it is 20.
182     ///
183     /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self184     pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
185         self.h2_builder.max_pending_accept_reset_streams = max.into();
186         self
187     }
188 
189     /// Sets an interval for HTTP2 Ping frames should be sent to keep a
190     /// connection alive.
191     ///
192     /// Pass `None` to disable HTTP2 keep-alive.
193     ///
194     /// Default is currently disabled.
195     ///
196     /// # Cargo Feature
197     ///
keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self198     pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
199         self.h2_builder.keep_alive_interval = interval.into();
200         self
201     }
202 
203     /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
204     ///
205     /// If the ping is not acknowledged within the timeout, the connection will
206     /// be closed. Does nothing if `keep_alive_interval` is disabled.
207     ///
208     /// Default is 20 seconds.
209     ///
210     /// # Cargo Feature
211     ///
keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self212     pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
213         self.h2_builder.keep_alive_timeout = timeout;
214         self
215     }
216 
217     /// Set the maximum write buffer size for each HTTP/2 stream.
218     ///
219     /// Default is currently ~400KB, but may change.
220     ///
221     /// # Panics
222     ///
223     /// The value must be no larger than `u32::MAX`.
max_send_buf_size(&mut self, max: usize) -> &mut Self224     pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
225         assert!(max <= std::u32::MAX as usize);
226         self.h2_builder.max_send_buffer_size = max;
227         self
228     }
229 
230     /// Enables the [extended CONNECT protocol].
231     ///
232     /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
enable_connect_protocol(&mut self) -> &mut Self233     pub fn enable_connect_protocol(&mut self) -> &mut Self {
234         self.h2_builder.enable_connect_protocol = true;
235         self
236     }
237 
238     /// Sets the max size of received header frames.
239     ///
240     /// Default is currently ~16MB, but may change.
max_header_list_size(&mut self, max: u32) -> &mut Self241     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
242         self.h2_builder.max_header_list_size = max;
243         self
244     }
245 
246     // /// Set the timer used in background tasks.
247     // pub fn timer<M>(&mut self, timer: M) -> &mut Self
248     // where
249     //     M: Timer + Send + Sync + 'static,
250     // {
251     //     self.timer = Time::Timer(Arc::new(timer));
252     //     self
253     // }
254 
255     /// Bind a connection together with a [`Service`](crate::service::Service).
256     ///
257     /// This returns a Future that must be polled in order for HTTP to be
258     /// driven on the connection.
serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E> where S: HttpService<IncomingBody, ResBody = Bd>, S::Error: Into<Box<dyn StdError + Send + Sync>>, Bd: Body + 'static, Bd::Error: Into<Box<dyn StdError + Send + Sync>>, I: AsyncRead + AsyncWrite + Unpin, E: ConnStreamExec<S::Future, Bd>,259     pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
260     where
261         S: HttpService<IncomingBody, ResBody = Bd>,
262         S::Error: Into<Box<dyn StdError + Send + Sync>>,
263         Bd: Body + 'static,
264         Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
265         I: AsyncRead + AsyncWrite + Unpin,
266         E: ConnStreamExec<S::Future, Bd>,
267     {
268         let proto = proto::h2::Server::new(io, service, &self.h2_builder, self.exec.clone());
269         Connection { conn: proto }
270     }
271 }
272