1 use std::{cell::RefCell, collections::HashMap, rc::Rc, time::Duration}; 2 3 use async_trait::async_trait; 4 use log::{trace, warn}; 5 use tokio::{sync::oneshot, time::timeout}; 6 7 use crate::{ 8 gatt::{ 9 ids::{AttHandle, ConnectionId, ServerId, TransactionId, TransportIndex}, 10 GattCallbacks, 11 }, 12 packets::{AttAttributeDataChild, AttAttributeDataView, AttErrorCode}, 13 }; 14 15 use super::{ 16 AttributeBackingType, GattWriteRequestType, GattWriteType, RawGattDatastore, 17 TransactionDecision, 18 }; 19 20 struct PendingTransaction { 21 response: oneshot::Sender<Result<AttAttributeDataChild, AttErrorCode>>, 22 } 23 24 #[derive(Debug)] 25 struct PendingTransactionWatcher { 26 conn_id: ConnectionId, 27 trans_id: TransactionId, 28 rx: oneshot::Receiver<Result<AttAttributeDataChild, AttErrorCode>>, 29 } 30 31 /// This struct converts the asynchronus read/write operations of GattDatastore 32 /// into the callback-based interface expected by JNI 33 pub struct CallbackTransactionManager { 34 callbacks: Rc<dyn GattCallbacks>, 35 pending_transactions: RefCell<PendingTransactionsState>, 36 } 37 38 struct PendingTransactionsState { 39 pending_transactions: HashMap<(ConnectionId, TransactionId), PendingTransaction>, 40 next_transaction_id: u32, 41 } 42 43 /// We expect all responses to be provided within this timeout 44 /// It should be less than 30s, as that is the ATT timeout that causes 45 /// the client to disconnect. 46 const TIMEOUT: Duration = Duration::from_secs(15); 47 48 /// The cause of a failure to dispatch a call to send_response() 49 #[derive(Debug, PartialEq, Eq)] 50 pub enum CallbackResponseError { 51 /// The TransactionId supplied was invalid for the specified connection 52 NonExistentTransaction(TransactionId), 53 /// The TransactionId was valid but has since terminated 54 ListenerHungUp(TransactionId), 55 } 56 57 impl CallbackTransactionManager { 58 /// Constructor, wrapping a GattCallbacks instance with the GattDatastore 59 /// interface new(callbacks: Rc<dyn GattCallbacks>) -> Self60 pub fn new(callbacks: Rc<dyn GattCallbacks>) -> Self { 61 Self { 62 callbacks, 63 pending_transactions: RefCell::new(PendingTransactionsState { 64 pending_transactions: HashMap::new(), 65 next_transaction_id: 1, 66 }), 67 } 68 } 69 70 /// Invoked from server implementations in response to read/write requests send_response( &self, conn_id: ConnectionId, trans_id: TransactionId, value: Result<AttAttributeDataChild, AttErrorCode>, ) -> Result<(), CallbackResponseError>71 pub fn send_response( 72 &self, 73 conn_id: ConnectionId, 74 trans_id: TransactionId, 75 value: Result<AttAttributeDataChild, AttErrorCode>, 76 ) -> Result<(), CallbackResponseError> { 77 let mut pending = self.pending_transactions.borrow_mut(); 78 if let Some(transaction) = pending.pending_transactions.remove(&(conn_id, trans_id)) { 79 if transaction.response.send(value).is_err() { 80 Err(CallbackResponseError::ListenerHungUp(trans_id)) 81 } else { 82 trace!("got expected response for transaction {trans_id:?}"); 83 Ok(()) 84 } 85 } else { 86 Err(CallbackResponseError::NonExistentTransaction(trans_id)) 87 } 88 } 89 90 /// Get an impl GattDatastore tied to a particular server get_datastore(self: &Rc<Self>, server_id: ServerId) -> impl RawGattDatastore91 pub fn get_datastore(self: &Rc<Self>, server_id: ServerId) -> impl RawGattDatastore { 92 GattDatastoreImpl { callback_transaction_manager: self.clone(), server_id } 93 } 94 } 95 96 impl PendingTransactionsState { alloc_transaction_id(&mut self) -> TransactionId97 fn alloc_transaction_id(&mut self) -> TransactionId { 98 let trans_id = TransactionId(self.next_transaction_id); 99 self.next_transaction_id = self.next_transaction_id.wrapping_add(1); 100 trans_id 101 } 102 start_new_transaction(&mut self, conn_id: ConnectionId) -> PendingTransactionWatcher103 fn start_new_transaction(&mut self, conn_id: ConnectionId) -> PendingTransactionWatcher { 104 let trans_id = self.alloc_transaction_id(); 105 let (tx, rx) = oneshot::channel(); 106 self.pending_transactions.insert((conn_id, trans_id), PendingTransaction { response: tx }); 107 PendingTransactionWatcher { conn_id, trans_id, rx } 108 } 109 } 110 111 impl PendingTransactionWatcher { 112 /// Wait for the transaction to resolve, or to hit the timeout. If the 113 /// timeout is reached, clean up state related to transaction watching. wait( self, manager: &CallbackTransactionManager, ) -> Result<AttAttributeDataChild, AttErrorCode>114 async fn wait( 115 self, 116 manager: &CallbackTransactionManager, 117 ) -> Result<AttAttributeDataChild, AttErrorCode> { 118 if let Ok(Ok(result)) = timeout(TIMEOUT, self.rx).await { 119 result 120 } else { 121 manager 122 .pending_transactions 123 .borrow_mut() 124 .pending_transactions 125 .remove(&(self.conn_id, self.trans_id)); 126 warn!("no response received from Java after timeout - returning UNLIKELY_ERROR"); 127 Err(AttErrorCode::UNLIKELY_ERROR) 128 } 129 } 130 } 131 132 struct GattDatastoreImpl { 133 callback_transaction_manager: Rc<CallbackTransactionManager>, 134 server_id: ServerId, 135 } 136 137 #[async_trait(?Send)] 138 impl RawGattDatastore for GattDatastoreImpl { read( &self, tcb_idx: TransportIndex, handle: AttHandle, offset: u32, attr_type: AttributeBackingType, ) -> Result<AttAttributeDataChild, AttErrorCode>139 async fn read( 140 &self, 141 tcb_idx: TransportIndex, 142 handle: AttHandle, 143 offset: u32, 144 attr_type: AttributeBackingType, 145 ) -> Result<AttAttributeDataChild, AttErrorCode> { 146 let conn_id = ConnectionId::new(tcb_idx, self.server_id); 147 148 let pending_transaction = self 149 .callback_transaction_manager 150 .pending_transactions 151 .borrow_mut() 152 .start_new_transaction(conn_id); 153 let trans_id = pending_transaction.trans_id; 154 155 self.callback_transaction_manager.callbacks.on_server_read( 156 ConnectionId::new(tcb_idx, self.server_id), 157 trans_id, 158 handle, 159 attr_type, 160 offset, 161 ); 162 163 pending_transaction.wait(&self.callback_transaction_manager).await 164 } 165 write( &self, tcb_idx: TransportIndex, handle: AttHandle, attr_type: AttributeBackingType, write_type: GattWriteRequestType, data: AttAttributeDataView<'_>, ) -> Result<(), AttErrorCode>166 async fn write( 167 &self, 168 tcb_idx: TransportIndex, 169 handle: AttHandle, 170 attr_type: AttributeBackingType, 171 write_type: GattWriteRequestType, 172 data: AttAttributeDataView<'_>, 173 ) -> Result<(), AttErrorCode> { 174 let conn_id = ConnectionId::new(tcb_idx, self.server_id); 175 176 let pending_transaction = self 177 .callback_transaction_manager 178 .pending_transactions 179 .borrow_mut() 180 .start_new_transaction(conn_id); 181 let trans_id = pending_transaction.trans_id; 182 183 self.callback_transaction_manager.callbacks.on_server_write( 184 conn_id, 185 trans_id, 186 handle, 187 attr_type, 188 GattWriteType::Request(write_type), 189 data, 190 ); 191 192 // the data passed back is irrelevant for write requests 193 pending_transaction.wait(&self.callback_transaction_manager).await.map(|_| ()) 194 } 195 write_no_response( &self, tcb_idx: TransportIndex, handle: AttHandle, attr_type: AttributeBackingType, data: AttAttributeDataView<'_>, )196 fn write_no_response( 197 &self, 198 tcb_idx: TransportIndex, 199 handle: AttHandle, 200 attr_type: AttributeBackingType, 201 data: AttAttributeDataView<'_>, 202 ) { 203 let conn_id = ConnectionId::new(tcb_idx, self.server_id); 204 205 let trans_id = self 206 .callback_transaction_manager 207 .pending_transactions 208 .borrow_mut() 209 .alloc_transaction_id(); 210 self.callback_transaction_manager.callbacks.on_server_write( 211 conn_id, 212 trans_id, 213 handle, 214 attr_type, 215 GattWriteType::Command, 216 data, 217 ); 218 } 219 execute( &self, tcb_idx: TransportIndex, decision: TransactionDecision, ) -> Result<(), AttErrorCode>220 async fn execute( 221 &self, 222 tcb_idx: TransportIndex, 223 decision: TransactionDecision, 224 ) -> Result<(), AttErrorCode> { 225 let conn_id = ConnectionId::new(tcb_idx, self.server_id); 226 227 let pending_transaction = self 228 .callback_transaction_manager 229 .pending_transactions 230 .borrow_mut() 231 .start_new_transaction(conn_id); 232 let trans_id = pending_transaction.trans_id; 233 234 self.callback_transaction_manager.callbacks.on_execute(conn_id, trans_id, decision); 235 236 // the data passed back is irrelevant for execute requests 237 pending_transaction.wait(&self.callback_transaction_manager).await.map(|_| ()) 238 } 239 } 240