• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 *      http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 //! Module providing an async abstraction around a quiche HTTP/3 connection
17 
18 use crate::boot_time::BootTime;
19 use crate::network::SocketTagger;
20 use log::{debug, error, warn};
21 use quiche::h3;
22 use std::future::Future;
23 use std::io;
24 use std::net::SocketAddr;
25 use thiserror::Error;
26 use tokio::net::UdpSocket;
27 use tokio::sync::{mpsc, oneshot, watch};
28 use tokio::task;
29 
30 mod driver;
31 
32 pub use driver::Stream;
33 use driver::{drive, Request};
34 
35 #[derive(Debug, Clone)]
36 pub enum Status {
37     QUIC,
38     H3,
39     Dead {
40         /// The session of the closed connection.
41         session: Option<Vec<u8>>,
42     },
43 }
44 
45 /// Quiche HTTP/3 connection
46 pub struct Connection {
47     request_tx: mpsc::Sender<Request>,
48     status_rx: watch::Receiver<Status>,
49 }
50 
new_scid() -> [u8; quiche::MAX_CONN_ID_LEN]51 fn new_scid() -> [u8; quiche::MAX_CONN_ID_LEN] {
52     use ring::rand::{SecureRandom, SystemRandom};
53     let mut scid = [0; quiche::MAX_CONN_ID_LEN];
54     SystemRandom::new().fill(&mut scid).unwrap();
55     scid
56 }
57 
mark_socket(socket: &std::net::UdpSocket, socket_mark: u32) -> io::Result<()>58 fn mark_socket(socket: &std::net::UdpSocket, socket_mark: u32) -> io::Result<()> {
59     use std::os::unix::io::AsRawFd;
60     let fd = socket.as_raw_fd();
61     // libc::setsockopt is a wrapper function calling into bionic setsockopt.
62     // The only pointer being passed in is &socket_mark, which is valid by virtue of being a
63     // reference, and the foreign function doesn't take ownership or a reference to that memory
64     // after completion.
65     if unsafe {
66         libc::setsockopt(
67             fd,
68             libc::SOL_SOCKET,
69             libc::SO_MARK,
70             &socket_mark as *const _ as *const libc::c_void,
71             std::mem::size_of::<u32>() as libc::socklen_t,
72         )
73     } == 0
74     {
75         Ok(())
76     } else {
77         Err(io::Error::last_os_error())
78     }
79 }
80 
build_socket( peer_addr: SocketAddr, socket_mark: u32, tag_socket: &SocketTagger, ) -> io::Result<UdpSocket>81 async fn build_socket(
82     peer_addr: SocketAddr,
83     socket_mark: u32,
84     tag_socket: &SocketTagger,
85 ) -> io::Result<UdpSocket> {
86     let bind_addr = match peer_addr {
87         SocketAddr::V4(_) => "0.0.0.0:0",
88         SocketAddr::V6(_) => "[::]:0",
89     };
90 
91     let socket = UdpSocket::bind(bind_addr).await?;
92     let std_socket = socket.into_std()?;
93     mark_socket(&std_socket, socket_mark)
94         .unwrap_or_else(|e| error!("Unable to mark socket : {:?}", e));
95     tag_socket(&std_socket).await;
96     let socket = UdpSocket::from_std(std_socket)?;
97     socket.connect(peer_addr).await?;
98     Ok(socket)
99 }
100 
101 /// Error type for HTTP/3 connection
102 #[derive(Debug, Error)]
103 pub enum Error {
104     /// QUIC protocol error
105     #[error("QUIC error: {0}")]
106     Quic(#[from] quiche::Error),
107     /// HTTP/3 protocol error
108     #[error("HTTP/3 error: {0}")]
109     H3(#[from] h3::Error),
110     /// Unable to send the request to the driver. This likely means the
111     /// backing task has died.
112     #[error("Unable to send request")]
113     SendRequest(#[from] mpsc::error::SendError<Request>),
114     /// IO failed. This is most likely to occur while trying to set up the
115     /// UDP socket for use by the connection.
116     #[error("IO error: {0}")]
117     Io(#[from] io::Error),
118     /// The request is no longer being serviced. This could mean that the
119     /// request was dropped for an unspecified reason, or that the connection
120     /// was closed prematurely and it can no longer be serviced.
121     #[error("Driver dropped request")]
122     RecvResponse(#[from] oneshot::error::RecvError),
123 }
124 
125 /// Common result type for working with a HTTP/3 connection
126 pub type Result<T> = std::result::Result<T, Error>;
127 
128 impl Connection {
129     const MAX_PENDING_REQUESTS: usize = 10;
130     /// Create a new connection with a background task handling IO.
new( server_name: Option<&str>, to: SocketAddr, socket_mark: u32, net_id: u32, tag_socket: &SocketTagger, config: &mut quiche::Config, session: Option<Vec<u8>>, ) -> Result<Self>131     pub async fn new(
132         server_name: Option<&str>,
133         to: SocketAddr,
134         socket_mark: u32,
135         net_id: u32,
136         tag_socket: &SocketTagger,
137         config: &mut quiche::Config,
138         session: Option<Vec<u8>>,
139     ) -> Result<Self> {
140         let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS);
141         let (status_tx, status_rx) = watch::channel(Status::QUIC);
142         let scid = new_scid();
143         let mut quiche_conn =
144             quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?;
145         if let Some(session) = session {
146             debug!("Setting session");
147             quiche_conn.set_session(&session)?;
148         }
149 
150         let socket = build_socket(to, socket_mark, tag_socket).await?;
151         let driver = async move {
152             let result = drive(request_rx, status_tx, quiche_conn, socket, net_id).await;
153             if let Err(ref e) = result {
154                 warn!("Connection driver returns some Err: {:?}", e);
155             }
156             result
157         };
158         task::spawn(driver);
159         Ok(Self { request_tx, status_rx })
160     }
161 
162     /// Waits until we're either fully alive or dead
wait_for_live(&mut self) -> bool163     pub async fn wait_for_live(&mut self) -> bool {
164         // Once sc-mainline-prod updates to modern tokio, use
165         // borrow_and_update here.
166         match &*self.status_rx.borrow() {
167             Status::H3 => return true,
168             Status::Dead { .. } => return false,
169             Status::QUIC => (),
170         }
171         if self.status_rx.changed().await.is_err() {
172             // status_tx is gone, we're dead
173             return false;
174         }
175         if matches!(*self.status_rx.borrow(), Status::H3) {
176             return true;
177         }
178         // Since we're stuck on legacy tokio due to mainline, we need to try one more time in case there was an outstanding change notification. Using borrow_and_update avoids this.
179         match self.status_rx.changed().await {
180             // status_tx is gone, we're dead
181             Err(_) => false,
182             // If there's an HTTP/3 connection now we're alive, otherwise we're stuck/dead
183             _ => matches!(*self.status_rx.borrow(), Status::H3),
184         }
185     }
186 
session(&self) -> Option<Vec<u8>>187     pub fn session(&self) -> Option<Vec<u8>> {
188         match &*self.status_rx.borrow() {
189             Status::Dead { session } => session.clone(),
190             _ => None,
191         }
192     }
193 
194     /// Send a query, produce a future which will provide a response.
195     /// The future is separately returned rather than awaited to allow it to be waited on without
196     /// keeping the `Connection` itself borrowed.
query( &self, headers: Vec<h3::Header>, expiry: Option<BootTime>, ) -> Result<impl Future<Output = Option<Stream>>>197     pub async fn query(
198         &self,
199         headers: Vec<h3::Header>,
200         expiry: Option<BootTime>,
201     ) -> Result<impl Future<Output = Option<Stream>>> {
202         let (response_tx, response_rx) = oneshot::channel();
203         self.request_tx.send(Request { headers, response_tx, expiry }).await?;
204         Ok(async move { response_rx.await.ok() })
205     }
206 }
207