• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 *      http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 //! Defines a backing task to keep a HTTP/3 connection running
17 
18 use crate::boot_time;
19 use crate::boot_time::BootTime;
20 use log::{debug, warn};
21 use quiche::h3;
22 use std::collections::HashMap;
23 use std::default::Default;
24 use std::future;
25 use std::io;
26 use std::pin::Pin;
27 use thiserror::Error;
28 use tokio::net::UdpSocket;
29 use tokio::select;
30 use tokio::sync::{mpsc, oneshot, watch};
31 
32 use super::Status;
33 
34 #[derive(Error, Debug)]
35 pub enum Error {
36     #[error("network IO error: {0}")]
37     Network(#[from] io::Error),
38     #[error("QUIC error: {0}")]
39     Quic(#[from] quiche::Error),
40     #[error("HTTP/3 error: {0}")]
41     H3(#[from] h3::Error),
42     #[error("Response delivery error: {0}")]
43     StreamSend(#[from] mpsc::error::SendError<Stream>),
44     #[error("Connection closed")]
45     Closed,
46 }
47 
48 pub type Result<T> = std::result::Result<T, Error>;
49 
50 #[derive(Debug)]
51 /// HTTP/3 Request to be sent on the connection
52 pub struct Request {
53     /// Request headers
54     pub headers: Vec<h3::Header>,
55     /// Expiry time for the request, relative to `CLOCK_BOOTTIME`
56     pub expiry: Option<BootTime>,
57     /// Channel to send the response to
58     pub response_tx: oneshot::Sender<Stream>,
59 }
60 
61 #[derive(Debug)]
62 /// HTTP/3 Response
63 pub struct Stream {
64     /// Response headers
65     pub headers: Vec<h3::Header>,
66     /// Response body
67     pub data: Vec<u8>,
68     /// Error code if stream was reset
69     pub error: Option<u64>,
70 }
71 
72 impl Stream {
new(headers: Vec<h3::Header>) -> Self73     fn new(headers: Vec<h3::Header>) -> Self {
74         Self { headers, data: Vec::new(), error: None }
75     }
76 }
77 
78 const MAX_UDP_PACKET_SIZE: usize = 65536;
79 
80 struct Driver {
81     request_rx: mpsc::Receiver<Request>,
82     status_tx: watch::Sender<Status>,
83     quiche_conn: Pin<Box<quiche::Connection>>,
84     socket: UdpSocket,
85     // This buffer is large, boxing it will keep it
86     // off the stack and prevent it being copied during
87     // moves of the driver.
88     buffer: Box<[u8; MAX_UDP_PACKET_SIZE]>,
89     net_id: u32,
90     // Used to check if the connection has entered closing or draining state. A connection can
91     // enter closing state if the sender of request_rx's channel has been dropped.
92     // Note that we can't check if a receiver is dead without potentially receiving a message, and
93     // if we poll on a dead receiver in a select! it will immediately return None. As a result, we
94     // need this to gate whether or not to include .recv() in our select!
95     closing: bool,
96 }
97 
98 struct H3Driver {
99     driver: Driver,
100     // h3_conn sometimes can't "fit" a request in its available windows.
101     // This value holds a peeked request in that case, waiting for
102     // transmission to become possible.
103     buffered_request: Option<Request>,
104     h3_conn: h3::Connection,
105     requests: HashMap<u64, Request>,
106     streams: HashMap<u64, Stream>,
107 }
108 
optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32)109 async fn optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32) {
110     debug!("optional_timeout: timeout={:?}, network {}", timeout, net_id);
111     match timeout {
112         Some(timeout) => boot_time::sleep(timeout).await,
113         None => future::pending().await,
114     }
115 }
116 
117 /// Creates a future which when polled will handle events related to a HTTP/3 connection.
118 /// The returned error code will explain why the connection terminated.
drive( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, net_id: u32, ) -> Result<()>119 pub async fn drive(
120     request_rx: mpsc::Receiver<Request>,
121     status_tx: watch::Sender<Status>,
122     quiche_conn: Pin<Box<quiche::Connection>>,
123     socket: UdpSocket,
124     net_id: u32,
125 ) -> Result<()> {
126     Driver::new(request_rx, status_tx, quiche_conn, socket, net_id).drive().await
127 }
128 
129 impl Driver {
new( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, net_id: u32, ) -> Self130     fn new(
131         request_rx: mpsc::Receiver<Request>,
132         status_tx: watch::Sender<Status>,
133         quiche_conn: Pin<Box<quiche::Connection>>,
134         socket: UdpSocket,
135         net_id: u32,
136     ) -> Self {
137         Self {
138             request_rx,
139             status_tx,
140             quiche_conn,
141             socket,
142             buffer: Box::new([0; MAX_UDP_PACKET_SIZE]),
143             net_id,
144             closing: false,
145         }
146     }
147 
drive(mut self) -> Result<()>148     async fn drive(mut self) -> Result<()> {
149         // Prime connection
150         self.flush_tx().await?;
151         loop {
152             self = self.drive_once().await?
153         }
154     }
155 
handle_closed(&self) -> Result<()>156     fn handle_closed(&self) -> Result<()> {
157         if self.quiche_conn.is_closed() {
158             // TODO: Also log local_error() once Quiche 0.10.0 is available.
159             debug!(
160                 "Connection {} closed on network {}, peer_error={:x?}",
161                 self.quiche_conn.trace_id(),
162                 self.net_id,
163                 self.quiche_conn.peer_error()
164             );
165             // We don't care if the receiver has hung up
166             let _ = self.status_tx.send(Status::Dead { session: self.quiche_conn.session() });
167             Err(Error::Closed)
168         } else {
169             Ok(())
170         }
171     }
172 
handle_draining(&mut self)173     fn handle_draining(&mut self) {
174         if self.quiche_conn.is_draining() && !self.closing {
175             // TODO: Also log local_error() once Quiche 0.10.0 is available.
176             debug!(
177                 "Connection {} is draining on network {}, peer_error={:x?}",
178                 self.quiche_conn.trace_id(),
179                 self.net_id,
180                 self.quiche_conn.peer_error()
181             );
182             // We don't care if the receiver has hung up
183             let _ = self.status_tx.send(Status::Dead { session: self.quiche_conn.session() });
184 
185             self.request_rx.close();
186             // Drain the pending DNS requests from the queue to make their corresponding future
187             // tasks return some error quickly rather than timeout. However, the DNS requests
188             // that has been sent will still time out.
189             // TODO: re-issue the outstanding DNS requests, such as passing H3Driver.requests
190             // along with Status::Dead to the `Network` that can re-issue the DNS requests.
191             while self.request_rx.try_recv().is_ok() {}
192             self.closing = true;
193         }
194     }
195 
drive_once(mut self) -> Result<Self>196     async fn drive_once(mut self) -> Result<Self> {
197         let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id);
198         select! {
199             // If a quiche timer would fire, call their callback
200             _ = timer => {
201                 debug!("Driver: Timer expired on network {}", self.net_id);
202                 self.quiche_conn.on_timeout()
203             }
204             // If we got packets from our peer, pass them to quiche
205             Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => {
206                 self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from })?;
207                 debug!("Received {} bytes on network {}", size, self.net_id);
208             }
209         };
210         // Any of the actions in the select could require us to send packets to the peer
211         self.flush_tx().await?;
212 
213         // If the QUIC connection is live, but the HTTP/3 is not, try to bring it up
214         if self.quiche_conn.is_established() {
215             debug!(
216                 "Connection {} established on network {}",
217                 self.quiche_conn.trace_id(),
218                 self.net_id
219             );
220             let h3_config = h3::Config::new()?;
221             let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?;
222             self = H3Driver::new(self, h3_conn).drive().await?;
223             let _ = self.status_tx.send(Status::QUIC);
224         }
225 
226         // If the connection has entered draining state (the server is closing the connection),
227         // tell the status watcher not to use the connection. Besides, per Quiche document,
228         // the connection should not be dropped until is_closed() returns true.
229         // This tokio task will become unowned and get dropped when is_closed() returns true.
230         self.handle_draining();
231 
232         // If the connection has closed, tear down
233         self.handle_closed()?;
234 
235         Ok(self)
236     }
237 
flush_tx(&mut self) -> Result<()>238     async fn flush_tx(&mut self) -> Result<()> {
239         let send_buf = self.buffer.as_mut();
240         loop {
241             match self.quiche_conn.send(send_buf) {
242                 Err(quiche::Error::Done) => return Ok(()),
243                 Err(e) => return Err(e.into()),
244                 Ok((valid_len, send_info)) => {
245                     self.socket.send_to(&send_buf[..valid_len], send_info.to).await?;
246                     debug!("Sent {} bytes on network {}", valid_len, self.net_id);
247                 }
248             }
249         }
250     }
251 }
252 
253 impl H3Driver {
new(driver: Driver, h3_conn: h3::Connection) -> Self254     fn new(driver: Driver, h3_conn: h3::Connection) -> Self {
255         Self {
256             driver,
257             h3_conn,
258             requests: HashMap::new(),
259             streams: HashMap::new(),
260             buffered_request: None,
261         }
262     }
263 
drive(mut self) -> Result<Driver>264     async fn drive(mut self) -> Result<Driver> {
265         let _ = self.driver.status_tx.send(Status::H3);
266         loop {
267             if let Err(e) = self.drive_once().await {
268                 let _ = self
269                     .driver
270                     .status_tx
271                     .send(Status::Dead { session: self.driver.quiche_conn.session() });
272                 return Err(e)
273             }
274         }
275     }
276 
drive_once(&mut self) -> Result<()>277     async fn drive_once(&mut self) -> Result<()> {
278         // We can't call self.driver.drive_once at the same time as
279         // self.driver.request_rx.recv() due to ownership
280         let timer = optional_timeout(self.driver.quiche_conn.timeout(), self.driver.net_id);
281         // If we've buffered a request (due to the connection being full)
282         // try to resend that first
283         if let Some(request) = self.buffered_request.take() {
284             self.handle_request(request)?;
285         }
286         select! {
287             // Only attempt to enqueue new requests if we have no buffered request and aren't
288             // closing
289             msg = self.driver.request_rx.recv(), if !self.driver.closing && self.buffered_request.is_none() => {
290                 match msg {
291                     Some(request) => self.handle_request(request)?,
292                     None => self.shutdown(true, b"DONE").await?,
293                 }
294             },
295             // If a quiche timer would fire, call their callback
296             _ = timer => {
297                 debug!("H3Driver: Timer expired on network {}", self.driver.net_id);
298                 self.driver.quiche_conn.on_timeout()
299             }
300             // If we got packets from our peer, pass them to quiche
301             Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => {
302                 self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?;
303                 debug!("Received {} bytes on network {}", size, self.driver.net_id);
304             }
305         };
306 
307         // Any of the actions in the select could require us to send packets to the peer
308         self.driver.flush_tx().await?;
309 
310         // Process any incoming HTTP/3 events
311         self.flush_h3().await?;
312 
313         // If the connection has entered draining state (the server is closing the connection),
314         // tell the status watcher not to use the connection. Besides, per Quiche document,
315         // the connection should not be dropped until is_closed() returns true.
316         // This tokio task will become unowned and get dropped when is_closed() returns true.
317         self.driver.handle_draining();
318 
319         // If the connection has closed, tear down
320         self.driver.handle_closed()
321     }
322 
handle_request(&mut self, request: Request) -> Result<()>323     fn handle_request(&mut self, request: Request) -> Result<()> {
324         debug!("Handling DNS request on network {}, stats={:?}, peer_streams_left_bidi={}, peer_streams_left_uni={}",
325                 self.driver.net_id, self.driver.quiche_conn.stats(), self.driver.quiche_conn.peer_streams_left_bidi(), self.driver.quiche_conn.peer_streams_left_uni());
326         // If the request has already timed out, don't issue it to the server.
327         if let Some(expiry) = request.expiry {
328             if BootTime::now() > expiry {
329                 warn!("Abandoning expired DNS request");
330                 return Ok(());
331             }
332         }
333         let stream_id =
334             // If h3_conn says the stream is blocked, this error is recoverable just by trying
335             // again once the stream has made progress. Buffer the request for a later retry.
336             match self.h3_conn.send_request(&mut self.driver.quiche_conn, &request.headers, true) {
337                 Err(h3::Error::StreamBlocked) | Err(h3::Error::TransportError(quiche::Error::StreamLimit)) => {
338                     // We only call handle_request on a value that has just come out of
339                     // buffered_request, or when buffered_request is empty. This assert just
340                     // validates that we don't break that assumption later, as it could result in
341                     // requests being dropped on the floor under high load.
342                     debug!("Stream has become blocked, buffering one request.");
343                     assert!(self.buffered_request.is_none());
344                     self.buffered_request = Some(request);
345                     return Ok(())
346                 }
347                 result => result?,
348             };
349         debug!(
350             "Handled DNS request: stream ID {}, network {}, stream_capacity={:?}",
351             stream_id,
352             self.driver.net_id,
353             self.driver.quiche_conn.stream_capacity(stream_id)
354         );
355         self.requests.insert(stream_id, request);
356         Ok(())
357     }
358 
recv_body(&mut self, stream_id: u64) -> Result<()>359     async fn recv_body(&mut self, stream_id: u64) -> Result<()> {
360         const STREAM_READ_CHUNK: usize = 4096;
361         if let Some(stream) = self.streams.get_mut(&stream_id) {
362             loop {
363                 let base_len = stream.data.len();
364                 stream.data.resize(base_len + STREAM_READ_CHUNK, 0);
365                 match self.h3_conn.recv_body(
366                     &mut self.driver.quiche_conn,
367                     stream_id,
368                     &mut stream.data[base_len..],
369                 ) {
370                     Err(h3::Error::Done) => {
371                         stream.data.truncate(base_len);
372                         return Ok(());
373                     }
374                     Err(e) => {
375                         debug!("recv_body: Error={:?}", e);
376                         stream.data.truncate(base_len);
377                         return Err(e.into());
378                     }
379                     Ok(recvd) => {
380                         stream.data.truncate(base_len + recvd);
381                         debug!(
382                             "Got {} bytes of response data from stream ID {} on network {}",
383                             recvd, stream_id, self.driver.net_id
384                         );
385                     }
386                 }
387             }
388         } else {
389             warn!("Received body for untracked stream ID {}", stream_id);
390         }
391         Ok(())
392     }
393 
discard_datagram(&mut self, _flow_id: u64) -> Result<()>394     fn discard_datagram(&mut self, _flow_id: u64) -> Result<()> {
395         loop {
396             match self.h3_conn.recv_dgram(&mut self.driver.quiche_conn, self.driver.buffer.as_mut())
397             {
398                 Err(h3::Error::Done) => return Ok(()),
399                 Err(e) => return Err(e.into()),
400                 _ => (),
401             }
402         }
403     }
404 
flush_h3(&mut self) -> Result<()>405     async fn flush_h3(&mut self) -> Result<()> {
406         loop {
407             match self.h3_conn.poll(&mut self.driver.quiche_conn) {
408                 Err(h3::Error::Done) => return Ok(()),
409                 Err(e) => return Err(e.into()),
410                 Ok((stream_id, event)) => self.process_h3_event(stream_id, event).await?,
411             }
412         }
413     }
414 
process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()>415     async fn process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()> {
416         if !self.requests.contains_key(&stream_id) {
417             warn!("Received event {:?} for stream_id {} without a request.", event, stream_id);
418         }
419         match event {
420             h3::Event::Headers { list, has_body } => {
421                 debug!(
422                     "process_h3_event: h3::Event::Headers on stream ID {}, network {}",
423                     stream_id, self.driver.net_id
424                 );
425                 let stream = Stream::new(list);
426                 if self.streams.insert(stream_id, stream).is_some() {
427                     warn!("Re-using stream ID {} before it was completed.", stream_id)
428                 }
429                 if !has_body {
430                     self.respond(stream_id);
431                 }
432             }
433             h3::Event::Data => {
434                 debug!(
435                     "process_h3_event: h3::Event::Data on stream ID {}, network {}",
436                     stream_id, self.driver.net_id
437                 );
438                 self.recv_body(stream_id).await?;
439             }
440             h3::Event::Finished => {
441                 debug!(
442                     "process_h3_event: h3::Event::Finished on stream ID {}, network {}",
443                     stream_id, self.driver.net_id
444                 );
445                 self.respond(stream_id)
446             }
447             // This clause is for quiche 0.10.x, we're still on 0.9.x
448             //h3::Event::Reset(e) => {
449             //    self.streams.get_mut(&stream_id).map(|stream| stream.error = Some(e));
450             //    self.respond(stream_id);
451             //}
452             h3::Event::Datagram => {
453                 warn!("Unexpected Datagram received");
454                 // We don't care if something went wrong with the datagram, we didn't
455                 // want it anyways.
456                 let _ = self.discard_datagram(stream_id);
457             }
458             h3::Event::GoAway => self.shutdown(false, b"SERVER GOAWAY").await?,
459         }
460         Ok(())
461     }
462 
shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()>463     async fn shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()> {
464         debug!(
465             "Closing connection {} on network {} with msg {:?}",
466             self.driver.quiche_conn.trace_id(),
467             self.driver.net_id,
468             msg
469         );
470         self.driver.request_rx.close();
471         while self.driver.request_rx.recv().await.is_some() {}
472         self.driver.closing = true;
473         if send_goaway {
474             self.h3_conn.send_goaway(&mut self.driver.quiche_conn, 0)?;
475         }
476         if self.driver.quiche_conn.close(true, 0, msg).is_err() {
477             warn!("Trying to close already closed QUIC connection");
478         }
479         Ok(())
480     }
481 
respond(&mut self, stream_id: u64)482     fn respond(&mut self, stream_id: u64) {
483         match (self.streams.remove(&stream_id), self.requests.remove(&stream_id)) {
484             (Some(stream), Some(request)) => {
485                 debug!(
486                     "Sending answer back to resolv, stream ID: {}, network {}",
487                     stream_id, self.driver.net_id
488                 );
489                 // We don't care about the error, because it means the requestor has left.
490                 let _ = request.response_tx.send(stream);
491             }
492             (None, _) => warn!("Tried to deliver untracked stream {}", stream_id),
493             (_, None) => warn!("Tried to deliver stream {} to untracked requestor", stream_id),
494         }
495     }
496 }
497