• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Classic ACL manager
2 
3 use crate::acl::core;
4 use bt_common::Bluetooth;
5 use bt_hci::{Address, CommandSender, EventRegistry};
6 use bt_packets::hci::EventChild::{
7     AuthenticationComplete, ConnectionComplete, DisconnectionComplete,
8 };
9 use bt_packets::hci::{
10     AcceptConnectionRequestBuilder, AcceptConnectionRequestRole, ClockOffsetValid,
11     CreateConnectionBuilder, CreateConnectionCancelBuilder, CreateConnectionRoleSwitch,
12     DisconnectBuilder, DisconnectReason, ErrorCode, EventChild, EventCode, EventPacket,
13     PageScanRepetitionMode, RejectConnectionReason, RejectConnectionRequestBuilder, Role,
14 };
15 use bytes::Bytes;
16 use gddi::{module, provides, Stoppable};
17 use log::warn;
18 use std::collections::HashMap;
19 use std::sync::Arc;
20 use tokio::runtime::Runtime;
21 use tokio::select;
22 use tokio::sync::mpsc::{channel, Receiver, Sender};
23 use tokio::sync::{oneshot, Mutex};
24 
25 module! {
26     classic_acl_module,
27     providers {
28         AclManager => provide_acl_manager,
29     },
30 }
31 
32 /// Classic ACL manager
33 #[derive(Clone, Stoppable)]
34 pub struct AclManager {
35     req_tx: Sender<Request>,
36     /// High level events from AclManager
37     pub evt_rx: Arc<Mutex<Receiver<Event>>>,
38 }
39 
40 /// Events generated by AclManager
41 #[derive(Debug)]
42 pub enum Event {
43     /// Connection was successful - provides the newly created connection
44     ConnectSuccess(Connection),
45     /// Locally initialted connection was not successful - indicates address & reason
46     ConnectFail {
47         /// Address of the failed connection
48         addr: Address,
49         /// Reason of the failed connection
50         reason: ErrorCode,
51     },
52 }
53 
54 /// A classic ACL connection
55 #[derive(Debug)]
56 pub struct Connection {
57     addr: Address,
58     rx: Receiver<Bytes>,
59     tx: Sender<Bytes>,
60     shared: Arc<Mutex<ConnectionShared>>,
61     requests: Sender<ConnectionRequest>,
62     evt_rx: Receiver<ConnectionEvent>,
63 }
64 
65 /// Events generated by Connection
66 #[derive(Debug)]
67 pub enum ConnectionEvent {
68     /// Connection was disconnected with the specified code.
69     Disconnected(ErrorCode),
70     /// Connection authentication was completed
71     AuthenticationComplete,
72 }
73 
74 impl Connection {
75     /// Disconnect the connection with the specified reason.
disconnect(&mut self, reason: DisconnectReason)76     pub async fn disconnect(&mut self, reason: DisconnectReason) {
77         let (tx, rx) = oneshot::channel();
78         self.requests.send(ConnectionRequest::Disconnect { reason, fut: tx }).await.unwrap();
79         rx.await.unwrap()
80     }
81 }
82 
83 #[derive(Debug)]
84 enum ConnectionRequest {
85     Disconnect { reason: DisconnectReason, fut: oneshot::Sender<()> },
86 }
87 
88 struct ConnectionInternal {
89     addr: Address,
90     #[allow(dead_code)]
91     shared: Arc<Mutex<ConnectionShared>>,
92     hci_evt_tx: Sender<EventPacket>,
93 }
94 
95 #[derive(Debug)]
96 struct ConnectionShared {
97     role: Role,
98 }
99 
100 impl AclManager {
101     /// Connect to the specified address, or queue it if a connection is already pending
connect(&mut self, addr: Address)102     pub async fn connect(&mut self, addr: Address) {
103         self.req_tx.send(Request::Connect { addr }).await.unwrap();
104     }
105 
106     /// Cancel the connection to the specified address, if it is pending
cancel_connect(&mut self, addr: Address)107     pub async fn cancel_connect(&mut self, addr: Address) {
108         let (tx, rx) = oneshot::channel();
109         self.req_tx.send(Request::CancelConnect { addr, fut: tx }).await.unwrap();
110         rx.await.unwrap();
111     }
112 }
113 
114 #[derive(Debug)]
115 enum Request {
116     Connect { addr: Address },
117     CancelConnect { addr: Address, fut: oneshot::Sender<()> },
118 }
119 
120 #[derive(Eq, PartialEq)]
121 enum PendingConnect {
122     Outgoing(Address),
123     Incoming(Address),
124     None,
125 }
126 
127 impl PendingConnect {
take(&mut self) -> Self128     fn take(&mut self) -> Self {
129         std::mem::replace(self, PendingConnect::None)
130     }
131 }
132 
133 #[provides]
provide_acl_manager( mut hci: CommandSender, mut events: EventRegistry, mut dispatch: core::AclDispatch, rt: Arc<Runtime>, ) -> AclManager134 async fn provide_acl_manager(
135     mut hci: CommandSender,
136     mut events: EventRegistry,
137     mut dispatch: core::AclDispatch,
138     rt: Arc<Runtime>,
139 ) -> AclManager {
140     let (req_tx, mut req_rx) = channel::<Request>(10);
141     let (conn_evt_tx, conn_evt_rx) = channel::<Event>(10);
142     let local_rt = rt.clone();
143 
144     local_rt.spawn(async move {
145         let connections: Arc<Mutex<HashMap<u16, ConnectionInternal>>> = Arc::new(Mutex::new(HashMap::new()));
146         let mut connect_queue: Vec<Address> = Vec::new();
147         let mut pending = PendingConnect::None;
148 
149         let (evt_tx, mut evt_rx) = channel(3);
150         events.register(EventCode::ConnectionComplete, evt_tx.clone()).await;
151         events.register(EventCode::ConnectionRequest, evt_tx.clone()).await;
152         events.register(EventCode::AuthenticationComplete, evt_tx).await;
153 
154         loop {
155             select! {
156                 Some(req) = req_rx.recv() => {
157                     match req {
158                         Request::Connect { addr } => {
159                             if connections.lock().await.values().any(|c| c.addr == addr) {
160                                 warn!("already connected: {}", addr);
161                                 return;
162                             }
163                             if let PendingConnect::None = pending {
164                                 pending = PendingConnect::Outgoing(addr);
165                                 hci.send(build_create_connection(addr)).await;
166                             } else {
167                                 connect_queue.insert(0, addr);
168                             }
169                         },
170                         Request::CancelConnect { addr, fut } => {
171                             connect_queue.retain(|p| *p != addr);
172                             if pending == PendingConnect::Outgoing(addr) {
173                                 hci.send(CreateConnectionCancelBuilder { bd_addr: addr }).await;
174                             }
175                             fut.send(()).unwrap();
176                         }
177                     }
178                 }
179                 Some(evt) = evt_rx.recv() => {
180                     match evt.specialize() {
181                         ConnectionComplete(evt) => {
182                             let addr = evt.get_bd_addr();
183                             let status = evt.get_status();
184                             let handle = evt.get_connection_handle();
185                             let role = match pending.take() {
186                                 PendingConnect::Outgoing(a) if a == addr => Role::Central,
187                                 PendingConnect::Incoming(a) if a == addr => Role::Peripheral,
188                                 _ => panic!("No prior connection request for {}", addr),
189                             };
190 
191                             match status {
192                                 ErrorCode::Success => {
193                                     let mut core_conn = dispatch.register(handle, Bluetooth::Classic).await;
194                                     let shared = Arc::new(Mutex::new(ConnectionShared { role }));
195                                     let (evt_tx, evt_rx) = channel(10);
196                                     let (req_tx, req_rx) = channel(10);
197                                     let connection = Connection {
198                                         addr,
199                                         shared: shared.clone(),
200                                         rx: core_conn.rx.take().unwrap(),
201                                         tx: core_conn.tx.take().unwrap(),
202                                         requests: req_tx,
203                                         evt_rx,
204                                     };
205                                     let connection_internal = ConnectionInternal {
206                                         addr,
207                                         shared,
208                                         hci_evt_tx: core_conn.evt_tx.clone(),
209                                     };
210 
211                                     assert!(connections.lock().await.insert(handle, connection_internal).is_none());
212                                     rt.spawn(run_connection(handle, evt_tx, req_rx, core_conn, connections.clone(), hci.clone()));
213                                     conn_evt_tx.send(Event::ConnectSuccess(connection)).await.unwrap();
214                                 },
215                                 _ => conn_evt_tx.send(Event::ConnectFail { addr, reason: status }).await.unwrap(),
216                             }
217                         },
218                         EventChild::ConnectionRequest(evt) => {
219                             let addr = evt.get_bd_addr();
220                             pending = PendingConnect::Incoming(addr);
221                             if connections.lock().await.values().any(|c| c.addr == addr) {
222                                 hci.send(RejectConnectionRequestBuilder {
223                                     bd_addr: addr,
224                                     reason: RejectConnectionReason::UnacceptableBdAddr
225                                 }).await;
226                             } else {
227                                 hci.send(AcceptConnectionRequestBuilder {
228                                     bd_addr: addr,
229                                     role: AcceptConnectionRequestRole::BecomeCentral
230                                 }).await;
231                             }
232                         },
233                         AuthenticationComplete(e) => dispatch_to(e.get_connection_handle(), &connections, evt).await,
234                         _ => unimplemented!(),
235                     }
236                 }
237             }
238         }
239     });
240 
241     AclManager { req_tx, evt_rx: Arc::new(Mutex::new(conn_evt_rx)) }
242 }
243 
build_create_connection(bd_addr: Address) -> CreateConnectionBuilder244 fn build_create_connection(bd_addr: Address) -> CreateConnectionBuilder {
245     CreateConnectionBuilder {
246         bd_addr,
247         packet_type: 0x4408 /* DM 1,3,5 */ | 0x8810, /*DH 1,3,5 */
248         page_scan_repetition_mode: PageScanRepetitionMode::R1,
249         clock_offset: 0,
250         clock_offset_valid: ClockOffsetValid::Invalid,
251         allow_role_switch: CreateConnectionRoleSwitch::AllowRoleSwitch,
252     }
253 }
254 
dispatch_to( handle: u16, connections: &Arc<Mutex<HashMap<u16, ConnectionInternal>>>, event: EventPacket, )255 async fn dispatch_to(
256     handle: u16,
257     connections: &Arc<Mutex<HashMap<u16, ConnectionInternal>>>,
258     event: EventPacket,
259 ) {
260     if let Some(c) = connections.lock().await.get_mut(&handle) {
261         c.hci_evt_tx.send(event).await.unwrap();
262     }
263 }
264 
run_connection( handle: u16, evt_tx: Sender<ConnectionEvent>, mut req_rx: Receiver<ConnectionRequest>, mut core: core::Connection, connections: Arc<Mutex<HashMap<u16, ConnectionInternal>>>, mut hci: CommandSender, )265 async fn run_connection(
266     handle: u16,
267     evt_tx: Sender<ConnectionEvent>,
268     mut req_rx: Receiver<ConnectionRequest>,
269     mut core: core::Connection,
270     connections: Arc<Mutex<HashMap<u16, ConnectionInternal>>>,
271     mut hci: CommandSender,
272 ) {
273     loop {
274         select! {
275             Some(evt) = core.evt_rx.recv() => {
276                 match evt.specialize() {
277                     DisconnectionComplete(evt) => {
278                         connections.lock().await.remove(&handle);
279                         evt_tx.send(ConnectionEvent::Disconnected(evt.get_reason())).await.unwrap();
280                         return; // At this point, there is nothing more to run on the connection.
281                     },
282                     AuthenticationComplete(_) => evt_tx.send(ConnectionEvent::AuthenticationComplete).await.unwrap(),
283                     _ => unimplemented!(),
284                 }
285             },
286             Some(req) = req_rx.recv() => {
287                 match req {
288                     ConnectionRequest::Disconnect{reason, fut} => {
289                         hci.send(DisconnectBuilder { connection_handle: handle, reason }).await;
290                         fut.send(()).unwrap();
291                     }
292                 }
293             },
294         }
295     }
296 }
297