1 use std::error::Error as StdError; 2 use std::future::Future; 3 use std::marker::Unpin; 4 use std::pin::Pin; 5 use std::task::{Context, Poll}; 6 #[cfg(feature = "runtime")] 7 use std::time::Duration; 8 9 use bytes::Bytes; 10 use h2::server::{Connection, Handshake, SendResponse}; 11 use h2::{Reason, RecvStream}; 12 use http::{Method, Request}; 13 use pin_project_lite::pin_project; 14 use tokio::io::{AsyncRead, AsyncWrite}; 15 use tracing::{debug, trace, warn}; 16 17 use super::{ping, PipeToSendStream, SendBuf}; 18 use crate::body::HttpBody; 19 use crate::common::date; 20 use crate::common::exec::ConnStreamExec; 21 use crate::ext::Protocol; 22 use crate::headers; 23 use crate::proto::h2::ping::Recorder; 24 use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; 25 use crate::proto::Dispatched; 26 use crate::service::HttpService; 27 28 use crate::upgrade::{OnUpgrade, Pending, Upgraded}; 29 use crate::{Body, Response}; 30 31 // Our defaults are chosen for the "majority" case, which usually are not 32 // resource constrained, and so the spec default of 64kb can be too limiting 33 // for performance. 34 // 35 // At the same time, a server more often has multiple clients connected, and 36 // so is more likely to use more resources than a client would. 37 const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb 38 const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb 39 const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb 40 const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb 41 const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20; // 16 MB "sane default" taken from golang http2 42 const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024; 43 44 #[derive(Clone, Debug)] 45 pub(crate) struct Config { 46 pub(crate) adaptive_window: bool, 47 pub(crate) initial_conn_window_size: u32, 48 pub(crate) initial_stream_window_size: u32, 49 pub(crate) max_frame_size: u32, 50 pub(crate) enable_connect_protocol: bool, 51 pub(crate) max_concurrent_streams: Option<u32>, 52 pub(crate) max_pending_accept_reset_streams: Option<usize>, 53 pub(crate) max_local_error_reset_streams: Option<usize>, 54 #[cfg(feature = "runtime")] 55 pub(crate) keep_alive_interval: Option<Duration>, 56 #[cfg(feature = "runtime")] 57 pub(crate) keep_alive_timeout: Duration, 58 pub(crate) max_send_buffer_size: usize, 59 pub(crate) max_header_list_size: u32, 60 } 61 62 impl Default for Config { default() -> Config63 fn default() -> Config { 64 Config { 65 adaptive_window: false, 66 initial_conn_window_size: DEFAULT_CONN_WINDOW, 67 initial_stream_window_size: DEFAULT_STREAM_WINDOW, 68 max_frame_size: DEFAULT_MAX_FRAME_SIZE, 69 enable_connect_protocol: false, 70 max_concurrent_streams: None, 71 max_pending_accept_reset_streams: None, 72 max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS), 73 #[cfg(feature = "runtime")] 74 keep_alive_interval: None, 75 #[cfg(feature = "runtime")] 76 keep_alive_timeout: Duration::from_secs(20), 77 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, 78 max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, 79 } 80 } 81 } 82 83 pin_project! { 84 pub(crate) struct Server<T, S, B, E> 85 where 86 S: HttpService<Body>, 87 B: HttpBody, 88 { 89 exec: E, 90 service: S, 91 state: State<T, B>, 92 } 93 } 94 95 enum State<T, B> 96 where 97 B: HttpBody, 98 { 99 Handshaking { 100 ping_config: ping::Config, 101 hs: Handshake<T, SendBuf<B::Data>>, 102 }, 103 Serving(Serving<T, B>), 104 Closed, 105 } 106 107 struct Serving<T, B> 108 where 109 B: HttpBody, 110 { 111 ping: Option<(ping::Recorder, ping::Ponger)>, 112 conn: Connection<T, SendBuf<B::Data>>, 113 closing: Option<crate::Error>, 114 } 115 116 impl<T, S, B, E> Server<T, S, B, E> 117 where 118 T: AsyncRead + AsyncWrite + Unpin, 119 S: HttpService<Body, ResBody = B>, 120 S::Error: Into<Box<dyn StdError + Send + Sync>>, 121 B: HttpBody + 'static, 122 E: ConnStreamExec<S::Future, B>, 123 { new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E>124 pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> { 125 let mut builder = h2::server::Builder::default(); 126 builder 127 .initial_window_size(config.initial_stream_window_size) 128 .initial_connection_window_size(config.initial_conn_window_size) 129 .max_frame_size(config.max_frame_size) 130 .max_header_list_size(config.max_header_list_size) 131 .max_local_error_reset_streams(config.max_local_error_reset_streams) 132 .max_send_buffer_size(config.max_send_buffer_size); 133 if let Some(max) = config.max_concurrent_streams { 134 builder.max_concurrent_streams(max); 135 } 136 if let Some(max) = config.max_pending_accept_reset_streams { 137 builder.max_pending_accept_reset_streams(max); 138 } 139 if config.enable_connect_protocol { 140 builder.enable_connect_protocol(); 141 } 142 let handshake = builder.handshake(io); 143 144 let bdp = if config.adaptive_window { 145 Some(config.initial_stream_window_size) 146 } else { 147 None 148 }; 149 150 let ping_config = ping::Config { 151 bdp_initial_window: bdp, 152 #[cfg(feature = "runtime")] 153 keep_alive_interval: config.keep_alive_interval, 154 #[cfg(feature = "runtime")] 155 keep_alive_timeout: config.keep_alive_timeout, 156 // If keep-alive is enabled for servers, always enabled while 157 // idle, so it can more aggressively close dead connections. 158 #[cfg(feature = "runtime")] 159 keep_alive_while_idle: true, 160 }; 161 162 Server { 163 exec, 164 state: State::Handshaking { 165 ping_config, 166 hs: handshake, 167 }, 168 service, 169 } 170 } 171 graceful_shutdown(&mut self)172 pub(crate) fn graceful_shutdown(&mut self) { 173 trace!("graceful_shutdown"); 174 match self.state { 175 State::Handshaking { .. } => { 176 // fall-through, to replace state with Closed 177 } 178 State::Serving(ref mut srv) => { 179 if srv.closing.is_none() { 180 srv.conn.graceful_shutdown(); 181 } 182 return; 183 } 184 State::Closed => { 185 return; 186 } 187 } 188 self.state = State::Closed; 189 } 190 } 191 192 impl<T, S, B, E> Future for Server<T, S, B, E> 193 where 194 T: AsyncRead + AsyncWrite + Unpin, 195 S: HttpService<Body, ResBody = B>, 196 S::Error: Into<Box<dyn StdError + Send + Sync>>, 197 B: HttpBody + 'static, 198 E: ConnStreamExec<S::Future, B>, 199 { 200 type Output = crate::Result<Dispatched>; 201 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>202 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 203 let me = &mut *self; 204 loop { 205 let next = match me.state { 206 State::Handshaking { 207 ref mut hs, 208 ref ping_config, 209 } => { 210 let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?; 211 let ping = if ping_config.is_enabled() { 212 let pp = conn.ping_pong().expect("conn.ping_pong"); 213 Some(ping::channel(pp, ping_config.clone())) 214 } else { 215 None 216 }; 217 State::Serving(Serving { 218 ping, 219 conn, 220 closing: None, 221 }) 222 } 223 State::Serving(ref mut srv) => { 224 ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?; 225 return Poll::Ready(Ok(Dispatched::Shutdown)); 226 } 227 State::Closed => { 228 // graceful_shutdown was called before handshaking finished, 229 // nothing to do here... 230 return Poll::Ready(Ok(Dispatched::Shutdown)); 231 } 232 }; 233 me.state = next; 234 } 235 } 236 } 237 238 impl<T, B> Serving<T, B> 239 where 240 T: AsyncRead + AsyncWrite + Unpin, 241 B: HttpBody + 'static, 242 { poll_server<S, E>( &mut self, cx: &mut Context<'_>, service: &mut S, exec: &mut E, ) -> Poll<crate::Result<()>> where S: HttpService<Body, ResBody = B>, S::Error: Into<Box<dyn StdError + Send + Sync>>, E: ConnStreamExec<S::Future, B>,243 fn poll_server<S, E>( 244 &mut self, 245 cx: &mut Context<'_>, 246 service: &mut S, 247 exec: &mut E, 248 ) -> Poll<crate::Result<()>> 249 where 250 S: HttpService<Body, ResBody = B>, 251 S::Error: Into<Box<dyn StdError + Send + Sync>>, 252 E: ConnStreamExec<S::Future, B>, 253 { 254 if self.closing.is_none() { 255 loop { 256 self.poll_ping(cx); 257 258 // Check that the service is ready to accept a new request. 259 // 260 // - If not, just drive the connection some. 261 // - If ready, try to accept a new request from the connection. 262 match service.poll_ready(cx) { 263 Poll::Ready(Ok(())) => (), 264 Poll::Pending => { 265 // use `poll_closed` instead of `poll_accept`, 266 // in order to avoid accepting a request. 267 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; 268 trace!("incoming connection complete"); 269 return Poll::Ready(Ok(())); 270 } 271 Poll::Ready(Err(err)) => { 272 let err = crate::Error::new_user_service(err); 273 debug!("service closed: {}", err); 274 275 let reason = err.h2_reason(); 276 if reason == Reason::NO_ERROR { 277 // NO_ERROR is only used for graceful shutdowns... 278 trace!("interpreting NO_ERROR user error as graceful_shutdown"); 279 self.conn.graceful_shutdown(); 280 } else { 281 trace!("abruptly shutting down with {:?}", reason); 282 self.conn.abrupt_shutdown(reason); 283 } 284 self.closing = Some(err); 285 break; 286 } 287 } 288 289 // When the service is ready, accepts an incoming request. 290 match ready!(self.conn.poll_accept(cx)) { 291 Some(Ok((req, mut respond))) => { 292 trace!("incoming request"); 293 let content_length = headers::content_length_parse_all(req.headers()); 294 let ping = self 295 .ping 296 .as_ref() 297 .map(|ping| ping.0.clone()) 298 .unwrap_or_else(ping::disabled); 299 300 // Record the headers received 301 ping.record_non_data(); 302 303 let is_connect = req.method() == Method::CONNECT; 304 let (mut parts, stream) = req.into_parts(); 305 let (mut req, connect_parts) = if !is_connect { 306 ( 307 Request::from_parts( 308 parts, 309 crate::Body::h2(stream, content_length.into(), ping), 310 ), 311 None, 312 ) 313 } else { 314 if content_length.map_or(false, |len| len != 0) { 315 warn!("h2 connect request with non-zero body not supported"); 316 respond.send_reset(h2::Reason::INTERNAL_ERROR); 317 return Poll::Ready(Ok(())); 318 } 319 let (pending, upgrade) = crate::upgrade::pending(); 320 debug_assert!(parts.extensions.get::<OnUpgrade>().is_none()); 321 parts.extensions.insert(upgrade); 322 ( 323 Request::from_parts(parts, crate::Body::empty()), 324 Some(ConnectParts { 325 pending, 326 ping, 327 recv_stream: stream, 328 }), 329 ) 330 }; 331 332 if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() { 333 req.extensions_mut().insert(Protocol::from_inner(protocol)); 334 } 335 336 let fut = H2Stream::new(service.call(req), connect_parts, respond); 337 exec.execute_h2stream(fut); 338 } 339 Some(Err(e)) => { 340 return Poll::Ready(Err(crate::Error::new_h2(e))); 341 } 342 None => { 343 // no more incoming streams... 344 if let Some((ref ping, _)) = self.ping { 345 ping.ensure_not_timed_out()?; 346 } 347 348 trace!("incoming connection complete"); 349 return Poll::Ready(Ok(())); 350 } 351 } 352 } 353 } 354 355 debug_assert!( 356 self.closing.is_some(), 357 "poll_server broke loop without closing" 358 ); 359 360 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; 361 362 Poll::Ready(Err(self.closing.take().expect("polled after error"))) 363 } 364 poll_ping(&mut self, cx: &mut Context<'_>)365 fn poll_ping(&mut self, cx: &mut Context<'_>) { 366 if let Some((_, ref mut estimator)) = self.ping { 367 match estimator.poll(cx) { 368 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { 369 self.conn.set_target_window_size(wnd); 370 let _ = self.conn.set_initial_window_size(wnd); 371 } 372 #[cfg(feature = "runtime")] 373 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { 374 debug!("keep-alive timed out, closing connection"); 375 self.conn.abrupt_shutdown(h2::Reason::NO_ERROR); 376 } 377 Poll::Pending => {} 378 } 379 } 380 } 381 } 382 383 pin_project! { 384 #[allow(missing_debug_implementations)] 385 pub struct H2Stream<F, B> 386 where 387 B: HttpBody, 388 { 389 reply: SendResponse<SendBuf<B::Data>>, 390 #[pin] 391 state: H2StreamState<F, B>, 392 } 393 } 394 395 pin_project! { 396 #[project = H2StreamStateProj] 397 enum H2StreamState<F, B> 398 where 399 B: HttpBody, 400 { 401 Service { 402 #[pin] 403 fut: F, 404 connect_parts: Option<ConnectParts>, 405 }, 406 Body { 407 #[pin] 408 pipe: PipeToSendStream<B>, 409 }, 410 } 411 } 412 413 struct ConnectParts { 414 pending: Pending, 415 ping: Recorder, 416 recv_stream: RecvStream, 417 } 418 419 impl<F, B> H2Stream<F, B> 420 where 421 B: HttpBody, 422 { new( fut: F, connect_parts: Option<ConnectParts>, respond: SendResponse<SendBuf<B::Data>>, ) -> H2Stream<F, B>423 fn new( 424 fut: F, 425 connect_parts: Option<ConnectParts>, 426 respond: SendResponse<SendBuf<B::Data>>, 427 ) -> H2Stream<F, B> { 428 H2Stream { 429 reply: respond, 430 state: H2StreamState::Service { fut, connect_parts }, 431 } 432 } 433 } 434 435 macro_rules! reply { 436 ($me:expr, $res:expr, $eos:expr) => {{ 437 match $me.reply.send_response($res, $eos) { 438 Ok(tx) => tx, 439 Err(e) => { 440 debug!("send response error: {}", e); 441 $me.reply.send_reset(Reason::INTERNAL_ERROR); 442 return Poll::Ready(Err(crate::Error::new_h2(e))); 443 } 444 } 445 }}; 446 } 447 448 impl<F, B, E> H2Stream<F, B> 449 where 450 F: Future<Output = Result<Response<B>, E>>, 451 B: HttpBody, 452 B::Data: 'static, 453 B::Error: Into<Box<dyn StdError + Send + Sync>>, 454 E: Into<Box<dyn StdError + Send + Sync>>, 455 { poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>>456 fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { 457 let mut me = self.project(); 458 loop { 459 let next = match me.state.as_mut().project() { 460 H2StreamStateProj::Service { 461 fut: h, 462 connect_parts, 463 } => { 464 let res = match h.poll(cx) { 465 Poll::Ready(Ok(r)) => r, 466 Poll::Pending => { 467 // Response is not yet ready, so we want to check if the client has sent a 468 // RST_STREAM frame which would cancel the current request. 469 if let Poll::Ready(reason) = 470 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)? 471 { 472 debug!("stream received RST_STREAM: {:?}", reason); 473 return Poll::Ready(Err(crate::Error::new_h2(reason.into()))); 474 } 475 return Poll::Pending; 476 } 477 Poll::Ready(Err(e)) => { 478 let err = crate::Error::new_user_service(e); 479 warn!("http2 service errored: {}", err); 480 me.reply.send_reset(err.h2_reason()); 481 return Poll::Ready(Err(err)); 482 } 483 }; 484 485 let (head, body) = res.into_parts(); 486 let mut res = ::http::Response::from_parts(head, ()); 487 super::strip_connection_headers(res.headers_mut(), false); 488 489 // set Date header if it isn't already set... 490 res.headers_mut() 491 .entry(::http::header::DATE) 492 .or_insert_with(date::update_and_header_value); 493 494 if let Some(connect_parts) = connect_parts.take() { 495 if res.status().is_success() { 496 if headers::content_length_parse_all(res.headers()) 497 .map_or(false, |len| len != 0) 498 { 499 warn!("h2 successful response to CONNECT request with body not supported"); 500 me.reply.send_reset(h2::Reason::INTERNAL_ERROR); 501 return Poll::Ready(Err(crate::Error::new_user_header())); 502 } 503 let send_stream = reply!(me, res, false); 504 connect_parts.pending.fulfill(Upgraded::new( 505 H2Upgraded { 506 ping: connect_parts.ping, 507 recv_stream: connect_parts.recv_stream, 508 send_stream: unsafe { UpgradedSendStream::new(send_stream) }, 509 buf: Bytes::new(), 510 }, 511 Bytes::new(), 512 )); 513 return Poll::Ready(Ok(())); 514 } 515 } 516 517 if !body.is_end_stream() { 518 // automatically set Content-Length from body... 519 if let Some(len) = body.size_hint().exact() { 520 headers::set_content_length_if_missing(res.headers_mut(), len); 521 } 522 523 let body_tx = reply!(me, res, false); 524 H2StreamState::Body { 525 pipe: PipeToSendStream::new(body, body_tx), 526 } 527 } else { 528 reply!(me, res, true); 529 return Poll::Ready(Ok(())); 530 } 531 } 532 H2StreamStateProj::Body { pipe } => { 533 return pipe.poll(cx); 534 } 535 }; 536 me.state.set(next); 537 } 538 } 539 } 540 541 impl<F, B, E> Future for H2Stream<F, B> 542 where 543 F: Future<Output = Result<Response<B>, E>>, 544 B: HttpBody, 545 B::Data: 'static, 546 B::Error: Into<Box<dyn StdError + Send + Sync>>, 547 E: Into<Box<dyn StdError + Send + Sync>>, 548 { 549 type Output = (); 550 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>551 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 552 self.poll2(cx).map(|res| { 553 if let Err(e) = res { 554 debug!("stream error: {}", e); 555 } 556 }) 557 } 558 } 559