1 //! HTTP/2 Server Connections 2 3 use std::error::Error as StdError; 4 use std::fmt; 5 use std::future::Future; 6 use std::marker::Unpin; 7 use std::pin::Pin; 8 use std::task::{Context, Poll}; 9 use std::time::Duration; 10 11 use pin_project_lite::pin_project; 12 use tokio::io::{AsyncRead, AsyncWrite}; 13 14 use crate::body::{Body as IncomingBody, HttpBody as Body}; 15 use crate::common::exec::ConnStreamExec; 16 use crate::proto; 17 use crate::service::HttpService; 18 19 pin_project! { 20 /// A future binding an HTTP/2 connection with a Service. 21 /// 22 /// Polling this future will drive HTTP forward. 23 #[must_use = "futures do nothing unless polled"] 24 pub struct Connection<T, S, E> 25 where 26 S: HttpService<IncomingBody>, 27 { 28 conn: proto::h2::Server<T, S, S::ResBody, E>, 29 } 30 } 31 32 /// A configuration builder for HTTP/2 server connections. 33 #[derive(Clone, Debug)] 34 pub struct Builder<E> { 35 exec: E, 36 h2_builder: proto::h2::server::Config, 37 } 38 39 // ===== impl Connection ===== 40 41 impl<I, S, E> fmt::Debug for Connection<I, S, E> 42 where 43 S: HttpService<IncomingBody>, 44 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 46 f.debug_struct("Connection").finish() 47 } 48 } 49 50 impl<I, B, S, E> Connection<I, S, E> 51 where 52 S: HttpService<IncomingBody, ResBody = B>, 53 S::Error: Into<Box<dyn StdError + Send + Sync>>, 54 I: AsyncRead + AsyncWrite + Unpin, 55 B: Body + 'static, 56 B::Error: Into<Box<dyn StdError + Send + Sync>>, 57 E: ConnStreamExec<S::Future, B>, 58 { 59 /// Start a graceful shutdown process for this connection. 60 /// 61 /// This `Connection` should continue to be polled until shutdown 62 /// can finish. 63 /// 64 /// # Note 65 /// 66 /// This should only be called while the `Connection` future is still 67 /// pending. If called after `Connection::poll` has resolved, this does 68 /// nothing. graceful_shutdown(mut self: Pin<&mut Self>)69 pub fn graceful_shutdown(mut self: Pin<&mut Self>) { 70 self.conn.graceful_shutdown(); 71 } 72 } 73 74 impl<I, B, S, E> Future for Connection<I, S, E> 75 where 76 S: HttpService<IncomingBody, ResBody = B>, 77 S::Error: Into<Box<dyn StdError + Send + Sync>>, 78 I: AsyncRead + AsyncWrite + Unpin + 'static, 79 B: Body + 'static, 80 B::Error: Into<Box<dyn StdError + Send + Sync>>, 81 E: ConnStreamExec<S::Future, B>, 82 { 83 type Output = crate::Result<()>; 84 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>85 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 86 match ready!(Pin::new(&mut self.conn).poll(cx)) { 87 Ok(_done) => { 88 //TODO: the proto::h2::Server no longer needs to return 89 //the Dispatched enum 90 Poll::Ready(Ok(())) 91 } 92 Err(e) => Poll::Ready(Err(e)), 93 } 94 } 95 } 96 97 // ===== impl Builder ===== 98 99 impl<E> Builder<E> { 100 /// Create a new connection builder. 101 /// 102 /// This starts with the default options, and an executor. new(exec: E) -> Self103 pub fn new(exec: E) -> Self { 104 Self { 105 exec: exec, 106 h2_builder: Default::default(), 107 } 108 } 109 110 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 111 /// stream-level flow control. 112 /// 113 /// Passing `None` will do nothing. 114 /// 115 /// If not set, hyper will use a default. 116 /// 117 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self118 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { 119 if let Some(sz) = sz.into() { 120 self.h2_builder.adaptive_window = false; 121 self.h2_builder.initial_stream_window_size = sz; 122 } 123 self 124 } 125 126 /// Sets the max connection-level flow control for HTTP2. 127 /// 128 /// Passing `None` will do nothing. 129 /// 130 /// If not set, hyper will use a default. initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self131 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { 132 if let Some(sz) = sz.into() { 133 self.h2_builder.adaptive_window = false; 134 self.h2_builder.initial_conn_window_size = sz; 135 } 136 self 137 } 138 139 /// Sets whether to use an adaptive flow control. 140 /// 141 /// Enabling this will override the limits set in 142 /// `initial_stream_window_size` and 143 /// `initial_connection_window_size`. adaptive_window(&mut self, enabled: bool) -> &mut Self144 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self { 145 use proto::h2::SPEC_WINDOW_SIZE; 146 147 self.h2_builder.adaptive_window = enabled; 148 if enabled { 149 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; 150 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; 151 } 152 self 153 } 154 155 /// Sets the maximum frame size to use for HTTP2. 156 /// 157 /// Passing `None` will do nothing. 158 /// 159 /// If not set, hyper will use a default. max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self160 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { 161 if let Some(sz) = sz.into() { 162 self.h2_builder.max_frame_size = sz; 163 } 164 self 165 } 166 167 /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 168 /// connections. 169 /// 170 /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. 171 /// 172 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self173 pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self { 174 self.h2_builder.max_concurrent_streams = max.into(); 175 self 176 } 177 178 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent. 179 /// 180 /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2). 181 /// As of v0.3.17, it is 20. 182 /// 183 /// See <https://github.com/hyperium/hyper/issues/2877> for more information. max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self184 pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self { 185 self.h2_builder.max_pending_accept_reset_streams = max.into(); 186 self 187 } 188 189 /// Sets an interval for HTTP2 Ping frames should be sent to keep a 190 /// connection alive. 191 /// 192 /// Pass `None` to disable HTTP2 keep-alive. 193 /// 194 /// Default is currently disabled. 195 /// 196 /// # Cargo Feature 197 /// keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self198 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self { 199 self.h2_builder.keep_alive_interval = interval.into(); 200 self 201 } 202 203 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. 204 /// 205 /// If the ping is not acknowledged within the timeout, the connection will 206 /// be closed. Does nothing if `keep_alive_interval` is disabled. 207 /// 208 /// Default is 20 seconds. 209 /// 210 /// # Cargo Feature 211 /// keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self212 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { 213 self.h2_builder.keep_alive_timeout = timeout; 214 self 215 } 216 217 /// Set the maximum write buffer size for each HTTP/2 stream. 218 /// 219 /// Default is currently ~400KB, but may change. 220 /// 221 /// # Panics 222 /// 223 /// The value must be no larger than `u32::MAX`. max_send_buf_size(&mut self, max: usize) -> &mut Self224 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self { 225 assert!(max <= std::u32::MAX as usize); 226 self.h2_builder.max_send_buffer_size = max; 227 self 228 } 229 230 /// Enables the [extended CONNECT protocol]. 231 /// 232 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 enable_connect_protocol(&mut self) -> &mut Self233 pub fn enable_connect_protocol(&mut self) -> &mut Self { 234 self.h2_builder.enable_connect_protocol = true; 235 self 236 } 237 238 /// Sets the max size of received header frames. 239 /// 240 /// Default is currently ~16MB, but may change. max_header_list_size(&mut self, max: u32) -> &mut Self241 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { 242 self.h2_builder.max_header_list_size = max; 243 self 244 } 245 246 // /// Set the timer used in background tasks. 247 // pub fn timer<M>(&mut self, timer: M) -> &mut Self 248 // where 249 // M: Timer + Send + Sync + 'static, 250 // { 251 // self.timer = Time::Timer(Arc::new(timer)); 252 // self 253 // } 254 255 /// Bind a connection together with a [`Service`](crate::service::Service). 256 /// 257 /// This returns a Future that must be polled in order for HTTP to be 258 /// driven on the connection. serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E> where S: HttpService<IncomingBody, ResBody = Bd>, S::Error: Into<Box<dyn StdError + Send + Sync>>, Bd: Body + 'static, Bd::Error: Into<Box<dyn StdError + Send + Sync>>, I: AsyncRead + AsyncWrite + Unpin, E: ConnStreamExec<S::Future, Bd>,259 pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E> 260 where 261 S: HttpService<IncomingBody, ResBody = Bd>, 262 S::Error: Into<Box<dyn StdError + Send + Sync>>, 263 Bd: Body + 'static, 264 Bd::Error: Into<Box<dyn StdError + Send + Sync>>, 265 I: AsyncRead + AsyncWrite + Unpin, 266 E: ConnStreamExec<S::Future, Bd>, 267 { 268 let proto = proto::h2::Server::new(io, service, &self.h2_builder, self.exec.clone()); 269 Connection { conn: proto } 270 } 271 } 272