1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 //! Module providing an async abstraction around a quiche HTTP/3 connection
17
18 use crate::boot_time::BootTime;
19 use crate::network::SocketTagger;
20 use log::{debug, error, warn};
21 use quiche::h3;
22 use std::future::Future;
23 use std::io;
24 use std::net::SocketAddr;
25 use thiserror::Error;
26 use tokio::net::UdpSocket;
27 use tokio::sync::{mpsc, oneshot, watch};
28 use tokio::task;
29
30 mod driver;
31
32 pub use driver::Stream;
33 use driver::{drive, Request};
34
35 #[derive(Debug, Clone)]
36 pub enum Status {
37 QUIC,
38 H3,
39 Dead {
40 /// The session of the closed connection.
41 session: Option<Vec<u8>>,
42 },
43 }
44
45 /// Quiche HTTP/3 connection
46 pub struct Connection {
47 request_tx: mpsc::Sender<Request>,
48 status_rx: watch::Receiver<Status>,
49 }
50
new_scid() -> [u8; quiche::MAX_CONN_ID_LEN]51 fn new_scid() -> [u8; quiche::MAX_CONN_ID_LEN] {
52 use ring::rand::{SecureRandom, SystemRandom};
53 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
54 SystemRandom::new().fill(&mut scid).unwrap();
55 scid
56 }
57
mark_socket(socket: &std::net::UdpSocket, socket_mark: u32) -> io::Result<()>58 fn mark_socket(socket: &std::net::UdpSocket, socket_mark: u32) -> io::Result<()> {
59 use std::os::unix::io::AsRawFd;
60 let fd = socket.as_raw_fd();
61 // libc::setsockopt is a wrapper function calling into bionic setsockopt.
62 // The only pointer being passed in is &socket_mark, which is valid by virtue of being a
63 // reference, and the foreign function doesn't take ownership or a reference to that memory
64 // after completion.
65 if unsafe {
66 libc::setsockopt(
67 fd,
68 libc::SOL_SOCKET,
69 libc::SO_MARK,
70 &socket_mark as *const _ as *const libc::c_void,
71 std::mem::size_of::<u32>() as libc::socklen_t,
72 )
73 } == 0
74 {
75 Ok(())
76 } else {
77 Err(io::Error::last_os_error())
78 }
79 }
80
build_socket( peer_addr: SocketAddr, socket_mark: u32, tag_socket: &SocketTagger, ) -> io::Result<UdpSocket>81 async fn build_socket(
82 peer_addr: SocketAddr,
83 socket_mark: u32,
84 tag_socket: &SocketTagger,
85 ) -> io::Result<UdpSocket> {
86 let bind_addr = match peer_addr {
87 SocketAddr::V4(_) => "0.0.0.0:0",
88 SocketAddr::V6(_) => "[::]:0",
89 };
90
91 let socket = UdpSocket::bind(bind_addr).await?;
92 let std_socket = socket.into_std()?;
93 mark_socket(&std_socket, socket_mark)
94 .unwrap_or_else(|e| error!("Unable to mark socket : {:?}", e));
95 tag_socket(&std_socket).await;
96 let socket = UdpSocket::from_std(std_socket)?;
97 socket.connect(peer_addr).await?;
98 Ok(socket)
99 }
100
101 /// Error type for HTTP/3 connection
102 #[derive(Debug, Error)]
103 pub enum Error {
104 /// QUIC protocol error
105 #[error("QUIC error: {0}")]
106 Quic(#[from] quiche::Error),
107 /// HTTP/3 protocol error
108 #[error("HTTP/3 error: {0}")]
109 H3(#[from] h3::Error),
110 /// Unable to send the request to the driver. This likely means the
111 /// backing task has died.
112 #[error("Unable to send request")]
113 SendRequest(#[from] mpsc::error::SendError<Request>),
114 /// IO failed. This is most likely to occur while trying to set up the
115 /// UDP socket for use by the connection.
116 #[error("IO error: {0}")]
117 Io(#[from] io::Error),
118 /// The request is no longer being serviced. This could mean that the
119 /// request was dropped for an unspecified reason, or that the connection
120 /// was closed prematurely and it can no longer be serviced.
121 #[error("Driver dropped request")]
122 RecvResponse(#[from] oneshot::error::RecvError),
123 }
124
125 /// Common result type for working with a HTTP/3 connection
126 pub type Result<T> = std::result::Result<T, Error>;
127
128 impl Connection {
129 const MAX_PENDING_REQUESTS: usize = 10;
130 /// Create a new connection with a background task handling IO.
new( server_name: Option<&str>, to: SocketAddr, socket_mark: u32, net_id: u32, tag_socket: &SocketTagger, config: &mut quiche::Config, session: Option<Vec<u8>>, ) -> Result<Self>131 pub async fn new(
132 server_name: Option<&str>,
133 to: SocketAddr,
134 socket_mark: u32,
135 net_id: u32,
136 tag_socket: &SocketTagger,
137 config: &mut quiche::Config,
138 session: Option<Vec<u8>>,
139 ) -> Result<Self> {
140 let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS);
141 let (status_tx, status_rx) = watch::channel(Status::QUIC);
142 let scid = new_scid();
143 let mut quiche_conn =
144 quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?;
145 if let Some(session) = session {
146 debug!("Setting session");
147 quiche_conn.set_session(&session)?;
148 }
149
150 let socket = build_socket(to, socket_mark, tag_socket).await?;
151 let driver = async move {
152 let result = drive(request_rx, status_tx, quiche_conn, socket, net_id).await;
153 if let Err(ref e) = result {
154 warn!("Connection driver returns some Err: {:?}", e);
155 }
156 result
157 };
158 task::spawn(driver);
159 Ok(Self { request_tx, status_rx })
160 }
161
162 /// Waits until we're either fully alive or dead
wait_for_live(&mut self) -> bool163 pub async fn wait_for_live(&mut self) -> bool {
164 // Once sc-mainline-prod updates to modern tokio, use
165 // borrow_and_update here.
166 match &*self.status_rx.borrow() {
167 Status::H3 => return true,
168 Status::Dead { .. } => return false,
169 Status::QUIC => (),
170 }
171 if self.status_rx.changed().await.is_err() {
172 // status_tx is gone, we're dead
173 return false;
174 }
175 if matches!(*self.status_rx.borrow(), Status::H3) {
176 return true;
177 }
178 // Since we're stuck on legacy tokio due to mainline, we need to try one more time in case there was an outstanding change notification. Using borrow_and_update avoids this.
179 match self.status_rx.changed().await {
180 // status_tx is gone, we're dead
181 Err(_) => false,
182 // If there's an HTTP/3 connection now we're alive, otherwise we're stuck/dead
183 _ => matches!(*self.status_rx.borrow(), Status::H3),
184 }
185 }
186
session(&self) -> Option<Vec<u8>>187 pub fn session(&self) -> Option<Vec<u8>> {
188 match &*self.status_rx.borrow() {
189 Status::Dead { session } => session.clone(),
190 _ => None,
191 }
192 }
193
194 /// Send a query, produce a future which will provide a response.
195 /// The future is separately returned rather than awaited to allow it to be waited on without
196 /// keeping the `Connection` itself borrowed.
query( &self, headers: Vec<h3::Header>, expiry: Option<BootTime>, ) -> Result<impl Future<Output = Option<Stream>>>197 pub async fn query(
198 &self,
199 headers: Vec<h3::Header>,
200 expiry: Option<BootTime>,
201 ) -> Result<impl Future<Output = Option<Stream>>> {
202 let (response_tx, response_rx) = oneshot::channel();
203 self.request_tx.send(Request { headers, response_tx, expiry }).await?;
204 Ok(async move { response_rx.await.ok() })
205 }
206 }
207