1 // Copyright 2024, The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 use bluetooth_offload_hci::{IsoData, Module}; 16 use std::collections::{HashMap, VecDeque}; 17 use std::sync::{Arc, Condvar, Mutex, MutexGuard}; 18 use std::thread::{self, JoinHandle}; 19 20 pub struct Arbiter { 21 state_cvar: Arc<(Mutex<State>, Condvar)>, 22 thread: Option<JoinHandle<()>>, 23 max_buf_len: usize, 24 } 25 26 #[derive(Default)] 27 struct State { 28 /// Halt indication of the sender thread 29 halt: bool, 30 31 /// Software transmission queues for each `Origin`. 32 /// A queue is pair of connection handle, and packet raw ISO data. 33 queues: [VecDeque<(u16, Vec<u8>)>; 2], 34 35 /// Count of packets sent to the controller and not yet acknowledged, 36 /// by connection handle stored on `u16`. 37 in_transit: HashMap<u16, usize>, 38 } 39 40 enum Origin { 41 Audio, 42 Incoming, 43 } 44 45 impl Arbiter { new(sink: Arc<dyn Module>, max_buf_len: usize, max_buf_count: usize) -> Self46 pub fn new(sink: Arc<dyn Module>, max_buf_len: usize, max_buf_count: usize) -> Self { 47 let state_cvar = Arc::new((Mutex::<State>::new(Default::default()), Condvar::new())); 48 let thread = { 49 let state_cvar = state_cvar.clone(); 50 thread::spawn(move || Self::thread_loop(state_cvar.clone(), sink, max_buf_count)) 51 }; 52 53 Self { state_cvar, thread: Some(thread), max_buf_len } 54 } 55 max_buf_len(&self) -> usize56 pub fn max_buf_len(&self) -> usize { 57 self.max_buf_len 58 } 59 add_connection(&self, handle: u16)60 pub fn add_connection(&self, handle: u16) { 61 let (state, _) = &*self.state_cvar; 62 if state.lock().unwrap().in_transit.insert(handle, 0).is_some() { 63 panic!("Connection with handle 0x{:03x} already exists", handle); 64 } 65 } 66 remove_connection(&self, handle: u16)67 pub fn remove_connection(&self, handle: u16) { 68 let (state, cvar) = &*self.state_cvar; 69 let mut state = state.lock().unwrap(); 70 for q in state.queues.iter_mut() { 71 while let Some(idx) = q.iter().position(|&(h, _)| h == handle) { 72 q.remove(idx); 73 } 74 } 75 if state.in_transit.remove(&handle).is_some() { 76 cvar.notify_one(); 77 } 78 } 79 push_incoming(&self, iso_data: &IsoData)80 pub fn push_incoming(&self, iso_data: &IsoData) { 81 self.push(Origin::Incoming, iso_data); 82 } 83 push_audio(&self, iso_data: &IsoData)84 pub fn push_audio(&self, iso_data: &IsoData) { 85 self.push(Origin::Audio, iso_data); 86 } 87 set_completed(&self, handle: u16, num: usize)88 pub fn set_completed(&self, handle: u16, num: usize) { 89 let (state, cvar) = &*self.state_cvar; 90 if let Some(buf_usage) = state.lock().unwrap().in_transit.get_mut(&handle) { 91 *buf_usage -= num; 92 cvar.notify_one(); 93 } 94 } 95 push(&self, origin: Origin, iso_data: &IsoData)96 fn push(&self, origin: Origin, iso_data: &IsoData) { 97 let handle = iso_data.connection_handle; 98 let data = iso_data.to_bytes(); 99 assert!(data.len() <= self.max_buf_len + 4); 100 101 let (state, cvar) = &*self.state_cvar; 102 let mut state = state.lock().unwrap(); 103 if state.in_transit.contains_key(&handle) { 104 state.queues[origin as usize].push_back((handle, data)); 105 cvar.notify_one(); 106 } 107 } 108 thread_loop( state_cvar: Arc<(Mutex<State>, Condvar)>, sink: Arc<dyn Module>, max_buf_count: usize, )109 fn thread_loop( 110 state_cvar: Arc<(Mutex<State>, Condvar)>, 111 sink: Arc<dyn Module>, 112 max_buf_count: usize, 113 ) { 114 let (state, cvar) = &*state_cvar; 115 'main: loop { 116 let packet = { 117 let mut state = state.lock().unwrap(); 118 let mut packet = None; 119 while !state.halt && { 120 packet = Self::pull(&mut state, max_buf_count); 121 packet.is_none() 122 } { 123 state = cvar.wait(state).unwrap(); 124 } 125 if state.halt { 126 break 'main; 127 } 128 packet.unwrap() 129 }; 130 sink.out_iso(&packet); 131 } 132 } 133 pull(state: &mut MutexGuard<'_, State>, max_buf_count: usize) -> Option<Vec<u8>>134 fn pull(state: &mut MutexGuard<'_, State>, max_buf_count: usize) -> Option<Vec<u8>> { 135 for idx in 0..state.queues.len() { 136 if state.queues[idx].is_empty() || max_buf_count <= state.in_transit.values().sum() { 137 continue; 138 } 139 let (handle, vec) = state.queues[idx].pop_front().unwrap(); 140 *state.in_transit.get_mut(&handle).unwrap() += 1; 141 return Some(vec); 142 } 143 None 144 } 145 } 146 147 impl Drop for Arbiter { drop(&mut self)148 fn drop(&mut self) { 149 let (state, cvar) = &*self.state_cvar; 150 { 151 let mut state = state.lock().unwrap(); 152 state.halt = true; 153 cvar.notify_one(); 154 } 155 let thread = self.thread.take().unwrap(); 156 thread.join().expect("End of thread loop"); 157 } 158 } 159