• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Classic ACL manager
2 
3 use crate::hci::{Address, CommandSender, EventRegistry};
4 use crate::link::acl::core;
5 use bt_common::Bluetooth;
6 use bt_packets::hci::EventChild::{
7     AuthenticationComplete, ConnectionComplete, DisconnectionComplete,
8 };
9 use bt_packets::hci::{
10     AcceptConnectionRequestBuilder, AcceptConnectionRequestRole, ClockOffsetValid,
11     CreateConnectionBuilder, CreateConnectionCancelBuilder, CreateConnectionRoleSwitch,
12     DisconnectBuilder, DisconnectReason, ErrorCode, EventChild, EventCode, EventPacket,
13     PageScanRepetitionMode, RejectConnectionReason, RejectConnectionRequestBuilder, Role,
14 };
15 use bytes::Bytes;
16 use gddi::{module, provides, Stoppable};
17 use log::warn;
18 use std::collections::HashMap;
19 use std::sync::Arc;
20 use tokio::runtime::Runtime;
21 use tokio::select;
22 use tokio::sync::mpsc::{channel, Receiver, Sender};
23 use tokio::sync::{oneshot, Mutex};
24 
25 module! {
26     classic_acl_module,
27     providers {
28         AclManager => provide_acl_manager,
29     },
30 }
31 
32 /// Classic ACL manager
33 #[derive(Clone, Stoppable)]
34 pub struct AclManager {
35     req_tx: Sender<Request>,
36     /// High level events from AclManager
37     pub evt_rx: Arc<Mutex<Receiver<Event>>>,
38 }
39 
40 /// Events generated by AclManager
41 #[derive(Debug)]
42 pub enum Event {
43     /// Connection was successful - provides the newly created connection
44     ConnectSuccess(Connection),
45     /// Locally initialted connection was not successful - indicates address & reason
46     ConnectFail {
47         /// Address of the failed connection
48         addr: Address,
49         /// Reason of the failed connection
50         reason: ErrorCode,
51     },
52 }
53 
54 /// A classic ACL connection
55 #[derive(Debug)]
56 pub struct Connection {
57     #[allow(dead_code)]
58     addr: Address,
59     #[allow(dead_code)]
60     rx: Receiver<Bytes>,
61     #[allow(dead_code)]
62     tx: Sender<Bytes>,
63     #[allow(dead_code)]
64     shared: Arc<Mutex<ConnectionShared>>,
65     requests: Sender<ConnectionRequest>,
66     #[allow(dead_code)]
67     evt_rx: Receiver<ConnectionEvent>,
68 }
69 
70 /// Events generated by Connection
71 #[derive(Debug)]
72 pub enum ConnectionEvent {
73     /// Connection was disconnected with the specified code.
74     Disconnected(ErrorCode),
75     /// Connection authentication was completed
76     AuthenticationComplete,
77 }
78 
79 impl Connection {
80     /// Disconnect the connection with the specified reason.
disconnect(&mut self, reason: DisconnectReason)81     pub async fn disconnect(&mut self, reason: DisconnectReason) {
82         let (tx, rx) = oneshot::channel();
83         self.requests.send(ConnectionRequest::Disconnect { reason, fut: tx }).await.unwrap();
84         rx.await.unwrap()
85     }
86 }
87 
88 #[derive(Debug)]
89 enum ConnectionRequest {
90     Disconnect { reason: DisconnectReason, fut: oneshot::Sender<()> },
91 }
92 
93 struct ConnectionInternal {
94     addr: Address,
95     #[allow(dead_code)]
96     shared: Arc<Mutex<ConnectionShared>>,
97     hci_evt_tx: Sender<EventPacket>,
98 }
99 
100 #[derive(Debug)]
101 struct ConnectionShared {
102     #[allow(dead_code)]
103     role: Role,
104 }
105 
106 impl AclManager {
107     /// Connect to the specified address, or queue it if a connection is already pending
connect(&mut self, addr: Address)108     pub async fn connect(&mut self, addr: Address) {
109         self.req_tx.send(Request::Connect { addr }).await.unwrap();
110     }
111 
112     /// Cancel the connection to the specified address, if it is pending
cancel_connect(&mut self, addr: Address)113     pub async fn cancel_connect(&mut self, addr: Address) {
114         let (tx, rx) = oneshot::channel();
115         self.req_tx.send(Request::CancelConnect { addr, fut: tx }).await.unwrap();
116         rx.await.unwrap();
117     }
118 }
119 
120 #[derive(Debug)]
121 enum Request {
122     Connect { addr: Address },
123     CancelConnect { addr: Address, fut: oneshot::Sender<()> },
124 }
125 
126 #[derive(Eq, PartialEq)]
127 enum PendingConnect {
128     Outgoing(Address),
129     Incoming(Address),
130     None,
131 }
132 
133 impl PendingConnect {
take(&mut self) -> Self134     fn take(&mut self) -> Self {
135         std::mem::replace(self, PendingConnect::None)
136     }
137 }
138 
139 #[provides]
provide_acl_manager( mut hci: CommandSender, mut events: EventRegistry, mut dispatch: core::AclDispatch, rt: Arc<Runtime>, ) -> AclManager140 async fn provide_acl_manager(
141     mut hci: CommandSender,
142     mut events: EventRegistry,
143     mut dispatch: core::AclDispatch,
144     rt: Arc<Runtime>,
145 ) -> AclManager {
146     let (req_tx, mut req_rx) = channel::<Request>(10);
147     let (conn_evt_tx, conn_evt_rx) = channel::<Event>(10);
148     let local_rt = rt.clone();
149 
150     local_rt.spawn(async move {
151         let connections: Arc<Mutex<HashMap<u16, ConnectionInternal>>> = Arc::new(Mutex::new(HashMap::new()));
152         let mut connect_queue: Vec<Address> = Vec::new();
153         let mut pending = PendingConnect::None;
154 
155         let (evt_tx, mut evt_rx) = channel(3);
156         events.register(EventCode::ConnectionComplete, evt_tx.clone()).await;
157         events.register(EventCode::ConnectionRequest, evt_tx.clone()).await;
158         events.register(EventCode::AuthenticationComplete, evt_tx).await;
159 
160         loop {
161             select! {
162                 Some(req) = req_rx.recv() => {
163                     match req {
164                         Request::Connect { addr } => {
165                             if connections.lock().await.values().any(|c| c.addr == addr) {
166                                 warn!("already connected: {}", addr);
167                                 return;
168                             }
169                             if let PendingConnect::None = pending {
170                                 pending = PendingConnect::Outgoing(addr);
171                                 hci.send(build_create_connection(addr)).await;
172                             } else {
173                                 connect_queue.insert(0, addr);
174                             }
175                         },
176                         Request::CancelConnect { addr, fut } => {
177                             connect_queue.retain(|p| *p != addr);
178                             if pending == PendingConnect::Outgoing(addr) {
179                                 hci.send(CreateConnectionCancelBuilder { bd_addr: addr }).await;
180                             }
181                             fut.send(()).unwrap();
182                         }
183                     }
184                 }
185                 Some(evt) = evt_rx.recv() => {
186                     match evt.specialize() {
187                         ConnectionComplete(evt) => {
188                             let addr = evt.get_bd_addr();
189                             let status = evt.get_status();
190                             let handle = evt.get_connection_handle();
191                             let role = match pending.take() {
192                                 PendingConnect::Outgoing(a) if a == addr => Role::Central,
193                                 PendingConnect::Incoming(a) if a == addr => Role::Peripheral,
194                                 _ => panic!("No prior connection request for {}", addr),
195                             };
196 
197                             match status {
198                                 ErrorCode::Success => {
199                                     let mut core_conn = dispatch.register(handle, Bluetooth::Classic).await;
200                                     let shared = Arc::new(Mutex::new(ConnectionShared { role }));
201                                     let (evt_tx, evt_rx) = channel(10);
202                                     let (req_tx, req_rx) = channel(10);
203                                     let connection = Connection {
204                                         addr,
205                                         shared: shared.clone(),
206                                         rx: core_conn.rx.take().unwrap(),
207                                         tx: core_conn.tx.take().unwrap(),
208                                         requests: req_tx,
209                                         evt_rx,
210                                     };
211                                     let connection_internal = ConnectionInternal {
212                                         addr,
213                                         shared,
214                                         hci_evt_tx: core_conn.evt_tx.clone(),
215                                     };
216 
217                                     assert!(connections.lock().await.insert(handle, connection_internal).is_none());
218                                     rt.spawn(run_connection(handle, evt_tx, req_rx, core_conn, connections.clone(), hci.clone()));
219                                     conn_evt_tx.send(Event::ConnectSuccess(connection)).await.unwrap();
220                                 },
221                                 _ => conn_evt_tx.send(Event::ConnectFail { addr, reason: status }).await.unwrap(),
222                             }
223                         },
224                         EventChild::ConnectionRequest(evt) => {
225                             let addr = evt.get_bd_addr();
226                             pending = PendingConnect::Incoming(addr);
227                             if connections.lock().await.values().any(|c| c.addr == addr) {
228                                 hci.send(RejectConnectionRequestBuilder {
229                                     bd_addr: addr,
230                                     reason: RejectConnectionReason::UnacceptableBdAddr
231                                 }).await;
232                             } else {
233                                 hci.send(AcceptConnectionRequestBuilder {
234                                     bd_addr: addr,
235                                     role: AcceptConnectionRequestRole::BecomeCentral
236                                 }).await;
237                             }
238                         },
239                         AuthenticationComplete(e) => dispatch_to(e.get_connection_handle(), &connections, evt).await,
240                         _ => unimplemented!(),
241                     }
242                 }
243             }
244         }
245     });
246 
247     AclManager { req_tx, evt_rx: Arc::new(Mutex::new(conn_evt_rx)) }
248 }
249 
build_create_connection(bd_addr: Address) -> CreateConnectionBuilder250 fn build_create_connection(bd_addr: Address) -> CreateConnectionBuilder {
251     CreateConnectionBuilder {
252         bd_addr,
253         packet_type: 0x4408 /* DM 1,3,5 */ | 0x8810, /*DH 1,3,5 */
254         page_scan_repetition_mode: PageScanRepetitionMode::R1,
255         clock_offset: 0,
256         clock_offset_valid: ClockOffsetValid::Invalid,
257         allow_role_switch: CreateConnectionRoleSwitch::AllowRoleSwitch,
258     }
259 }
260 
dispatch_to( handle: u16, connections: &Arc<Mutex<HashMap<u16, ConnectionInternal>>>, event: EventPacket, )261 async fn dispatch_to(
262     handle: u16,
263     connections: &Arc<Mutex<HashMap<u16, ConnectionInternal>>>,
264     event: EventPacket,
265 ) {
266     if let Some(c) = connections.lock().await.get_mut(&handle) {
267         c.hci_evt_tx.send(event).await.unwrap();
268     }
269 }
270 
run_connection( handle: u16, evt_tx: Sender<ConnectionEvent>, mut req_rx: Receiver<ConnectionRequest>, mut core: core::Connection, connections: Arc<Mutex<HashMap<u16, ConnectionInternal>>>, mut hci: CommandSender, )271 async fn run_connection(
272     handle: u16,
273     evt_tx: Sender<ConnectionEvent>,
274     mut req_rx: Receiver<ConnectionRequest>,
275     mut core: core::Connection,
276     connections: Arc<Mutex<HashMap<u16, ConnectionInternal>>>,
277     mut hci: CommandSender,
278 ) {
279     loop {
280         select! {
281             Some(evt) = core.evt_rx.recv() => {
282                 match evt.specialize() {
283                     DisconnectionComplete(evt) => {
284                         connections.lock().await.remove(&handle);
285                         evt_tx.send(ConnectionEvent::Disconnected(evt.get_reason())).await.unwrap();
286                         return; // At this point, there is nothing more to run on the connection.
287                     },
288                     AuthenticationComplete(_) => evt_tx.send(ConnectionEvent::AuthenticationComplete).await.unwrap(),
289                     _ => unimplemented!(),
290                 }
291             },
292             Some(req) = req_rx.recv() => {
293                 match req {
294                     ConnectionRequest::Disconnect{reason, fut} => {
295                         hci.send(DisconnectBuilder { connection_handle: handle, reason }).await;
296                         fut.send(()).unwrap();
297                     }
298                 }
299             },
300         }
301     }
302 }
303