1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import os 21import struct 22import pytest 23 24from bumble.controller import Controller 25from bumble.link import LocalLink 26from bumble.device import Device, Peer 27from bumble.host import Host 28from bumble.gatt import ( 29 GATT_BATTERY_LEVEL_CHARACTERISTIC, 30 CharacteristicAdapter, 31 DelegatedCharacteristicAdapter, 32 PackedCharacteristicAdapter, 33 MappedCharacteristicAdapter, 34 UTF8CharacteristicAdapter, 35 Service, 36 Characteristic, 37 CharacteristicValue 38) 39from bumble.transport import AsyncPipeSink 40from bumble.core import UUID 41from bumble.att import ( 42 ATT_EXCHANGE_MTU_REQUEST, 43 ATT_ATTRIBUTE_NOT_FOUND_ERROR, 44 ATT_PDU, 45 ATT_Error_Response, 46 ATT_Read_By_Group_Type_Request 47) 48 49 50# ----------------------------------------------------------------------------- 51def basic_check(x): 52 pdu = x.to_bytes() 53 parsed = ATT_PDU.from_bytes(pdu) 54 x_str = str(x) 55 parsed_str = str(parsed) 56 assert(x_str == parsed_str) 57 58 59# ----------------------------------------------------------------------------- 60def test_UUID(): 61 u = UUID.from_16_bits(0x7788) 62 assert(str(u) == 'UUID-16:7788') 63 u = UUID.from_32_bits(0x11223344) 64 assert(str(u) == 'UUID-32:11223344') 65 u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') 66 assert(str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') 67 v = UUID(str(u)) 68 assert(str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') 69 w = UUID.from_bytes(v.to_bytes()) 70 assert(str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') 71 72 u1 = UUID.from_16_bits(0x1234) 73 b1 = u1.to_bytes(force_128 = True) 74 u2 = UUID.from_bytes(b1) 75 assert(u1 == u2) 76 77 u3 = UUID.from_16_bits(0x180a) 78 assert(str(u3) == 'UUID-16:180A (Device Information)') 79 80 81# ----------------------------------------------------------------------------- 82def test_ATT_Error_Response(): 83 pdu = ATT_Error_Response( 84 request_opcode_in_error = ATT_EXCHANGE_MTU_REQUEST, 85 attribute_handle_in_error = 0x0000, 86 error_code = ATT_ATTRIBUTE_NOT_FOUND_ERROR 87 ) 88 basic_check(pdu) 89 90 91# ----------------------------------------------------------------------------- 92def test_ATT_Read_By_Group_Type_Request(): 93 pdu = ATT_Read_By_Group_Type_Request( 94 starting_handle = 0x0001, 95 ending_handle = 0xFFFF, 96 attribute_group_type = UUID.from_16_bits(0x2800) 97 ) 98 basic_check(pdu) 99 100 101# ----------------------------------------------------------------------------- 102def test_CharacteristicAdapter(): 103 # Check that the CharacteristicAdapter base class is transparent 104 v = bytes([1, 2, 3]) 105 c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v) 106 a = CharacteristicAdapter(c) 107 108 value = a.read_value(None) 109 assert(value == v) 110 111 v = bytes([3, 4, 5]) 112 a.write_value(None, v) 113 assert(c.value == v) 114 115 # Simple delegated adapter 116 a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))) 117 118 value = a.read_value(None) 119 assert(value == bytes(reversed(v))) 120 121 v = bytes([3, 4, 5]) 122 a.write_value(None, v) 123 assert(a.value == bytes(reversed(v))) 124 125 # Packed adapter with single element format 126 v = 1234 127 pv = struct.pack('>H', v) 128 c.value = v 129 a = PackedCharacteristicAdapter(c, '>H') 130 131 value = a.read_value(None) 132 assert(value == pv) 133 c.value = None 134 a.write_value(None, pv) 135 assert(a.value == v) 136 137 # Packed adapter with multi-element format 138 v1 = 1234 139 v2 = 5678 140 pv = struct.pack('>HH', v1, v2) 141 c.value = (v1, v2) 142 a = PackedCharacteristicAdapter(c, '>HH') 143 144 value = a.read_value(None) 145 assert(value == pv) 146 c.value = None 147 a.write_value(None, pv) 148 assert(a.value == (v1, v2)) 149 150 # Mapped adapter 151 v1 = 1234 152 v2 = 5678 153 pv = struct.pack('>HH', v1, v2) 154 mapped = {'v1': v1, 'v2': v2} 155 c.value = mapped 156 a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) 157 158 value = a.read_value(None) 159 assert(value == pv) 160 c.value = None 161 a.write_value(None, pv) 162 assert(a.value == mapped) 163 164 # UTF-8 adapter 165 v = 'Hello π' 166 ev = v.encode('utf-8') 167 c.value = v 168 a = UTF8CharacteristicAdapter(c) 169 170 value = a.read_value(None) 171 assert(value == ev) 172 c.value = None 173 a.write_value(None, ev) 174 assert(a.value == v) 175 176 177# ----------------------------------------------------------------------------- 178def test_CharacteristicValue(): 179 b = bytes([1, 2, 3]) 180 c = CharacteristicValue(read=lambda _: b) 181 x = c.read(None) 182 assert(x == b) 183 184 result = [] 185 c = CharacteristicValue(write=lambda connection, value: result.append((connection, value))) 186 z = object() 187 c.write(z, b) 188 assert(result == [(z, b)]) 189 190 191# ----------------------------------------------------------------------------- 192class TwoDevices: 193 def __init__(self): 194 self.connections = [None, None] 195 196 self.link = LocalLink() 197 self.controllers = [ 198 Controller('C1', link = self.link), 199 Controller('C2', link = self.link) 200 ] 201 self.devices = [ 202 Device( 203 address = 'F0:F1:F2:F3:F4:F5', 204 host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0])) 205 ), 206 Device( 207 address = 'F5:F4:F3:F2:F1:F0', 208 host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1])) 209 ) 210 ] 211 212 self.paired = [None, None] 213 214 215# ----------------------------------------------------------------------------- 216async def async_barrier(): 217 ready = asyncio.get_running_loop().create_future() 218 asyncio.get_running_loop().call_soon(ready.set_result, None) 219 await ready 220 221 222# ----------------------------------------------------------------------------- 223@pytest.mark.asyncio 224async def test_read_write(): 225 [client, server] = TwoDevices().devices 226 227 characteristic1 = Characteristic( 228 'FDB159DB-036C-49E3-B3DB-6325AC750806', 229 Characteristic.READ | Characteristic.WRITE, 230 Characteristic.READABLE | Characteristic.WRITEABLE 231 ) 232 233 def on_characteristic1_write(connection, value): 234 characteristic1._last_value = (connection, value) 235 236 characteristic1.on('write', on_characteristic1_write) 237 238 def on_characteristic2_read(connection): 239 return bytes(str(connection.peer_address)) 240 241 def on_characteristic2_write(connection, value): 242 characteristic2._last_value = (connection, value) 243 244 characteristic2 = Characteristic( 245 '66DE9057-C848-4ACA-B993-D675644EBB85', 246 Characteristic.READ | Characteristic.WRITE, 247 Characteristic.READABLE | Characteristic.WRITEABLE, 248 CharacteristicValue(read=on_characteristic2_read, write=on_characteristic2_write) 249 ) 250 251 service1 = Service( 252 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 253 [ 254 characteristic1, 255 characteristic2 256 ] 257 ) 258 server.add_services([service1]) 259 260 await client.power_on() 261 await server.power_on() 262 connection = await client.connect(server.random_address) 263 peer = Peer(connection) 264 265 await peer.discover_services() 266 await peer.discover_characteristics() 267 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 268 assert(len(c) == 1) 269 c1 = c[0] 270 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 271 assert(len(c) == 1) 272 c2 = c[0] 273 274 v1 = await peer.read_value(c1) 275 assert(v1 == b'') 276 b = bytes([1, 2, 3]) 277 await peer.write_value(c1, b) 278 await async_barrier() 279 assert(characteristic1.value == b) 280 v1 = await peer.read_value(c1) 281 assert(v1 == b) 282 assert(type(characteristic1._last_value) is tuple) 283 assert(len(characteristic1._last_value) == 2) 284 assert(str(characteristic1._last_value[0].peer_address) == str(client.random_address)) 285 assert(characteristic1._last_value[1] == b) 286 bb = bytes([3, 4, 5, 6]) 287 characteristic1.value = bb 288 v1 = await peer.read_value(c1) 289 assert(v1 == bb) 290 291 await peer.write_value(c2, b) 292 await async_barrier() 293 assert(type(characteristic2._last_value) is tuple) 294 assert(len(characteristic2._last_value) == 2) 295 assert(str(characteristic2._last_value[0].peer_address) == str(client.random_address)) 296 assert(characteristic2._last_value[1] == b) 297 298 299# ----------------------------------------------------------------------------- 300@pytest.mark.asyncio 301async def test_read_write2(): 302 [client, server] = TwoDevices().devices 303 304 v = bytes([0x11, 0x22, 0x33, 0x44]) 305 characteristic1 = Characteristic( 306 'FDB159DB-036C-49E3-B3DB-6325AC750806', 307 Characteristic.READ | Characteristic.WRITE, 308 Characteristic.READABLE | Characteristic.WRITEABLE, 309 value=v 310 ) 311 312 service1 = Service( 313 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 314 [ 315 characteristic1 316 ] 317 ) 318 server.add_services([service1]) 319 320 await client.power_on() 321 await server.power_on() 322 connection = await client.connect(server.random_address) 323 peer = Peer(connection) 324 325 await peer.discover_services() 326 c = peer.get_services_by_uuid(service1.uuid) 327 assert(len(c) == 1) 328 s = c[0] 329 await s.discover_characteristics() 330 c = s.get_characteristics_by_uuid(characteristic1.uuid) 331 assert(len(c) == 1) 332 c1 = c[0] 333 334 v1 = await c1.read_value() 335 assert(v1 == v) 336 337 a1 = PackedCharacteristicAdapter(c1, '>I') 338 v1 = await a1.read_value() 339 assert(v1 == struct.unpack('>I', v)[0]) 340 341 b = bytes([0x55, 0x66, 0x77, 0x88]) 342 await a1.write_value(struct.unpack('>I', b)[0]) 343 await async_barrier() 344 assert(characteristic1.value == b) 345 v1 = await a1.read_value() 346 assert(v1 == struct.unpack('>I', b)[0]) 347 348 349# ----------------------------------------------------------------------------- 350@pytest.mark.asyncio 351async def test_subscribe_notify(): 352 [client, server] = TwoDevices().devices 353 354 characteristic1 = Characteristic( 355 'FDB159DB-036C-49E3-B3DB-6325AC750806', 356 Characteristic.READ | Characteristic.NOTIFY, 357 Characteristic.READABLE, 358 bytes([1, 2, 3]) 359 ) 360 361 def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled): 362 characteristic1._last_subscription = (connection, notify_enabled, indicate_enabled) 363 364 characteristic1.on('subscription', on_characteristic1_subscription) 365 366 characteristic2 = Characteristic( 367 '66DE9057-C848-4ACA-B993-D675644EBB85', 368 Characteristic.READ | Characteristic.INDICATE, 369 Characteristic.READABLE, 370 bytes([4, 5, 6]) 371 ) 372 373 def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled): 374 characteristic2._last_subscription = (connection, notify_enabled, indicate_enabled) 375 376 characteristic2.on('subscription', on_characteristic2_subscription) 377 378 characteristic3 = Characteristic( 379 'AB5E639C-40C1-4238-B9CB-AF41F8B806E4', 380 Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE, 381 Characteristic.READABLE, 382 bytes([7, 8, 9]) 383 ) 384 385 def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled): 386 characteristic3._last_subscription = (connection, notify_enabled, indicate_enabled) 387 388 characteristic3.on('subscription', on_characteristic3_subscription) 389 390 service1 = Service( 391 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 392 [ 393 characteristic1, 394 characteristic2, 395 characteristic3 396 ] 397 ) 398 server.add_services([service1]) 399 400 def on_characteristic_subscription(connection, characteristic, notify_enabled, indicate_enabled): 401 server._last_subscription = (connection, characteristic, notify_enabled, indicate_enabled) 402 403 server.on('characteristic_subscription', on_characteristic_subscription) 404 405 await client.power_on() 406 await server.power_on() 407 connection = await client.connect(server.random_address) 408 peer = Peer(connection) 409 410 await peer.discover_services() 411 await peer.discover_characteristics() 412 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 413 assert(len(c) == 1) 414 c1 = c[0] 415 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 416 assert(len(c) == 1) 417 c2 = c[0] 418 c = peer.get_characteristics_by_uuid(characteristic3.uuid) 419 assert(len(c) == 1) 420 c3 = c[0] 421 422 c1._last_update = None 423 424 def on_c1_update(connection, value): 425 c1._last_update = (connection, value) 426 427 c1.on('update', on_c1_update) 428 await peer.subscribe(c1) 429 await async_barrier() 430 assert(server._last_subscription[1] == characteristic1) 431 assert(server._last_subscription[2]) 432 assert(not server._last_subscription[3]) 433 assert(characteristic1._last_subscription[1]) 434 assert(not characteristic1._last_subscription[2]) 435 await server.indicate_subscribers(characteristic1) 436 await async_barrier() 437 assert(c1._last_update is None) 438 await server.notify_subscribers(characteristic1) 439 await async_barrier() 440 assert(c1._last_update is not None) 441 assert(c1._last_update[1] == characteristic1.value) 442 443 c2._last_update = None 444 445 def on_c2_update(value): 446 c2._last_update = (connection, value) 447 448 await peer.subscribe(c2, on_c2_update) 449 await async_barrier() 450 await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2) 451 await async_barrier() 452 assert(c2._last_update is None) 453 await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2) 454 await async_barrier() 455 assert(c2._last_update is not None) 456 assert(c2._last_update[1] == characteristic2.value) 457 458 c3._last_update = None 459 460 def on_c3_update(connection, value): 461 c3._last_update = (connection, value) 462 463 c3.on('update', on_c3_update) 464 await peer.subscribe(c3) 465 await async_barrier() 466 await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3) 467 await async_barrier() 468 assert(c3._last_update is not None) 469 assert(c3._last_update[1] == characteristic3.value) 470 characteristic3.value = bytes([1, 2, 3]) 471 await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3) 472 await async_barrier() 473 assert(c3._last_update is not None) 474 assert(c3._last_update[1] == characteristic3.value) 475 476 477# ----------------------------------------------------------------------------- 478async def async_main(): 479 await test_read_write() 480 await test_read_write2() 481 await test_subscribe_notify() 482 483# ----------------------------------------------------------------------------- 484if __name__ == '__main__': 485 logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 486 test_UUID() 487 test_ATT_Error_Response() 488 test_ATT_Read_By_Group_Type_Request() 489 test_CharacteristicValue() 490 test_CharacteristicAdapter() 491 asyncio.run(async_main()) 492