1 //! This module handles an individual connection on the ATT fixed channel. 2 //! It handles ATT transactions and unacknowledged operations, backed by an 3 //! AttDatabase (that may in turn be backed by an upper-layer protocol) 4 5 use pdl_runtime::EncodeError; 6 use std::cell::Cell; 7 use std::future::Future; 8 9 use anyhow::Result; 10 use log::{error, trace, warn}; 11 use tokio::task::spawn_local; 12 13 use crate::core::shared_box::{WeakBox, WeakBoxRef}; 14 use crate::core::shared_mutex::SharedMutex; 15 use crate::gatt::ids::AttHandle; 16 use crate::gatt::mtu::{AttMtu, MtuEvent}; 17 use crate::gatt::opcode_types::{classify_opcode, OperationType}; 18 use crate::packets::att::{self, AttErrorCode}; 19 use crate::utils::owned_handle::OwnedHandle; 20 21 use super::att_database::AttDatabase; 22 use super::command_handler::AttCommandHandler; 23 use super::indication_handler::{ConfirmationWatcher, IndicationError, IndicationHandler}; 24 use super::request_handler::AttRequestHandler; 25 26 enum AttRequestState<T: AttDatabase> { 27 Idle(AttRequestHandler<T>), 28 Pending { _task: OwnedHandle<()> }, 29 Replacing, 30 } 31 32 /// The errors that can occur while trying to send a packet 33 #[derive(Debug)] 34 pub enum SendError { 35 /// The packet failed to serialize 36 SerializeError(EncodeError), 37 /// The connection no longer exists 38 ConnectionDropped, 39 } 40 41 /// This represents a single ATT bearer (currently, always the unenhanced fixed 42 /// channel on LE) The AttRequestState ensures that only one transaction can 43 /// take place at a time 44 pub struct AttServerBearer<T: AttDatabase> { 45 // general 46 send_packet: Box<dyn Fn(att::Att) -> Result<(), EncodeError>>, 47 mtu: AttMtu, 48 49 // request state 50 curr_request: Cell<AttRequestState<T>>, 51 52 // indication state 53 indication_handler: SharedMutex<IndicationHandler<T>>, 54 pending_confirmation: ConfirmationWatcher, 55 56 // command handler (across all bearers) 57 command_handler: AttCommandHandler<T>, 58 } 59 60 impl<T: AttDatabase + Clone + 'static> AttServerBearer<T> { 61 /// Constructor, wrapping an ATT channel (for outgoing packets) and an 62 /// AttDatabase new(db: T, send_packet: impl Fn(att::Att) -> Result<(), EncodeError> + 'static) -> Self63 pub fn new(db: T, send_packet: impl Fn(att::Att) -> Result<(), EncodeError> + 'static) -> Self { 64 let (indication_handler, pending_confirmation) = IndicationHandler::new(db.clone()); 65 Self { 66 send_packet: Box::new(send_packet), 67 mtu: AttMtu::new(), 68 69 curr_request: AttRequestState::Idle(AttRequestHandler::new(db.clone())).into(), 70 71 indication_handler: SharedMutex::new(indication_handler), 72 pending_confirmation, 73 74 command_handler: AttCommandHandler::new(db), 75 } 76 } 77 send_packet(&self, packet: att::Att) -> Result<(), EncodeError>78 fn send_packet(&self, packet: att::Att) -> Result<(), EncodeError> { 79 (self.send_packet)(packet) 80 } 81 } 82 83 impl<T: AttDatabase + Clone + 'static> WeakBoxRef<'_, AttServerBearer<T>> { 84 /// Handle an incoming packet, and send outgoing packets as appropriate 85 /// using the owned ATT channel. handle_packet(&self, packet: att::Att)86 pub fn handle_packet(&self, packet: att::Att) { 87 match classify_opcode(packet.opcode) { 88 OperationType::Command => { 89 self.command_handler.process_packet(packet); 90 } 91 OperationType::Request => { 92 self.handle_request(packet); 93 } 94 OperationType::Confirmation => self.pending_confirmation.on_confirmation(), 95 OperationType::Response | OperationType::Notification | OperationType::Indication => { 96 unreachable!("the arbiter should not let us receive these packet types") 97 } 98 } 99 } 100 101 /// Send an indication, wait for the peer confirmation, and return the 102 /// appropriate status If multiple calls are outstanding, they are 103 /// executed in FIFO order. send_indication( &self, handle: AttHandle, data: Vec<u8>, ) -> impl Future<Output = Result<(), IndicationError>>104 pub fn send_indication( 105 &self, 106 handle: AttHandle, 107 data: Vec<u8>, 108 ) -> impl Future<Output = Result<(), IndicationError>> { 109 trace!("sending indication for handle {handle:?}"); 110 111 let locked_indication_handler = self.indication_handler.lock(); 112 let pending_mtu = self.mtu.snapshot(); 113 let this = self.downgrade(); 114 115 async move { 116 // first wait until we are at the head of the queue and are ready to send 117 // indications 118 let mut indication_handler = locked_indication_handler 119 .await 120 .ok_or_else(|| { 121 warn!("indication for handle {handle:?} cancelled while queued since the connection dropped"); 122 IndicationError::SendError(SendError::ConnectionDropped) 123 })?; 124 // then, if MTU negotiation is taking place, wait for it to complete 125 let mtu = pending_mtu 126 .await 127 .ok_or_else(|| { 128 warn!("indication for handle {handle:?} cancelled while waiting for MTU exchange to complete since the connection dropped"); 129 IndicationError::SendError(SendError::ConnectionDropped) 130 })?; 131 // finally, send, and wait for a response 132 indication_handler.send(handle, &data, mtu, |packet| this.try_send_packet(packet)).await 133 } 134 } 135 136 /// Handle a snooped MTU event, to update the MTU we use for our various 137 /// operations handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()>138 pub fn handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()> { 139 self.mtu.handle_event(mtu_event) 140 } 141 handle_request(&self, packet: att::Att)142 fn handle_request(&self, packet: att::Att) { 143 let curr_request = self.curr_request.replace(AttRequestState::Replacing); 144 self.curr_request.replace(match curr_request { 145 AttRequestState::Idle(mut request_handler) => { 146 // even if the MTU is updated afterwards, 5.3 3F 3.4.2.2 states that the 147 // request-time MTU should be used 148 let mtu = self.mtu.snapshot_or_default(); 149 let this = self.downgrade(); 150 let opcode = packet.opcode; 151 let task = spawn_local(async move { 152 trace!("starting ATT transaction"); 153 let reply = request_handler.process_packet(packet, mtu).await; 154 this.with(|this| { 155 this.map(|this| { 156 match this.send_packet(reply) { 157 Ok(_) => { 158 trace!("reply packet sent") 159 } 160 Err(err) => { 161 error!("serializer failure {err:?}, dropping packet and sending failed reply"); 162 // if this also fails, we're stuck 163 if let Err(err) = this.send_packet(att::AttErrorResponse { 164 opcode_in_error: opcode, 165 handle_in_error: AttHandle(0).into(), 166 error_code: AttErrorCode::UnlikelyError, 167 }.try_into().unwrap()) { 168 panic!("unexpected serialize error for known-good packet {err:?}") 169 } 170 } 171 }; 172 // ready for next transaction 173 this.curr_request.replace(AttRequestState::Idle(request_handler)); 174 }) 175 }); 176 }); 177 AttRequestState::Pending { _task: task.into() } 178 } 179 AttRequestState::Pending { .. } => { 180 warn!("multiple ATT operations cannot simultaneously take place, dropping one"); 181 // TODO(aryarahul) - disconnect connection here; 182 curr_request 183 } 184 AttRequestState::Replacing => { 185 panic!("Replacing is an ephemeral state"); 186 } 187 }); 188 } 189 } 190 191 impl<T: AttDatabase + Clone + 'static> WeakBox<AttServerBearer<T>> { try_send_packet(&self, packet: att::Att) -> Result<(), SendError>192 fn try_send_packet(&self, packet: att::Att) -> Result<(), SendError> { 193 self.with(|this| { 194 this.ok_or_else(|| { 195 warn!("connection dropped before packet sent"); 196 SendError::ConnectionDropped 197 })? 198 .send_packet(packet) 199 .map_err(SendError::SerializeError) 200 }) 201 } 202 } 203 204 #[cfg(test)] 205 mod test { 206 use std::rc::Rc; 207 208 use tokio::sync::mpsc::error::TryRecvError; 209 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; 210 211 use super::*; 212 213 use crate::core::shared_box::SharedBox; 214 use crate::core::uuid::Uuid; 215 use crate::gatt::ffi::AttributeBackingType; 216 use crate::gatt::ids::TransportIndex; 217 use crate::gatt::mocks::mock_datastore::{MockDatastore, MockDatastoreEvents}; 218 use crate::gatt::server::att_database::{AttAttribute, AttPermissions}; 219 use crate::gatt::server::gatt_database::{ 220 GattCharacteristicWithHandle, GattDatabase, GattServiceWithHandle, 221 }; 222 use crate::gatt::server::test::test_att_db::TestAttDatabase; 223 use crate::packets::att; 224 use crate::utils::task::{block_on_locally, try_await}; 225 226 const VALID_HANDLE: AttHandle = AttHandle(3); 227 const INVALID_HANDLE: AttHandle = AttHandle(4); 228 const ANOTHER_VALID_HANDLE: AttHandle = AttHandle(10); 229 230 const TCB_IDX: TransportIndex = TransportIndex(1); 231 open_connection( ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<att::Att>)232 fn open_connection( 233 ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<att::Att>) { 234 let db = TestAttDatabase::new(vec![ 235 ( 236 AttAttribute { 237 handle: VALID_HANDLE, 238 type_: Uuid::new(0x1234), 239 permissions: AttPermissions::READABLE | AttPermissions::INDICATE, 240 }, 241 vec![5, 6], 242 ), 243 ( 244 AttAttribute { 245 handle: ANOTHER_VALID_HANDLE, 246 type_: Uuid::new(0x5678), 247 permissions: AttPermissions::READABLE | AttPermissions::INDICATE, 248 }, 249 vec![5, 6], 250 ), 251 ]); 252 let (tx, rx) = unbounded_channel(); 253 let conn = AttServerBearer::new(db, move |packet| { 254 tx.send(packet).unwrap(); 255 Ok(()) 256 }) 257 .into(); 258 (conn, rx) 259 } 260 261 #[test] test_single_transaction()262 fn test_single_transaction() { 263 block_on_locally(async { 264 let (conn, mut rx) = open_connection(); 265 conn.as_ref().handle_packet( 266 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(), 267 ); 268 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse); 269 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 270 }); 271 } 272 273 #[test] test_sequential_transactions()274 fn test_sequential_transactions() { 275 block_on_locally(async { 276 let (conn, mut rx) = open_connection(); 277 conn.as_ref().handle_packet( 278 att::AttReadRequest { attribute_handle: INVALID_HANDLE.into() }.try_into().unwrap(), 279 ); 280 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ErrorResponse); 281 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 282 283 conn.as_ref().handle_packet( 284 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(), 285 ); 286 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse); 287 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 288 }); 289 } 290 291 #[test] test_concurrent_transaction_failure()292 fn test_concurrent_transaction_failure() { 293 // arrange: AttServerBearer linked to a backing datastore and packet queue, with 294 // two characteristics in the database 295 let (datastore, mut data_rx) = MockDatastore::new(); 296 let datastore = Rc::new(datastore); 297 let db = SharedBox::new(GattDatabase::new()); 298 db.add_service_with_handles( 299 GattServiceWithHandle { 300 handle: AttHandle(1), 301 type_: Uuid::new(1), 302 characteristics: vec![ 303 GattCharacteristicWithHandle { 304 handle: VALID_HANDLE, 305 type_: Uuid::new(2), 306 permissions: AttPermissions::READABLE, 307 descriptors: vec![], 308 }, 309 GattCharacteristicWithHandle { 310 handle: ANOTHER_VALID_HANDLE, 311 type_: Uuid::new(2), 312 permissions: AttPermissions::READABLE, 313 descriptors: vec![], 314 }, 315 ], 316 }, 317 datastore, 318 ) 319 .unwrap(); 320 let (tx, mut rx) = unbounded_channel(); 321 let send_packet = move |packet| { 322 tx.send(packet).unwrap(); 323 Ok(()) 324 }; 325 let conn = SharedBox::new(AttServerBearer::new(db.get_att_database(TCB_IDX), send_packet)); 326 let data = [1, 2]; 327 328 // act: send two read requests before replying to either read 329 // first request 330 block_on_locally(async { 331 let req1 = att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }; 332 conn.as_ref().handle_packet(req1.try_into().unwrap()); 333 // second request 334 let req2 = att::AttReadRequest { attribute_handle: ANOTHER_VALID_HANDLE.into() }; 335 conn.as_ref().handle_packet(req2.try_into().unwrap()); 336 // handle first reply 337 let MockDatastoreEvents::Read( 338 TCB_IDX, 339 VALID_HANDLE, 340 AttributeBackingType::Characteristic, 341 data_resp, 342 ) = data_rx.recv().await.unwrap() 343 else { 344 unreachable!(); 345 }; 346 data_resp.send(Ok(data.to_vec())).unwrap(); 347 trace!("reply sent from upper tester"); 348 349 // assert: that the first reply was made 350 let resp = rx.recv().await.unwrap(); 351 assert_eq!(resp, att::AttReadResponse { value: data.to_vec() }.try_into().unwrap()); 352 // assert no other replies were made 353 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 354 // assert no callbacks are pending 355 assert_eq!(data_rx.try_recv().unwrap_err(), TryRecvError::Empty); 356 }); 357 } 358 359 #[test] test_indication_confirmation()360 fn test_indication_confirmation() { 361 block_on_locally(async { 362 // arrange 363 let (conn, mut rx) = open_connection(); 364 365 // act: send an indication 366 let pending_send = 367 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 368 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication); 369 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 370 // and the confirmation 371 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 372 373 // assert: the indication was correctly sent 374 assert!(matches!(pending_send.await.unwrap(), Ok(()))); 375 }); 376 } 377 378 #[test] test_sequential_indications()379 fn test_sequential_indications() { 380 block_on_locally(async { 381 // arrange 382 let (conn, mut rx) = open_connection(); 383 384 // act: send the first indication 385 let pending_send1 = 386 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 387 // wait for/capture the outgoing packet 388 let sent1 = rx.recv().await.unwrap(); 389 // send the response 390 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 391 // send the second indication 392 let pending_send2 = 393 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 394 // wait for/capture the outgoing packet 395 let sent2 = rx.recv().await.unwrap(); 396 // and the response 397 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 398 399 // assert: exactly two indications were sent 400 assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication); 401 assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication); 402 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 403 // and that both got successful responses 404 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 405 assert!(matches!(pending_send2.await.unwrap(), Ok(()))); 406 }); 407 } 408 409 #[test] test_queued_indications_only_one_sent()410 fn test_queued_indications_only_one_sent() { 411 block_on_locally(async { 412 // arrange 413 let (conn, mut rx) = open_connection(); 414 415 // act: send two indications simultaneously 416 let pending_send1 = 417 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 418 let pending_send2 = 419 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3])); 420 // assert: only one was initially sent 421 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication); 422 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 423 // and both are still pending 424 assert!(!pending_send1.is_finished()); 425 assert!(!pending_send2.is_finished()); 426 }); 427 } 428 429 #[test] test_queued_indications_dequeue_second()430 fn test_queued_indications_dequeue_second() { 431 block_on_locally(async { 432 // arrange 433 let (conn, mut rx) = open_connection(); 434 435 // act: send two indications simultaneously 436 let pending_send1 = 437 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 438 let pending_send2 = 439 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3])); 440 // wait for/capture the outgoing packet 441 let sent1 = rx.recv().await.unwrap(); 442 // send response for the first one 443 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 444 // wait for/capture the outgoing packet 445 let sent2 = rx.recv().await.unwrap(); 446 447 // assert: the first future has completed successfully, the second one is 448 // pending 449 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 450 assert!(!pending_send2.is_finished()); 451 // and that both indications have been sent 452 assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication); 453 assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication); 454 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 455 }); 456 } 457 458 #[test] test_queued_indications_complete_both()459 fn test_queued_indications_complete_both() { 460 block_on_locally(async { 461 // arrange 462 let (conn, mut rx) = open_connection(); 463 464 // act: send two indications simultaneously 465 let pending_send1 = 466 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 467 let pending_send2 = 468 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3])); 469 // wait for/capture the outgoing packet 470 let sent1 = rx.recv().await.unwrap(); 471 // send response for the first one 472 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 473 // wait for/capture the outgoing packet 474 let sent2 = rx.recv().await.unwrap(); 475 // and now the second 476 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 477 478 // assert: both futures have completed successfully 479 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 480 assert!(matches!(pending_send2.await.unwrap(), Ok(()))); 481 // and both indications have been sent 482 assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication); 483 assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication); 484 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 485 }); 486 } 487 488 #[test] test_indication_connection_drop()489 fn test_indication_connection_drop() { 490 block_on_locally(async { 491 // arrange: a pending indication 492 let (conn, mut rx) = open_connection(); 493 let pending_send = 494 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 495 496 // act: drop the connection after the indication is sent 497 rx.recv().await.unwrap(); 498 drop(conn); 499 500 // assert: the pending indication fails with the appropriate error 501 assert!(matches!( 502 pending_send.await.unwrap(), 503 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 504 )); 505 }); 506 } 507 508 #[test] test_single_indication_pending_mtu()509 fn test_single_indication_pending_mtu() { 510 block_on_locally(async { 511 // arrange: pending MTU negotiation 512 let (conn, mut rx) = open_connection(); 513 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 514 515 // act: try to send an indication with a large payload size 516 let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect())).await; 517 // then resolve the MTU negotiation with a large MTU 518 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(100)).unwrap(); 519 520 // assert: the indication was sent 521 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication); 522 }); 523 } 524 525 #[test] test_single_indication_pending_mtu_fail()526 fn test_single_indication_pending_mtu_fail() { 527 block_on_locally(async { 528 // arrange: pending MTU negotiation 529 let (conn, _) = open_connection(); 530 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 531 532 // act: try to send an indication with a large payload size 533 let pending_mtu = 534 try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect())) 535 .await 536 .unwrap_err(); 537 // then resolve the MTU negotiation with a small MTU 538 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(32)).unwrap(); 539 540 // assert: the indication failed to send 541 assert!(matches!(pending_mtu.await, Err(IndicationError::DataExceedsMtu { .. }))); 542 }); 543 } 544 545 #[test] test_server_transaction_pending_mtu()546 fn test_server_transaction_pending_mtu() { 547 block_on_locally(async { 548 // arrange: pending MTU negotiation 549 let (conn, mut rx) = open_connection(); 550 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 551 552 // act: send server packet 553 conn.as_ref().handle_packet( 554 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(), 555 ); 556 557 // assert: that we reply even while the MTU req is outstanding 558 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse); 559 }); 560 } 561 562 #[test] test_queued_indication_pending_mtu_uses_mtu_on_dequeue()563 fn test_queued_indication_pending_mtu_uses_mtu_on_dequeue() { 564 block_on_locally(async { 565 // arrange: an outstanding indication 566 let (conn, mut rx) = open_connection(); 567 let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])).await; 568 rx.recv().await.unwrap(); // flush rx_queue 569 570 // act: enqueue an indication with a large payload 571 let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect())).await; 572 // then perform MTU negotiation to upgrade to a large MTU 573 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 574 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(512)).unwrap(); 575 // finally resolve the first indication, so the second indication can be sent 576 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 577 578 // assert: the second indication successfully sent (so it used the new MTU) 579 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication); 580 }); 581 } 582 } 583