1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 //! Defines a backing task to keep a HTTP/3 connection running
17
18 use crate::boot_time;
19 use crate::boot_time::BootTime;
20 use log::{debug, warn};
21 use quiche::h3;
22 use std::collections::HashMap;
23 use std::default::Default;
24 use std::future;
25 use std::io;
26 use std::pin::Pin;
27 use thiserror::Error;
28 use tokio::net::UdpSocket;
29 use tokio::select;
30 use tokio::sync::{mpsc, oneshot, watch};
31
32 use super::Status;
33
34 #[derive(Error, Debug)]
35 pub enum Error {
36 #[error("network IO error: {0}")]
37 Network(#[from] io::Error),
38 #[error("QUIC error: {0}")]
39 Quic(#[from] quiche::Error),
40 #[error("HTTP/3 error: {0}")]
41 H3(#[from] h3::Error),
42 #[error("Response delivery error: {0}")]
43 StreamSend(#[from] mpsc::error::SendError<Stream>),
44 #[error("Connection closed")]
45 Closed,
46 }
47
48 pub type Result<T> = std::result::Result<T, Error>;
49
50 #[derive(Debug)]
51 /// HTTP/3 Request to be sent on the connection
52 pub struct Request {
53 /// Request headers
54 pub headers: Vec<h3::Header>,
55 /// Expiry time for the request, relative to `CLOCK_BOOTTIME`
56 pub expiry: Option<BootTime>,
57 /// Channel to send the response to
58 pub response_tx: oneshot::Sender<Stream>,
59 }
60
61 #[derive(Debug)]
62 /// HTTP/3 Response
63 pub struct Stream {
64 /// Response headers
65 pub headers: Vec<h3::Header>,
66 /// Response body
67 pub data: Vec<u8>,
68 /// Error code if stream was reset
69 pub error: Option<u64>,
70 }
71
72 impl Stream {
new(headers: Vec<h3::Header>) -> Self73 fn new(headers: Vec<h3::Header>) -> Self {
74 Self { headers, data: Vec::new(), error: None }
75 }
76 }
77
78 const MAX_UDP_PACKET_SIZE: usize = 65536;
79
80 struct Driver {
81 request_rx: mpsc::Receiver<Request>,
82 status_tx: watch::Sender<Status>,
83 quiche_conn: Pin<Box<quiche::Connection>>,
84 socket: UdpSocket,
85 // This buffer is large, boxing it will keep it
86 // off the stack and prevent it being copied during
87 // moves of the driver.
88 buffer: Box<[u8; MAX_UDP_PACKET_SIZE]>,
89 net_id: u32,
90 // Used to check if the connection has entered closing or draining state. A connection can
91 // enter closing state if the sender of request_rx's channel has been dropped.
92 // Note that we can't check if a receiver is dead without potentially receiving a message, and
93 // if we poll on a dead receiver in a select! it will immediately return None. As a result, we
94 // need this to gate whether or not to include .recv() in our select!
95 closing: bool,
96 }
97
98 struct H3Driver {
99 driver: Driver,
100 // h3_conn sometimes can't "fit" a request in its available windows.
101 // This value holds a peeked request in that case, waiting for
102 // transmission to become possible.
103 buffered_request: Option<Request>,
104 h3_conn: h3::Connection,
105 requests: HashMap<u64, Request>,
106 streams: HashMap<u64, Stream>,
107 }
108
optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32)109 async fn optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32) {
110 debug!("optional_timeout: timeout={:?}, network {}", timeout, net_id);
111 match timeout {
112 Some(timeout) => boot_time::sleep(timeout).await,
113 None => future::pending().await,
114 }
115 }
116
117 /// Creates a future which when polled will handle events related to a HTTP/3 connection.
118 /// The returned error code will explain why the connection terminated.
drive( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, net_id: u32, ) -> Result<()>119 pub async fn drive(
120 request_rx: mpsc::Receiver<Request>,
121 status_tx: watch::Sender<Status>,
122 quiche_conn: Pin<Box<quiche::Connection>>,
123 socket: UdpSocket,
124 net_id: u32,
125 ) -> Result<()> {
126 Driver::new(request_rx, status_tx, quiche_conn, socket, net_id).drive().await
127 }
128
129 impl Driver {
new( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, net_id: u32, ) -> Self130 fn new(
131 request_rx: mpsc::Receiver<Request>,
132 status_tx: watch::Sender<Status>,
133 quiche_conn: Pin<Box<quiche::Connection>>,
134 socket: UdpSocket,
135 net_id: u32,
136 ) -> Self {
137 Self {
138 request_rx,
139 status_tx,
140 quiche_conn,
141 socket,
142 buffer: Box::new([0; MAX_UDP_PACKET_SIZE]),
143 net_id,
144 closing: false,
145 }
146 }
147
drive(mut self) -> Result<()>148 async fn drive(mut self) -> Result<()> {
149 // Prime connection
150 self.flush_tx().await?;
151 loop {
152 self = self.drive_once().await?
153 }
154 }
155
handle_closed(&self) -> Result<()>156 fn handle_closed(&self) -> Result<()> {
157 if self.quiche_conn.is_closed() {
158 // TODO: Also log local_error() once Quiche 0.10.0 is available.
159 debug!(
160 "Connection {} closed on network {}, peer_error={:x?}",
161 self.quiche_conn.trace_id(),
162 self.net_id,
163 self.quiche_conn.peer_error()
164 );
165 // We don't care if the receiver has hung up
166 let _ = self.status_tx.send(Status::Dead { session: self.quiche_conn.session() });
167 Err(Error::Closed)
168 } else {
169 Ok(())
170 }
171 }
172
handle_draining(&mut self)173 fn handle_draining(&mut self) {
174 if self.quiche_conn.is_draining() && !self.closing {
175 // TODO: Also log local_error() once Quiche 0.10.0 is available.
176 debug!(
177 "Connection {} is draining on network {}, peer_error={:x?}",
178 self.quiche_conn.trace_id(),
179 self.net_id,
180 self.quiche_conn.peer_error()
181 );
182 // We don't care if the receiver has hung up
183 let _ = self.status_tx.send(Status::Dead { session: self.quiche_conn.session() });
184
185 self.request_rx.close();
186 // Drain the pending DNS requests from the queue to make their corresponding future
187 // tasks return some error quickly rather than timeout. However, the DNS requests
188 // that has been sent will still time out.
189 // TODO: re-issue the outstanding DNS requests, such as passing H3Driver.requests
190 // along with Status::Dead to the `Network` that can re-issue the DNS requests.
191 while self.request_rx.try_recv().is_ok() {}
192 self.closing = true;
193 }
194 }
195
drive_once(mut self) -> Result<Self>196 async fn drive_once(mut self) -> Result<Self> {
197 let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id);
198 select! {
199 // If a quiche timer would fire, call their callback
200 _ = timer => {
201 debug!("Driver: Timer expired on network {}", self.net_id);
202 self.quiche_conn.on_timeout()
203 }
204 // If we got packets from our peer, pass them to quiche
205 Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => {
206 self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from })?;
207 debug!("Received {} bytes on network {}", size, self.net_id);
208 }
209 };
210 // Any of the actions in the select could require us to send packets to the peer
211 self.flush_tx().await?;
212
213 // If the QUIC connection is live, but the HTTP/3 is not, try to bring it up
214 if self.quiche_conn.is_established() {
215 debug!(
216 "Connection {} established on network {}",
217 self.quiche_conn.trace_id(),
218 self.net_id
219 );
220 let h3_config = h3::Config::new()?;
221 let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?;
222 self = H3Driver::new(self, h3_conn).drive().await?;
223 let _ = self.status_tx.send(Status::QUIC);
224 }
225
226 // If the connection has entered draining state (the server is closing the connection),
227 // tell the status watcher not to use the connection. Besides, per Quiche document,
228 // the connection should not be dropped until is_closed() returns true.
229 // This tokio task will become unowned and get dropped when is_closed() returns true.
230 self.handle_draining();
231
232 // If the connection has closed, tear down
233 self.handle_closed()?;
234
235 Ok(self)
236 }
237
flush_tx(&mut self) -> Result<()>238 async fn flush_tx(&mut self) -> Result<()> {
239 let send_buf = self.buffer.as_mut();
240 loop {
241 match self.quiche_conn.send(send_buf) {
242 Err(quiche::Error::Done) => return Ok(()),
243 Err(e) => return Err(e.into()),
244 Ok((valid_len, send_info)) => {
245 self.socket.send_to(&send_buf[..valid_len], send_info.to).await?;
246 debug!("Sent {} bytes on network {}", valid_len, self.net_id);
247 }
248 }
249 }
250 }
251 }
252
253 impl H3Driver {
new(driver: Driver, h3_conn: h3::Connection) -> Self254 fn new(driver: Driver, h3_conn: h3::Connection) -> Self {
255 Self {
256 driver,
257 h3_conn,
258 requests: HashMap::new(),
259 streams: HashMap::new(),
260 buffered_request: None,
261 }
262 }
263
drive(mut self) -> Result<Driver>264 async fn drive(mut self) -> Result<Driver> {
265 let _ = self.driver.status_tx.send(Status::H3);
266 loop {
267 if let Err(e) = self.drive_once().await {
268 let _ = self
269 .driver
270 .status_tx
271 .send(Status::Dead { session: self.driver.quiche_conn.session() });
272 return Err(e)
273 }
274 }
275 }
276
drive_once(&mut self) -> Result<()>277 async fn drive_once(&mut self) -> Result<()> {
278 // We can't call self.driver.drive_once at the same time as
279 // self.driver.request_rx.recv() due to ownership
280 let timer = optional_timeout(self.driver.quiche_conn.timeout(), self.driver.net_id);
281 // If we've buffered a request (due to the connection being full)
282 // try to resend that first
283 if let Some(request) = self.buffered_request.take() {
284 self.handle_request(request)?;
285 }
286 select! {
287 // Only attempt to enqueue new requests if we have no buffered request and aren't
288 // closing
289 msg = self.driver.request_rx.recv(), if !self.driver.closing && self.buffered_request.is_none() => {
290 match msg {
291 Some(request) => self.handle_request(request)?,
292 None => self.shutdown(true, b"DONE").await?,
293 }
294 },
295 // If a quiche timer would fire, call their callback
296 _ = timer => {
297 debug!("H3Driver: Timer expired on network {}", self.driver.net_id);
298 self.driver.quiche_conn.on_timeout()
299 }
300 // If we got packets from our peer, pass them to quiche
301 Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => {
302 self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?;
303 debug!("Received {} bytes on network {}", size, self.driver.net_id);
304 }
305 };
306
307 // Any of the actions in the select could require us to send packets to the peer
308 self.driver.flush_tx().await?;
309
310 // Process any incoming HTTP/3 events
311 self.flush_h3().await?;
312
313 // If the connection has entered draining state (the server is closing the connection),
314 // tell the status watcher not to use the connection. Besides, per Quiche document,
315 // the connection should not be dropped until is_closed() returns true.
316 // This tokio task will become unowned and get dropped when is_closed() returns true.
317 self.driver.handle_draining();
318
319 // If the connection has closed, tear down
320 self.driver.handle_closed()
321 }
322
handle_request(&mut self, request: Request) -> Result<()>323 fn handle_request(&mut self, request: Request) -> Result<()> {
324 debug!("Handling DNS request on network {}, stats={:?}, peer_streams_left_bidi={}, peer_streams_left_uni={}",
325 self.driver.net_id, self.driver.quiche_conn.stats(), self.driver.quiche_conn.peer_streams_left_bidi(), self.driver.quiche_conn.peer_streams_left_uni());
326 // If the request has already timed out, don't issue it to the server.
327 if let Some(expiry) = request.expiry {
328 if BootTime::now() > expiry {
329 warn!("Abandoning expired DNS request");
330 return Ok(());
331 }
332 }
333 let stream_id =
334 // If h3_conn says the stream is blocked, this error is recoverable just by trying
335 // again once the stream has made progress. Buffer the request for a later retry.
336 match self.h3_conn.send_request(&mut self.driver.quiche_conn, &request.headers, true) {
337 Err(h3::Error::StreamBlocked) | Err(h3::Error::TransportError(quiche::Error::StreamLimit)) => {
338 // We only call handle_request on a value that has just come out of
339 // buffered_request, or when buffered_request is empty. This assert just
340 // validates that we don't break that assumption later, as it could result in
341 // requests being dropped on the floor under high load.
342 debug!("Stream has become blocked, buffering one request.");
343 assert!(self.buffered_request.is_none());
344 self.buffered_request = Some(request);
345 return Ok(())
346 }
347 result => result?,
348 };
349 debug!(
350 "Handled DNS request: stream ID {}, network {}, stream_capacity={:?}",
351 stream_id,
352 self.driver.net_id,
353 self.driver.quiche_conn.stream_capacity(stream_id)
354 );
355 self.requests.insert(stream_id, request);
356 Ok(())
357 }
358
recv_body(&mut self, stream_id: u64) -> Result<()>359 async fn recv_body(&mut self, stream_id: u64) -> Result<()> {
360 const STREAM_READ_CHUNK: usize = 4096;
361 if let Some(stream) = self.streams.get_mut(&stream_id) {
362 loop {
363 let base_len = stream.data.len();
364 stream.data.resize(base_len + STREAM_READ_CHUNK, 0);
365 match self.h3_conn.recv_body(
366 &mut self.driver.quiche_conn,
367 stream_id,
368 &mut stream.data[base_len..],
369 ) {
370 Err(h3::Error::Done) => {
371 stream.data.truncate(base_len);
372 return Ok(());
373 }
374 Err(e) => {
375 debug!("recv_body: Error={:?}", e);
376 stream.data.truncate(base_len);
377 return Err(e.into());
378 }
379 Ok(recvd) => {
380 stream.data.truncate(base_len + recvd);
381 debug!(
382 "Got {} bytes of response data from stream ID {} on network {}",
383 recvd, stream_id, self.driver.net_id
384 );
385 }
386 }
387 }
388 } else {
389 warn!("Received body for untracked stream ID {}", stream_id);
390 }
391 Ok(())
392 }
393
discard_datagram(&mut self, _flow_id: u64) -> Result<()>394 fn discard_datagram(&mut self, _flow_id: u64) -> Result<()> {
395 loop {
396 match self.h3_conn.recv_dgram(&mut self.driver.quiche_conn, self.driver.buffer.as_mut())
397 {
398 Err(h3::Error::Done) => return Ok(()),
399 Err(e) => return Err(e.into()),
400 _ => (),
401 }
402 }
403 }
404
flush_h3(&mut self) -> Result<()>405 async fn flush_h3(&mut self) -> Result<()> {
406 loop {
407 match self.h3_conn.poll(&mut self.driver.quiche_conn) {
408 Err(h3::Error::Done) => return Ok(()),
409 Err(e) => return Err(e.into()),
410 Ok((stream_id, event)) => self.process_h3_event(stream_id, event).await?,
411 }
412 }
413 }
414
process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()>415 async fn process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()> {
416 if !self.requests.contains_key(&stream_id) {
417 warn!("Received event {:?} for stream_id {} without a request.", event, stream_id);
418 }
419 match event {
420 h3::Event::Headers { list, has_body } => {
421 debug!(
422 "process_h3_event: h3::Event::Headers on stream ID {}, network {}",
423 stream_id, self.driver.net_id
424 );
425 let stream = Stream::new(list);
426 if self.streams.insert(stream_id, stream).is_some() {
427 warn!("Re-using stream ID {} before it was completed.", stream_id)
428 }
429 if !has_body {
430 self.respond(stream_id);
431 }
432 }
433 h3::Event::Data => {
434 debug!(
435 "process_h3_event: h3::Event::Data on stream ID {}, network {}",
436 stream_id, self.driver.net_id
437 );
438 self.recv_body(stream_id).await?;
439 }
440 h3::Event::Finished => {
441 debug!(
442 "process_h3_event: h3::Event::Finished on stream ID {}, network {}",
443 stream_id, self.driver.net_id
444 );
445 self.respond(stream_id)
446 }
447 // This clause is for quiche 0.10.x, we're still on 0.9.x
448 //h3::Event::Reset(e) => {
449 // self.streams.get_mut(&stream_id).map(|stream| stream.error = Some(e));
450 // self.respond(stream_id);
451 //}
452 h3::Event::Datagram => {
453 warn!("Unexpected Datagram received");
454 // We don't care if something went wrong with the datagram, we didn't
455 // want it anyways.
456 let _ = self.discard_datagram(stream_id);
457 }
458 h3::Event::GoAway => self.shutdown(false, b"SERVER GOAWAY").await?,
459 }
460 Ok(())
461 }
462
shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()>463 async fn shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()> {
464 debug!(
465 "Closing connection {} on network {} with msg {:?}",
466 self.driver.quiche_conn.trace_id(),
467 self.driver.net_id,
468 msg
469 );
470 self.driver.request_rx.close();
471 while self.driver.request_rx.recv().await.is_some() {}
472 self.driver.closing = true;
473 if send_goaway {
474 self.h3_conn.send_goaway(&mut self.driver.quiche_conn, 0)?;
475 }
476 if self.driver.quiche_conn.close(true, 0, msg).is_err() {
477 warn!("Trying to close already closed QUIC connection");
478 }
479 Ok(())
480 }
481
respond(&mut self, stream_id: u64)482 fn respond(&mut self, stream_id: u64) {
483 match (self.streams.remove(&stream_id), self.requests.remove(&stream_id)) {
484 (Some(stream), Some(request)) => {
485 debug!(
486 "Sending answer back to resolv, stream ID: {}, network {}",
487 stream_id, self.driver.net_id
488 );
489 // We don't care about the error, because it means the requestor has left.
490 let _ = request.response_tx.send(stream);
491 }
492 (None, _) => warn!("Tried to deliver untracked stream {}", stream_id),
493 (_, None) => warn!("Tried to deliver stream {} to untracked requestor", stream_id),
494 }
495 }
496 }
497