# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import os import struct import pytest from bumble.controller import Controller from bumble.gatt_client import CharacteristicProxy from bumble.link import LocalLink from bumble.device import Device, Peer from bumble.host import Host from bumble.gatt import ( GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, CharacteristicAdapter, DelegatedCharacteristicAdapter, PackedCharacteristicAdapter, MappedCharacteristicAdapter, UTF8CharacteristicAdapter, Service, Characteristic, CharacteristicValue, Descriptor, ) from bumble.transport import AsyncPipeSink from bumble.core import UUID from bumble.att import ( Attribute, ATT_EXCHANGE_MTU_REQUEST, ATT_ATTRIBUTE_NOT_FOUND_ERROR, ATT_PDU, ATT_Error_Response, ATT_Read_By_Group_Type_Request, ) # ----------------------------------------------------------------------------- def basic_check(x): pdu = x.to_bytes() parsed = ATT_PDU.from_bytes(pdu) x_str = str(x) parsed_str = str(parsed) assert x_str == parsed_str # ----------------------------------------------------------------------------- def test_UUID(): u = UUID.from_16_bits(0x7788) assert str(u) == 'UUID-16:7788' u = UUID.from_32_bits(0x11223344) assert str(u) == 'UUID-32:11223344' u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' v = UUID(str(u)) assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' w = UUID.from_bytes(v.to_bytes()) assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' u1 = UUID.from_16_bits(0x1234) b1 = u1.to_bytes(force_128=True) u2 = UUID.from_bytes(b1) assert u1 == u2 u3 = UUID.from_16_bits(0x180A) assert str(u3) == 'UUID-16:180A (Device Information)' # ----------------------------------------------------------------------------- def test_ATT_Error_Response(): pdu = ATT_Error_Response( request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST, attribute_handle_in_error=0x0000, error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR, ) basic_check(pdu) # ----------------------------------------------------------------------------- def test_ATT_Read_By_Group_Type_Request(): pdu = ATT_Read_By_Group_Type_Request( starting_handle=0x0001, ending_handle=0xFFFF, attribute_group_type=UUID.from_16_bits(0x2800), ) basic_check(pdu) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_characteristic_encoding(): class Foo(Characteristic): def encode_value(self, value): return bytes([value]) def decode_value(self, value_bytes): return value_bytes[0] c = Foo( GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, 123, ) x = c.read_value(None) assert x == bytes([123]) c.write_value(None, bytes([122])) assert c.value == 122 class FooProxy(CharacteristicProxy): def __init__(self, characteristic): super().__init__( characteristic.client, characteristic.handle, characteristic.end_group_handle, characteristic.uuid, characteristic.properties, ) def encode_value(self, value): return bytes([value]) def decode_value(self, value_bytes): return value_bytes[0] [client, server] = LinkedDevices().devices[:2] characteristic = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]), ) service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic]) server.add_service(service) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic.uuid) assert len(c) == 1 c = c[0] cp = FooProxy(c) v = await cp.read_value() assert v == 123 await cp.write_value(124) await async_barrier() assert characteristic.value == bytes([124]) v = await cp.read_value() assert v == 124 await cp.write_value(125, with_response=True) await async_barrier() assert characteristic.value == bytes([125]) cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2])) await cd.write_value(100, with_response=True) await async_barrier() assert characteristic.value == bytes([50]) last_change = None def on_change(value): nonlocal last_change last_change = value await c.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value last_change = None await server.notify_subscribers(characteristic, value=bytes([125])) await async_barrier() assert last_change == bytes([125]) last_change = None await c.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None await cp.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value[0] last_change = None await server.notify_subscribers(characteristic, value=bytes([126])) await async_barrier() assert last_change == 126 last_change = None await cp.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0]) await cd.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value[0] last_change = None await cd.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_attribute_getters(): [client, server] = LinkedDevices().devices[:2] characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806') characteristic = Characteristic( characteristic_uuid, Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]), ) service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829') service = Service(service_uuid, [characteristic]) server.add_service(service) service_attr = server.gatt_server.get_service_attribute(service_uuid) assert service_attr ( char_decl_attr, char_value_attr, ) = server.gatt_server.get_characteristic_attributes( service_uuid, characteristic_uuid ) assert char_decl_attr and char_value_attr desc_attr = server.gatt_server.get_descriptor_attribute( service_uuid, characteristic_uuid, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, ) assert desc_attr # assert all handles are in expected order assert ( service_attr.handle < char_decl_attr.handle < char_value_attr.handle < desc_attr.handle == service_attr.end_group_handle ) # assert characteristic declarations attribute is followed by characteristic value attribute assert char_decl_attr.handle + 1 == char_value_attr.handle # ----------------------------------------------------------------------------- def test_CharacteristicAdapter(): # Check that the CharacteristicAdapter base class is transparent v = bytes([1, 2, 3]) c = Characteristic( GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v, ) a = CharacteristicAdapter(c) value = a.read_value(None) assert value == v v = bytes([3, 4, 5]) a.write_value(None, v) assert c.value == v # Simple delegated adapter a = DelegatedCharacteristicAdapter( c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)) ) value = a.read_value(None) assert value == bytes(reversed(v)) v = bytes([3, 4, 5]) a.write_value(None, v) assert a.value == bytes(reversed(v)) # Packed adapter with single element format v = 1234 pv = struct.pack('>H', v) c.value = v a = PackedCharacteristicAdapter(c, '>H') value = a.read_value(None) assert value == pv c.value = None a.write_value(None, pv) assert a.value == v # Packed adapter with multi-element format v1 = 1234 v2 = 5678 pv = struct.pack('>HH', v1, v2) c.value = (v1, v2) a = PackedCharacteristicAdapter(c, '>HH') value = a.read_value(None) assert value == pv c.value = None a.write_value(None, pv) assert a.value == (v1, v2) # Mapped adapter v1 = 1234 v2 = 5678 pv = struct.pack('>HH', v1, v2) mapped = {'v1': v1, 'v2': v2} c.value = mapped a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) value = a.read_value(None) assert value == pv c.value = None a.write_value(None, pv) assert a.value == mapped # UTF-8 adapter v = 'Hello π' ev = v.encode('utf-8') c.value = v a = UTF8CharacteristicAdapter(c) value = a.read_value(None) assert value == ev c.value = None a.write_value(None, ev) assert a.value == v # ----------------------------------------------------------------------------- def test_CharacteristicValue(): b = bytes([1, 2, 3]) c = CharacteristicValue(read=lambda _: b) x = c.read(None) assert x == b result = [] c = CharacteristicValue( write=lambda connection, value: result.append((connection, value)) ) z = object() c.write(z, b) assert result == [(z, b)] # ----------------------------------------------------------------------------- class LinkedDevices: def __init__(self): self.connections = [None, None, None] self.link = LocalLink() self.controllers = [ Controller('C1', link=self.link), Controller('C2', link=self.link), Controller('C3', link=self.link), ] self.devices = [ Device( address='F0:F1:F2:F3:F4:F5', host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), ), Device( address='F1:F2:F3:F4:F5:F6', host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), ), Device( address='F2:F3:F4:F5:F6:F7', host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])), ), ] self.paired = [None, None, None] # ----------------------------------------------------------------------------- async def async_barrier(): ready = asyncio.get_running_loop().create_future() asyncio.get_running_loop().call_soon(ready.set_result, None) await ready # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_write(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE, ) def on_characteristic1_write(connection, value): characteristic1._last_value = (connection, value) characteristic1.on('write', on_characteristic1_write) def on_characteristic2_read(connection): return bytes(str(connection.peer_address)) def on_characteristic2_write(connection, value): characteristic2._last_value = (connection, value) characteristic2 = Characteristic( '66DE9057-C848-4ACA-B993-D675644EBB85', Characteristic.READ | Characteristic.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE, CharacteristicValue( read=on_characteristic2_read, write=on_characteristic2_write ), ) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2] ) server.add_services([service1]) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] c = peer.get_characteristics_by_uuid(characteristic2.uuid) assert len(c) == 1 c2 = c[0] v1 = await peer.read_value(c1) assert v1 == b'' b = bytes([1, 2, 3]) await peer.write_value(c1, b) await async_barrier() assert characteristic1.value == b v1 = await peer.read_value(c1) assert v1 == b assert type(characteristic1._last_value is tuple) assert len(characteristic1._last_value) == 2 assert str(characteristic1._last_value[0].peer_address) == str( client.random_address ) assert characteristic1._last_value[1] == b bb = bytes([3, 4, 5, 6]) characteristic1.value = bb v1 = await peer.read_value(c1) assert v1 == bb await peer.write_value(c2, b) await async_barrier() assert type(characteristic2._last_value is tuple) assert len(characteristic2._last_value) == 2 assert str(characteristic2._last_value[0].peer_address) == str( client.random_address ) assert characteristic2._last_value[1] == b # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_write2(): [client, server] = LinkedDevices().devices[:2] v = bytes([0x11, 0x22, 0x33, 0x44]) characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE, value=v, ) service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1]) server.add_services([service1]) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() c = peer.get_services_by_uuid(service1.uuid) assert len(c) == 1 s = c[0] await s.discover_characteristics() c = s.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] v1 = await c1.read_value() assert v1 == v a1 = PackedCharacteristicAdapter(c1, '>I') v1 = await a1.read_value() assert v1 == struct.unpack('>I', v)[0] b = bytes([0x55, 0x66, 0x77, 0x88]) await a1.write_value(struct.unpack('>I', b)[0]) await async_barrier() assert characteristic1.value == b v1 = await a1.read_value() assert v1 == struct.unpack('>I', b)[0] # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_subscribe_notify(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]), ) def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled): characteristic1._last_subscription = ( connection, notify_enabled, indicate_enabled, ) characteristic1.on('subscription', on_characteristic1_subscription) characteristic2 = Characteristic( '66DE9057-C848-4ACA-B993-D675644EBB85', Characteristic.READ | Characteristic.INDICATE, Characteristic.READABLE, bytes([4, 5, 6]), ) def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled): characteristic2._last_subscription = ( connection, notify_enabled, indicate_enabled, ) characteristic2.on('subscription', on_characteristic2_subscription) characteristic3 = Characteristic( 'AB5E639C-40C1-4238-B9CB-AF41F8B806E4', Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE, Characteristic.READABLE, bytes([7, 8, 9]), ) def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled): characteristic3._last_subscription = ( connection, notify_enabled, indicate_enabled, ) characteristic3.on('subscription', on_characteristic3_subscription) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2, characteristic3], ) server.add_services([service1]) def on_characteristic_subscription( connection, characteristic, notify_enabled, indicate_enabled ): server._last_subscription = ( connection, characteristic, notify_enabled, indicate_enabled, ) server.on('characteristic_subscription', on_characteristic_subscription) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] c = peer.get_characteristics_by_uuid(characteristic2.uuid) assert len(c) == 1 c2 = c[0] c = peer.get_characteristics_by_uuid(characteristic3.uuid) assert len(c) == 1 c3 = c[0] c1._called = False c1._last_update = None def on_c1_update(value): c1._called = True c1._last_update = value c1.on('update', on_c1_update) await peer.subscribe(c1) await async_barrier() assert server._last_subscription[1] == characteristic1 assert server._last_subscription[2] assert not server._last_subscription[3] assert characteristic1._last_subscription[1] assert not characteristic1._last_subscription[2] await server.indicate_subscribers(characteristic1) await async_barrier() assert not c1._called await server.notify_subscribers(characteristic1) await async_barrier() assert c1._called assert c1._last_update == characteristic1.value c1._called = False c1._last_update = None c1_value = characteristic1.value await server.notify_subscribers(characteristic1, bytes([0, 1, 2])) await async_barrier() assert c1._called assert c1._last_update == bytes([0, 1, 2]) assert characteristic1.value == c1_value c1._called = False await peer.unsubscribe(c1) await server.notify_subscribers(characteristic1) assert not c1._called c2._called = False c2._last_update = None def on_c2_update(value): c2._called = True c2._last_update = value await peer.subscribe(c2, on_c2_update) await async_barrier() await server.notify_subscriber( characteristic2._last_subscription[0], characteristic2 ) await async_barrier() assert not c2._called await server.indicate_subscriber( characteristic2._last_subscription[0], characteristic2 ) await async_barrier() assert c2._called assert c2._last_update == characteristic2.value c2._called = False await peer.unsubscribe(c2, on_c2_update) await server.indicate_subscriber( characteristic2._last_subscription[0], characteristic2 ) await async_barrier() assert not c2._called c3._called = False c3._called_2 = False c3._called_3 = False c3._last_update = None c3._last_update_2 = None c3._last_update_3 = None def on_c3_update(value): c3._called = True c3._last_update = value def on_c3_update_2(value): # for notify c3._called_2 = True c3._last_update_2 = value def on_c3_update_3(value): # for indicate c3._called_3 = True c3._last_update_3 = value c3.on('update', on_c3_update) await peer.subscribe(c3, on_c3_update_2) await async_barrier() await server.notify_subscriber( characteristic3._last_subscription[0], characteristic3 ) await async_barrier() assert c3._called assert c3._last_update == characteristic3.value assert c3._called_2 assert c3._last_update_2 == characteristic3.value assert not c3._called_3 c3._called = False c3._called_2 = False c3._called_3 = False await peer.unsubscribe(c3) await peer.subscribe(c3, on_c3_update_3, prefer_notify=False) await async_barrier() characteristic3.value = bytes([1, 2, 3]) await server.indicate_subscriber( characteristic3._last_subscription[0], characteristic3 ) await async_barrier() assert c3._called assert c3._last_update == characteristic3.value assert not c3._called_2 assert c3._called_3 assert c3._last_update_3 == characteristic3.value c3._called = False c3._called_2 = False c3._called_3 = False await peer.unsubscribe(c3) await server.notify_subscriber( characteristic3._last_subscription[0], characteristic3 ) await server.indicate_subscriber( characteristic3._last_subscription[0], characteristic3 ) await async_barrier() assert not c3._called assert not c3._called_2 assert not c3._called_3 # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_mtu_exchange(): [d1, d2, d3] = LinkedDevices().devices[:3] d3.gatt_server.max_mtu = 100 d3_connections = [] @d3.on('connection') def on_d3_connection(connection): d3_connections.append(connection) await d1.power_on() await d2.power_on() await d3.power_on() d1_connection = await d1.connect(d3.random_address) assert len(d3_connections) == 1 assert d3_connections[0] is not None d2_connection = await d2.connect(d3.random_address) assert len(d3_connections) == 2 assert d3_connections[1] is not None d1_peer = Peer(d1_connection) d2_peer = Peer(d2_connection) d1_client_mtu = await d1_peer.request_mtu(220) assert d1_client_mtu == 100 assert d1_connection.att_mtu == 100 d2_client_mtu = await d2_peer.request_mtu(50) assert d2_client_mtu == 50 assert d2_connection.att_mtu == 50 # ----------------------------------------------------------------------------- def test_char_property_to_string(): # single assert Characteristic.property_name(0x01) == "BROADCAST" assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST" # double assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ" assert ( Characteristic.properties_as_string( Characteristic.BROADCAST | Characteristic.READ ) == "BROADCAST,READ" ) # ----------------------------------------------------------------------------- def test_char_property_string_to_type(): # single assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST # double assert ( Characteristic.string_to_properties("BROADCAST,READ") == Characteristic.BROADCAST | Characteristic.READ ) assert ( Characteristic.string_to_properties("READ,BROADCAST") == Characteristic.BROADCAST | Characteristic.READ ) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_server_string(): [_, server] = LinkedDevices().devices[:2] characteristic = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]), ) service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic]) server.add_service(service) assert ( str(server.gatt_server) == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access)) CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ) Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ) CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ) Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ) Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829) CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY) Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY) Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)""" ) # ----------------------------------------------------------------------------- async def async_main(): await test_read_write() await test_read_write2() await test_subscribe_notify() await test_characteristic_encoding() await test_mtu_exchange() # ----------------------------------------------------------------------------- def test_attribute_string_to_permissions(): assert Attribute.string_to_permissions('READABLE') == 1 assert Attribute.string_to_permissions('WRITEABLE') == 2 assert Attribute.string_to_permissions('READABLE,WRITEABLE') == 3 # ----------------------------------------------------------------------------- def test_charracteristic_permissions(): characteristic = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 'READABLE,WRITEABLE', ) assert characteristic.permissions == 3 # ----------------------------------------------------------------------------- def test_descriptor_permissions(): descriptor = Descriptor('2902', 'READABLE,WRITEABLE') assert descriptor.permissions == 3 # ----------------------------------------------------------------------------- if __name__ == '__main__': logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) test_UUID() test_ATT_Error_Response() test_ATT_Read_By_Group_Type_Request() test_CharacteristicValue() test_CharacteristicAdapter() asyncio.run(async_main())