• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! This module handles an individual connection on the ATT fixed channel.
2 //! It handles ATT transactions and unacknowledged operations, backed by an
3 //! AttDatabase (that may in turn be backed by an upper-layer protocol)
4 
5 use pdl_runtime::EncodeError;
6 use std::cell::Cell;
7 use std::future::Future;
8 
9 use anyhow::Result;
10 use log::{error, trace, warn};
11 use tokio::task::spawn_local;
12 
13 use crate::core::shared_box::{WeakBox, WeakBoxRef};
14 use crate::core::shared_mutex::SharedMutex;
15 use crate::gatt::ids::AttHandle;
16 use crate::gatt::mtu::{AttMtu, MtuEvent};
17 use crate::gatt::opcode_types::{classify_opcode, OperationType};
18 use crate::packets::att::{self, AttErrorCode};
19 use crate::utils::owned_handle::OwnedHandle;
20 
21 use super::att_database::AttDatabase;
22 use super::command_handler::AttCommandHandler;
23 use super::indication_handler::{ConfirmationWatcher, IndicationError, IndicationHandler};
24 use super::request_handler::AttRequestHandler;
25 
26 enum AttRequestState<T: AttDatabase> {
27     Idle(AttRequestHandler<T>),
28     Pending { _task: OwnedHandle<()> },
29     Replacing,
30 }
31 
32 /// The errors that can occur while trying to send a packet
33 #[derive(Debug)]
34 pub enum SendError {
35     /// The packet failed to serialize
36     SerializeError(EncodeError),
37     /// The connection no longer exists
38     ConnectionDropped,
39 }
40 
41 /// This represents a single ATT bearer (currently, always the unenhanced fixed
42 /// channel on LE) The AttRequestState ensures that only one transaction can
43 /// take place at a time
44 pub struct AttServerBearer<T: AttDatabase> {
45     // general
46     send_packet: Box<dyn Fn(att::Att) -> Result<(), EncodeError>>,
47     mtu: AttMtu,
48 
49     // request state
50     curr_request: Cell<AttRequestState<T>>,
51 
52     // indication state
53     indication_handler: SharedMutex<IndicationHandler<T>>,
54     pending_confirmation: ConfirmationWatcher,
55 
56     // command handler (across all bearers)
57     command_handler: AttCommandHandler<T>,
58 }
59 
60 impl<T: AttDatabase + Clone + 'static> AttServerBearer<T> {
61     /// Constructor, wrapping an ATT channel (for outgoing packets) and an
62     /// AttDatabase
new(db: T, send_packet: impl Fn(att::Att) -> Result<(), EncodeError> + 'static) -> Self63     pub fn new(db: T, send_packet: impl Fn(att::Att) -> Result<(), EncodeError> + 'static) -> Self {
64         let (indication_handler, pending_confirmation) = IndicationHandler::new(db.clone());
65         Self {
66             send_packet: Box::new(send_packet),
67             mtu: AttMtu::new(),
68 
69             curr_request: AttRequestState::Idle(AttRequestHandler::new(db.clone())).into(),
70 
71             indication_handler: SharedMutex::new(indication_handler),
72             pending_confirmation,
73 
74             command_handler: AttCommandHandler::new(db),
75         }
76     }
77 
send_packet(&self, packet: att::Att) -> Result<(), EncodeError>78     fn send_packet(&self, packet: att::Att) -> Result<(), EncodeError> {
79         (self.send_packet)(packet)
80     }
81 }
82 
83 impl<T: AttDatabase + Clone + 'static> WeakBoxRef<'_, AttServerBearer<T>> {
84     /// Handle an incoming packet, and send outgoing packets as appropriate
85     /// using the owned ATT channel.
handle_packet(&self, packet: att::Att)86     pub fn handle_packet(&self, packet: att::Att) {
87         match classify_opcode(packet.opcode) {
88             OperationType::Command => {
89                 self.command_handler.process_packet(packet);
90             }
91             OperationType::Request => {
92                 self.handle_request(packet);
93             }
94             OperationType::Confirmation => self.pending_confirmation.on_confirmation(),
95             OperationType::Response | OperationType::Notification | OperationType::Indication => {
96                 unreachable!("the arbiter should not let us receive these packet types")
97             }
98         }
99     }
100 
101     /// Send an indication, wait for the peer confirmation, and return the
102     /// appropriate status If multiple calls are outstanding, they are
103     /// executed in FIFO order.
send_indication( &self, handle: AttHandle, data: Vec<u8>, ) -> impl Future<Output = Result<(), IndicationError>>104     pub fn send_indication(
105         &self,
106         handle: AttHandle,
107         data: Vec<u8>,
108     ) -> impl Future<Output = Result<(), IndicationError>> {
109         trace!("sending indication for handle {handle:?}");
110 
111         let locked_indication_handler = self.indication_handler.lock();
112         let pending_mtu = self.mtu.snapshot();
113         let this = self.downgrade();
114 
115         async move {
116             // first wait until we are at the head of the queue and are ready to send
117             // indications
118             let mut indication_handler = locked_indication_handler
119                 .await
120                 .ok_or_else(|| {
121                     warn!("indication for handle {handle:?} cancelled while queued since the connection dropped");
122                     IndicationError::SendError(SendError::ConnectionDropped)
123                 })?;
124             // then, if MTU negotiation is taking place, wait for it to complete
125             let mtu = pending_mtu
126                 .await
127                 .ok_or_else(|| {
128                     warn!("indication for handle {handle:?} cancelled while waiting for MTU exchange to complete since the connection dropped");
129                     IndicationError::SendError(SendError::ConnectionDropped)
130                 })?;
131             // finally, send, and wait for a response
132             indication_handler.send(handle, &data, mtu, |packet| this.try_send_packet(packet)).await
133         }
134     }
135 
136     /// Handle a snooped MTU event, to update the MTU we use for our various
137     /// operations
handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()>138     pub fn handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()> {
139         self.mtu.handle_event(mtu_event)
140     }
141 
handle_request(&self, packet: att::Att)142     fn handle_request(&self, packet: att::Att) {
143         let curr_request = self.curr_request.replace(AttRequestState::Replacing);
144         self.curr_request.replace(match curr_request {
145             AttRequestState::Idle(mut request_handler) => {
146                 // even if the MTU is updated afterwards, 5.3 3F 3.4.2.2 states that the
147                 // request-time MTU should be used
148                 let mtu = self.mtu.snapshot_or_default();
149                 let this = self.downgrade();
150                 let opcode = packet.opcode;
151                 let task = spawn_local(async move {
152                     trace!("starting ATT transaction");
153                     let reply = request_handler.process_packet(packet, mtu).await;
154                     this.with(|this| {
155                         this.map(|this| {
156                             match this.send_packet(reply) {
157                                 Ok(_) => {
158                                     trace!("reply packet sent")
159                                 }
160                                 Err(err) => {
161                                     error!("serializer failure {err:?}, dropping packet and sending failed reply");
162                                     // if this also fails, we're stuck
163                                     if let Err(err) = this.send_packet(att::AttErrorResponse {
164                                         opcode_in_error: opcode,
165                                         handle_in_error: AttHandle(0).into(),
166                                         error_code: AttErrorCode::UnlikelyError,
167                                     }.try_into().unwrap()) {
168                                         panic!("unexpected serialize error for known-good packet {err:?}")
169                                     }
170                                 }
171                             };
172                             // ready for next transaction
173                             this.curr_request.replace(AttRequestState::Idle(request_handler));
174                         })
175                     });
176                 });
177                 AttRequestState::Pending { _task: task.into() }
178             }
179             AttRequestState::Pending { .. } => {
180                 warn!("multiple ATT operations cannot simultaneously take place, dropping one");
181                 // TODO(aryarahul) - disconnect connection here;
182                 curr_request
183             }
184             AttRequestState::Replacing => {
185               panic!("Replacing is an ephemeral state");
186             }
187         });
188     }
189 }
190 
191 impl<T: AttDatabase + Clone + 'static> WeakBox<AttServerBearer<T>> {
try_send_packet(&self, packet: att::Att) -> Result<(), SendError>192     fn try_send_packet(&self, packet: att::Att) -> Result<(), SendError> {
193         self.with(|this| {
194             this.ok_or_else(|| {
195                 warn!("connection dropped before packet sent");
196                 SendError::ConnectionDropped
197             })?
198             .send_packet(packet)
199             .map_err(SendError::SerializeError)
200         })
201     }
202 }
203 
204 #[cfg(test)]
205 mod test {
206     use std::rc::Rc;
207 
208     use tokio::sync::mpsc::error::TryRecvError;
209     use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
210 
211     use super::*;
212 
213     use crate::core::shared_box::SharedBox;
214     use crate::core::uuid::Uuid;
215     use crate::gatt::ffi::AttributeBackingType;
216     use crate::gatt::ids::TransportIndex;
217     use crate::gatt::mocks::mock_datastore::{MockDatastore, MockDatastoreEvents};
218     use crate::gatt::server::att_database::{AttAttribute, AttPermissions};
219     use crate::gatt::server::gatt_database::{
220         GattCharacteristicWithHandle, GattDatabase, GattServiceWithHandle,
221     };
222     use crate::gatt::server::test::test_att_db::TestAttDatabase;
223     use crate::packets::att;
224     use crate::utils::task::{block_on_locally, try_await};
225 
226     const VALID_HANDLE: AttHandle = AttHandle(3);
227     const INVALID_HANDLE: AttHandle = AttHandle(4);
228     const ANOTHER_VALID_HANDLE: AttHandle = AttHandle(10);
229 
230     const TCB_IDX: TransportIndex = TransportIndex(1);
231 
open_connection( ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<att::Att>)232     fn open_connection(
233     ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<att::Att>) {
234         let db = TestAttDatabase::new(vec![
235             (
236                 AttAttribute {
237                     handle: VALID_HANDLE,
238                     type_: Uuid::new(0x1234),
239                     permissions: AttPermissions::READABLE | AttPermissions::INDICATE,
240                 },
241                 vec![5, 6],
242             ),
243             (
244                 AttAttribute {
245                     handle: ANOTHER_VALID_HANDLE,
246                     type_: Uuid::new(0x5678),
247                     permissions: AttPermissions::READABLE | AttPermissions::INDICATE,
248                 },
249                 vec![5, 6],
250             ),
251         ]);
252         let (tx, rx) = unbounded_channel();
253         let conn = AttServerBearer::new(db, move |packet| {
254             tx.send(packet).unwrap();
255             Ok(())
256         })
257         .into();
258         (conn, rx)
259     }
260 
261     #[test]
test_single_transaction()262     fn test_single_transaction() {
263         block_on_locally(async {
264             let (conn, mut rx) = open_connection();
265             conn.as_ref().handle_packet(
266                 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(),
267             );
268             assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse);
269             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
270         });
271     }
272 
273     #[test]
test_sequential_transactions()274     fn test_sequential_transactions() {
275         block_on_locally(async {
276             let (conn, mut rx) = open_connection();
277             conn.as_ref().handle_packet(
278                 att::AttReadRequest { attribute_handle: INVALID_HANDLE.into() }.try_into().unwrap(),
279             );
280             assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ErrorResponse);
281             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
282 
283             conn.as_ref().handle_packet(
284                 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(),
285             );
286             assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse);
287             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
288         });
289     }
290 
291     #[test]
test_concurrent_transaction_failure()292     fn test_concurrent_transaction_failure() {
293         // arrange: AttServerBearer linked to a backing datastore and packet queue, with
294         // two characteristics in the database
295         let (datastore, mut data_rx) = MockDatastore::new();
296         let datastore = Rc::new(datastore);
297         let db = SharedBox::new(GattDatabase::new());
298         db.add_service_with_handles(
299             GattServiceWithHandle {
300                 handle: AttHandle(1),
301                 type_: Uuid::new(1),
302                 characteristics: vec![
303                     GattCharacteristicWithHandle {
304                         handle: VALID_HANDLE,
305                         type_: Uuid::new(2),
306                         permissions: AttPermissions::READABLE,
307                         descriptors: vec![],
308                     },
309                     GattCharacteristicWithHandle {
310                         handle: ANOTHER_VALID_HANDLE,
311                         type_: Uuid::new(2),
312                         permissions: AttPermissions::READABLE,
313                         descriptors: vec![],
314                     },
315                 ],
316             },
317             datastore,
318         )
319         .unwrap();
320         let (tx, mut rx) = unbounded_channel();
321         let send_packet = move |packet| {
322             tx.send(packet).unwrap();
323             Ok(())
324         };
325         let conn = SharedBox::new(AttServerBearer::new(db.get_att_database(TCB_IDX), send_packet));
326         let data = [1, 2];
327 
328         // act: send two read requests before replying to either read
329         // first request
330         block_on_locally(async {
331             let req1 = att::AttReadRequest { attribute_handle: VALID_HANDLE.into() };
332             conn.as_ref().handle_packet(req1.try_into().unwrap());
333             // second request
334             let req2 = att::AttReadRequest { attribute_handle: ANOTHER_VALID_HANDLE.into() };
335             conn.as_ref().handle_packet(req2.try_into().unwrap());
336             // handle first reply
337             let MockDatastoreEvents::Read(
338                 TCB_IDX,
339                 VALID_HANDLE,
340                 AttributeBackingType::Characteristic,
341                 data_resp,
342             ) = data_rx.recv().await.unwrap()
343             else {
344                 unreachable!();
345             };
346             data_resp.send(Ok(data.to_vec())).unwrap();
347             trace!("reply sent from upper tester");
348 
349             // assert: that the first reply was made
350             let resp = rx.recv().await.unwrap();
351             assert_eq!(resp, att::AttReadResponse { value: data.to_vec() }.try_into().unwrap());
352             // assert no other replies were made
353             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
354             // assert no callbacks are pending
355             assert_eq!(data_rx.try_recv().unwrap_err(), TryRecvError::Empty);
356         });
357     }
358 
359     #[test]
test_indication_confirmation()360     fn test_indication_confirmation() {
361         block_on_locally(async {
362             // arrange
363             let (conn, mut rx) = open_connection();
364 
365             // act: send an indication
366             let pending_send =
367                 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3]));
368             assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication);
369             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
370             // and the confirmation
371             conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap());
372 
373             // assert: the indication was correctly sent
374             assert!(matches!(pending_send.await.unwrap(), Ok(())));
375         });
376     }
377 
378     #[test]
test_sequential_indications()379     fn test_sequential_indications() {
380         block_on_locally(async {
381             // arrange
382             let (conn, mut rx) = open_connection();
383 
384             // act: send the first indication
385             let pending_send1 =
386                 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3]));
387             // wait for/capture the outgoing packet
388             let sent1 = rx.recv().await.unwrap();
389             // send the response
390             conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap());
391             // send the second indication
392             let pending_send2 =
393                 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3]));
394             // wait for/capture the outgoing packet
395             let sent2 = rx.recv().await.unwrap();
396             // and the response
397             conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap());
398 
399             // assert: exactly two indications were sent
400             assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication);
401             assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication);
402             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
403             // and that both got successful responses
404             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
405             assert!(matches!(pending_send2.await.unwrap(), Ok(())));
406         });
407     }
408 
409     #[test]
test_queued_indications_only_one_sent()410     fn test_queued_indications_only_one_sent() {
411         block_on_locally(async {
412             // arrange
413             let (conn, mut rx) = open_connection();
414 
415             // act: send two indications simultaneously
416             let pending_send1 =
417                 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3]));
418             let pending_send2 =
419                 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3]));
420             // assert: only one was initially sent
421             assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication);
422             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
423             // and both are still pending
424             assert!(!pending_send1.is_finished());
425             assert!(!pending_send2.is_finished());
426         });
427     }
428 
429     #[test]
test_queued_indications_dequeue_second()430     fn test_queued_indications_dequeue_second() {
431         block_on_locally(async {
432             // arrange
433             let (conn, mut rx) = open_connection();
434 
435             // act: send two indications simultaneously
436             let pending_send1 =
437                 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3]));
438             let pending_send2 =
439                 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3]));
440             // wait for/capture the outgoing packet
441             let sent1 = rx.recv().await.unwrap();
442             // send response for the first one
443             conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap());
444             // wait for/capture the outgoing packet
445             let sent2 = rx.recv().await.unwrap();
446 
447             // assert: the first future has completed successfully, the second one is
448             // pending
449             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
450             assert!(!pending_send2.is_finished());
451             // and that both indications have been sent
452             assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication);
453             assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication);
454             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
455         });
456     }
457 
458     #[test]
test_queued_indications_complete_both()459     fn test_queued_indications_complete_both() {
460         block_on_locally(async {
461             // arrange
462             let (conn, mut rx) = open_connection();
463 
464             // act: send two indications simultaneously
465             let pending_send1 =
466                 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3]));
467             let pending_send2 =
468                 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3]));
469             // wait for/capture the outgoing packet
470             let sent1 = rx.recv().await.unwrap();
471             // send response for the first one
472             conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap());
473             // wait for/capture the outgoing packet
474             let sent2 = rx.recv().await.unwrap();
475             // and now the second
476             conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap());
477 
478             // assert: both futures have completed successfully
479             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
480             assert!(matches!(pending_send2.await.unwrap(), Ok(())));
481             // and both indications have been sent
482             assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication);
483             assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication);
484             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
485         });
486     }
487 
488     #[test]
test_indication_connection_drop()489     fn test_indication_connection_drop() {
490         block_on_locally(async {
491             // arrange: a pending indication
492             let (conn, mut rx) = open_connection();
493             let pending_send =
494                 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3]));
495 
496             // act: drop the connection after the indication is sent
497             rx.recv().await.unwrap();
498             drop(conn);
499 
500             // assert: the pending indication fails with the appropriate error
501             assert!(matches!(
502                 pending_send.await.unwrap(),
503                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
504             ));
505         });
506     }
507 
508     #[test]
test_single_indication_pending_mtu()509     fn test_single_indication_pending_mtu() {
510         block_on_locally(async {
511             // arrange: pending MTU negotiation
512             let (conn, mut rx) = open_connection();
513             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
514 
515             // act: try to send an indication with a large payload size
516             let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect())).await;
517             // then resolve the MTU negotiation with a large MTU
518             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(100)).unwrap();
519 
520             // assert: the indication was sent
521             assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication);
522         });
523     }
524 
525     #[test]
test_single_indication_pending_mtu_fail()526     fn test_single_indication_pending_mtu_fail() {
527         block_on_locally(async {
528             // arrange: pending MTU negotiation
529             let (conn, _) = open_connection();
530             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
531 
532             // act: try to send an indication with a large payload size
533             let pending_mtu =
534                 try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect()))
535                     .await
536                     .unwrap_err();
537             // then resolve the MTU negotiation with a small MTU
538             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(32)).unwrap();
539 
540             // assert: the indication failed to send
541             assert!(matches!(pending_mtu.await, Err(IndicationError::DataExceedsMtu { .. })));
542         });
543     }
544 
545     #[test]
test_server_transaction_pending_mtu()546     fn test_server_transaction_pending_mtu() {
547         block_on_locally(async {
548             // arrange: pending MTU negotiation
549             let (conn, mut rx) = open_connection();
550             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
551 
552             // act: send server packet
553             conn.as_ref().handle_packet(
554                 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(),
555             );
556 
557             // assert: that we reply even while the MTU req is outstanding
558             assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse);
559         });
560     }
561 
562     #[test]
test_queued_indication_pending_mtu_uses_mtu_on_dequeue()563     fn test_queued_indication_pending_mtu_uses_mtu_on_dequeue() {
564         block_on_locally(async {
565             // arrange: an outstanding indication
566             let (conn, mut rx) = open_connection();
567             let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])).await;
568             rx.recv().await.unwrap(); // flush rx_queue
569 
570             // act: enqueue an indication with a large payload
571             let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect())).await;
572             // then perform MTU negotiation to upgrade to a large MTU
573             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
574             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(512)).unwrap();
575             // finally resolve the first indication, so the second indication can be sent
576             conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap());
577 
578             // assert: the second indication successfully sent (so it used the new MTU)
579             assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication);
580         });
581     }
582 }
583