• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import os
21import struct
22import pytest
23
24from bumble.controller import Controller
25from bumble.link import LocalLink
26from bumble.device import Device, Peer
27from bumble.host import Host
28from bumble.gatt import (
29    GATT_BATTERY_LEVEL_CHARACTERISTIC,
30    CharacteristicAdapter,
31    DelegatedCharacteristicAdapter,
32    PackedCharacteristicAdapter,
33    MappedCharacteristicAdapter,
34    UTF8CharacteristicAdapter,
35    Service,
36    Characteristic,
37    CharacteristicValue
38)
39from bumble.transport import AsyncPipeSink
40from bumble.core import UUID
41from bumble.att import (
42    ATT_EXCHANGE_MTU_REQUEST,
43    ATT_ATTRIBUTE_NOT_FOUND_ERROR,
44    ATT_PDU,
45    ATT_Error_Response,
46    ATT_Read_By_Group_Type_Request
47)
48
49
50# -----------------------------------------------------------------------------
51def basic_check(x):
52    pdu = x.to_bytes()
53    parsed = ATT_PDU.from_bytes(pdu)
54    x_str = str(x)
55    parsed_str = str(parsed)
56    assert(x_str == parsed_str)
57
58
59# -----------------------------------------------------------------------------
60def test_UUID():
61    u = UUID.from_16_bits(0x7788)
62    assert(str(u) == 'UUID-16:7788')
63    u = UUID.from_32_bits(0x11223344)
64    assert(str(u) == 'UUID-32:11223344')
65    u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
66    assert(str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
67    v = UUID(str(u))
68    assert(str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
69    w = UUID.from_bytes(v.to_bytes())
70    assert(str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
71
72    u1 = UUID.from_16_bits(0x1234)
73    b1 = u1.to_bytes(force_128 = True)
74    u2 = UUID.from_bytes(b1)
75    assert(u1 == u2)
76
77    u3 = UUID.from_16_bits(0x180a)
78    assert(str(u3) == 'UUID-16:180A (Device Information)')
79
80
81# -----------------------------------------------------------------------------
82def test_ATT_Error_Response():
83    pdu = ATT_Error_Response(
84        request_opcode_in_error = ATT_EXCHANGE_MTU_REQUEST,
85        attribute_handle_in_error = 0x0000,
86        error_code = ATT_ATTRIBUTE_NOT_FOUND_ERROR
87    )
88    basic_check(pdu)
89
90
91# -----------------------------------------------------------------------------
92def test_ATT_Read_By_Group_Type_Request():
93    pdu = ATT_Read_By_Group_Type_Request(
94        starting_handle      = 0x0001,
95        ending_handle        = 0xFFFF,
96        attribute_group_type = UUID.from_16_bits(0x2800)
97    )
98    basic_check(pdu)
99
100
101# -----------------------------------------------------------------------------
102def test_CharacteristicAdapter():
103    # Check that the CharacteristicAdapter base class is transparent
104    v = bytes([1, 2, 3])
105    c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v)
106    a = CharacteristicAdapter(c)
107
108    value = a.read_value(None)
109    assert(value == v)
110
111    v = bytes([3, 4, 5])
112    a.write_value(None, v)
113    assert(c.value == v)
114
115    # Simple delegated adapter
116    a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
117
118    value = a.read_value(None)
119    assert(value == bytes(reversed(v)))
120
121    v = bytes([3, 4, 5])
122    a.write_value(None, v)
123    assert(a.value == bytes(reversed(v)))
124
125    # Packed adapter with single element format
126    v = 1234
127    pv = struct.pack('>H', v)
128    c.value = v
129    a = PackedCharacteristicAdapter(c, '>H')
130
131    value = a.read_value(None)
132    assert(value == pv)
133    c.value = None
134    a.write_value(None, pv)
135    assert(a.value == v)
136
137    # Packed adapter with multi-element format
138    v1 = 1234
139    v2 = 5678
140    pv = struct.pack('>HH', v1, v2)
141    c.value = (v1, v2)
142    a = PackedCharacteristicAdapter(c, '>HH')
143
144    value = a.read_value(None)
145    assert(value == pv)
146    c.value = None
147    a.write_value(None, pv)
148    assert(a.value == (v1, v2))
149
150    # Mapped adapter
151    v1 = 1234
152    v2 = 5678
153    pv = struct.pack('>HH', v1, v2)
154    mapped = {'v1': v1, 'v2': v2}
155    c.value = mapped
156    a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
157
158    value = a.read_value(None)
159    assert(value == pv)
160    c.value = None
161    a.write_value(None, pv)
162    assert(a.value == mapped)
163
164    # UTF-8 adapter
165    v = 'Hello π'
166    ev = v.encode('utf-8')
167    c.value = v
168    a = UTF8CharacteristicAdapter(c)
169
170    value = a.read_value(None)
171    assert(value == ev)
172    c.value = None
173    a.write_value(None, ev)
174    assert(a.value == v)
175
176
177# -----------------------------------------------------------------------------
178def test_CharacteristicValue():
179    b = bytes([1, 2, 3])
180    c = CharacteristicValue(read=lambda _: b)
181    x = c.read(None)
182    assert(x == b)
183
184    result = []
185    c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
186    z = object()
187    c.write(z, b)
188    assert(result == [(z, b)])
189
190
191# -----------------------------------------------------------------------------
192class TwoDevices:
193    def __init__(self):
194        self.connections = [None, None]
195
196        self.link = LocalLink()
197        self.controllers = [
198            Controller('C1', link = self.link),
199            Controller('C2', link = self.link)
200        ]
201        self.devices = [
202            Device(
203                address = 'F0:F1:F2:F3:F4:F5',
204                host    = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
205            ),
206            Device(
207                address = 'F5:F4:F3:F2:F1:F0',
208                host    = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
209            )
210        ]
211
212        self.paired = [None, None]
213
214
215# -----------------------------------------------------------------------------
216async def async_barrier():
217    ready = asyncio.get_running_loop().create_future()
218    asyncio.get_running_loop().call_soon(ready.set_result, None)
219    await ready
220
221
222# -----------------------------------------------------------------------------
223@pytest.mark.asyncio
224async def test_read_write():
225    [client, server] = TwoDevices().devices
226
227    characteristic1 = Characteristic(
228        'FDB159DB-036C-49E3-B3DB-6325AC750806',
229        Characteristic.READ | Characteristic.WRITE,
230        Characteristic.READABLE | Characteristic.WRITEABLE
231    )
232
233    def on_characteristic1_write(connection, value):
234        characteristic1._last_value = (connection, value)
235
236    characteristic1.on('write', on_characteristic1_write)
237
238    def on_characteristic2_read(connection):
239        return bytes(str(connection.peer_address))
240
241    def on_characteristic2_write(connection, value):
242        characteristic2._last_value = (connection, value)
243
244    characteristic2 = Characteristic(
245        '66DE9057-C848-4ACA-B993-D675644EBB85',
246        Characteristic.READ | Characteristic.WRITE,
247        Characteristic.READABLE | Characteristic.WRITEABLE,
248        CharacteristicValue(read=on_characteristic2_read, write=on_characteristic2_write)
249    )
250
251    service1 = Service(
252        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
253        [
254            characteristic1,
255            characteristic2
256        ]
257    )
258    server.add_services([service1])
259
260    await client.power_on()
261    await server.power_on()
262    connection = await client.connect(server.random_address)
263    peer = Peer(connection)
264
265    await peer.discover_services()
266    await peer.discover_characteristics()
267    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
268    assert(len(c) == 1)
269    c1 = c[0]
270    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
271    assert(len(c) == 1)
272    c2 = c[0]
273
274    v1 = await peer.read_value(c1)
275    assert(v1 == b'')
276    b = bytes([1, 2, 3])
277    await peer.write_value(c1, b)
278    await async_barrier()
279    assert(characteristic1.value == b)
280    v1 = await peer.read_value(c1)
281    assert(v1 == b)
282    assert(type(characteristic1._last_value) is tuple)
283    assert(len(characteristic1._last_value) == 2)
284    assert(str(characteristic1._last_value[0].peer_address) == str(client.random_address))
285    assert(characteristic1._last_value[1] == b)
286    bb = bytes([3, 4, 5, 6])
287    characteristic1.value = bb
288    v1 = await peer.read_value(c1)
289    assert(v1 == bb)
290
291    await peer.write_value(c2, b)
292    await async_barrier()
293    assert(type(characteristic2._last_value) is tuple)
294    assert(len(characteristic2._last_value) == 2)
295    assert(str(characteristic2._last_value[0].peer_address) == str(client.random_address))
296    assert(characteristic2._last_value[1] == b)
297
298
299# -----------------------------------------------------------------------------
300@pytest.mark.asyncio
301async def test_read_write2():
302    [client, server] = TwoDevices().devices
303
304    v = bytes([0x11, 0x22, 0x33, 0x44])
305    characteristic1 = Characteristic(
306        'FDB159DB-036C-49E3-B3DB-6325AC750806',
307        Characteristic.READ | Characteristic.WRITE,
308        Characteristic.READABLE | Characteristic.WRITEABLE,
309        value=v
310    )
311
312    service1 = Service(
313        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
314        [
315            characteristic1
316        ]
317    )
318    server.add_services([service1])
319
320    await client.power_on()
321    await server.power_on()
322    connection = await client.connect(server.random_address)
323    peer = Peer(connection)
324
325    await peer.discover_services()
326    c = peer.get_services_by_uuid(service1.uuid)
327    assert(len(c) == 1)
328    s = c[0]
329    await s.discover_characteristics()
330    c = s.get_characteristics_by_uuid(characteristic1.uuid)
331    assert(len(c) == 1)
332    c1 = c[0]
333
334    v1 = await c1.read_value()
335    assert(v1 == v)
336
337    a1 = PackedCharacteristicAdapter(c1, '>I')
338    v1 = await a1.read_value()
339    assert(v1 == struct.unpack('>I', v)[0])
340
341    b = bytes([0x55, 0x66, 0x77, 0x88])
342    await a1.write_value(struct.unpack('>I', b)[0])
343    await async_barrier()
344    assert(characteristic1.value == b)
345    v1 = await a1.read_value()
346    assert(v1 == struct.unpack('>I', b)[0])
347
348
349# -----------------------------------------------------------------------------
350@pytest.mark.asyncio
351async def test_subscribe_notify():
352    [client, server] = TwoDevices().devices
353
354    characteristic1 = Characteristic(
355        'FDB159DB-036C-49E3-B3DB-6325AC750806',
356        Characteristic.READ | Characteristic.NOTIFY,
357        Characteristic.READABLE,
358        bytes([1, 2, 3])
359    )
360
361    def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled):
362        characteristic1._last_subscription = (connection, notify_enabled, indicate_enabled)
363
364    characteristic1.on('subscription', on_characteristic1_subscription)
365
366    characteristic2 = Characteristic(
367        '66DE9057-C848-4ACA-B993-D675644EBB85',
368        Characteristic.READ | Characteristic.INDICATE,
369        Characteristic.READABLE,
370        bytes([4, 5, 6])
371    )
372
373    def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled):
374        characteristic2._last_subscription = (connection, notify_enabled, indicate_enabled)
375
376    characteristic2.on('subscription', on_characteristic2_subscription)
377
378    characteristic3 = Characteristic(
379        'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
380        Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
381        Characteristic.READABLE,
382        bytes([7, 8, 9])
383    )
384
385    def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled):
386        characteristic3._last_subscription = (connection, notify_enabled, indicate_enabled)
387
388    characteristic3.on('subscription', on_characteristic3_subscription)
389
390    service1 = Service(
391        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
392        [
393            characteristic1,
394            characteristic2,
395            characteristic3
396        ]
397    )
398    server.add_services([service1])
399
400    def on_characteristic_subscription(connection, characteristic, notify_enabled, indicate_enabled):
401        server._last_subscription = (connection, characteristic, notify_enabled, indicate_enabled)
402
403    server.on('characteristic_subscription', on_characteristic_subscription)
404
405    await client.power_on()
406    await server.power_on()
407    connection = await client.connect(server.random_address)
408    peer = Peer(connection)
409
410    await peer.discover_services()
411    await peer.discover_characteristics()
412    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
413    assert(len(c) == 1)
414    c1 = c[0]
415    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
416    assert(len(c) == 1)
417    c2 = c[0]
418    c = peer.get_characteristics_by_uuid(characteristic3.uuid)
419    assert(len(c) == 1)
420    c3 = c[0]
421
422    c1._last_update = None
423
424    def on_c1_update(connection, value):
425        c1._last_update = (connection, value)
426
427    c1.on('update', on_c1_update)
428    await peer.subscribe(c1)
429    await async_barrier()
430    assert(server._last_subscription[1] == characteristic1)
431    assert(server._last_subscription[2])
432    assert(not server._last_subscription[3])
433    assert(characteristic1._last_subscription[1])
434    assert(not characteristic1._last_subscription[2])
435    await server.indicate_subscribers(characteristic1)
436    await async_barrier()
437    assert(c1._last_update is None)
438    await server.notify_subscribers(characteristic1)
439    await async_barrier()
440    assert(c1._last_update is not None)
441    assert(c1._last_update[1] == characteristic1.value)
442
443    c2._last_update = None
444
445    def on_c2_update(value):
446        c2._last_update = (connection, value)
447
448    await peer.subscribe(c2, on_c2_update)
449    await async_barrier()
450    await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2)
451    await async_barrier()
452    assert(c2._last_update is None)
453    await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
454    await async_barrier()
455    assert(c2._last_update is not None)
456    assert(c2._last_update[1] == characteristic2.value)
457
458    c3._last_update = None
459
460    def on_c3_update(connection, value):
461        c3._last_update = (connection, value)
462
463    c3.on('update', on_c3_update)
464    await peer.subscribe(c3)
465    await async_barrier()
466    await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
467    await async_barrier()
468    assert(c3._last_update is not None)
469    assert(c3._last_update[1] == characteristic3.value)
470    characteristic3.value = bytes([1, 2, 3])
471    await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
472    await async_barrier()
473    assert(c3._last_update is not None)
474    assert(c3._last_update[1] == characteristic3.value)
475
476
477# -----------------------------------------------------------------------------
478async def async_main():
479    await test_read_write()
480    await test_read_write2()
481    await test_subscribe_notify()
482
483# -----------------------------------------------------------------------------
484if __name__ == '__main__':
485    logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
486    test_UUID()
487    test_ATT_Error_Response()
488    test_ATT_Read_By_Group_Type_Request()
489    test_CharacteristicValue()
490    test_CharacteristicAdapter()
491    asyncio.run(async_main())
492