• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 use bluetooth_offload_hci::{IsoData, Module};
16 use std::collections::{HashMap, VecDeque};
17 use std::sync::{Arc, Condvar, Mutex, MutexGuard};
18 use std::thread::{self, JoinHandle};
19 
20 pub struct Arbiter {
21     state_cvar: Arc<(Mutex<State>, Condvar)>,
22     thread: Option<JoinHandle<()>>,
23     max_buf_len: usize,
24 }
25 
26 #[derive(Default)]
27 struct State {
28     /// Halt indication of the sender thread
29     halt: bool,
30 
31     /// Software transmission queues for each `Origin`.
32     /// A queue is pair of connection handle, and packet raw ISO data.
33     queues: [VecDeque<(u16, Vec<u8>)>; 2],
34 
35     /// Count of packets sent to the controller and not yet acknowledged,
36     /// by connection handle stored on `u16`.
37     in_transit: HashMap<u16, usize>,
38 }
39 
40 enum Origin {
41     Audio,
42     Incoming,
43 }
44 
45 impl Arbiter {
new(sink: Arc<dyn Module>, max_buf_len: usize, max_buf_count: usize) -> Self46     pub fn new(sink: Arc<dyn Module>, max_buf_len: usize, max_buf_count: usize) -> Self {
47         let state_cvar = Arc::new((Mutex::<State>::new(Default::default()), Condvar::new()));
48         let thread = {
49             let state_cvar = state_cvar.clone();
50             thread::spawn(move || Self::thread_loop(state_cvar.clone(), sink, max_buf_count))
51         };
52 
53         Self { state_cvar, thread: Some(thread), max_buf_len }
54     }
55 
max_buf_len(&self) -> usize56     pub fn max_buf_len(&self) -> usize {
57         self.max_buf_len
58     }
59 
add_connection(&self, handle: u16)60     pub fn add_connection(&self, handle: u16) {
61         let (state, _) = &*self.state_cvar;
62         if state.lock().unwrap().in_transit.insert(handle, 0).is_some() {
63             panic!("Connection with handle 0x{:03x} already exists", handle);
64         }
65     }
66 
remove_connection(&self, handle: u16)67     pub fn remove_connection(&self, handle: u16) {
68         let (state, cvar) = &*self.state_cvar;
69         let mut state = state.lock().unwrap();
70         for q in state.queues.iter_mut() {
71             while let Some(idx) = q.iter().position(|&(h, _)| h == handle) {
72                 q.remove(idx);
73             }
74         }
75         if state.in_transit.remove(&handle).is_some() {
76             cvar.notify_one();
77         }
78     }
79 
push_incoming(&self, iso_data: &IsoData)80     pub fn push_incoming(&self, iso_data: &IsoData) {
81         self.push(Origin::Incoming, iso_data);
82     }
83 
push_audio(&self, iso_data: &IsoData)84     pub fn push_audio(&self, iso_data: &IsoData) {
85         self.push(Origin::Audio, iso_data);
86     }
87 
set_completed(&self, handle: u16, num: usize)88     pub fn set_completed(&self, handle: u16, num: usize) {
89         let (state, cvar) = &*self.state_cvar;
90         if let Some(buf_usage) = state.lock().unwrap().in_transit.get_mut(&handle) {
91             *buf_usage -= num;
92             cvar.notify_one();
93         }
94     }
95 
push(&self, origin: Origin, iso_data: &IsoData)96     fn push(&self, origin: Origin, iso_data: &IsoData) {
97         let handle = iso_data.connection_handle;
98         let data = iso_data.to_bytes();
99         assert!(data.len() <= self.max_buf_len + 4);
100 
101         let (state, cvar) = &*self.state_cvar;
102         let mut state = state.lock().unwrap();
103         if state.in_transit.contains_key(&handle) {
104             state.queues[origin as usize].push_back((handle, data));
105             cvar.notify_one();
106         }
107     }
108 
thread_loop( state_cvar: Arc<(Mutex<State>, Condvar)>, sink: Arc<dyn Module>, max_buf_count: usize, )109     fn thread_loop(
110         state_cvar: Arc<(Mutex<State>, Condvar)>,
111         sink: Arc<dyn Module>,
112         max_buf_count: usize,
113     ) {
114         let (state, cvar) = &*state_cvar;
115         'main: loop {
116             let packet = {
117                 let mut state = state.lock().unwrap();
118                 let mut packet = None;
119                 while !state.halt && {
120                     packet = Self::pull(&mut state, max_buf_count);
121                     packet.is_none()
122                 } {
123                     state = cvar.wait(state).unwrap();
124                 }
125                 if state.halt {
126                     break 'main;
127                 }
128                 packet.unwrap()
129             };
130             sink.out_iso(&packet);
131         }
132     }
133 
pull(state: &mut MutexGuard<'_, State>, max_buf_count: usize) -> Option<Vec<u8>>134     fn pull(state: &mut MutexGuard<'_, State>, max_buf_count: usize) -> Option<Vec<u8>> {
135         for idx in 0..state.queues.len() {
136             if state.queues[idx].is_empty() || max_buf_count <= state.in_transit.values().sum() {
137                 continue;
138             }
139             let (handle, vec) = state.queues[idx].pop_front().unwrap();
140             *state.in_transit.get_mut(&handle).unwrap() += 1;
141             return Some(vec);
142         }
143         None
144     }
145 }
146 
147 impl Drop for Arbiter {
drop(&mut self)148     fn drop(&mut self) {
149         let (state, cvar) = &*self.state_cvar;
150         {
151             let mut state = state.lock().unwrap();
152             state.halt = true;
153             cvar.notify_one();
154         }
155         let thread = self.thread.take().unwrap();
156         thread.join().expect("End of thread loop");
157     }
158 }
159