• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! This module handles an individual connection on the ATT fixed channel.
2 //! It handles ATT transactions and unacknowledged operations, backed by an
3 //! AttDatabase (that may in turn be backed by an upper-layer protocol)
4 
5 use std::{cell::Cell, future::Future};
6 
7 use anyhow::Result;
8 use log::{error, trace, warn};
9 use tokio::task::spawn_local;
10 
11 use crate::{
12     core::{
13         shared_box::{WeakBox, WeakBoxRef},
14         shared_mutex::SharedMutex,
15     },
16     gatt::{
17         ids::AttHandle,
18         mtu::{AttMtu, MtuEvent},
19         opcode_types::{classify_opcode, OperationType},
20     },
21     packets::{
22         AttAttributeDataChild, AttBuilder, AttChild, AttErrorCode, AttErrorResponseBuilder,
23         AttView, Packet, SerializeError,
24     },
25     utils::{owned_handle::OwnedHandle, packet::HACK_child_to_opcode},
26 };
27 
28 use super::{
29     att_database::AttDatabase,
30     command_handler::AttCommandHandler,
31     indication_handler::{ConfirmationWatcher, IndicationError, IndicationHandler},
32     request_handler::AttRequestHandler,
33 };
34 
35 enum AttRequestState<T: AttDatabase> {
36     Idle(AttRequestHandler<T>),
37     Pending(Option<OwnedHandle<()>>),
38 }
39 
40 /// The errors that can occur while trying to send a packet
41 #[derive(Debug)]
42 pub enum SendError {
43     /// The packet failed to serialize
44     SerializeError(SerializeError),
45     /// The connection no longer exists
46     ConnectionDropped,
47 }
48 
49 /// This represents a single ATT bearer (currently, always the unenhanced fixed
50 /// channel on LE) The AttRequestState ensures that only one transaction can
51 /// take place at a time
52 pub struct AttServerBearer<T: AttDatabase> {
53     // general
54     send_packet: Box<dyn Fn(AttBuilder) -> Result<(), SerializeError>>,
55     mtu: AttMtu,
56 
57     // request state
58     curr_request: Cell<AttRequestState<T>>,
59 
60     // indication state
61     indication_handler: SharedMutex<IndicationHandler<T>>,
62     pending_confirmation: ConfirmationWatcher,
63 
64     // command handler (across all bearers)
65     command_handler: AttCommandHandler<T>,
66 }
67 
68 impl<T: AttDatabase + Clone + 'static> AttServerBearer<T> {
69     /// Constructor, wrapping an ATT channel (for outgoing packets) and an
70     /// AttDatabase
new( db: T, send_packet: impl Fn(AttBuilder) -> Result<(), SerializeError> + 'static, ) -> Self71     pub fn new(
72         db: T,
73         send_packet: impl Fn(AttBuilder) -> Result<(), SerializeError> + 'static,
74     ) -> Self {
75         let (indication_handler, pending_confirmation) = IndicationHandler::new(db.clone());
76         Self {
77             send_packet: Box::new(send_packet),
78             mtu: AttMtu::new(),
79 
80             curr_request: AttRequestState::Idle(AttRequestHandler::new(db.clone())).into(),
81 
82             indication_handler: SharedMutex::new(indication_handler),
83             pending_confirmation,
84 
85             command_handler: AttCommandHandler::new(db),
86         }
87     }
88 
send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SerializeError>89     fn send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SerializeError> {
90         let child = packet.into();
91         let packet = AttBuilder { opcode: HACK_child_to_opcode(&child), _child_: child };
92         (self.send_packet)(packet)
93     }
94 }
95 
96 impl<T: AttDatabase + Clone + 'static> WeakBoxRef<'_, AttServerBearer<T>> {
97     /// Handle an incoming packet, and send outgoing packets as appropriate
98     /// using the owned ATT channel.
handle_packet(&self, packet: AttView<'_>)99     pub fn handle_packet(&self, packet: AttView<'_>) {
100         match classify_opcode(packet.get_opcode()) {
101             OperationType::Command => {
102                 self.command_handler.process_packet(packet);
103             }
104             OperationType::Request => {
105                 self.handle_request(packet);
106             }
107             OperationType::Confirmation => self.pending_confirmation.on_confirmation(),
108             OperationType::Response | OperationType::Notification | OperationType::Indication => {
109                 unreachable!("the arbiter should not let us receive these packet types")
110             }
111         }
112     }
113 
114     /// Send an indication, wait for the peer confirmation, and return the
115     /// appropriate status If multiple calls are outstanding, they are
116     /// executed in FIFO order.
send_indication( &self, handle: AttHandle, data: AttAttributeDataChild, ) -> impl Future<Output = Result<(), IndicationError>>117     pub fn send_indication(
118         &self,
119         handle: AttHandle,
120         data: AttAttributeDataChild,
121     ) -> impl Future<Output = Result<(), IndicationError>> {
122         trace!("sending indication for handle {handle:?}");
123 
124         let locked_indication_handler = self.indication_handler.lock();
125         let pending_mtu = self.mtu.snapshot();
126         let this = self.downgrade();
127 
128         async move {
129             // first wait until we are at the head of the queue and are ready to send
130             // indications
131             let mut indication_handler = locked_indication_handler
132                 .await
133                 .ok_or_else(|| {
134                     warn!("indication for handle {handle:?} cancelled while queued since the connection dropped");
135                     IndicationError::SendError(SendError::ConnectionDropped)
136                 })?;
137             // then, if MTU negotiation is taking place, wait for it to complete
138             let mtu = pending_mtu
139                 .await
140                 .ok_or_else(|| {
141                     warn!("indication for handle {handle:?} cancelled while waiting for MTU exchange to complete since the connection dropped");
142                     IndicationError::SendError(SendError::ConnectionDropped)
143                 })?;
144             // finally, send, and wait for a response
145             indication_handler.send(handle, data, mtu, |packet| this.try_send_packet(packet)).await
146         }
147     }
148 
149     /// Handle a snooped MTU event, to update the MTU we use for our various
150     /// operations
handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()>151     pub fn handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()> {
152         self.mtu.handle_event(mtu_event)
153     }
154 
handle_request(&self, packet: AttView<'_>)155     fn handle_request(&self, packet: AttView<'_>) {
156         let curr_request = self.curr_request.replace(AttRequestState::Pending(None));
157         self.curr_request.replace(match curr_request {
158             AttRequestState::Idle(mut request_handler) => {
159                 // even if the MTU is updated afterwards, 5.3 3F 3.4.2.2 states that the
160                 // request-time MTU should be used
161                 let mtu = self.mtu.snapshot_or_default();
162                 let packet = packet.to_owned_packet();
163                 let this = self.downgrade();
164                 let task = spawn_local(async move {
165                     trace!("starting ATT transaction");
166                     let reply = request_handler.process_packet(packet.view(), mtu).await;
167                     this.with(|this| {
168                         this.map(|this| {
169                             match this.send_packet(reply) {
170                                 Ok(_) => {
171                                     trace!("reply packet sent")
172                                 }
173                                 Err(err) => {
174                                     error!("serializer failure {err:?}, dropping packet and sending failed reply");
175                                     // if this also fails, we're stuck
176                                     if let Err(err) = this.send_packet(AttErrorResponseBuilder {
177                                         opcode_in_error: packet.view().get_opcode(),
178                                         handle_in_error: AttHandle(0).into(),
179                                         error_code: AttErrorCode::UNLIKELY_ERROR,
180                                     }) {
181                                         panic!("unexpected serialize error for known-good packet {err:?}")
182                                     }
183                                 }
184                             };
185                             // ready for next transaction
186                             this.curr_request.replace(AttRequestState::Idle(request_handler));
187                         })
188                     });
189                 });
190                 AttRequestState::Pending(Some(task.into()))
191             }
192             AttRequestState::Pending(_) => {
193                 warn!("multiple ATT operations cannot simultaneously take place, dropping one");
194                 // TODO(aryarahul) - disconnect connection here;
195                 curr_request
196             }
197         });
198     }
199 }
200 
201 impl<T: AttDatabase + Clone + 'static> WeakBox<AttServerBearer<T>> {
try_send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SendError>202     fn try_send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SendError> {
203         self.with(|this| {
204             this.ok_or_else(|| {
205                 warn!("connection dropped before packet sent");
206                 SendError::ConnectionDropped
207             })?
208             .send_packet(packet)
209             .map_err(SendError::SerializeError)
210         })
211     }
212 }
213 
214 #[cfg(test)]
215 mod test {
216     use std::rc::Rc;
217 
218     use tokio::sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver};
219 
220     use super::*;
221 
222     use crate::{
223         core::{shared_box::SharedBox, uuid::Uuid},
224         gatt::{
225             ffi::AttributeBackingType,
226             ids::TransportIndex,
227             mocks::mock_datastore::{MockDatastore, MockDatastoreEvents},
228             server::{
229                 att_database::{AttAttribute, AttPermissions},
230                 gatt_database::{
231                     GattCharacteristicWithHandle, GattDatabase, GattServiceWithHandle,
232                 },
233                 test::test_att_db::TestAttDatabase,
234             },
235         },
236         packets::{
237             AttAttributeDataChild, AttHandleValueConfirmationBuilder, AttOpcode,
238             AttReadRequestBuilder, AttReadResponseBuilder,
239         },
240         utils::{
241             packet::{build_att_data, build_att_view_or_crash},
242             task::{block_on_locally, try_await},
243         },
244     };
245 
246     const VALID_HANDLE: AttHandle = AttHandle(3);
247     const INVALID_HANDLE: AttHandle = AttHandle(4);
248     const ANOTHER_VALID_HANDLE: AttHandle = AttHandle(10);
249 
250     const TCB_IDX: TransportIndex = TransportIndex(1);
251 
open_connection( ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<AttBuilder>)252     fn open_connection(
253     ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<AttBuilder>) {
254         let db = TestAttDatabase::new(vec![
255             (
256                 AttAttribute {
257                     handle: VALID_HANDLE,
258                     type_: Uuid::new(0x1234),
259                     permissions: AttPermissions::READABLE | AttPermissions::INDICATE,
260                 },
261                 vec![5, 6],
262             ),
263             (
264                 AttAttribute {
265                     handle: ANOTHER_VALID_HANDLE,
266                     type_: Uuid::new(0x5678),
267                     permissions: AttPermissions::READABLE | AttPermissions::INDICATE,
268                 },
269                 vec![5, 6],
270             ),
271         ]);
272         let (tx, rx) = unbounded_channel();
273         let conn = AttServerBearer::new(db, move |packet| {
274             tx.send(packet).unwrap();
275             Ok(())
276         })
277         .into();
278         (conn, rx)
279     }
280 
281     #[test]
test_single_transaction()282     fn test_single_transaction() {
283         block_on_locally(async {
284             let (conn, mut rx) = open_connection();
285             conn.as_ref().handle_packet(
286                 build_att_view_or_crash(AttReadRequestBuilder {
287                     attribute_handle: VALID_HANDLE.into(),
288                 })
289                 .view(),
290             );
291             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE);
292             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
293         });
294     }
295 
296     #[test]
test_sequential_transactions()297     fn test_sequential_transactions() {
298         block_on_locally(async {
299             let (conn, mut rx) = open_connection();
300             conn.as_ref().handle_packet(
301                 build_att_view_or_crash(AttReadRequestBuilder {
302                     attribute_handle: INVALID_HANDLE.into(),
303                 })
304                 .view(),
305             );
306             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::ERROR_RESPONSE);
307             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
308 
309             conn.as_ref().handle_packet(
310                 build_att_view_or_crash(AttReadRequestBuilder {
311                     attribute_handle: VALID_HANDLE.into(),
312                 })
313                 .view(),
314             );
315             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE);
316             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
317         });
318     }
319 
320     #[test]
test_concurrent_transaction_failure()321     fn test_concurrent_transaction_failure() {
322         // arrange: AttServerBearer linked to a backing datastore and packet queue, with
323         // two characteristics in the database
324         let (datastore, mut data_rx) = MockDatastore::new();
325         let datastore = Rc::new(datastore);
326         let db = SharedBox::new(GattDatabase::new());
327         db.add_service_with_handles(
328             GattServiceWithHandle {
329                 handle: AttHandle(1),
330                 type_: Uuid::new(1),
331                 characteristics: vec![
332                     GattCharacteristicWithHandle {
333                         handle: VALID_HANDLE,
334                         type_: Uuid::new(2),
335                         permissions: AttPermissions::READABLE,
336                         descriptors: vec![],
337                     },
338                     GattCharacteristicWithHandle {
339                         handle: ANOTHER_VALID_HANDLE,
340                         type_: Uuid::new(2),
341                         permissions: AttPermissions::READABLE,
342                         descriptors: vec![],
343                     },
344                 ],
345             },
346             datastore,
347         )
348         .unwrap();
349         let (tx, mut rx) = unbounded_channel();
350         let send_packet = move |packet| {
351             tx.send(packet).unwrap();
352             Ok(())
353         };
354         let conn = SharedBox::new(AttServerBearer::new(db.get_att_database(TCB_IDX), send_packet));
355         let data = AttAttributeDataChild::RawData([1, 2].into());
356 
357         // act: send two read requests before replying to either read
358         // first request
359         block_on_locally(async {
360             let req1 = build_att_view_or_crash(AttReadRequestBuilder {
361                 attribute_handle: VALID_HANDLE.into(),
362             });
363             conn.as_ref().handle_packet(req1.view());
364             // second request
365             let req2 = build_att_view_or_crash(AttReadRequestBuilder {
366                 attribute_handle: ANOTHER_VALID_HANDLE.into(),
367             });
368             conn.as_ref().handle_packet(req2.view());
369             // handle first reply
370             let MockDatastoreEvents::Read(TCB_IDX, VALID_HANDLE, AttributeBackingType::Characteristic, data_resp) =
371                 data_rx.recv().await.unwrap() else {
372                     unreachable!();
373             };
374             data_resp.send(Ok(data.clone())).unwrap();
375             trace!("reply sent from upper tester");
376 
377             // assert: that the first reply was made
378             let resp = rx.recv().await.unwrap();
379             assert_eq!(
380                 resp,
381                 AttBuilder {
382                     opcode: AttOpcode::READ_RESPONSE,
383                     _child_: AttReadResponseBuilder { value: build_att_data(data) }.into(),
384                 }
385             );
386             // assert no other replies were made
387             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
388             // assert no callbacks are pending
389             assert_eq!(data_rx.try_recv().unwrap_err(), TryRecvError::Empty);
390         });
391     }
392 
393     #[test]
test_indication_confirmation()394     fn test_indication_confirmation() {
395         block_on_locally(async {
396             // arrange
397             let (conn, mut rx) = open_connection();
398 
399             // act: send an indication
400             let pending_send =
401                 spawn_local(conn.as_ref().send_indication(
402                     VALID_HANDLE,
403                     AttAttributeDataChild::RawData([1, 2, 3].into()),
404                 ));
405             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION);
406             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
407             // and the confirmation
408             conn.as_ref().handle_packet(
409                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
410             );
411 
412             // assert: the indication was correctly sent
413             assert!(matches!(pending_send.await.unwrap(), Ok(())));
414         });
415     }
416 
417     #[test]
test_sequential_indications()418     fn test_sequential_indications() {
419         block_on_locally(async {
420             // arrange
421             let (conn, mut rx) = open_connection();
422 
423             // act: send the first indication
424             let pending_send1 =
425                 spawn_local(conn.as_ref().send_indication(
426                     VALID_HANDLE,
427                     AttAttributeDataChild::RawData([1, 2, 3].into()),
428                 ));
429             // wait for/capture the outgoing packet
430             let sent1 = rx.recv().await.unwrap();
431             // send the response
432             conn.as_ref().handle_packet(
433                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
434             );
435             // send the second indication
436             let pending_send2 =
437                 spawn_local(conn.as_ref().send_indication(
438                     VALID_HANDLE,
439                     AttAttributeDataChild::RawData([1, 2, 3].into()),
440                 ));
441             // wait for/capture the outgoing packet
442             let sent2 = rx.recv().await.unwrap();
443             // and the response
444             conn.as_ref().handle_packet(
445                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
446             );
447 
448             // assert: exactly two indications were sent
449             assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
450             assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
451             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
452             // and that both got successful responses
453             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
454             assert!(matches!(pending_send2.await.unwrap(), Ok(())));
455         });
456     }
457 
458     #[test]
test_queued_indications_only_one_sent()459     fn test_queued_indications_only_one_sent() {
460         block_on_locally(async {
461             // arrange
462             let (conn, mut rx) = open_connection();
463 
464             // act: send two indications simultaneously
465             let pending_send1 =
466                 spawn_local(conn.as_ref().send_indication(
467                     VALID_HANDLE,
468                     AttAttributeDataChild::RawData([1, 2, 3].into()),
469                 ));
470             let pending_send2 = spawn_local(conn.as_ref().send_indication(
471                 ANOTHER_VALID_HANDLE,
472                 AttAttributeDataChild::RawData([1, 2, 3].into()),
473             ));
474             // assert: only one was initially sent
475             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION);
476             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
477             // and both are still pending
478             assert!(!pending_send1.is_finished());
479             assert!(!pending_send2.is_finished());
480         });
481     }
482 
483     #[test]
test_queued_indications_dequeue_second()484     fn test_queued_indications_dequeue_second() {
485         block_on_locally(async {
486             // arrange
487             let (conn, mut rx) = open_connection();
488 
489             // act: send two indications simultaneously
490             let pending_send1 =
491                 spawn_local(conn.as_ref().send_indication(
492                     VALID_HANDLE,
493                     AttAttributeDataChild::RawData([1, 2, 3].into()),
494                 ));
495             let pending_send2 = spawn_local(conn.as_ref().send_indication(
496                 ANOTHER_VALID_HANDLE,
497                 AttAttributeDataChild::RawData([1, 2, 3].into()),
498             ));
499             // wait for/capture the outgoing packet
500             let sent1 = rx.recv().await.unwrap();
501             // send response for the first one
502             conn.as_ref().handle_packet(
503                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
504             );
505             // wait for/capture the outgoing packet
506             let sent2 = rx.recv().await.unwrap();
507 
508             // assert: the first future has completed successfully, the second one is
509             // pending
510             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
511             assert!(!pending_send2.is_finished());
512             // and that both indications have been sent
513             assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
514             assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
515             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
516         });
517     }
518 
519     #[test]
test_queued_indications_complete_both()520     fn test_queued_indications_complete_both() {
521         block_on_locally(async {
522             // arrange
523             let (conn, mut rx) = open_connection();
524 
525             // act: send two indications simultaneously
526             let pending_send1 =
527                 spawn_local(conn.as_ref().send_indication(
528                     VALID_HANDLE,
529                     AttAttributeDataChild::RawData([1, 2, 3].into()),
530                 ));
531             let pending_send2 = spawn_local(conn.as_ref().send_indication(
532                 ANOTHER_VALID_HANDLE,
533                 AttAttributeDataChild::RawData([1, 2, 3].into()),
534             ));
535             // wait for/capture the outgoing packet
536             let sent1 = rx.recv().await.unwrap();
537             // send response for the first one
538             conn.as_ref().handle_packet(
539                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
540             );
541             // wait for/capture the outgoing packet
542             let sent2 = rx.recv().await.unwrap();
543             // and now the second
544             conn.as_ref().handle_packet(
545                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
546             );
547 
548             // assert: both futures have completed successfully
549             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
550             assert!(matches!(pending_send2.await.unwrap(), Ok(())));
551             // and both indications have been sent
552             assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
553             assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
554             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
555         });
556     }
557 
558     #[test]
test_indication_connection_drop()559     fn test_indication_connection_drop() {
560         block_on_locally(async {
561             // arrange: a pending indication
562             let (conn, mut rx) = open_connection();
563             let pending_send =
564                 spawn_local(conn.as_ref().send_indication(
565                     VALID_HANDLE,
566                     AttAttributeDataChild::RawData([1, 2, 3].into()),
567                 ));
568 
569             // act: drop the connection after the indication is sent
570             rx.recv().await.unwrap();
571             drop(conn);
572 
573             // assert: the pending indication fails with the appropriate error
574             assert!(matches!(
575                 pending_send.await.unwrap(),
576                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
577             ));
578         });
579     }
580 
581     #[test]
test_single_indication_pending_mtu()582     fn test_single_indication_pending_mtu() {
583         block_on_locally(async {
584             // arrange: pending MTU negotiation
585             let (conn, mut rx) = open_connection();
586             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
587 
588             // act: try to send an indication with a large payload size
589             let _ =
590                 try_await(conn.as_ref().send_indication(
591                     VALID_HANDLE,
592                     AttAttributeDataChild::RawData((1..50).collect()),
593                 ))
594                 .await;
595             // then resolve the MTU negotiation with a large MTU
596             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(100)).unwrap();
597 
598             // assert: the indication was sent
599             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION);
600         });
601     }
602 
603     #[test]
test_single_indication_pending_mtu_fail()604     fn test_single_indication_pending_mtu_fail() {
605         block_on_locally(async {
606             // arrange: pending MTU negotiation
607             let (conn, _) = open_connection();
608             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
609 
610             // act: try to send an indication with a large payload size
611             let pending_mtu =
612                 try_await(conn.as_ref().send_indication(
613                     VALID_HANDLE,
614                     AttAttributeDataChild::RawData((1..50).collect()),
615                 ))
616                 .await
617                 .unwrap_err();
618             // then resolve the MTU negotiation with a small MTU
619             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(32)).unwrap();
620 
621             // assert: the indication failed to send
622             assert!(matches!(pending_mtu.await, Err(IndicationError::DataExceedsMtu { .. })));
623         });
624     }
625 
626     #[test]
test_server_transaction_pending_mtu()627     fn test_server_transaction_pending_mtu() {
628         block_on_locally(async {
629             // arrange: pending MTU negotiation
630             let (conn, mut rx) = open_connection();
631             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
632 
633             // act: send server packet
634             conn.as_ref().handle_packet(
635                 build_att_view_or_crash(AttReadRequestBuilder {
636                     attribute_handle: VALID_HANDLE.into(),
637                 })
638                 .view(),
639             );
640 
641             // assert: that we reply even while the MTU req is outstanding
642             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE);
643         });
644     }
645 
646     #[test]
test_queued_indication_pending_mtu_uses_mtu_on_dequeue()647     fn test_queued_indication_pending_mtu_uses_mtu_on_dequeue() {
648         block_on_locally(async {
649             // arrange: an outstanding indication
650             let (conn, mut rx) = open_connection();
651             let _ =
652                 try_await(conn.as_ref().send_indication(
653                     VALID_HANDLE,
654                     AttAttributeDataChild::RawData([1, 2, 3].into()),
655                 ))
656                 .await;
657             rx.recv().await.unwrap(); // flush rx_queue
658 
659             // act: enqueue an indication with a large payload
660             let _ =
661                 try_await(conn.as_ref().send_indication(
662                     VALID_HANDLE,
663                     AttAttributeDataChild::RawData((1..50).collect()),
664                 ))
665                 .await;
666             // then perform MTU negotiation to upgrade to a large MTU
667             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
668             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(512)).unwrap();
669             // finally resolve the first indication, so the second indication can be sent
670             conn.as_ref().handle_packet(
671                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
672             );
673 
674             // assert: the second indication successfully sent (so it used the new MTU)
675             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION);
676         });
677     }
678 }
679