1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import os 21import struct 22import pytest 23from unittest.mock import AsyncMock, Mock, ANY 24 25from bumble.controller import Controller 26from bumble.gatt_client import CharacteristicProxy 27from bumble.link import LocalLink 28from bumble.device import Device, Peer 29from bumble.host import Host 30from bumble.gatt import ( 31 GATT_BATTERY_LEVEL_CHARACTERISTIC, 32 GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, 33 CharacteristicAdapter, 34 DelegatedCharacteristicAdapter, 35 PackedCharacteristicAdapter, 36 MappedCharacteristicAdapter, 37 UTF8CharacteristicAdapter, 38 Service, 39 Characteristic, 40 CharacteristicValue, 41 Descriptor, 42) 43from bumble.transport import AsyncPipeSink 44from bumble.core import UUID 45from bumble.att import ( 46 Attribute, 47 ATT_EXCHANGE_MTU_REQUEST, 48 ATT_ATTRIBUTE_NOT_FOUND_ERROR, 49 ATT_PDU, 50 ATT_Error_Response, 51 ATT_Read_By_Group_Type_Request, 52) 53from .test_utils import async_barrier 54 55 56# ----------------------------------------------------------------------------- 57def basic_check(x): 58 pdu = x.to_bytes() 59 parsed = ATT_PDU.from_bytes(pdu) 60 x_str = str(x) 61 parsed_str = str(parsed) 62 assert x_str == parsed_str 63 64 65# ----------------------------------------------------------------------------- 66def test_UUID(): 67 u = UUID.from_16_bits(0x7788) 68 assert str(u) == 'UUID-16:7788' 69 u = UUID.from_32_bits(0x11223344) 70 assert str(u) == 'UUID-32:11223344' 71 u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') 72 assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 73 v = UUID(str(u)) 74 assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 75 w = UUID.from_bytes(v.to_bytes()) 76 assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 77 78 u1 = UUID.from_16_bits(0x1234) 79 b1 = u1.to_bytes(force_128=True) 80 u2 = UUID.from_bytes(b1) 81 assert u1 == u2 82 83 u3 = UUID.from_16_bits(0x180A) 84 assert str(u3) == 'UUID-16:180A (Device Information)' 85 86 87# ----------------------------------------------------------------------------- 88def test_ATT_Error_Response(): 89 pdu = ATT_Error_Response( 90 request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST, 91 attribute_handle_in_error=0x0000, 92 error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR, 93 ) 94 basic_check(pdu) 95 96 97# ----------------------------------------------------------------------------- 98def test_ATT_Read_By_Group_Type_Request(): 99 pdu = ATT_Read_By_Group_Type_Request( 100 starting_handle=0x0001, 101 ending_handle=0xFFFF, 102 attribute_group_type=UUID.from_16_bits(0x2800), 103 ) 104 basic_check(pdu) 105 106 107# ----------------------------------------------------------------------------- 108@pytest.mark.asyncio 109async def test_characteristic_encoding(): 110 class Foo(Characteristic): 111 def encode_value(self, value): 112 return bytes([value]) 113 114 def decode_value(self, value_bytes): 115 return value_bytes[0] 116 117 c = Foo( 118 GATT_BATTERY_LEVEL_CHARACTERISTIC, 119 Characteristic.Properties.READ, 120 Characteristic.READABLE, 121 123, 122 ) 123 x = await c.read_value(None) 124 assert x == bytes([123]) 125 await c.write_value(None, bytes([122])) 126 assert c.value == 122 127 128 class FooProxy(CharacteristicProxy): 129 def __init__(self, characteristic): 130 super().__init__( 131 characteristic.client, 132 characteristic.handle, 133 characteristic.end_group_handle, 134 characteristic.uuid, 135 characteristic.properties, 136 ) 137 138 def encode_value(self, value): 139 return bytes([value]) 140 141 def decode_value(self, value_bytes): 142 return value_bytes[0] 143 144 [client, server] = LinkedDevices().devices[:2] 145 146 characteristic = Characteristic( 147 'FDB159DB-036C-49E3-B3DB-6325AC750806', 148 Characteristic.Properties.READ 149 | Characteristic.Properties.WRITE 150 | Characteristic.Properties.NOTIFY, 151 Characteristic.READABLE | Characteristic.WRITEABLE, 152 bytes([123]), 153 ) 154 155 async def async_read(connection): 156 return 0x05060708 157 158 async_characteristic = PackedCharacteristicAdapter( 159 Characteristic( 160 '2AB7E91B-43E8-4F73-AC3B-80C1683B47F9', 161 Characteristic.Properties.READ, 162 Characteristic.READABLE, 163 CharacteristicValue(read=async_read), 164 ), 165 '>I', 166 ) 167 168 service = Service( 169 '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic, async_characteristic] 170 ) 171 server.add_service(service) 172 173 await client.power_on() 174 await server.power_on() 175 connection = await client.connect(server.random_address) 176 peer = Peer(connection) 177 178 await peer.discover_services() 179 await peer.discover_characteristics() 180 c = peer.get_characteristics_by_uuid(characteristic.uuid) 181 assert len(c) == 1 182 c = c[0] 183 cp = FooProxy(c) 184 185 v = await cp.read_value() 186 assert v == 123 187 await cp.write_value(124) 188 await async_barrier() 189 assert characteristic.value == bytes([124]) 190 191 v = await cp.read_value() 192 assert v == 124 193 await cp.write_value(125, with_response=True) 194 await async_barrier() 195 assert characteristic.value == bytes([125]) 196 197 cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2])) 198 await cd.write_value(100, with_response=True) 199 await async_barrier() 200 assert characteristic.value == bytes([50]) 201 202 c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid) 203 assert len(c2) == 1 204 c2 = c2[0] 205 cd2 = PackedCharacteristicAdapter(c2, ">I") 206 cd2v = await cd2.read_value() 207 assert cd2v == 0x05060708 208 209 last_change = None 210 211 def on_change(value): 212 nonlocal last_change 213 last_change = value 214 215 await c.subscribe(on_change) 216 await server.notify_subscribers(characteristic) 217 await async_barrier() 218 assert last_change == characteristic.value 219 last_change = None 220 221 await server.notify_subscribers(characteristic, value=bytes([125])) 222 await async_barrier() 223 assert last_change == bytes([125]) 224 last_change = None 225 226 await c.unsubscribe(on_change) 227 await server.notify_subscribers(characteristic) 228 await async_barrier() 229 assert last_change is None 230 231 await cp.subscribe(on_change) 232 await server.notify_subscribers(characteristic) 233 await async_barrier() 234 assert last_change == characteristic.value[0] 235 last_change = None 236 237 await server.notify_subscribers(characteristic, value=bytes([126])) 238 await async_barrier() 239 assert last_change == 126 240 last_change = None 241 242 await cp.unsubscribe(on_change) 243 await server.notify_subscribers(characteristic) 244 await async_barrier() 245 assert last_change is None 246 247 cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0]) 248 await cd.subscribe(on_change) 249 await server.notify_subscribers(characteristic) 250 await async_barrier() 251 assert last_change == characteristic.value[0] 252 last_change = None 253 254 await cd.unsubscribe(on_change) 255 await server.notify_subscribers(characteristic) 256 await async_barrier() 257 assert last_change is None 258 259 260# ----------------------------------------------------------------------------- 261@pytest.mark.asyncio 262async def test_attribute_getters(): 263 [client, server] = LinkedDevices().devices[:2] 264 265 characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806') 266 characteristic = Characteristic( 267 characteristic_uuid, 268 Characteristic.Properties.READ 269 | Characteristic.Properties.WRITE 270 | Characteristic.Properties.NOTIFY, 271 Characteristic.READABLE | Characteristic.WRITEABLE, 272 bytes([123]), 273 ) 274 275 service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829') 276 service = Service(service_uuid, [characteristic]) 277 server.add_service(service) 278 279 service_attr = server.gatt_server.get_service_attribute(service_uuid) 280 assert service_attr 281 282 ( 283 char_decl_attr, 284 char_value_attr, 285 ) = server.gatt_server.get_characteristic_attributes( 286 service_uuid, characteristic_uuid 287 ) 288 assert char_decl_attr and char_value_attr 289 290 desc_attr = server.gatt_server.get_descriptor_attribute( 291 service_uuid, 292 characteristic_uuid, 293 GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, 294 ) 295 assert desc_attr 296 297 # assert all handles are in expected order 298 assert ( 299 service_attr.handle 300 < char_decl_attr.handle 301 < char_value_attr.handle 302 < desc_attr.handle 303 == service_attr.end_group_handle 304 ) 305 # assert characteristic declarations attribute is followed by characteristic value attribute 306 assert char_decl_attr.handle + 1 == char_value_attr.handle 307 308 309# ----------------------------------------------------------------------------- 310@pytest.mark.asyncio 311async def test_CharacteristicAdapter(): 312 # Check that the CharacteristicAdapter base class is transparent 313 v = bytes([1, 2, 3]) 314 c = Characteristic( 315 GATT_BATTERY_LEVEL_CHARACTERISTIC, 316 Characteristic.Properties.READ, 317 Characteristic.READABLE, 318 v, 319 ) 320 a = CharacteristicAdapter(c) 321 322 value = await a.read_value(None) 323 assert value == v 324 325 v = bytes([3, 4, 5]) 326 await a.write_value(None, v) 327 assert c.value == v 328 329 # Simple delegated adapter 330 a = DelegatedCharacteristicAdapter( 331 c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)) 332 ) 333 334 value = await a.read_value(None) 335 assert value == bytes(reversed(v)) 336 337 v = bytes([3, 4, 5]) 338 await a.write_value(None, v) 339 assert a.value == bytes(reversed(v)) 340 341 # Packed adapter with single element format 342 v = 1234 343 pv = struct.pack('>H', v) 344 c.value = v 345 a = PackedCharacteristicAdapter(c, '>H') 346 347 value = await a.read_value(None) 348 assert value == pv 349 c.value = None 350 await a.write_value(None, pv) 351 assert a.value == v 352 353 # Packed adapter with multi-element format 354 v1 = 1234 355 v2 = 5678 356 pv = struct.pack('>HH', v1, v2) 357 c.value = (v1, v2) 358 a = PackedCharacteristicAdapter(c, '>HH') 359 360 value = await a.read_value(None) 361 assert value == pv 362 c.value = None 363 await a.write_value(None, pv) 364 assert a.value == (v1, v2) 365 366 # Mapped adapter 367 v1 = 1234 368 v2 = 5678 369 pv = struct.pack('>HH', v1, v2) 370 mapped = {'v1': v1, 'v2': v2} 371 c.value = mapped 372 a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) 373 374 value = await a.read_value(None) 375 assert value == pv 376 c.value = None 377 await a.write_value(None, pv) 378 assert a.value == mapped 379 380 # UTF-8 adapter 381 v = 'Hello π' 382 ev = v.encode('utf-8') 383 c.value = v 384 a = UTF8CharacteristicAdapter(c) 385 386 value = await a.read_value(None) 387 assert value == ev 388 c.value = None 389 await a.write_value(None, ev) 390 assert a.value == v 391 392 393# ----------------------------------------------------------------------------- 394@pytest.mark.asyncio 395async def test_CharacteristicValue(): 396 b = bytes([1, 2, 3]) 397 398 async def read_value(connection): 399 return b 400 401 c = CharacteristicValue(read=read_value) 402 x = await c.read(None) 403 assert x == b 404 405 m = Mock() 406 c = CharacteristicValue(write=m) 407 z = object() 408 c.write(z, b) 409 m.assert_called_once_with(z, b) 410 411 412# ----------------------------------------------------------------------------- 413@pytest.mark.asyncio 414async def test_CharacteristicValue_async(): 415 b = bytes([1, 2, 3]) 416 417 async def read_value(connection): 418 return b 419 420 c = CharacteristicValue(read=read_value) 421 x = await c.read(None) 422 assert x == b 423 424 m = AsyncMock() 425 c = CharacteristicValue(write=m) 426 z = object() 427 await c.write(z, b) 428 m.assert_called_once_with(z, b) 429 430 431# ----------------------------------------------------------------------------- 432class LinkedDevices: 433 def __init__(self): 434 self.connections = [None, None, None] 435 436 self.link = LocalLink() 437 self.controllers = [ 438 Controller('C1', link=self.link), 439 Controller('C2', link=self.link), 440 Controller('C3', link=self.link), 441 ] 442 self.devices = [ 443 Device( 444 address='F0:F1:F2:F3:F4:F5', 445 host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), 446 ), 447 Device( 448 address='F1:F2:F3:F4:F5:F6', 449 host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), 450 ), 451 Device( 452 address='F2:F3:F4:F5:F6:F7', 453 host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])), 454 ), 455 ] 456 457 self.paired = [None, None, None] 458 459 460# ----------------------------------------------------------------------------- 461@pytest.mark.asyncio 462async def test_read_write(): 463 [client, server] = LinkedDevices().devices[:2] 464 465 characteristic1 = Characteristic( 466 'FDB159DB-036C-49E3-B3DB-6325AC750806', 467 Characteristic.Properties.READ | Characteristic.Properties.WRITE, 468 Characteristic.READABLE | Characteristic.WRITEABLE, 469 ) 470 471 def on_characteristic1_write(connection, value): 472 characteristic1._last_value = (connection, value) 473 474 characteristic1.on('write', on_characteristic1_write) 475 476 def on_characteristic2_read(connection): 477 return bytes(str(connection.peer_address)) 478 479 def on_characteristic2_write(connection, value): 480 characteristic2._last_value = (connection, value) 481 482 characteristic2 = Characteristic( 483 '66DE9057-C848-4ACA-B993-D675644EBB85', 484 Characteristic.Properties.READ | Characteristic.Properties.WRITE, 485 Characteristic.READABLE | Characteristic.WRITEABLE, 486 CharacteristicValue( 487 read=on_characteristic2_read, write=on_characteristic2_write 488 ), 489 ) 490 491 service1 = Service( 492 '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2] 493 ) 494 server.add_services([service1]) 495 496 await client.power_on() 497 await server.power_on() 498 connection = await client.connect(server.random_address) 499 peer = Peer(connection) 500 501 await peer.discover_services() 502 await peer.discover_characteristics() 503 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 504 assert len(c) == 1 505 c1 = c[0] 506 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 507 assert len(c) == 1 508 c2 = c[0] 509 510 v1 = await peer.read_value(c1) 511 assert v1 == b'' 512 b = bytes([1, 2, 3]) 513 await peer.write_value(c1, b) 514 await async_barrier() 515 assert characteristic1.value == b 516 v1 = await peer.read_value(c1) 517 assert v1 == b 518 assert type(characteristic1._last_value is tuple) 519 assert len(characteristic1._last_value) == 2 520 assert str(characteristic1._last_value[0].peer_address) == str( 521 client.random_address 522 ) 523 assert characteristic1._last_value[1] == b 524 bb = bytes([3, 4, 5, 6]) 525 characteristic1.value = bb 526 v1 = await peer.read_value(c1) 527 assert v1 == bb 528 529 await peer.write_value(c2, b) 530 await async_barrier() 531 assert type(characteristic2._last_value is tuple) 532 assert len(characteristic2._last_value) == 2 533 assert str(characteristic2._last_value[0].peer_address) == str( 534 client.random_address 535 ) 536 assert characteristic2._last_value[1] == b 537 538 539# ----------------------------------------------------------------------------- 540@pytest.mark.asyncio 541async def test_read_write2(): 542 [client, server] = LinkedDevices().devices[:2] 543 544 v = bytes([0x11, 0x22, 0x33, 0x44]) 545 characteristic1 = Characteristic( 546 'FDB159DB-036C-49E3-B3DB-6325AC750806', 547 Characteristic.Properties.READ | Characteristic.Properties.WRITE, 548 Characteristic.READABLE | Characteristic.WRITEABLE, 549 value=v, 550 ) 551 552 service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1]) 553 server.add_services([service1]) 554 555 await client.power_on() 556 await server.power_on() 557 connection = await client.connect(server.random_address) 558 peer = Peer(connection) 559 560 await peer.discover_services() 561 c = peer.get_services_by_uuid(service1.uuid) 562 assert len(c) == 1 563 s = c[0] 564 await s.discover_characteristics() 565 c = s.get_characteristics_by_uuid(characteristic1.uuid) 566 assert len(c) == 1 567 c1 = c[0] 568 569 v1 = await c1.read_value() 570 assert v1 == v 571 572 a1 = PackedCharacteristicAdapter(c1, '>I') 573 v1 = await a1.read_value() 574 assert v1 == struct.unpack('>I', v)[0] 575 576 b = bytes([0x55, 0x66, 0x77, 0x88]) 577 await a1.write_value(struct.unpack('>I', b)[0]) 578 await async_barrier() 579 assert characteristic1.value == b 580 v1 = await a1.read_value() 581 assert v1 == struct.unpack('>I', b)[0] 582 583 584# ----------------------------------------------------------------------------- 585@pytest.mark.asyncio 586async def test_subscribe_notify(): 587 [client, server] = LinkedDevices().devices[:2] 588 589 characteristic1 = Characteristic( 590 'FDB159DB-036C-49E3-B3DB-6325AC750806', 591 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 592 Characteristic.READABLE, 593 bytes([1, 2, 3]), 594 ) 595 596 def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled): 597 characteristic1._last_subscription = ( 598 connection, 599 notify_enabled, 600 indicate_enabled, 601 ) 602 603 characteristic1.on('subscription', on_characteristic1_subscription) 604 605 characteristic2 = Characteristic( 606 '66DE9057-C848-4ACA-B993-D675644EBB85', 607 Characteristic.Properties.READ | Characteristic.Properties.INDICATE, 608 Characteristic.READABLE, 609 bytes([4, 5, 6]), 610 ) 611 612 def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled): 613 characteristic2._last_subscription = ( 614 connection, 615 notify_enabled, 616 indicate_enabled, 617 ) 618 619 characteristic2.on('subscription', on_characteristic2_subscription) 620 621 characteristic3 = Characteristic( 622 'AB5E639C-40C1-4238-B9CB-AF41F8B806E4', 623 Characteristic.Properties.READ 624 | Characteristic.Properties.NOTIFY 625 | Characteristic.Properties.INDICATE, 626 Characteristic.READABLE, 627 bytes([7, 8, 9]), 628 ) 629 630 def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled): 631 characteristic3._last_subscription = ( 632 connection, 633 notify_enabled, 634 indicate_enabled, 635 ) 636 637 characteristic3.on('subscription', on_characteristic3_subscription) 638 639 service1 = Service( 640 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 641 [characteristic1, characteristic2, characteristic3], 642 ) 643 server.add_services([service1]) 644 645 def on_characteristic_subscription( 646 connection, characteristic, notify_enabled, indicate_enabled 647 ): 648 server._last_subscription = ( 649 connection, 650 characteristic, 651 notify_enabled, 652 indicate_enabled, 653 ) 654 655 server.on('characteristic_subscription', on_characteristic_subscription) 656 657 await client.power_on() 658 await server.power_on() 659 connection = await client.connect(server.random_address) 660 peer = Peer(connection) 661 662 await peer.discover_services() 663 await peer.discover_characteristics() 664 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 665 assert len(c) == 1 666 c1 = c[0] 667 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 668 assert len(c) == 1 669 c2 = c[0] 670 c = peer.get_characteristics_by_uuid(characteristic3.uuid) 671 assert len(c) == 1 672 c3 = c[0] 673 674 c1._called = False 675 c1._last_update = None 676 677 def on_c1_update(value): 678 c1._called = True 679 c1._last_update = value 680 681 c1.on('update', on_c1_update) 682 await peer.subscribe(c1) 683 await async_barrier() 684 assert server._last_subscription[1] == characteristic1 685 assert server._last_subscription[2] 686 assert not server._last_subscription[3] 687 assert characteristic1._last_subscription[1] 688 assert not characteristic1._last_subscription[2] 689 await server.indicate_subscribers(characteristic1) 690 await async_barrier() 691 assert not c1._called 692 await server.notify_subscribers(characteristic1) 693 await async_barrier() 694 assert c1._called 695 assert c1._last_update == characteristic1.value 696 697 c1._called = False 698 c1._last_update = None 699 c1_value = characteristic1.value 700 await server.notify_subscribers(characteristic1, bytes([0, 1, 2])) 701 await async_barrier() 702 assert c1._called 703 assert c1._last_update == bytes([0, 1, 2]) 704 assert characteristic1.value == c1_value 705 706 c1._called = False 707 await peer.unsubscribe(c1) 708 await server.notify_subscribers(characteristic1) 709 assert not c1._called 710 711 c2._called = False 712 c2._last_update = None 713 714 def on_c2_update(value): 715 c2._called = True 716 c2._last_update = value 717 718 await peer.subscribe(c2, on_c2_update) 719 await async_barrier() 720 await server.notify_subscriber( 721 characteristic2._last_subscription[0], characteristic2 722 ) 723 await async_barrier() 724 assert not c2._called 725 await server.indicate_subscriber( 726 characteristic2._last_subscription[0], characteristic2 727 ) 728 await async_barrier() 729 assert c2._called 730 assert c2._last_update == characteristic2.value 731 732 c2._called = False 733 await peer.unsubscribe(c2, on_c2_update) 734 await server.indicate_subscriber( 735 characteristic2._last_subscription[0], characteristic2 736 ) 737 await async_barrier() 738 assert not c2._called 739 740 c3._called = False 741 c3._called_2 = False 742 c3._called_3 = False 743 c3._last_update = None 744 c3._last_update_2 = None 745 c3._last_update_3 = None 746 747 def on_c3_update(value): 748 c3._called = True 749 c3._last_update = value 750 751 def on_c3_update_2(value): # for notify 752 c3._called_2 = True 753 c3._last_update_2 = value 754 755 def on_c3_update_3(value): # for indicate 756 c3._called_3 = True 757 c3._last_update_3 = value 758 759 c3.on('update', on_c3_update) 760 await peer.subscribe(c3, on_c3_update_2) 761 await async_barrier() 762 await server.notify_subscriber( 763 characteristic3._last_subscription[0], characteristic3 764 ) 765 await async_barrier() 766 assert c3._called 767 assert c3._last_update == characteristic3.value 768 assert c3._called_2 769 assert c3._last_update_2 == characteristic3.value 770 assert not c3._called_3 771 772 c3._called = False 773 c3._called_2 = False 774 c3._called_3 = False 775 await peer.unsubscribe(c3) 776 await peer.subscribe(c3, on_c3_update_3, prefer_notify=False) 777 await async_barrier() 778 characteristic3.value = bytes([1, 2, 3]) 779 await server.indicate_subscriber( 780 characteristic3._last_subscription[0], characteristic3 781 ) 782 await async_barrier() 783 assert c3._called 784 assert c3._last_update == characteristic3.value 785 assert not c3._called_2 786 assert c3._called_3 787 assert c3._last_update_3 == characteristic3.value 788 789 c3._called = False 790 c3._called_2 = False 791 c3._called_3 = False 792 await peer.unsubscribe(c3) 793 await server.notify_subscriber( 794 characteristic3._last_subscription[0], characteristic3 795 ) 796 await server.indicate_subscriber( 797 characteristic3._last_subscription[0], characteristic3 798 ) 799 await async_barrier() 800 assert not c3._called 801 assert not c3._called_2 802 assert not c3._called_3 803 804 805# ----------------------------------------------------------------------------- 806@pytest.mark.asyncio 807async def test_unsubscribe(): 808 [client, server] = LinkedDevices().devices[:2] 809 810 characteristic1 = Characteristic( 811 'FDB159DB-036C-49E3-B3DB-6325AC750806', 812 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 813 Characteristic.READABLE, 814 bytes([1, 2, 3]), 815 ) 816 characteristic2 = Characteristic( 817 '3234C4F4-3F34-4616-8935-45A50EE05DEB', 818 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 819 Characteristic.READABLE, 820 bytes([1, 2, 3]), 821 ) 822 823 service1 = Service( 824 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 825 [characteristic1, characteristic2], 826 ) 827 server.add_services([service1]) 828 829 mock1 = Mock() 830 characteristic1.on('subscription', mock1) 831 mock2 = Mock() 832 characteristic2.on('subscription', mock2) 833 834 await client.power_on() 835 await server.power_on() 836 connection = await client.connect(server.random_address) 837 peer = Peer(connection) 838 839 await peer.discover_services() 840 await peer.discover_characteristics() 841 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 842 assert len(c) == 1 843 c1 = c[0] 844 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 845 assert len(c) == 1 846 c2 = c[0] 847 848 await c1.subscribe() 849 await async_barrier() 850 mock1.assert_called_once_with(ANY, True, False) 851 852 await c2.subscribe() 853 await async_barrier() 854 mock2.assert_called_once_with(ANY, True, False) 855 856 mock1.reset_mock() 857 await c1.unsubscribe() 858 await async_barrier() 859 mock1.assert_called_once_with(ANY, False, False) 860 861 mock2.reset_mock() 862 await c2.unsubscribe() 863 await async_barrier() 864 mock2.assert_called_once_with(ANY, False, False) 865 866 mock1.reset_mock() 867 await c1.unsubscribe() 868 await async_barrier() 869 mock1.assert_not_called() 870 871 mock2.reset_mock() 872 await c2.unsubscribe() 873 await async_barrier() 874 mock2.assert_not_called() 875 876 mock1.reset_mock() 877 await c1.unsubscribe(force=True) 878 await async_barrier() 879 mock1.assert_called_once_with(ANY, False, False) 880 881 882# ----------------------------------------------------------------------------- 883@pytest.mark.asyncio 884async def test_mtu_exchange(): 885 [d1, d2, d3] = LinkedDevices().devices[:3] 886 887 d3.gatt_server.max_mtu = 100 888 889 d3_connections = [] 890 891 @d3.on('connection') 892 def on_d3_connection(connection): 893 d3_connections.append(connection) 894 895 await d1.power_on() 896 await d2.power_on() 897 await d3.power_on() 898 899 d1_connection = await d1.connect(d3.random_address) 900 assert len(d3_connections) == 1 901 assert d3_connections[0] is not None 902 903 d2_connection = await d2.connect(d3.random_address) 904 assert len(d3_connections) == 2 905 assert d3_connections[1] is not None 906 907 d1_peer = Peer(d1_connection) 908 d2_peer = Peer(d2_connection) 909 910 d1_client_mtu = await d1_peer.request_mtu(220) 911 assert d1_client_mtu == 100 912 assert d1_connection.att_mtu == 100 913 914 d2_client_mtu = await d2_peer.request_mtu(50) 915 assert d2_client_mtu == 50 916 assert d2_connection.att_mtu == 50 917 918 919# ----------------------------------------------------------------------------- 920def test_char_property_to_string(): 921 # single 922 assert str(Characteristic.Properties(0x01)) == "BROADCAST" 923 assert str(Characteristic.Properties.BROADCAST) == "BROADCAST" 924 925 # double 926 assert str(Characteristic.Properties(0x03)) == "BROADCAST|READ" 927 assert ( 928 str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ) 929 == "BROADCAST|READ" 930 ) 931 932 933# ----------------------------------------------------------------------------- 934def test_characteristic_property_from_string(): 935 # single 936 assert ( 937 Characteristic.Properties.from_string("BROADCAST") 938 == Characteristic.Properties.BROADCAST 939 ) 940 941 # double 942 assert ( 943 Characteristic.Properties.from_string("BROADCAST,READ") 944 == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ 945 ) 946 assert ( 947 Characteristic.Properties.from_string("READ,BROADCAST") 948 == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ 949 ) 950 assert ( 951 Characteristic.Properties.from_string("BROADCAST|READ") 952 == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ 953 ) 954 955 956# ----------------------------------------------------------------------------- 957def test_characteristic_property_from_string_assert(): 958 with pytest.raises(TypeError) as e_info: 959 Characteristic.Properties.from_string("BROADCAST,HELLO") 960 961 assert ( 962 str(e_info.value) 963 == """Characteristic.Properties::from_string() error: 964Expected a string containing any of the keys, separated by , or |: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES 965Got: BROADCAST,HELLO""" 966 ) 967 968 969# ----------------------------------------------------------------------------- 970@pytest.mark.asyncio 971async def test_server_string(): 972 [_, server] = LinkedDevices().devices[:2] 973 974 characteristic = Characteristic( 975 'FDB159DB-036C-49E3-B3DB-6325AC750806', 976 Characteristic.Properties.READ 977 | Characteristic.Properties.WRITE 978 | Characteristic.Properties.NOTIFY, 979 Characteristic.READABLE | Characteristic.WRITEABLE, 980 bytes([123]), 981 ) 982 983 service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic]) 984 server.add_service(service) 985 986 assert ( 987 str(server.gatt_server) 988 == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access)) 989CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), READ) 990Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ) 991CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ) 992Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ) 993Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829) 994CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY) 995Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY) 996Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)""" 997 ) 998 999 1000# ----------------------------------------------------------------------------- 1001async def async_main(): 1002 test_UUID() 1003 test_ATT_Error_Response() 1004 test_ATT_Read_By_Group_Type_Request() 1005 await test_read_write() 1006 await test_read_write2() 1007 await test_subscribe_notify() 1008 await test_unsubscribe() 1009 await test_characteristic_encoding() 1010 await test_mtu_exchange() 1011 await test_CharacteristicValue() 1012 await test_CharacteristicValue_async() 1013 await test_CharacteristicAdapter() 1014 1015 1016# ----------------------------------------------------------------------------- 1017def test_permissions_from_string(): 1018 assert Attribute.Permissions.from_string('READABLE') == 1 1019 assert Attribute.Permissions.from_string('WRITEABLE') == 2 1020 assert Attribute.Permissions.from_string('READABLE,WRITEABLE') == 3 1021 1022 1023# ----------------------------------------------------------------------------- 1024def test_characteristic_permissions(): 1025 characteristic = Characteristic( 1026 'FDB159DB-036C-49E3-B3DB-6325AC750806', 1027 Characteristic.Properties.READ 1028 | Characteristic.Properties.WRITE 1029 | Characteristic.Properties.NOTIFY, 1030 'READABLE,WRITEABLE', 1031 ) 1032 assert characteristic.permissions == 3 1033 1034 1035# ----------------------------------------------------------------------------- 1036def test_characteristic_has_properties(): 1037 characteristic = Characteristic( 1038 'FDB159DB-036C-49E3-B3DB-6325AC750806', 1039 Characteristic.Properties.READ 1040 | Characteristic.Properties.WRITE 1041 | Characteristic.Properties.NOTIFY, 1042 'READABLE,WRITEABLE', 1043 ) 1044 assert characteristic.has_properties(Characteristic.Properties.READ) 1045 assert characteristic.has_properties( 1046 Characteristic.Properties.READ | Characteristic.Properties.WRITE 1047 ) 1048 assert not characteristic.has_properties( 1049 Characteristic.Properties.READ 1050 | Characteristic.Properties.WRITE 1051 | Characteristic.Properties.INDICATE 1052 ) 1053 assert not characteristic.has_properties(Characteristic.Properties.INDICATE) 1054 1055 1056# ----------------------------------------------------------------------------- 1057def test_descriptor_permissions(): 1058 descriptor = Descriptor('2902', 'READABLE,WRITEABLE') 1059 assert descriptor.permissions == 3 1060 1061 1062# ----------------------------------------------------------------------------- 1063def test_get_attribute_group(): 1064 device = Device() 1065 1066 # add some services / characteristics to the gatt server 1067 characteristic1 = Characteristic( 1068 '1111', 1069 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 1070 Characteristic.READABLE | Characteristic.WRITEABLE, 1071 bytes([123]), 1072 ) 1073 characteristic2 = Characteristic( 1074 '2222', 1075 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 1076 Characteristic.READABLE | Characteristic.WRITEABLE, 1077 bytes([123]), 1078 ) 1079 services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])] 1080 device.gatt_server.add_services(services) 1081 1082 # get the handles from gatt server 1083 characteristic_attributes1 = device.gatt_server.get_characteristic_attributes( 1084 UUID('1212'), UUID('1111') 1085 ) 1086 assert characteristic_attributes1 is not None 1087 characteristic_attributes2 = device.gatt_server.get_characteristic_attributes( 1088 UUID('3233'), UUID('2222') 1089 ) 1090 assert characteristic_attributes2 is not None 1091 descriptor1 = device.gatt_server.get_descriptor_attribute( 1092 UUID('1212'), UUID('1111'), UUID('2902') 1093 ) 1094 assert descriptor1 is not None 1095 descriptor2 = device.gatt_server.get_descriptor_attribute( 1096 UUID('3233'), UUID('2222'), UUID('2902') 1097 ) 1098 assert descriptor2 is not None 1099 1100 # confirm the handles map back to the service 1101 assert ( 1102 UUID('1212') 1103 == device.gatt_server.get_attribute_group( 1104 characteristic_attributes1[0].handle, Service 1105 ).uuid 1106 ) 1107 assert ( 1108 UUID('1212') 1109 == device.gatt_server.get_attribute_group( 1110 characteristic_attributes1[1].handle, Service 1111 ).uuid 1112 ) 1113 assert ( 1114 UUID('1212') 1115 == device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid 1116 ) 1117 assert ( 1118 UUID('3233') 1119 == device.gatt_server.get_attribute_group( 1120 characteristic_attributes2[0].handle, Service 1121 ).uuid 1122 ) 1123 assert ( 1124 UUID('3233') 1125 == device.gatt_server.get_attribute_group( 1126 characteristic_attributes2[1].handle, Service 1127 ).uuid 1128 ) 1129 assert ( 1130 UUID('3233') 1131 == device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid 1132 ) 1133 1134 # confirm the handles map back to the characteristic 1135 assert ( 1136 UUID('1111') 1137 == device.gatt_server.get_attribute_group( 1138 descriptor1.handle, Characteristic 1139 ).uuid 1140 ) 1141 assert ( 1142 UUID('2222') 1143 == device.gatt_server.get_attribute_group( 1144 descriptor2.handle, Characteristic 1145 ).uuid 1146 ) 1147 1148 1149# ----------------------------------------------------------------------------- 1150if __name__ == '__main__': 1151 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 1152 asyncio.run(async_main()) 1153