1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import os 21import struct 22import pytest 23 24from bumble.controller import Controller 25from bumble.gatt_client import CharacteristicProxy 26from bumble.link import LocalLink 27from bumble.device import Device, Peer 28from bumble.host import Host 29from bumble.gatt import ( 30 GATT_BATTERY_LEVEL_CHARACTERISTIC, 31 GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, 32 CharacteristicAdapter, 33 DelegatedCharacteristicAdapter, 34 PackedCharacteristicAdapter, 35 MappedCharacteristicAdapter, 36 UTF8CharacteristicAdapter, 37 Service, 38 Characteristic, 39 CharacteristicValue, 40 Descriptor, 41) 42from bumble.transport import AsyncPipeSink 43from bumble.core import UUID 44from bumble.att import ( 45 Attribute, 46 ATT_EXCHANGE_MTU_REQUEST, 47 ATT_ATTRIBUTE_NOT_FOUND_ERROR, 48 ATT_PDU, 49 ATT_Error_Response, 50 ATT_Read_By_Group_Type_Request, 51) 52 53 54# ----------------------------------------------------------------------------- 55def basic_check(x): 56 pdu = x.to_bytes() 57 parsed = ATT_PDU.from_bytes(pdu) 58 x_str = str(x) 59 parsed_str = str(parsed) 60 assert x_str == parsed_str 61 62 63# ----------------------------------------------------------------------------- 64def test_UUID(): 65 u = UUID.from_16_bits(0x7788) 66 assert str(u) == 'UUID-16:7788' 67 u = UUID.from_32_bits(0x11223344) 68 assert str(u) == 'UUID-32:11223344' 69 u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') 70 assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 71 v = UUID(str(u)) 72 assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 73 w = UUID.from_bytes(v.to_bytes()) 74 assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 75 76 u1 = UUID.from_16_bits(0x1234) 77 b1 = u1.to_bytes(force_128=True) 78 u2 = UUID.from_bytes(b1) 79 assert u1 == u2 80 81 u3 = UUID.from_16_bits(0x180A) 82 assert str(u3) == 'UUID-16:180A (Device Information)' 83 84 85# ----------------------------------------------------------------------------- 86def test_ATT_Error_Response(): 87 pdu = ATT_Error_Response( 88 request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST, 89 attribute_handle_in_error=0x0000, 90 error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR, 91 ) 92 basic_check(pdu) 93 94 95# ----------------------------------------------------------------------------- 96def test_ATT_Read_By_Group_Type_Request(): 97 pdu = ATT_Read_By_Group_Type_Request( 98 starting_handle=0x0001, 99 ending_handle=0xFFFF, 100 attribute_group_type=UUID.from_16_bits(0x2800), 101 ) 102 basic_check(pdu) 103 104 105# ----------------------------------------------------------------------------- 106@pytest.mark.asyncio 107async def test_characteristic_encoding(): 108 class Foo(Characteristic): 109 def encode_value(self, value): 110 return bytes([value]) 111 112 def decode_value(self, value_bytes): 113 return value_bytes[0] 114 115 c = Foo( 116 GATT_BATTERY_LEVEL_CHARACTERISTIC, 117 Characteristic.READ, 118 Characteristic.READABLE, 119 123, 120 ) 121 x = c.read_value(None) 122 assert x == bytes([123]) 123 c.write_value(None, bytes([122])) 124 assert c.value == 122 125 126 class FooProxy(CharacteristicProxy): 127 def __init__(self, characteristic): 128 super().__init__( 129 characteristic.client, 130 characteristic.handle, 131 characteristic.end_group_handle, 132 characteristic.uuid, 133 characteristic.properties, 134 ) 135 136 def encode_value(self, value): 137 return bytes([value]) 138 139 def decode_value(self, value_bytes): 140 return value_bytes[0] 141 142 [client, server] = LinkedDevices().devices[:2] 143 144 characteristic = Characteristic( 145 'FDB159DB-036C-49E3-B3DB-6325AC750806', 146 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 147 Characteristic.READABLE | Characteristic.WRITEABLE, 148 bytes([123]), 149 ) 150 151 service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic]) 152 server.add_service(service) 153 154 await client.power_on() 155 await server.power_on() 156 connection = await client.connect(server.random_address) 157 peer = Peer(connection) 158 159 await peer.discover_services() 160 await peer.discover_characteristics() 161 c = peer.get_characteristics_by_uuid(characteristic.uuid) 162 assert len(c) == 1 163 c = c[0] 164 cp = FooProxy(c) 165 166 v = await cp.read_value() 167 assert v == 123 168 await cp.write_value(124) 169 await async_barrier() 170 assert characteristic.value == bytes([124]) 171 172 v = await cp.read_value() 173 assert v == 124 174 await cp.write_value(125, with_response=True) 175 await async_barrier() 176 assert characteristic.value == bytes([125]) 177 178 cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2])) 179 await cd.write_value(100, with_response=True) 180 await async_barrier() 181 assert characteristic.value == bytes([50]) 182 183 last_change = None 184 185 def on_change(value): 186 nonlocal last_change 187 last_change = value 188 189 await c.subscribe(on_change) 190 await server.notify_subscribers(characteristic) 191 await async_barrier() 192 assert last_change == characteristic.value 193 last_change = None 194 195 await server.notify_subscribers(characteristic, value=bytes([125])) 196 await async_barrier() 197 assert last_change == bytes([125]) 198 last_change = None 199 200 await c.unsubscribe(on_change) 201 await server.notify_subscribers(characteristic) 202 await async_barrier() 203 assert last_change is None 204 205 await cp.subscribe(on_change) 206 await server.notify_subscribers(characteristic) 207 await async_barrier() 208 assert last_change == characteristic.value[0] 209 last_change = None 210 211 await server.notify_subscribers(characteristic, value=bytes([126])) 212 await async_barrier() 213 assert last_change == 126 214 last_change = None 215 216 await cp.unsubscribe(on_change) 217 await server.notify_subscribers(characteristic) 218 await async_barrier() 219 assert last_change is None 220 221 cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0]) 222 await cd.subscribe(on_change) 223 await server.notify_subscribers(characteristic) 224 await async_barrier() 225 assert last_change == characteristic.value[0] 226 last_change = None 227 228 await cd.unsubscribe(on_change) 229 await server.notify_subscribers(characteristic) 230 await async_barrier() 231 assert last_change is None 232 233 234# ----------------------------------------------------------------------------- 235@pytest.mark.asyncio 236async def test_attribute_getters(): 237 [client, server] = LinkedDevices().devices[:2] 238 239 characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806') 240 characteristic = Characteristic( 241 characteristic_uuid, 242 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 243 Characteristic.READABLE | Characteristic.WRITEABLE, 244 bytes([123]), 245 ) 246 247 service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829') 248 service = Service(service_uuid, [characteristic]) 249 server.add_service(service) 250 251 service_attr = server.gatt_server.get_service_attribute(service_uuid) 252 assert service_attr 253 254 ( 255 char_decl_attr, 256 char_value_attr, 257 ) = server.gatt_server.get_characteristic_attributes( 258 service_uuid, characteristic_uuid 259 ) 260 assert char_decl_attr and char_value_attr 261 262 desc_attr = server.gatt_server.get_descriptor_attribute( 263 service_uuid, 264 characteristic_uuid, 265 GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, 266 ) 267 assert desc_attr 268 269 # assert all handles are in expected order 270 assert ( 271 service_attr.handle 272 < char_decl_attr.handle 273 < char_value_attr.handle 274 < desc_attr.handle 275 == service_attr.end_group_handle 276 ) 277 # assert characteristic declarations attribute is followed by characteristic value attribute 278 assert char_decl_attr.handle + 1 == char_value_attr.handle 279 280 281# ----------------------------------------------------------------------------- 282def test_CharacteristicAdapter(): 283 # Check that the CharacteristicAdapter base class is transparent 284 v = bytes([1, 2, 3]) 285 c = Characteristic( 286 GATT_BATTERY_LEVEL_CHARACTERISTIC, 287 Characteristic.READ, 288 Characteristic.READABLE, 289 v, 290 ) 291 a = CharacteristicAdapter(c) 292 293 value = a.read_value(None) 294 assert value == v 295 296 v = bytes([3, 4, 5]) 297 a.write_value(None, v) 298 assert c.value == v 299 300 # Simple delegated adapter 301 a = DelegatedCharacteristicAdapter( 302 c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)) 303 ) 304 305 value = a.read_value(None) 306 assert value == bytes(reversed(v)) 307 308 v = bytes([3, 4, 5]) 309 a.write_value(None, v) 310 assert a.value == bytes(reversed(v)) 311 312 # Packed adapter with single element format 313 v = 1234 314 pv = struct.pack('>H', v) 315 c.value = v 316 a = PackedCharacteristicAdapter(c, '>H') 317 318 value = a.read_value(None) 319 assert value == pv 320 c.value = None 321 a.write_value(None, pv) 322 assert a.value == v 323 324 # Packed adapter with multi-element format 325 v1 = 1234 326 v2 = 5678 327 pv = struct.pack('>HH', v1, v2) 328 c.value = (v1, v2) 329 a = PackedCharacteristicAdapter(c, '>HH') 330 331 value = a.read_value(None) 332 assert value == pv 333 c.value = None 334 a.write_value(None, pv) 335 assert a.value == (v1, v2) 336 337 # Mapped adapter 338 v1 = 1234 339 v2 = 5678 340 pv = struct.pack('>HH', v1, v2) 341 mapped = {'v1': v1, 'v2': v2} 342 c.value = mapped 343 a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) 344 345 value = a.read_value(None) 346 assert value == pv 347 c.value = None 348 a.write_value(None, pv) 349 assert a.value == mapped 350 351 # UTF-8 adapter 352 v = 'Hello π' 353 ev = v.encode('utf-8') 354 c.value = v 355 a = UTF8CharacteristicAdapter(c) 356 357 value = a.read_value(None) 358 assert value == ev 359 c.value = None 360 a.write_value(None, ev) 361 assert a.value == v 362 363 364# ----------------------------------------------------------------------------- 365def test_CharacteristicValue(): 366 b = bytes([1, 2, 3]) 367 c = CharacteristicValue(read=lambda _: b) 368 x = c.read(None) 369 assert x == b 370 371 result = [] 372 c = CharacteristicValue( 373 write=lambda connection, value: result.append((connection, value)) 374 ) 375 z = object() 376 c.write(z, b) 377 assert result == [(z, b)] 378 379 380# ----------------------------------------------------------------------------- 381class LinkedDevices: 382 def __init__(self): 383 self.connections = [None, None, None] 384 385 self.link = LocalLink() 386 self.controllers = [ 387 Controller('C1', link=self.link), 388 Controller('C2', link=self.link), 389 Controller('C3', link=self.link), 390 ] 391 self.devices = [ 392 Device( 393 address='F0:F1:F2:F3:F4:F5', 394 host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), 395 ), 396 Device( 397 address='F1:F2:F3:F4:F5:F6', 398 host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), 399 ), 400 Device( 401 address='F2:F3:F4:F5:F6:F7', 402 host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])), 403 ), 404 ] 405 406 self.paired = [None, None, None] 407 408 409# ----------------------------------------------------------------------------- 410async def async_barrier(): 411 ready = asyncio.get_running_loop().create_future() 412 asyncio.get_running_loop().call_soon(ready.set_result, None) 413 await ready 414 415 416# ----------------------------------------------------------------------------- 417@pytest.mark.asyncio 418async def test_read_write(): 419 [client, server] = LinkedDevices().devices[:2] 420 421 characteristic1 = Characteristic( 422 'FDB159DB-036C-49E3-B3DB-6325AC750806', 423 Characteristic.READ | Characteristic.WRITE, 424 Characteristic.READABLE | Characteristic.WRITEABLE, 425 ) 426 427 def on_characteristic1_write(connection, value): 428 characteristic1._last_value = (connection, value) 429 430 characteristic1.on('write', on_characteristic1_write) 431 432 def on_characteristic2_read(connection): 433 return bytes(str(connection.peer_address)) 434 435 def on_characteristic2_write(connection, value): 436 characteristic2._last_value = (connection, value) 437 438 characteristic2 = Characteristic( 439 '66DE9057-C848-4ACA-B993-D675644EBB85', 440 Characteristic.READ | Characteristic.WRITE, 441 Characteristic.READABLE | Characteristic.WRITEABLE, 442 CharacteristicValue( 443 read=on_characteristic2_read, write=on_characteristic2_write 444 ), 445 ) 446 447 service1 = Service( 448 '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2] 449 ) 450 server.add_services([service1]) 451 452 await client.power_on() 453 await server.power_on() 454 connection = await client.connect(server.random_address) 455 peer = Peer(connection) 456 457 await peer.discover_services() 458 await peer.discover_characteristics() 459 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 460 assert len(c) == 1 461 c1 = c[0] 462 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 463 assert len(c) == 1 464 c2 = c[0] 465 466 v1 = await peer.read_value(c1) 467 assert v1 == b'' 468 b = bytes([1, 2, 3]) 469 await peer.write_value(c1, b) 470 await async_barrier() 471 assert characteristic1.value == b 472 v1 = await peer.read_value(c1) 473 assert v1 == b 474 assert type(characteristic1._last_value is tuple) 475 assert len(characteristic1._last_value) == 2 476 assert str(characteristic1._last_value[0].peer_address) == str( 477 client.random_address 478 ) 479 assert characteristic1._last_value[1] == b 480 bb = bytes([3, 4, 5, 6]) 481 characteristic1.value = bb 482 v1 = await peer.read_value(c1) 483 assert v1 == bb 484 485 await peer.write_value(c2, b) 486 await async_barrier() 487 assert type(characteristic2._last_value is tuple) 488 assert len(characteristic2._last_value) == 2 489 assert str(characteristic2._last_value[0].peer_address) == str( 490 client.random_address 491 ) 492 assert characteristic2._last_value[1] == b 493 494 495# ----------------------------------------------------------------------------- 496@pytest.mark.asyncio 497async def test_read_write2(): 498 [client, server] = LinkedDevices().devices[:2] 499 500 v = bytes([0x11, 0x22, 0x33, 0x44]) 501 characteristic1 = Characteristic( 502 'FDB159DB-036C-49E3-B3DB-6325AC750806', 503 Characteristic.READ | Characteristic.WRITE, 504 Characteristic.READABLE | Characteristic.WRITEABLE, 505 value=v, 506 ) 507 508 service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1]) 509 server.add_services([service1]) 510 511 await client.power_on() 512 await server.power_on() 513 connection = await client.connect(server.random_address) 514 peer = Peer(connection) 515 516 await peer.discover_services() 517 c = peer.get_services_by_uuid(service1.uuid) 518 assert len(c) == 1 519 s = c[0] 520 await s.discover_characteristics() 521 c = s.get_characteristics_by_uuid(characteristic1.uuid) 522 assert len(c) == 1 523 c1 = c[0] 524 525 v1 = await c1.read_value() 526 assert v1 == v 527 528 a1 = PackedCharacteristicAdapter(c1, '>I') 529 v1 = await a1.read_value() 530 assert v1 == struct.unpack('>I', v)[0] 531 532 b = bytes([0x55, 0x66, 0x77, 0x88]) 533 await a1.write_value(struct.unpack('>I', b)[0]) 534 await async_barrier() 535 assert characteristic1.value == b 536 v1 = await a1.read_value() 537 assert v1 == struct.unpack('>I', b)[0] 538 539 540# ----------------------------------------------------------------------------- 541@pytest.mark.asyncio 542async def test_subscribe_notify(): 543 [client, server] = LinkedDevices().devices[:2] 544 545 characteristic1 = Characteristic( 546 'FDB159DB-036C-49E3-B3DB-6325AC750806', 547 Characteristic.READ | Characteristic.NOTIFY, 548 Characteristic.READABLE, 549 bytes([1, 2, 3]), 550 ) 551 552 def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled): 553 characteristic1._last_subscription = ( 554 connection, 555 notify_enabled, 556 indicate_enabled, 557 ) 558 559 characteristic1.on('subscription', on_characteristic1_subscription) 560 561 characteristic2 = Characteristic( 562 '66DE9057-C848-4ACA-B993-D675644EBB85', 563 Characteristic.READ | Characteristic.INDICATE, 564 Characteristic.READABLE, 565 bytes([4, 5, 6]), 566 ) 567 568 def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled): 569 characteristic2._last_subscription = ( 570 connection, 571 notify_enabled, 572 indicate_enabled, 573 ) 574 575 characteristic2.on('subscription', on_characteristic2_subscription) 576 577 characteristic3 = Characteristic( 578 'AB5E639C-40C1-4238-B9CB-AF41F8B806E4', 579 Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE, 580 Characteristic.READABLE, 581 bytes([7, 8, 9]), 582 ) 583 584 def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled): 585 characteristic3._last_subscription = ( 586 connection, 587 notify_enabled, 588 indicate_enabled, 589 ) 590 591 characteristic3.on('subscription', on_characteristic3_subscription) 592 593 service1 = Service( 594 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 595 [characteristic1, characteristic2, characteristic3], 596 ) 597 server.add_services([service1]) 598 599 def on_characteristic_subscription( 600 connection, characteristic, notify_enabled, indicate_enabled 601 ): 602 server._last_subscription = ( 603 connection, 604 characteristic, 605 notify_enabled, 606 indicate_enabled, 607 ) 608 609 server.on('characteristic_subscription', on_characteristic_subscription) 610 611 await client.power_on() 612 await server.power_on() 613 connection = await client.connect(server.random_address) 614 peer = Peer(connection) 615 616 await peer.discover_services() 617 await peer.discover_characteristics() 618 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 619 assert len(c) == 1 620 c1 = c[0] 621 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 622 assert len(c) == 1 623 c2 = c[0] 624 c = peer.get_characteristics_by_uuid(characteristic3.uuid) 625 assert len(c) == 1 626 c3 = c[0] 627 628 c1._called = False 629 c1._last_update = None 630 631 def on_c1_update(value): 632 c1._called = True 633 c1._last_update = value 634 635 c1.on('update', on_c1_update) 636 await peer.subscribe(c1) 637 await async_barrier() 638 assert server._last_subscription[1] == characteristic1 639 assert server._last_subscription[2] 640 assert not server._last_subscription[3] 641 assert characteristic1._last_subscription[1] 642 assert not characteristic1._last_subscription[2] 643 await server.indicate_subscribers(characteristic1) 644 await async_barrier() 645 assert not c1._called 646 await server.notify_subscribers(characteristic1) 647 await async_barrier() 648 assert c1._called 649 assert c1._last_update == characteristic1.value 650 651 c1._called = False 652 c1._last_update = None 653 c1_value = characteristic1.value 654 await server.notify_subscribers(characteristic1, bytes([0, 1, 2])) 655 await async_barrier() 656 assert c1._called 657 assert c1._last_update == bytes([0, 1, 2]) 658 assert characteristic1.value == c1_value 659 660 c1._called = False 661 await peer.unsubscribe(c1) 662 await server.notify_subscribers(characteristic1) 663 assert not c1._called 664 665 c2._called = False 666 c2._last_update = None 667 668 def on_c2_update(value): 669 c2._called = True 670 c2._last_update = value 671 672 await peer.subscribe(c2, on_c2_update) 673 await async_barrier() 674 await server.notify_subscriber( 675 characteristic2._last_subscription[0], characteristic2 676 ) 677 await async_barrier() 678 assert not c2._called 679 await server.indicate_subscriber( 680 characteristic2._last_subscription[0], characteristic2 681 ) 682 await async_barrier() 683 assert c2._called 684 assert c2._last_update == characteristic2.value 685 686 c2._called = False 687 await peer.unsubscribe(c2, on_c2_update) 688 await server.indicate_subscriber( 689 characteristic2._last_subscription[0], characteristic2 690 ) 691 await async_barrier() 692 assert not c2._called 693 694 c3._called = False 695 c3._called_2 = False 696 c3._called_3 = False 697 c3._last_update = None 698 c3._last_update_2 = None 699 c3._last_update_3 = None 700 701 def on_c3_update(value): 702 c3._called = True 703 c3._last_update = value 704 705 def on_c3_update_2(value): # for notify 706 c3._called_2 = True 707 c3._last_update_2 = value 708 709 def on_c3_update_3(value): # for indicate 710 c3._called_3 = True 711 c3._last_update_3 = value 712 713 c3.on('update', on_c3_update) 714 await peer.subscribe(c3, on_c3_update_2) 715 await async_barrier() 716 await server.notify_subscriber( 717 characteristic3._last_subscription[0], characteristic3 718 ) 719 await async_barrier() 720 assert c3._called 721 assert c3._last_update == characteristic3.value 722 assert c3._called_2 723 assert c3._last_update_2 == characteristic3.value 724 assert not c3._called_3 725 726 c3._called = False 727 c3._called_2 = False 728 c3._called_3 = False 729 await peer.unsubscribe(c3) 730 await peer.subscribe(c3, on_c3_update_3, prefer_notify=False) 731 await async_barrier() 732 characteristic3.value = bytes([1, 2, 3]) 733 await server.indicate_subscriber( 734 characteristic3._last_subscription[0], characteristic3 735 ) 736 await async_barrier() 737 assert c3._called 738 assert c3._last_update == characteristic3.value 739 assert not c3._called_2 740 assert c3._called_3 741 assert c3._last_update_3 == characteristic3.value 742 743 c3._called = False 744 c3._called_2 = False 745 c3._called_3 = False 746 await peer.unsubscribe(c3) 747 await server.notify_subscriber( 748 characteristic3._last_subscription[0], characteristic3 749 ) 750 await server.indicate_subscriber( 751 characteristic3._last_subscription[0], characteristic3 752 ) 753 await async_barrier() 754 assert not c3._called 755 assert not c3._called_2 756 assert not c3._called_3 757 758 759# ----------------------------------------------------------------------------- 760@pytest.mark.asyncio 761async def test_mtu_exchange(): 762 [d1, d2, d3] = LinkedDevices().devices[:3] 763 764 d3.gatt_server.max_mtu = 100 765 766 d3_connections = [] 767 768 @d3.on('connection') 769 def on_d3_connection(connection): 770 d3_connections.append(connection) 771 772 await d1.power_on() 773 await d2.power_on() 774 await d3.power_on() 775 776 d1_connection = await d1.connect(d3.random_address) 777 assert len(d3_connections) == 1 778 assert d3_connections[0] is not None 779 780 d2_connection = await d2.connect(d3.random_address) 781 assert len(d3_connections) == 2 782 assert d3_connections[1] is not None 783 784 d1_peer = Peer(d1_connection) 785 d2_peer = Peer(d2_connection) 786 787 d1_client_mtu = await d1_peer.request_mtu(220) 788 assert d1_client_mtu == 100 789 assert d1_connection.att_mtu == 100 790 791 d2_client_mtu = await d2_peer.request_mtu(50) 792 assert d2_client_mtu == 50 793 assert d2_connection.att_mtu == 50 794 795 796# ----------------------------------------------------------------------------- 797def test_char_property_to_string(): 798 # single 799 assert Characteristic.property_name(0x01) == "BROADCAST" 800 assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST" 801 802 # double 803 assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ" 804 assert ( 805 Characteristic.properties_as_string( 806 Characteristic.BROADCAST | Characteristic.READ 807 ) 808 == "BROADCAST,READ" 809 ) 810 811 812# ----------------------------------------------------------------------------- 813def test_char_property_string_to_type(): 814 # single 815 assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST 816 817 # double 818 assert ( 819 Characteristic.string_to_properties("BROADCAST,READ") 820 == Characteristic.BROADCAST | Characteristic.READ 821 ) 822 assert ( 823 Characteristic.string_to_properties("READ,BROADCAST") 824 == Characteristic.BROADCAST | Characteristic.READ 825 ) 826 827 828# ----------------------------------------------------------------------------- 829@pytest.mark.asyncio 830async def test_server_string(): 831 [_, server] = LinkedDevices().devices[:2] 832 833 characteristic = Characteristic( 834 'FDB159DB-036C-49E3-B3DB-6325AC750806', 835 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 836 Characteristic.READABLE | Characteristic.WRITEABLE, 837 bytes([123]), 838 ) 839 840 service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic]) 841 server.add_service(service) 842 843 assert ( 844 str(server.gatt_server) 845 == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access)) 846CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ) 847Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ) 848CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ) 849Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ) 850Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829) 851CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY) 852Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY) 853Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)""" 854 ) 855 856 857# ----------------------------------------------------------------------------- 858async def async_main(): 859 await test_read_write() 860 await test_read_write2() 861 await test_subscribe_notify() 862 await test_characteristic_encoding() 863 await test_mtu_exchange() 864 865 866# ----------------------------------------------------------------------------- 867def test_attribute_string_to_permissions(): 868 assert Attribute.string_to_permissions('READABLE') == 1 869 assert Attribute.string_to_permissions('WRITEABLE') == 2 870 assert Attribute.string_to_permissions('READABLE,WRITEABLE') == 3 871 872 873# ----------------------------------------------------------------------------- 874def test_charracteristic_permissions(): 875 characteristic = Characteristic( 876 'FDB159DB-036C-49E3-B3DB-6325AC750806', 877 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 878 'READABLE,WRITEABLE', 879 ) 880 assert characteristic.permissions == 3 881 882 883# ----------------------------------------------------------------------------- 884def test_descriptor_permissions(): 885 descriptor = Descriptor('2902', 'READABLE,WRITEABLE') 886 assert descriptor.permissions == 3 887 888 889# ----------------------------------------------------------------------------- 890if __name__ == '__main__': 891 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 892 test_UUID() 893 test_ATT_Error_Response() 894 test_ATT_Read_By_Group_Type_Request() 895 test_CharacteristicValue() 896 test_CharacteristicAdapter() 897 asyncio.run(async_main()) 898