• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::{cell::RefCell, collections::HashMap, rc::Rc, time::Duration};
2 
3 use async_trait::async_trait;
4 use log::{trace, warn};
5 use tokio::{sync::oneshot, time::timeout};
6 
7 use crate::{
8     gatt::{
9         ids::{AttHandle, ConnectionId, ServerId, TransactionId, TransportIndex},
10         GattCallbacks,
11     },
12     packets::{AttAttributeDataChild, AttAttributeDataView, AttErrorCode},
13 };
14 
15 use super::{
16     AttributeBackingType, GattWriteRequestType, GattWriteType, RawGattDatastore,
17     TransactionDecision,
18 };
19 
20 struct PendingTransaction {
21     response: oneshot::Sender<Result<AttAttributeDataChild, AttErrorCode>>,
22 }
23 
24 #[derive(Debug)]
25 struct PendingTransactionWatcher {
26     conn_id: ConnectionId,
27     trans_id: TransactionId,
28     rx: oneshot::Receiver<Result<AttAttributeDataChild, AttErrorCode>>,
29 }
30 
31 /// This struct converts the asynchronus read/write operations of GattDatastore
32 /// into the callback-based interface expected by JNI
33 pub struct CallbackTransactionManager {
34     callbacks: Rc<dyn GattCallbacks>,
35     pending_transactions: RefCell<PendingTransactionsState>,
36 }
37 
38 struct PendingTransactionsState {
39     pending_transactions: HashMap<(ConnectionId, TransactionId), PendingTransaction>,
40     next_transaction_id: u32,
41 }
42 
43 /// We expect all responses to be provided within this timeout
44 /// It should be less than 30s, as that is the ATT timeout that causes
45 /// the client to disconnect.
46 const TIMEOUT: Duration = Duration::from_secs(15);
47 
48 /// The cause of a failure to dispatch a call to send_response()
49 #[derive(Debug, PartialEq, Eq)]
50 pub enum CallbackResponseError {
51     /// The TransactionId supplied was invalid for the specified connection
52     NonExistentTransaction(TransactionId),
53     /// The TransactionId was valid but has since terminated
54     ListenerHungUp(TransactionId),
55 }
56 
57 impl CallbackTransactionManager {
58     /// Constructor, wrapping a GattCallbacks instance with the GattDatastore
59     /// interface
new(callbacks: Rc<dyn GattCallbacks>) -> Self60     pub fn new(callbacks: Rc<dyn GattCallbacks>) -> Self {
61         Self {
62             callbacks,
63             pending_transactions: RefCell::new(PendingTransactionsState {
64                 pending_transactions: HashMap::new(),
65                 next_transaction_id: 1,
66             }),
67         }
68     }
69 
70     /// Invoked from server implementations in response to read/write requests
send_response( &self, conn_id: ConnectionId, trans_id: TransactionId, value: Result<AttAttributeDataChild, AttErrorCode>, ) -> Result<(), CallbackResponseError>71     pub fn send_response(
72         &self,
73         conn_id: ConnectionId,
74         trans_id: TransactionId,
75         value: Result<AttAttributeDataChild, AttErrorCode>,
76     ) -> Result<(), CallbackResponseError> {
77         let mut pending = self.pending_transactions.borrow_mut();
78         if let Some(transaction) = pending.pending_transactions.remove(&(conn_id, trans_id)) {
79             if transaction.response.send(value).is_err() {
80                 Err(CallbackResponseError::ListenerHungUp(trans_id))
81             } else {
82                 trace!("got expected response for transaction {trans_id:?}");
83                 Ok(())
84             }
85         } else {
86             Err(CallbackResponseError::NonExistentTransaction(trans_id))
87         }
88     }
89 
90     /// Get an impl GattDatastore tied to a particular server
get_datastore(self: &Rc<Self>, server_id: ServerId) -> impl RawGattDatastore91     pub fn get_datastore(self: &Rc<Self>, server_id: ServerId) -> impl RawGattDatastore {
92         GattDatastoreImpl { callback_transaction_manager: self.clone(), server_id }
93     }
94 }
95 
96 impl PendingTransactionsState {
alloc_transaction_id(&mut self) -> TransactionId97     fn alloc_transaction_id(&mut self) -> TransactionId {
98         let trans_id = TransactionId(self.next_transaction_id);
99         self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
100         trans_id
101     }
102 
start_new_transaction(&mut self, conn_id: ConnectionId) -> PendingTransactionWatcher103     fn start_new_transaction(&mut self, conn_id: ConnectionId) -> PendingTransactionWatcher {
104         let trans_id = self.alloc_transaction_id();
105         let (tx, rx) = oneshot::channel();
106         self.pending_transactions.insert((conn_id, trans_id), PendingTransaction { response: tx });
107         PendingTransactionWatcher { conn_id, trans_id, rx }
108     }
109 }
110 
111 impl PendingTransactionWatcher {
112     /// Wait for the transaction to resolve, or to hit the timeout. If the
113     /// timeout is reached, clean up state related to transaction watching.
wait( self, manager: &CallbackTransactionManager, ) -> Result<AttAttributeDataChild, AttErrorCode>114     async fn wait(
115         self,
116         manager: &CallbackTransactionManager,
117     ) -> Result<AttAttributeDataChild, AttErrorCode> {
118         if let Ok(Ok(result)) = timeout(TIMEOUT, self.rx).await {
119             result
120         } else {
121             manager
122                 .pending_transactions
123                 .borrow_mut()
124                 .pending_transactions
125                 .remove(&(self.conn_id, self.trans_id));
126             warn!("no response received from Java after timeout - returning UNLIKELY_ERROR");
127             Err(AttErrorCode::UNLIKELY_ERROR)
128         }
129     }
130 }
131 
132 struct GattDatastoreImpl {
133     callback_transaction_manager: Rc<CallbackTransactionManager>,
134     server_id: ServerId,
135 }
136 
137 #[async_trait(?Send)]
138 impl RawGattDatastore for GattDatastoreImpl {
read( &self, tcb_idx: TransportIndex, handle: AttHandle, offset: u32, attr_type: AttributeBackingType, ) -> Result<AttAttributeDataChild, AttErrorCode>139     async fn read(
140         &self,
141         tcb_idx: TransportIndex,
142         handle: AttHandle,
143         offset: u32,
144         attr_type: AttributeBackingType,
145     ) -> Result<AttAttributeDataChild, AttErrorCode> {
146         let conn_id = ConnectionId::new(tcb_idx, self.server_id);
147 
148         let pending_transaction = self
149             .callback_transaction_manager
150             .pending_transactions
151             .borrow_mut()
152             .start_new_transaction(conn_id);
153         let trans_id = pending_transaction.trans_id;
154 
155         self.callback_transaction_manager.callbacks.on_server_read(
156             ConnectionId::new(tcb_idx, self.server_id),
157             trans_id,
158             handle,
159             attr_type,
160             offset,
161         );
162 
163         pending_transaction.wait(&self.callback_transaction_manager).await
164     }
165 
write( &self, tcb_idx: TransportIndex, handle: AttHandle, attr_type: AttributeBackingType, write_type: GattWriteRequestType, data: AttAttributeDataView<'_>, ) -> Result<(), AttErrorCode>166     async fn write(
167         &self,
168         tcb_idx: TransportIndex,
169         handle: AttHandle,
170         attr_type: AttributeBackingType,
171         write_type: GattWriteRequestType,
172         data: AttAttributeDataView<'_>,
173     ) -> Result<(), AttErrorCode> {
174         let conn_id = ConnectionId::new(tcb_idx, self.server_id);
175 
176         let pending_transaction = self
177             .callback_transaction_manager
178             .pending_transactions
179             .borrow_mut()
180             .start_new_transaction(conn_id);
181         let trans_id = pending_transaction.trans_id;
182 
183         self.callback_transaction_manager.callbacks.on_server_write(
184             conn_id,
185             trans_id,
186             handle,
187             attr_type,
188             GattWriteType::Request(write_type),
189             data,
190         );
191 
192         // the data passed back is irrelevant for write requests
193         pending_transaction.wait(&self.callback_transaction_manager).await.map(|_| ())
194     }
195 
write_no_response( &self, tcb_idx: TransportIndex, handle: AttHandle, attr_type: AttributeBackingType, data: AttAttributeDataView<'_>, )196     fn write_no_response(
197         &self,
198         tcb_idx: TransportIndex,
199         handle: AttHandle,
200         attr_type: AttributeBackingType,
201         data: AttAttributeDataView<'_>,
202     ) {
203         let conn_id = ConnectionId::new(tcb_idx, self.server_id);
204 
205         let trans_id = self
206             .callback_transaction_manager
207             .pending_transactions
208             .borrow_mut()
209             .alloc_transaction_id();
210         self.callback_transaction_manager.callbacks.on_server_write(
211             conn_id,
212             trans_id,
213             handle,
214             attr_type,
215             GattWriteType::Command,
216             data,
217         );
218     }
219 
execute( &self, tcb_idx: TransportIndex, decision: TransactionDecision, ) -> Result<(), AttErrorCode>220     async fn execute(
221         &self,
222         tcb_idx: TransportIndex,
223         decision: TransactionDecision,
224     ) -> Result<(), AttErrorCode> {
225         let conn_id = ConnectionId::new(tcb_idx, self.server_id);
226 
227         let pending_transaction = self
228             .callback_transaction_manager
229             .pending_transactions
230             .borrow_mut()
231             .start_new_transaction(conn_id);
232         let trans_id = pending_transaction.trans_id;
233 
234         self.callback_transaction_manager.callbacks.on_execute(conn_id, trans_id, decision);
235 
236         // the data passed back is irrelevant for execute requests
237         pending_transaction.wait(&self.callback_transaction_manager).await.map(|_| ())
238     }
239 }
240