• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import os
21import struct
22import pytest
23from unittest.mock import AsyncMock, Mock, ANY
24
25from bumble.controller import Controller
26from bumble.gatt_client import CharacteristicProxy
27from bumble.link import LocalLink
28from bumble.device import Device, Peer
29from bumble.host import Host
30from bumble.gatt import (
31    GATT_BATTERY_LEVEL_CHARACTERISTIC,
32    GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
33    CharacteristicAdapter,
34    DelegatedCharacteristicAdapter,
35    PackedCharacteristicAdapter,
36    MappedCharacteristicAdapter,
37    UTF8CharacteristicAdapter,
38    Service,
39    Characteristic,
40    CharacteristicValue,
41    Descriptor,
42)
43from bumble.transport import AsyncPipeSink
44from bumble.core import UUID
45from bumble.att import (
46    Attribute,
47    ATT_EXCHANGE_MTU_REQUEST,
48    ATT_ATTRIBUTE_NOT_FOUND_ERROR,
49    ATT_PDU,
50    ATT_Error_Response,
51    ATT_Read_By_Group_Type_Request,
52)
53from .test_utils import async_barrier
54
55
56# -----------------------------------------------------------------------------
57def basic_check(x):
58    pdu = x.to_bytes()
59    parsed = ATT_PDU.from_bytes(pdu)
60    x_str = str(x)
61    parsed_str = str(parsed)
62    assert x_str == parsed_str
63
64
65# -----------------------------------------------------------------------------
66def test_UUID():
67    u = UUID.from_16_bits(0x7788)
68    assert str(u) == 'UUID-16:7788'
69    u = UUID.from_32_bits(0x11223344)
70    assert str(u) == 'UUID-32:11223344'
71    u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
72    assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
73    v = UUID(str(u))
74    assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
75    w = UUID.from_bytes(v.to_bytes())
76    assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
77
78    u1 = UUID.from_16_bits(0x1234)
79    b1 = u1.to_bytes(force_128=True)
80    u2 = UUID.from_bytes(b1)
81    assert u1 == u2
82
83    u3 = UUID.from_16_bits(0x180A)
84    assert str(u3) == 'UUID-16:180A (Device Information)'
85
86
87# -----------------------------------------------------------------------------
88def test_ATT_Error_Response():
89    pdu = ATT_Error_Response(
90        request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST,
91        attribute_handle_in_error=0x0000,
92        error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR,
93    )
94    basic_check(pdu)
95
96
97# -----------------------------------------------------------------------------
98def test_ATT_Read_By_Group_Type_Request():
99    pdu = ATT_Read_By_Group_Type_Request(
100        starting_handle=0x0001,
101        ending_handle=0xFFFF,
102        attribute_group_type=UUID.from_16_bits(0x2800),
103    )
104    basic_check(pdu)
105
106
107# -----------------------------------------------------------------------------
108@pytest.mark.asyncio
109async def test_characteristic_encoding():
110    class Foo(Characteristic):
111        def encode_value(self, value):
112            return bytes([value])
113
114        def decode_value(self, value_bytes):
115            return value_bytes[0]
116
117    c = Foo(
118        GATT_BATTERY_LEVEL_CHARACTERISTIC,
119        Characteristic.Properties.READ,
120        Characteristic.READABLE,
121        123,
122    )
123    x = await c.read_value(None)
124    assert x == bytes([123])
125    await c.write_value(None, bytes([122]))
126    assert c.value == 122
127
128    class FooProxy(CharacteristicProxy):
129        def __init__(self, characteristic):
130            super().__init__(
131                characteristic.client,
132                characteristic.handle,
133                characteristic.end_group_handle,
134                characteristic.uuid,
135                characteristic.properties,
136            )
137
138        def encode_value(self, value):
139            return bytes([value])
140
141        def decode_value(self, value_bytes):
142            return value_bytes[0]
143
144    [client, server] = LinkedDevices().devices[:2]
145
146    characteristic = Characteristic(
147        'FDB159DB-036C-49E3-B3DB-6325AC750806',
148        Characteristic.Properties.READ
149        | Characteristic.Properties.WRITE
150        | Characteristic.Properties.NOTIFY,
151        Characteristic.READABLE | Characteristic.WRITEABLE,
152        bytes([123]),
153    )
154
155    async def async_read(connection):
156        return 0x05060708
157
158    async_characteristic = PackedCharacteristicAdapter(
159        Characteristic(
160            '2AB7E91B-43E8-4F73-AC3B-80C1683B47F9',
161            Characteristic.Properties.READ,
162            Characteristic.READABLE,
163            CharacteristicValue(read=async_read),
164        ),
165        '>I',
166    )
167
168    service = Service(
169        '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic, async_characteristic]
170    )
171    server.add_service(service)
172
173    await client.power_on()
174    await server.power_on()
175    connection = await client.connect(server.random_address)
176    peer = Peer(connection)
177
178    await peer.discover_services()
179    await peer.discover_characteristics()
180    c = peer.get_characteristics_by_uuid(characteristic.uuid)
181    assert len(c) == 1
182    c = c[0]
183    cp = FooProxy(c)
184
185    v = await cp.read_value()
186    assert v == 123
187    await cp.write_value(124)
188    await async_barrier()
189    assert characteristic.value == bytes([124])
190
191    v = await cp.read_value()
192    assert v == 124
193    await cp.write_value(125, with_response=True)
194    await async_barrier()
195    assert characteristic.value == bytes([125])
196
197    cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
198    await cd.write_value(100, with_response=True)
199    await async_barrier()
200    assert characteristic.value == bytes([50])
201
202    c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid)
203    assert len(c2) == 1
204    c2 = c2[0]
205    cd2 = PackedCharacteristicAdapter(c2, ">I")
206    cd2v = await cd2.read_value()
207    assert cd2v == 0x05060708
208
209    last_change = None
210
211    def on_change(value):
212        nonlocal last_change
213        last_change = value
214
215    await c.subscribe(on_change)
216    await server.notify_subscribers(characteristic)
217    await async_barrier()
218    assert last_change == characteristic.value
219    last_change = None
220
221    await server.notify_subscribers(characteristic, value=bytes([125]))
222    await async_barrier()
223    assert last_change == bytes([125])
224    last_change = None
225
226    await c.unsubscribe(on_change)
227    await server.notify_subscribers(characteristic)
228    await async_barrier()
229    assert last_change is None
230
231    await cp.subscribe(on_change)
232    await server.notify_subscribers(characteristic)
233    await async_barrier()
234    assert last_change == characteristic.value[0]
235    last_change = None
236
237    await server.notify_subscribers(characteristic, value=bytes([126]))
238    await async_barrier()
239    assert last_change == 126
240    last_change = None
241
242    await cp.unsubscribe(on_change)
243    await server.notify_subscribers(characteristic)
244    await async_barrier()
245    assert last_change is None
246
247    cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
248    await cd.subscribe(on_change)
249    await server.notify_subscribers(characteristic)
250    await async_barrier()
251    assert last_change == characteristic.value[0]
252    last_change = None
253
254    await cd.unsubscribe(on_change)
255    await server.notify_subscribers(characteristic)
256    await async_barrier()
257    assert last_change is None
258
259
260# -----------------------------------------------------------------------------
261@pytest.mark.asyncio
262async def test_attribute_getters():
263    [client, server] = LinkedDevices().devices[:2]
264
265    characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
266    characteristic = Characteristic(
267        characteristic_uuid,
268        Characteristic.Properties.READ
269        | Characteristic.Properties.WRITE
270        | Characteristic.Properties.NOTIFY,
271        Characteristic.READABLE | Characteristic.WRITEABLE,
272        bytes([123]),
273    )
274
275    service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829')
276    service = Service(service_uuid, [characteristic])
277    server.add_service(service)
278
279    service_attr = server.gatt_server.get_service_attribute(service_uuid)
280    assert service_attr
281
282    (
283        char_decl_attr,
284        char_value_attr,
285    ) = server.gatt_server.get_characteristic_attributes(
286        service_uuid, characteristic_uuid
287    )
288    assert char_decl_attr and char_value_attr
289
290    desc_attr = server.gatt_server.get_descriptor_attribute(
291        service_uuid,
292        characteristic_uuid,
293        GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
294    )
295    assert desc_attr
296
297    # assert all handles are in expected order
298    assert (
299        service_attr.handle
300        < char_decl_attr.handle
301        < char_value_attr.handle
302        < desc_attr.handle
303        == service_attr.end_group_handle
304    )
305    # assert characteristic declarations attribute is followed by characteristic value attribute
306    assert char_decl_attr.handle + 1 == char_value_attr.handle
307
308
309# -----------------------------------------------------------------------------
310@pytest.mark.asyncio
311async def test_CharacteristicAdapter():
312    # Check that the CharacteristicAdapter base class is transparent
313    v = bytes([1, 2, 3])
314    c = Characteristic(
315        GATT_BATTERY_LEVEL_CHARACTERISTIC,
316        Characteristic.Properties.READ,
317        Characteristic.READABLE,
318        v,
319    )
320    a = CharacteristicAdapter(c)
321
322    value = await a.read_value(None)
323    assert value == v
324
325    v = bytes([3, 4, 5])
326    await a.write_value(None, v)
327    assert c.value == v
328
329    # Simple delegated adapter
330    a = DelegatedCharacteristicAdapter(
331        c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
332    )
333
334    value = await a.read_value(None)
335    assert value == bytes(reversed(v))
336
337    v = bytes([3, 4, 5])
338    await a.write_value(None, v)
339    assert a.value == bytes(reversed(v))
340
341    # Packed adapter with single element format
342    v = 1234
343    pv = struct.pack('>H', v)
344    c.value = v
345    a = PackedCharacteristicAdapter(c, '>H')
346
347    value = await a.read_value(None)
348    assert value == pv
349    c.value = None
350    await a.write_value(None, pv)
351    assert a.value == v
352
353    # Packed adapter with multi-element format
354    v1 = 1234
355    v2 = 5678
356    pv = struct.pack('>HH', v1, v2)
357    c.value = (v1, v2)
358    a = PackedCharacteristicAdapter(c, '>HH')
359
360    value = await a.read_value(None)
361    assert value == pv
362    c.value = None
363    await a.write_value(None, pv)
364    assert a.value == (v1, v2)
365
366    # Mapped adapter
367    v1 = 1234
368    v2 = 5678
369    pv = struct.pack('>HH', v1, v2)
370    mapped = {'v1': v1, 'v2': v2}
371    c.value = mapped
372    a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
373
374    value = await a.read_value(None)
375    assert value == pv
376    c.value = None
377    await a.write_value(None, pv)
378    assert a.value == mapped
379
380    # UTF-8 adapter
381    v = 'Hello π'
382    ev = v.encode('utf-8')
383    c.value = v
384    a = UTF8CharacteristicAdapter(c)
385
386    value = await a.read_value(None)
387    assert value == ev
388    c.value = None
389    await a.write_value(None, ev)
390    assert a.value == v
391
392
393# -----------------------------------------------------------------------------
394@pytest.mark.asyncio
395async def test_CharacteristicValue():
396    b = bytes([1, 2, 3])
397
398    async def read_value(connection):
399        return b
400
401    c = CharacteristicValue(read=read_value)
402    x = await c.read(None)
403    assert x == b
404
405    m = Mock()
406    c = CharacteristicValue(write=m)
407    z = object()
408    c.write(z, b)
409    m.assert_called_once_with(z, b)
410
411
412# -----------------------------------------------------------------------------
413@pytest.mark.asyncio
414async def test_CharacteristicValue_async():
415    b = bytes([1, 2, 3])
416
417    async def read_value(connection):
418        return b
419
420    c = CharacteristicValue(read=read_value)
421    x = await c.read(None)
422    assert x == b
423
424    m = AsyncMock()
425    c = CharacteristicValue(write=m)
426    z = object()
427    await c.write(z, b)
428    m.assert_called_once_with(z, b)
429
430
431# -----------------------------------------------------------------------------
432class LinkedDevices:
433    def __init__(self):
434        self.connections = [None, None, None]
435
436        self.link = LocalLink()
437        self.controllers = [
438            Controller('C1', link=self.link),
439            Controller('C2', link=self.link),
440            Controller('C3', link=self.link),
441        ]
442        self.devices = [
443            Device(
444                address='F0:F1:F2:F3:F4:F5',
445                host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
446            ),
447            Device(
448                address='F1:F2:F3:F4:F5:F6',
449                host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
450            ),
451            Device(
452                address='F2:F3:F4:F5:F6:F7',
453                host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])),
454            ),
455        ]
456
457        self.paired = [None, None, None]
458
459
460# -----------------------------------------------------------------------------
461@pytest.mark.asyncio
462async def test_read_write():
463    [client, server] = LinkedDevices().devices[:2]
464
465    characteristic1 = Characteristic(
466        'FDB159DB-036C-49E3-B3DB-6325AC750806',
467        Characteristic.Properties.READ | Characteristic.Properties.WRITE,
468        Characteristic.READABLE | Characteristic.WRITEABLE,
469    )
470
471    def on_characteristic1_write(connection, value):
472        characteristic1._last_value = (connection, value)
473
474    characteristic1.on('write', on_characteristic1_write)
475
476    def on_characteristic2_read(connection):
477        return bytes(str(connection.peer_address))
478
479    def on_characteristic2_write(connection, value):
480        characteristic2._last_value = (connection, value)
481
482    characteristic2 = Characteristic(
483        '66DE9057-C848-4ACA-B993-D675644EBB85',
484        Characteristic.Properties.READ | Characteristic.Properties.WRITE,
485        Characteristic.READABLE | Characteristic.WRITEABLE,
486        CharacteristicValue(
487            read=on_characteristic2_read, write=on_characteristic2_write
488        ),
489    )
490
491    service1 = Service(
492        '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2]
493    )
494    server.add_services([service1])
495
496    await client.power_on()
497    await server.power_on()
498    connection = await client.connect(server.random_address)
499    peer = Peer(connection)
500
501    await peer.discover_services()
502    await peer.discover_characteristics()
503    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
504    assert len(c) == 1
505    c1 = c[0]
506    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
507    assert len(c) == 1
508    c2 = c[0]
509
510    v1 = await peer.read_value(c1)
511    assert v1 == b''
512    b = bytes([1, 2, 3])
513    await peer.write_value(c1, b)
514    await async_barrier()
515    assert characteristic1.value == b
516    v1 = await peer.read_value(c1)
517    assert v1 == b
518    assert type(characteristic1._last_value is tuple)
519    assert len(characteristic1._last_value) == 2
520    assert str(characteristic1._last_value[0].peer_address) == str(
521        client.random_address
522    )
523    assert characteristic1._last_value[1] == b
524    bb = bytes([3, 4, 5, 6])
525    characteristic1.value = bb
526    v1 = await peer.read_value(c1)
527    assert v1 == bb
528
529    await peer.write_value(c2, b)
530    await async_barrier()
531    assert type(characteristic2._last_value is tuple)
532    assert len(characteristic2._last_value) == 2
533    assert str(characteristic2._last_value[0].peer_address) == str(
534        client.random_address
535    )
536    assert characteristic2._last_value[1] == b
537
538
539# -----------------------------------------------------------------------------
540@pytest.mark.asyncio
541async def test_read_write2():
542    [client, server] = LinkedDevices().devices[:2]
543
544    v = bytes([0x11, 0x22, 0x33, 0x44])
545    characteristic1 = Characteristic(
546        'FDB159DB-036C-49E3-B3DB-6325AC750806',
547        Characteristic.Properties.READ | Characteristic.Properties.WRITE,
548        Characteristic.READABLE | Characteristic.WRITEABLE,
549        value=v,
550    )
551
552    service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1])
553    server.add_services([service1])
554
555    await client.power_on()
556    await server.power_on()
557    connection = await client.connect(server.random_address)
558    peer = Peer(connection)
559
560    await peer.discover_services()
561    c = peer.get_services_by_uuid(service1.uuid)
562    assert len(c) == 1
563    s = c[0]
564    await s.discover_characteristics()
565    c = s.get_characteristics_by_uuid(characteristic1.uuid)
566    assert len(c) == 1
567    c1 = c[0]
568
569    v1 = await c1.read_value()
570    assert v1 == v
571
572    a1 = PackedCharacteristicAdapter(c1, '>I')
573    v1 = await a1.read_value()
574    assert v1 == struct.unpack('>I', v)[0]
575
576    b = bytes([0x55, 0x66, 0x77, 0x88])
577    await a1.write_value(struct.unpack('>I', b)[0])
578    await async_barrier()
579    assert characteristic1.value == b
580    v1 = await a1.read_value()
581    assert v1 == struct.unpack('>I', b)[0]
582
583
584# -----------------------------------------------------------------------------
585@pytest.mark.asyncio
586async def test_subscribe_notify():
587    [client, server] = LinkedDevices().devices[:2]
588
589    characteristic1 = Characteristic(
590        'FDB159DB-036C-49E3-B3DB-6325AC750806',
591        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
592        Characteristic.READABLE,
593        bytes([1, 2, 3]),
594    )
595
596    def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled):
597        characteristic1._last_subscription = (
598            connection,
599            notify_enabled,
600            indicate_enabled,
601        )
602
603    characteristic1.on('subscription', on_characteristic1_subscription)
604
605    characteristic2 = Characteristic(
606        '66DE9057-C848-4ACA-B993-D675644EBB85',
607        Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
608        Characteristic.READABLE,
609        bytes([4, 5, 6]),
610    )
611
612    def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled):
613        characteristic2._last_subscription = (
614            connection,
615            notify_enabled,
616            indicate_enabled,
617        )
618
619    characteristic2.on('subscription', on_characteristic2_subscription)
620
621    characteristic3 = Characteristic(
622        'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
623        Characteristic.Properties.READ
624        | Characteristic.Properties.NOTIFY
625        | Characteristic.Properties.INDICATE,
626        Characteristic.READABLE,
627        bytes([7, 8, 9]),
628    )
629
630    def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled):
631        characteristic3._last_subscription = (
632            connection,
633            notify_enabled,
634            indicate_enabled,
635        )
636
637    characteristic3.on('subscription', on_characteristic3_subscription)
638
639    service1 = Service(
640        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
641        [characteristic1, characteristic2, characteristic3],
642    )
643    server.add_services([service1])
644
645    def on_characteristic_subscription(
646        connection, characteristic, notify_enabled, indicate_enabled
647    ):
648        server._last_subscription = (
649            connection,
650            characteristic,
651            notify_enabled,
652            indicate_enabled,
653        )
654
655    server.on('characteristic_subscription', on_characteristic_subscription)
656
657    await client.power_on()
658    await server.power_on()
659    connection = await client.connect(server.random_address)
660    peer = Peer(connection)
661
662    await peer.discover_services()
663    await peer.discover_characteristics()
664    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
665    assert len(c) == 1
666    c1 = c[0]
667    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
668    assert len(c) == 1
669    c2 = c[0]
670    c = peer.get_characteristics_by_uuid(characteristic3.uuid)
671    assert len(c) == 1
672    c3 = c[0]
673
674    c1._called = False
675    c1._last_update = None
676
677    def on_c1_update(value):
678        c1._called = True
679        c1._last_update = value
680
681    c1.on('update', on_c1_update)
682    await peer.subscribe(c1)
683    await async_barrier()
684    assert server._last_subscription[1] == characteristic1
685    assert server._last_subscription[2]
686    assert not server._last_subscription[3]
687    assert characteristic1._last_subscription[1]
688    assert not characteristic1._last_subscription[2]
689    await server.indicate_subscribers(characteristic1)
690    await async_barrier()
691    assert not c1._called
692    await server.notify_subscribers(characteristic1)
693    await async_barrier()
694    assert c1._called
695    assert c1._last_update == characteristic1.value
696
697    c1._called = False
698    c1._last_update = None
699    c1_value = characteristic1.value
700    await server.notify_subscribers(characteristic1, bytes([0, 1, 2]))
701    await async_barrier()
702    assert c1._called
703    assert c1._last_update == bytes([0, 1, 2])
704    assert characteristic1.value == c1_value
705
706    c1._called = False
707    await peer.unsubscribe(c1)
708    await server.notify_subscribers(characteristic1)
709    assert not c1._called
710
711    c2._called = False
712    c2._last_update = None
713
714    def on_c2_update(value):
715        c2._called = True
716        c2._last_update = value
717
718    await peer.subscribe(c2, on_c2_update)
719    await async_barrier()
720    await server.notify_subscriber(
721        characteristic2._last_subscription[0], characteristic2
722    )
723    await async_barrier()
724    assert not c2._called
725    await server.indicate_subscriber(
726        characteristic2._last_subscription[0], characteristic2
727    )
728    await async_barrier()
729    assert c2._called
730    assert c2._last_update == characteristic2.value
731
732    c2._called = False
733    await peer.unsubscribe(c2, on_c2_update)
734    await server.indicate_subscriber(
735        characteristic2._last_subscription[0], characteristic2
736    )
737    await async_barrier()
738    assert not c2._called
739
740    c3._called = False
741    c3._called_2 = False
742    c3._called_3 = False
743    c3._last_update = None
744    c3._last_update_2 = None
745    c3._last_update_3 = None
746
747    def on_c3_update(value):
748        c3._called = True
749        c3._last_update = value
750
751    def on_c3_update_2(value):  # for notify
752        c3._called_2 = True
753        c3._last_update_2 = value
754
755    def on_c3_update_3(value):  # for indicate
756        c3._called_3 = True
757        c3._last_update_3 = value
758
759    c3.on('update', on_c3_update)
760    await peer.subscribe(c3, on_c3_update_2)
761    await async_barrier()
762    await server.notify_subscriber(
763        characteristic3._last_subscription[0], characteristic3
764    )
765    await async_barrier()
766    assert c3._called
767    assert c3._last_update == characteristic3.value
768    assert c3._called_2
769    assert c3._last_update_2 == characteristic3.value
770    assert not c3._called_3
771
772    c3._called = False
773    c3._called_2 = False
774    c3._called_3 = False
775    await peer.unsubscribe(c3)
776    await peer.subscribe(c3, on_c3_update_3, prefer_notify=False)
777    await async_barrier()
778    characteristic3.value = bytes([1, 2, 3])
779    await server.indicate_subscriber(
780        characteristic3._last_subscription[0], characteristic3
781    )
782    await async_barrier()
783    assert c3._called
784    assert c3._last_update == characteristic3.value
785    assert not c3._called_2
786    assert c3._called_3
787    assert c3._last_update_3 == characteristic3.value
788
789    c3._called = False
790    c3._called_2 = False
791    c3._called_3 = False
792    await peer.unsubscribe(c3)
793    await server.notify_subscriber(
794        characteristic3._last_subscription[0], characteristic3
795    )
796    await server.indicate_subscriber(
797        characteristic3._last_subscription[0], characteristic3
798    )
799    await async_barrier()
800    assert not c3._called
801    assert not c3._called_2
802    assert not c3._called_3
803
804
805# -----------------------------------------------------------------------------
806@pytest.mark.asyncio
807async def test_unsubscribe():
808    [client, server] = LinkedDevices().devices[:2]
809
810    characteristic1 = Characteristic(
811        'FDB159DB-036C-49E3-B3DB-6325AC750806',
812        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
813        Characteristic.READABLE,
814        bytes([1, 2, 3]),
815    )
816    characteristic2 = Characteristic(
817        '3234C4F4-3F34-4616-8935-45A50EE05DEB',
818        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
819        Characteristic.READABLE,
820        bytes([1, 2, 3]),
821    )
822
823    service1 = Service(
824        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
825        [characteristic1, characteristic2],
826    )
827    server.add_services([service1])
828
829    mock1 = Mock()
830    characteristic1.on('subscription', mock1)
831    mock2 = Mock()
832    characteristic2.on('subscription', mock2)
833
834    await client.power_on()
835    await server.power_on()
836    connection = await client.connect(server.random_address)
837    peer = Peer(connection)
838
839    await peer.discover_services()
840    await peer.discover_characteristics()
841    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
842    assert len(c) == 1
843    c1 = c[0]
844    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
845    assert len(c) == 1
846    c2 = c[0]
847
848    await c1.subscribe()
849    await async_barrier()
850    mock1.assert_called_once_with(ANY, True, False)
851
852    await c2.subscribe()
853    await async_barrier()
854    mock2.assert_called_once_with(ANY, True, False)
855
856    mock1.reset_mock()
857    await c1.unsubscribe()
858    await async_barrier()
859    mock1.assert_called_once_with(ANY, False, False)
860
861    mock2.reset_mock()
862    await c2.unsubscribe()
863    await async_barrier()
864    mock2.assert_called_once_with(ANY, False, False)
865
866    mock1.reset_mock()
867    await c1.unsubscribe()
868    await async_barrier()
869    mock1.assert_not_called()
870
871    mock2.reset_mock()
872    await c2.unsubscribe()
873    await async_barrier()
874    mock2.assert_not_called()
875
876    mock1.reset_mock()
877    await c1.unsubscribe(force=True)
878    await async_barrier()
879    mock1.assert_called_once_with(ANY, False, False)
880
881
882# -----------------------------------------------------------------------------
883@pytest.mark.asyncio
884async def test_mtu_exchange():
885    [d1, d2, d3] = LinkedDevices().devices[:3]
886
887    d3.gatt_server.max_mtu = 100
888
889    d3_connections = []
890
891    @d3.on('connection')
892    def on_d3_connection(connection):
893        d3_connections.append(connection)
894
895    await d1.power_on()
896    await d2.power_on()
897    await d3.power_on()
898
899    d1_connection = await d1.connect(d3.random_address)
900    assert len(d3_connections) == 1
901    assert d3_connections[0] is not None
902
903    d2_connection = await d2.connect(d3.random_address)
904    assert len(d3_connections) == 2
905    assert d3_connections[1] is not None
906
907    d1_peer = Peer(d1_connection)
908    d2_peer = Peer(d2_connection)
909
910    d1_client_mtu = await d1_peer.request_mtu(220)
911    assert d1_client_mtu == 100
912    assert d1_connection.att_mtu == 100
913
914    d2_client_mtu = await d2_peer.request_mtu(50)
915    assert d2_client_mtu == 50
916    assert d2_connection.att_mtu == 50
917
918
919# -----------------------------------------------------------------------------
920def test_char_property_to_string():
921    # single
922    assert str(Characteristic.Properties(0x01)) == "BROADCAST"
923    assert str(Characteristic.Properties.BROADCAST) == "BROADCAST"
924
925    # double
926    assert str(Characteristic.Properties(0x03)) == "BROADCAST|READ"
927    assert (
928        str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ)
929        == "BROADCAST|READ"
930    )
931
932
933# -----------------------------------------------------------------------------
934def test_characteristic_property_from_string():
935    # single
936    assert (
937        Characteristic.Properties.from_string("BROADCAST")
938        == Characteristic.Properties.BROADCAST
939    )
940
941    # double
942    assert (
943        Characteristic.Properties.from_string("BROADCAST,READ")
944        == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
945    )
946    assert (
947        Characteristic.Properties.from_string("READ,BROADCAST")
948        == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
949    )
950    assert (
951        Characteristic.Properties.from_string("BROADCAST|READ")
952        == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
953    )
954
955
956# -----------------------------------------------------------------------------
957def test_characteristic_property_from_string_assert():
958    with pytest.raises(TypeError) as e_info:
959        Characteristic.Properties.from_string("BROADCAST,HELLO")
960
961    assert (
962        str(e_info.value)
963        == """Characteristic.Properties::from_string() error:
964Expected a string containing any of the keys, separated by , or |: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
965Got: BROADCAST,HELLO"""
966    )
967
968
969# -----------------------------------------------------------------------------
970@pytest.mark.asyncio
971async def test_server_string():
972    [_, server] = LinkedDevices().devices[:2]
973
974    characteristic = Characteristic(
975        'FDB159DB-036C-49E3-B3DB-6325AC750806',
976        Characteristic.Properties.READ
977        | Characteristic.Properties.WRITE
978        | Characteristic.Properties.NOTIFY,
979        Characteristic.READABLE | Characteristic.WRITEABLE,
980        bytes([123]),
981    )
982
983    service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic])
984    server.add_service(service)
985
986    assert (
987        str(server.gatt_server)
988        == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
989CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
990Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
991CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
992Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
993Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
994CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
995Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
996Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
997    )
998
999
1000# -----------------------------------------------------------------------------
1001async def async_main():
1002    test_UUID()
1003    test_ATT_Error_Response()
1004    test_ATT_Read_By_Group_Type_Request()
1005    await test_read_write()
1006    await test_read_write2()
1007    await test_subscribe_notify()
1008    await test_unsubscribe()
1009    await test_characteristic_encoding()
1010    await test_mtu_exchange()
1011    await test_CharacteristicValue()
1012    await test_CharacteristicValue_async()
1013    await test_CharacteristicAdapter()
1014
1015
1016# -----------------------------------------------------------------------------
1017def test_permissions_from_string():
1018    assert Attribute.Permissions.from_string('READABLE') == 1
1019    assert Attribute.Permissions.from_string('WRITEABLE') == 2
1020    assert Attribute.Permissions.from_string('READABLE,WRITEABLE') == 3
1021
1022
1023# -----------------------------------------------------------------------------
1024def test_characteristic_permissions():
1025    characteristic = Characteristic(
1026        'FDB159DB-036C-49E3-B3DB-6325AC750806',
1027        Characteristic.Properties.READ
1028        | Characteristic.Properties.WRITE
1029        | Characteristic.Properties.NOTIFY,
1030        'READABLE,WRITEABLE',
1031    )
1032    assert characteristic.permissions == 3
1033
1034
1035# -----------------------------------------------------------------------------
1036def test_characteristic_has_properties():
1037    characteristic = Characteristic(
1038        'FDB159DB-036C-49E3-B3DB-6325AC750806',
1039        Characteristic.Properties.READ
1040        | Characteristic.Properties.WRITE
1041        | Characteristic.Properties.NOTIFY,
1042        'READABLE,WRITEABLE',
1043    )
1044    assert characteristic.has_properties(Characteristic.Properties.READ)
1045    assert characteristic.has_properties(
1046        Characteristic.Properties.READ | Characteristic.Properties.WRITE
1047    )
1048    assert not characteristic.has_properties(
1049        Characteristic.Properties.READ
1050        | Characteristic.Properties.WRITE
1051        | Characteristic.Properties.INDICATE
1052    )
1053    assert not characteristic.has_properties(Characteristic.Properties.INDICATE)
1054
1055
1056# -----------------------------------------------------------------------------
1057def test_descriptor_permissions():
1058    descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
1059    assert descriptor.permissions == 3
1060
1061
1062# -----------------------------------------------------------------------------
1063def test_get_attribute_group():
1064    device = Device()
1065
1066    # add some services / characteristics to the gatt server
1067    characteristic1 = Characteristic(
1068        '1111',
1069        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
1070        Characteristic.READABLE | Characteristic.WRITEABLE,
1071        bytes([123]),
1072    )
1073    characteristic2 = Characteristic(
1074        '2222',
1075        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
1076        Characteristic.READABLE | Characteristic.WRITEABLE,
1077        bytes([123]),
1078    )
1079    services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])]
1080    device.gatt_server.add_services(services)
1081
1082    # get the handles from gatt server
1083    characteristic_attributes1 = device.gatt_server.get_characteristic_attributes(
1084        UUID('1212'), UUID('1111')
1085    )
1086    assert characteristic_attributes1 is not None
1087    characteristic_attributes2 = device.gatt_server.get_characteristic_attributes(
1088        UUID('3233'), UUID('2222')
1089    )
1090    assert characteristic_attributes2 is not None
1091    descriptor1 = device.gatt_server.get_descriptor_attribute(
1092        UUID('1212'), UUID('1111'), UUID('2902')
1093    )
1094    assert descriptor1 is not None
1095    descriptor2 = device.gatt_server.get_descriptor_attribute(
1096        UUID('3233'), UUID('2222'), UUID('2902')
1097    )
1098    assert descriptor2 is not None
1099
1100    # confirm the handles map back to the service
1101    assert (
1102        UUID('1212')
1103        == device.gatt_server.get_attribute_group(
1104            characteristic_attributes1[0].handle, Service
1105        ).uuid
1106    )
1107    assert (
1108        UUID('1212')
1109        == device.gatt_server.get_attribute_group(
1110            characteristic_attributes1[1].handle, Service
1111        ).uuid
1112    )
1113    assert (
1114        UUID('1212')
1115        == device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid
1116    )
1117    assert (
1118        UUID('3233')
1119        == device.gatt_server.get_attribute_group(
1120            characteristic_attributes2[0].handle, Service
1121        ).uuid
1122    )
1123    assert (
1124        UUID('3233')
1125        == device.gatt_server.get_attribute_group(
1126            characteristic_attributes2[1].handle, Service
1127        ).uuid
1128    )
1129    assert (
1130        UUID('3233')
1131        == device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid
1132    )
1133
1134    # confirm the handles map back to the characteristic
1135    assert (
1136        UUID('1111')
1137        == device.gatt_server.get_attribute_group(
1138            descriptor1.handle, Characteristic
1139        ).uuid
1140    )
1141    assert (
1142        UUID('2222')
1143        == device.gatt_server.get_attribute_group(
1144            descriptor2.handle, Characteristic
1145        ).uuid
1146    )
1147
1148
1149# -----------------------------------------------------------------------------
1150if __name__ == '__main__':
1151    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
1152    asyncio.run(async_main())
1153