1 //! This module handles an individual connection on the ATT fixed channel. 2 //! It handles ATT transactions and unacknowledged operations, backed by an 3 //! AttDatabase (that may in turn be backed by an upper-layer protocol) 4 5 use std::{cell::Cell, future::Future}; 6 7 use anyhow::Result; 8 use log::{error, trace, warn}; 9 use tokio::task::spawn_local; 10 11 use crate::{ 12 core::{ 13 shared_box::{WeakBox, WeakBoxRef}, 14 shared_mutex::SharedMutex, 15 }, 16 gatt::{ 17 ids::AttHandle, 18 mtu::{AttMtu, MtuEvent}, 19 opcode_types::{classify_opcode, OperationType}, 20 }, 21 packets::{ 22 AttAttributeDataChild, AttBuilder, AttChild, AttErrorCode, AttErrorResponseBuilder, 23 AttView, Packet, SerializeError, 24 }, 25 utils::{owned_handle::OwnedHandle, packet::HACK_child_to_opcode}, 26 }; 27 28 use super::{ 29 att_database::AttDatabase, 30 command_handler::AttCommandHandler, 31 indication_handler::{ConfirmationWatcher, IndicationError, IndicationHandler}, 32 request_handler::AttRequestHandler, 33 }; 34 35 enum AttRequestState<T: AttDatabase> { 36 Idle(AttRequestHandler<T>), 37 Pending(Option<OwnedHandle<()>>), 38 } 39 40 /// The errors that can occur while trying to send a packet 41 #[derive(Debug)] 42 pub enum SendError { 43 /// The packet failed to serialize 44 SerializeError(SerializeError), 45 /// The connection no longer exists 46 ConnectionDropped, 47 } 48 49 /// This represents a single ATT bearer (currently, always the unenhanced fixed 50 /// channel on LE) The AttRequestState ensures that only one transaction can 51 /// take place at a time 52 pub struct AttServerBearer<T: AttDatabase> { 53 // general 54 send_packet: Box<dyn Fn(AttBuilder) -> Result<(), SerializeError>>, 55 mtu: AttMtu, 56 57 // request state 58 curr_request: Cell<AttRequestState<T>>, 59 60 // indication state 61 indication_handler: SharedMutex<IndicationHandler<T>>, 62 pending_confirmation: ConfirmationWatcher, 63 64 // command handler (across all bearers) 65 command_handler: AttCommandHandler<T>, 66 } 67 68 impl<T: AttDatabase + Clone + 'static> AttServerBearer<T> { 69 /// Constructor, wrapping an ATT channel (for outgoing packets) and an 70 /// AttDatabase new( db: T, send_packet: impl Fn(AttBuilder) -> Result<(), SerializeError> + 'static, ) -> Self71 pub fn new( 72 db: T, 73 send_packet: impl Fn(AttBuilder) -> Result<(), SerializeError> + 'static, 74 ) -> Self { 75 let (indication_handler, pending_confirmation) = IndicationHandler::new(db.clone()); 76 Self { 77 send_packet: Box::new(send_packet), 78 mtu: AttMtu::new(), 79 80 curr_request: AttRequestState::Idle(AttRequestHandler::new(db.clone())).into(), 81 82 indication_handler: SharedMutex::new(indication_handler), 83 pending_confirmation, 84 85 command_handler: AttCommandHandler::new(db), 86 } 87 } 88 send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SerializeError>89 fn send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SerializeError> { 90 let child = packet.into(); 91 let packet = AttBuilder { opcode: HACK_child_to_opcode(&child), _child_: child }; 92 (self.send_packet)(packet) 93 } 94 } 95 96 impl<T: AttDatabase + Clone + 'static> WeakBoxRef<'_, AttServerBearer<T>> { 97 /// Handle an incoming packet, and send outgoing packets as appropriate 98 /// using the owned ATT channel. handle_packet(&self, packet: AttView<'_>)99 pub fn handle_packet(&self, packet: AttView<'_>) { 100 match classify_opcode(packet.get_opcode()) { 101 OperationType::Command => { 102 self.command_handler.process_packet(packet); 103 } 104 OperationType::Request => { 105 self.handle_request(packet); 106 } 107 OperationType::Confirmation => self.pending_confirmation.on_confirmation(), 108 OperationType::Response | OperationType::Notification | OperationType::Indication => { 109 unreachable!("the arbiter should not let us receive these packet types") 110 } 111 } 112 } 113 114 /// Send an indication, wait for the peer confirmation, and return the 115 /// appropriate status If multiple calls are outstanding, they are 116 /// executed in FIFO order. send_indication( &self, handle: AttHandle, data: AttAttributeDataChild, ) -> impl Future<Output = Result<(), IndicationError>>117 pub fn send_indication( 118 &self, 119 handle: AttHandle, 120 data: AttAttributeDataChild, 121 ) -> impl Future<Output = Result<(), IndicationError>> { 122 trace!("sending indication for handle {handle:?}"); 123 124 let locked_indication_handler = self.indication_handler.lock(); 125 let pending_mtu = self.mtu.snapshot(); 126 let this = self.downgrade(); 127 128 async move { 129 // first wait until we are at the head of the queue and are ready to send 130 // indications 131 let mut indication_handler = locked_indication_handler 132 .await 133 .ok_or_else(|| { 134 warn!("indication for handle {handle:?} cancelled while queued since the connection dropped"); 135 IndicationError::SendError(SendError::ConnectionDropped) 136 })?; 137 // then, if MTU negotiation is taking place, wait for it to complete 138 let mtu = pending_mtu 139 .await 140 .ok_or_else(|| { 141 warn!("indication for handle {handle:?} cancelled while waiting for MTU exchange to complete since the connection dropped"); 142 IndicationError::SendError(SendError::ConnectionDropped) 143 })?; 144 // finally, send, and wait for a response 145 indication_handler.send(handle, data, mtu, |packet| this.try_send_packet(packet)).await 146 } 147 } 148 149 /// Handle a snooped MTU event, to update the MTU we use for our various 150 /// operations handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()>151 pub fn handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()> { 152 self.mtu.handle_event(mtu_event) 153 } 154 handle_request(&self, packet: AttView<'_>)155 fn handle_request(&self, packet: AttView<'_>) { 156 let curr_request = self.curr_request.replace(AttRequestState::Pending(None)); 157 self.curr_request.replace(match curr_request { 158 AttRequestState::Idle(mut request_handler) => { 159 // even if the MTU is updated afterwards, 5.3 3F 3.4.2.2 states that the 160 // request-time MTU should be used 161 let mtu = self.mtu.snapshot_or_default(); 162 let packet = packet.to_owned_packet(); 163 let this = self.downgrade(); 164 let task = spawn_local(async move { 165 trace!("starting ATT transaction"); 166 let reply = request_handler.process_packet(packet.view(), mtu).await; 167 this.with(|this| { 168 this.map(|this| { 169 match this.send_packet(reply) { 170 Ok(_) => { 171 trace!("reply packet sent") 172 } 173 Err(err) => { 174 error!("serializer failure {err:?}, dropping packet and sending failed reply"); 175 // if this also fails, we're stuck 176 if let Err(err) = this.send_packet(AttErrorResponseBuilder { 177 opcode_in_error: packet.view().get_opcode(), 178 handle_in_error: AttHandle(0).into(), 179 error_code: AttErrorCode::UNLIKELY_ERROR, 180 }) { 181 panic!("unexpected serialize error for known-good packet {err:?}") 182 } 183 } 184 }; 185 // ready for next transaction 186 this.curr_request.replace(AttRequestState::Idle(request_handler)); 187 }) 188 }); 189 }); 190 AttRequestState::Pending(Some(task.into())) 191 } 192 AttRequestState::Pending(_) => { 193 warn!("multiple ATT operations cannot simultaneously take place, dropping one"); 194 // TODO(aryarahul) - disconnect connection here; 195 curr_request 196 } 197 }); 198 } 199 } 200 201 impl<T: AttDatabase + Clone + 'static> WeakBox<AttServerBearer<T>> { try_send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SendError>202 fn try_send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SendError> { 203 self.with(|this| { 204 this.ok_or_else(|| { 205 warn!("connection dropped before packet sent"); 206 SendError::ConnectionDropped 207 })? 208 .send_packet(packet) 209 .map_err(SendError::SerializeError) 210 }) 211 } 212 } 213 214 #[cfg(test)] 215 mod test { 216 use std::rc::Rc; 217 218 use tokio::sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}; 219 220 use super::*; 221 222 use crate::{ 223 core::{shared_box::SharedBox, uuid::Uuid}, 224 gatt::{ 225 ffi::AttributeBackingType, 226 ids::TransportIndex, 227 mocks::mock_datastore::{MockDatastore, MockDatastoreEvents}, 228 server::{ 229 att_database::{AttAttribute, AttPermissions}, 230 gatt_database::{ 231 GattCharacteristicWithHandle, GattDatabase, GattServiceWithHandle, 232 }, 233 test::test_att_db::TestAttDatabase, 234 }, 235 }, 236 packets::{ 237 AttAttributeDataChild, AttHandleValueConfirmationBuilder, AttOpcode, 238 AttReadRequestBuilder, AttReadResponseBuilder, 239 }, 240 utils::{ 241 packet::{build_att_data, build_att_view_or_crash}, 242 task::{block_on_locally, try_await}, 243 }, 244 }; 245 246 const VALID_HANDLE: AttHandle = AttHandle(3); 247 const INVALID_HANDLE: AttHandle = AttHandle(4); 248 const ANOTHER_VALID_HANDLE: AttHandle = AttHandle(10); 249 250 const TCB_IDX: TransportIndex = TransportIndex(1); 251 open_connection( ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<AttBuilder>)252 fn open_connection( 253 ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<AttBuilder>) { 254 let db = TestAttDatabase::new(vec![ 255 ( 256 AttAttribute { 257 handle: VALID_HANDLE, 258 type_: Uuid::new(0x1234), 259 permissions: AttPermissions::READABLE | AttPermissions::INDICATE, 260 }, 261 vec![5, 6], 262 ), 263 ( 264 AttAttribute { 265 handle: ANOTHER_VALID_HANDLE, 266 type_: Uuid::new(0x5678), 267 permissions: AttPermissions::READABLE | AttPermissions::INDICATE, 268 }, 269 vec![5, 6], 270 ), 271 ]); 272 let (tx, rx) = unbounded_channel(); 273 let conn = AttServerBearer::new(db, move |packet| { 274 tx.send(packet).unwrap(); 275 Ok(()) 276 }) 277 .into(); 278 (conn, rx) 279 } 280 281 #[test] test_single_transaction()282 fn test_single_transaction() { 283 block_on_locally(async { 284 let (conn, mut rx) = open_connection(); 285 conn.as_ref().handle_packet( 286 build_att_view_or_crash(AttReadRequestBuilder { 287 attribute_handle: VALID_HANDLE.into(), 288 }) 289 .view(), 290 ); 291 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE); 292 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 293 }); 294 } 295 296 #[test] test_sequential_transactions()297 fn test_sequential_transactions() { 298 block_on_locally(async { 299 let (conn, mut rx) = open_connection(); 300 conn.as_ref().handle_packet( 301 build_att_view_or_crash(AttReadRequestBuilder { 302 attribute_handle: INVALID_HANDLE.into(), 303 }) 304 .view(), 305 ); 306 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::ERROR_RESPONSE); 307 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 308 309 conn.as_ref().handle_packet( 310 build_att_view_or_crash(AttReadRequestBuilder { 311 attribute_handle: VALID_HANDLE.into(), 312 }) 313 .view(), 314 ); 315 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE); 316 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 317 }); 318 } 319 320 #[test] test_concurrent_transaction_failure()321 fn test_concurrent_transaction_failure() { 322 // arrange: AttServerBearer linked to a backing datastore and packet queue, with 323 // two characteristics in the database 324 let (datastore, mut data_rx) = MockDatastore::new(); 325 let datastore = Rc::new(datastore); 326 let db = SharedBox::new(GattDatabase::new()); 327 db.add_service_with_handles( 328 GattServiceWithHandle { 329 handle: AttHandle(1), 330 type_: Uuid::new(1), 331 characteristics: vec![ 332 GattCharacteristicWithHandle { 333 handle: VALID_HANDLE, 334 type_: Uuid::new(2), 335 permissions: AttPermissions::READABLE, 336 descriptors: vec![], 337 }, 338 GattCharacteristicWithHandle { 339 handle: ANOTHER_VALID_HANDLE, 340 type_: Uuid::new(2), 341 permissions: AttPermissions::READABLE, 342 descriptors: vec![], 343 }, 344 ], 345 }, 346 datastore, 347 ) 348 .unwrap(); 349 let (tx, mut rx) = unbounded_channel(); 350 let send_packet = move |packet| { 351 tx.send(packet).unwrap(); 352 Ok(()) 353 }; 354 let conn = SharedBox::new(AttServerBearer::new(db.get_att_database(TCB_IDX), send_packet)); 355 let data = AttAttributeDataChild::RawData([1, 2].into()); 356 357 // act: send two read requests before replying to either read 358 // first request 359 block_on_locally(async { 360 let req1 = build_att_view_or_crash(AttReadRequestBuilder { 361 attribute_handle: VALID_HANDLE.into(), 362 }); 363 conn.as_ref().handle_packet(req1.view()); 364 // second request 365 let req2 = build_att_view_or_crash(AttReadRequestBuilder { 366 attribute_handle: ANOTHER_VALID_HANDLE.into(), 367 }); 368 conn.as_ref().handle_packet(req2.view()); 369 // handle first reply 370 let MockDatastoreEvents::Read(TCB_IDX, VALID_HANDLE, AttributeBackingType::Characteristic, data_resp) = 371 data_rx.recv().await.unwrap() else { 372 unreachable!(); 373 }; 374 data_resp.send(Ok(data.clone())).unwrap(); 375 trace!("reply sent from upper tester"); 376 377 // assert: that the first reply was made 378 let resp = rx.recv().await.unwrap(); 379 assert_eq!( 380 resp, 381 AttBuilder { 382 opcode: AttOpcode::READ_RESPONSE, 383 _child_: AttReadResponseBuilder { value: build_att_data(data) }.into(), 384 } 385 ); 386 // assert no other replies were made 387 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 388 // assert no callbacks are pending 389 assert_eq!(data_rx.try_recv().unwrap_err(), TryRecvError::Empty); 390 }); 391 } 392 393 #[test] test_indication_confirmation()394 fn test_indication_confirmation() { 395 block_on_locally(async { 396 // arrange 397 let (conn, mut rx) = open_connection(); 398 399 // act: send an indication 400 let pending_send = 401 spawn_local(conn.as_ref().send_indication( 402 VALID_HANDLE, 403 AttAttributeDataChild::RawData([1, 2, 3].into()), 404 )); 405 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION); 406 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 407 // and the confirmation 408 conn.as_ref().handle_packet( 409 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 410 ); 411 412 // assert: the indication was correctly sent 413 assert!(matches!(pending_send.await.unwrap(), Ok(()))); 414 }); 415 } 416 417 #[test] test_sequential_indications()418 fn test_sequential_indications() { 419 block_on_locally(async { 420 // arrange 421 let (conn, mut rx) = open_connection(); 422 423 // act: send the first indication 424 let pending_send1 = 425 spawn_local(conn.as_ref().send_indication( 426 VALID_HANDLE, 427 AttAttributeDataChild::RawData([1, 2, 3].into()), 428 )); 429 // wait for/capture the outgoing packet 430 let sent1 = rx.recv().await.unwrap(); 431 // send the response 432 conn.as_ref().handle_packet( 433 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 434 ); 435 // send the second indication 436 let pending_send2 = 437 spawn_local(conn.as_ref().send_indication( 438 VALID_HANDLE, 439 AttAttributeDataChild::RawData([1, 2, 3].into()), 440 )); 441 // wait for/capture the outgoing packet 442 let sent2 = rx.recv().await.unwrap(); 443 // and the response 444 conn.as_ref().handle_packet( 445 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 446 ); 447 448 // assert: exactly two indications were sent 449 assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 450 assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 451 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 452 // and that both got successful responses 453 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 454 assert!(matches!(pending_send2.await.unwrap(), Ok(()))); 455 }); 456 } 457 458 #[test] test_queued_indications_only_one_sent()459 fn test_queued_indications_only_one_sent() { 460 block_on_locally(async { 461 // arrange 462 let (conn, mut rx) = open_connection(); 463 464 // act: send two indications simultaneously 465 let pending_send1 = 466 spawn_local(conn.as_ref().send_indication( 467 VALID_HANDLE, 468 AttAttributeDataChild::RawData([1, 2, 3].into()), 469 )); 470 let pending_send2 = spawn_local(conn.as_ref().send_indication( 471 ANOTHER_VALID_HANDLE, 472 AttAttributeDataChild::RawData([1, 2, 3].into()), 473 )); 474 // assert: only one was initially sent 475 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION); 476 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 477 // and both are still pending 478 assert!(!pending_send1.is_finished()); 479 assert!(!pending_send2.is_finished()); 480 }); 481 } 482 483 #[test] test_queued_indications_dequeue_second()484 fn test_queued_indications_dequeue_second() { 485 block_on_locally(async { 486 // arrange 487 let (conn, mut rx) = open_connection(); 488 489 // act: send two indications simultaneously 490 let pending_send1 = 491 spawn_local(conn.as_ref().send_indication( 492 VALID_HANDLE, 493 AttAttributeDataChild::RawData([1, 2, 3].into()), 494 )); 495 let pending_send2 = spawn_local(conn.as_ref().send_indication( 496 ANOTHER_VALID_HANDLE, 497 AttAttributeDataChild::RawData([1, 2, 3].into()), 498 )); 499 // wait for/capture the outgoing packet 500 let sent1 = rx.recv().await.unwrap(); 501 // send response for the first one 502 conn.as_ref().handle_packet( 503 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 504 ); 505 // wait for/capture the outgoing packet 506 let sent2 = rx.recv().await.unwrap(); 507 508 // assert: the first future has completed successfully, the second one is 509 // pending 510 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 511 assert!(!pending_send2.is_finished()); 512 // and that both indications have been sent 513 assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 514 assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 515 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 516 }); 517 } 518 519 #[test] test_queued_indications_complete_both()520 fn test_queued_indications_complete_both() { 521 block_on_locally(async { 522 // arrange 523 let (conn, mut rx) = open_connection(); 524 525 // act: send two indications simultaneously 526 let pending_send1 = 527 spawn_local(conn.as_ref().send_indication( 528 VALID_HANDLE, 529 AttAttributeDataChild::RawData([1, 2, 3].into()), 530 )); 531 let pending_send2 = spawn_local(conn.as_ref().send_indication( 532 ANOTHER_VALID_HANDLE, 533 AttAttributeDataChild::RawData([1, 2, 3].into()), 534 )); 535 // wait for/capture the outgoing packet 536 let sent1 = rx.recv().await.unwrap(); 537 // send response for the first one 538 conn.as_ref().handle_packet( 539 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 540 ); 541 // wait for/capture the outgoing packet 542 let sent2 = rx.recv().await.unwrap(); 543 // and now the second 544 conn.as_ref().handle_packet( 545 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 546 ); 547 548 // assert: both futures have completed successfully 549 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 550 assert!(matches!(pending_send2.await.unwrap(), Ok(()))); 551 // and both indications have been sent 552 assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 553 assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 554 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 555 }); 556 } 557 558 #[test] test_indication_connection_drop()559 fn test_indication_connection_drop() { 560 block_on_locally(async { 561 // arrange: a pending indication 562 let (conn, mut rx) = open_connection(); 563 let pending_send = 564 spawn_local(conn.as_ref().send_indication( 565 VALID_HANDLE, 566 AttAttributeDataChild::RawData([1, 2, 3].into()), 567 )); 568 569 // act: drop the connection after the indication is sent 570 rx.recv().await.unwrap(); 571 drop(conn); 572 573 // assert: the pending indication fails with the appropriate error 574 assert!(matches!( 575 pending_send.await.unwrap(), 576 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 577 )); 578 }); 579 } 580 581 #[test] test_single_indication_pending_mtu()582 fn test_single_indication_pending_mtu() { 583 block_on_locally(async { 584 // arrange: pending MTU negotiation 585 let (conn, mut rx) = open_connection(); 586 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 587 588 // act: try to send an indication with a large payload size 589 let _ = 590 try_await(conn.as_ref().send_indication( 591 VALID_HANDLE, 592 AttAttributeDataChild::RawData((1..50).collect()), 593 )) 594 .await; 595 // then resolve the MTU negotiation with a large MTU 596 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(100)).unwrap(); 597 598 // assert: the indication was sent 599 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION); 600 }); 601 } 602 603 #[test] test_single_indication_pending_mtu_fail()604 fn test_single_indication_pending_mtu_fail() { 605 block_on_locally(async { 606 // arrange: pending MTU negotiation 607 let (conn, _) = open_connection(); 608 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 609 610 // act: try to send an indication with a large payload size 611 let pending_mtu = 612 try_await(conn.as_ref().send_indication( 613 VALID_HANDLE, 614 AttAttributeDataChild::RawData((1..50).collect()), 615 )) 616 .await 617 .unwrap_err(); 618 // then resolve the MTU negotiation with a small MTU 619 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(32)).unwrap(); 620 621 // assert: the indication failed to send 622 assert!(matches!(pending_mtu.await, Err(IndicationError::DataExceedsMtu { .. }))); 623 }); 624 } 625 626 #[test] test_server_transaction_pending_mtu()627 fn test_server_transaction_pending_mtu() { 628 block_on_locally(async { 629 // arrange: pending MTU negotiation 630 let (conn, mut rx) = open_connection(); 631 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 632 633 // act: send server packet 634 conn.as_ref().handle_packet( 635 build_att_view_or_crash(AttReadRequestBuilder { 636 attribute_handle: VALID_HANDLE.into(), 637 }) 638 .view(), 639 ); 640 641 // assert: that we reply even while the MTU req is outstanding 642 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE); 643 }); 644 } 645 646 #[test] test_queued_indication_pending_mtu_uses_mtu_on_dequeue()647 fn test_queued_indication_pending_mtu_uses_mtu_on_dequeue() { 648 block_on_locally(async { 649 // arrange: an outstanding indication 650 let (conn, mut rx) = open_connection(); 651 let _ = 652 try_await(conn.as_ref().send_indication( 653 VALID_HANDLE, 654 AttAttributeDataChild::RawData([1, 2, 3].into()), 655 )) 656 .await; 657 rx.recv().await.unwrap(); // flush rx_queue 658 659 // act: enqueue an indication with a large payload 660 let _ = 661 try_await(conn.as_ref().send_indication( 662 VALID_HANDLE, 663 AttAttributeDataChild::RawData((1..50).collect()), 664 )) 665 .await; 666 // then perform MTU negotiation to upgrade to a large MTU 667 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 668 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(512)).unwrap(); 669 // finally resolve the first indication, so the second indication can be sent 670 conn.as_ref().handle_packet( 671 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 672 ); 673 674 // assert: the second indication successfully sent (so it used the new MTU) 675 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION); 676 }); 677 } 678 } 679