• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import os
21import struct
22import pytest
23
24from bumble.controller import Controller
25from bumble.gatt_client import CharacteristicProxy
26from bumble.link import LocalLink
27from bumble.device import Device, Peer
28from bumble.host import Host
29from bumble.gatt import (
30    GATT_BATTERY_LEVEL_CHARACTERISTIC,
31    GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
32    CharacteristicAdapter,
33    DelegatedCharacteristicAdapter,
34    PackedCharacteristicAdapter,
35    MappedCharacteristicAdapter,
36    UTF8CharacteristicAdapter,
37    Service,
38    Characteristic,
39    CharacteristicValue,
40    Descriptor,
41)
42from bumble.transport import AsyncPipeSink
43from bumble.core import UUID
44from bumble.att import (
45    Attribute,
46    ATT_EXCHANGE_MTU_REQUEST,
47    ATT_ATTRIBUTE_NOT_FOUND_ERROR,
48    ATT_PDU,
49    ATT_Error_Response,
50    ATT_Read_By_Group_Type_Request,
51)
52
53
54# -----------------------------------------------------------------------------
55def basic_check(x):
56    pdu = x.to_bytes()
57    parsed = ATT_PDU.from_bytes(pdu)
58    x_str = str(x)
59    parsed_str = str(parsed)
60    assert x_str == parsed_str
61
62
63# -----------------------------------------------------------------------------
64def test_UUID():
65    u = UUID.from_16_bits(0x7788)
66    assert str(u) == 'UUID-16:7788'
67    u = UUID.from_32_bits(0x11223344)
68    assert str(u) == 'UUID-32:11223344'
69    u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
70    assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
71    v = UUID(str(u))
72    assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
73    w = UUID.from_bytes(v.to_bytes())
74    assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
75
76    u1 = UUID.from_16_bits(0x1234)
77    b1 = u1.to_bytes(force_128=True)
78    u2 = UUID.from_bytes(b1)
79    assert u1 == u2
80
81    u3 = UUID.from_16_bits(0x180A)
82    assert str(u3) == 'UUID-16:180A (Device Information)'
83
84
85# -----------------------------------------------------------------------------
86def test_ATT_Error_Response():
87    pdu = ATT_Error_Response(
88        request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST,
89        attribute_handle_in_error=0x0000,
90        error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR,
91    )
92    basic_check(pdu)
93
94
95# -----------------------------------------------------------------------------
96def test_ATT_Read_By_Group_Type_Request():
97    pdu = ATT_Read_By_Group_Type_Request(
98        starting_handle=0x0001,
99        ending_handle=0xFFFF,
100        attribute_group_type=UUID.from_16_bits(0x2800),
101    )
102    basic_check(pdu)
103
104
105# -----------------------------------------------------------------------------
106@pytest.mark.asyncio
107async def test_characteristic_encoding():
108    class Foo(Characteristic):
109        def encode_value(self, value):
110            return bytes([value])
111
112        def decode_value(self, value_bytes):
113            return value_bytes[0]
114
115    c = Foo(
116        GATT_BATTERY_LEVEL_CHARACTERISTIC,
117        Characteristic.READ,
118        Characteristic.READABLE,
119        123,
120    )
121    x = c.read_value(None)
122    assert x == bytes([123])
123    c.write_value(None, bytes([122]))
124    assert c.value == 122
125
126    class FooProxy(CharacteristicProxy):
127        def __init__(self, characteristic):
128            super().__init__(
129                characteristic.client,
130                characteristic.handle,
131                characteristic.end_group_handle,
132                characteristic.uuid,
133                characteristic.properties,
134            )
135
136        def encode_value(self, value):
137            return bytes([value])
138
139        def decode_value(self, value_bytes):
140            return value_bytes[0]
141
142    [client, server] = LinkedDevices().devices[:2]
143
144    characteristic = Characteristic(
145        'FDB159DB-036C-49E3-B3DB-6325AC750806',
146        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
147        Characteristic.READABLE | Characteristic.WRITEABLE,
148        bytes([123]),
149    )
150
151    service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic])
152    server.add_service(service)
153
154    await client.power_on()
155    await server.power_on()
156    connection = await client.connect(server.random_address)
157    peer = Peer(connection)
158
159    await peer.discover_services()
160    await peer.discover_characteristics()
161    c = peer.get_characteristics_by_uuid(characteristic.uuid)
162    assert len(c) == 1
163    c = c[0]
164    cp = FooProxy(c)
165
166    v = await cp.read_value()
167    assert v == 123
168    await cp.write_value(124)
169    await async_barrier()
170    assert characteristic.value == bytes([124])
171
172    v = await cp.read_value()
173    assert v == 124
174    await cp.write_value(125, with_response=True)
175    await async_barrier()
176    assert characteristic.value == bytes([125])
177
178    cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
179    await cd.write_value(100, with_response=True)
180    await async_barrier()
181    assert characteristic.value == bytes([50])
182
183    last_change = None
184
185    def on_change(value):
186        nonlocal last_change
187        last_change = value
188
189    await c.subscribe(on_change)
190    await server.notify_subscribers(characteristic)
191    await async_barrier()
192    assert last_change == characteristic.value
193    last_change = None
194
195    await server.notify_subscribers(characteristic, value=bytes([125]))
196    await async_barrier()
197    assert last_change == bytes([125])
198    last_change = None
199
200    await c.unsubscribe(on_change)
201    await server.notify_subscribers(characteristic)
202    await async_barrier()
203    assert last_change is None
204
205    await cp.subscribe(on_change)
206    await server.notify_subscribers(characteristic)
207    await async_barrier()
208    assert last_change == characteristic.value[0]
209    last_change = None
210
211    await server.notify_subscribers(characteristic, value=bytes([126]))
212    await async_barrier()
213    assert last_change == 126
214    last_change = None
215
216    await cp.unsubscribe(on_change)
217    await server.notify_subscribers(characteristic)
218    await async_barrier()
219    assert last_change is None
220
221    cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
222    await cd.subscribe(on_change)
223    await server.notify_subscribers(characteristic)
224    await async_barrier()
225    assert last_change == characteristic.value[0]
226    last_change = None
227
228    await cd.unsubscribe(on_change)
229    await server.notify_subscribers(characteristic)
230    await async_barrier()
231    assert last_change is None
232
233
234# -----------------------------------------------------------------------------
235@pytest.mark.asyncio
236async def test_attribute_getters():
237    [client, server] = LinkedDevices().devices[:2]
238
239    characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
240    characteristic = Characteristic(
241        characteristic_uuid,
242        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
243        Characteristic.READABLE | Characteristic.WRITEABLE,
244        bytes([123]),
245    )
246
247    service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829')
248    service = Service(service_uuid, [characteristic])
249    server.add_service(service)
250
251    service_attr = server.gatt_server.get_service_attribute(service_uuid)
252    assert service_attr
253
254    (
255        char_decl_attr,
256        char_value_attr,
257    ) = server.gatt_server.get_characteristic_attributes(
258        service_uuid, characteristic_uuid
259    )
260    assert char_decl_attr and char_value_attr
261
262    desc_attr = server.gatt_server.get_descriptor_attribute(
263        service_uuid,
264        characteristic_uuid,
265        GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
266    )
267    assert desc_attr
268
269    # assert all handles are in expected order
270    assert (
271        service_attr.handle
272        < char_decl_attr.handle
273        < char_value_attr.handle
274        < desc_attr.handle
275        == service_attr.end_group_handle
276    )
277    # assert characteristic declarations attribute is followed by characteristic value attribute
278    assert char_decl_attr.handle + 1 == char_value_attr.handle
279
280
281# -----------------------------------------------------------------------------
282def test_CharacteristicAdapter():
283    # Check that the CharacteristicAdapter base class is transparent
284    v = bytes([1, 2, 3])
285    c = Characteristic(
286        GATT_BATTERY_LEVEL_CHARACTERISTIC,
287        Characteristic.READ,
288        Characteristic.READABLE,
289        v,
290    )
291    a = CharacteristicAdapter(c)
292
293    value = a.read_value(None)
294    assert value == v
295
296    v = bytes([3, 4, 5])
297    a.write_value(None, v)
298    assert c.value == v
299
300    # Simple delegated adapter
301    a = DelegatedCharacteristicAdapter(
302        c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
303    )
304
305    value = a.read_value(None)
306    assert value == bytes(reversed(v))
307
308    v = bytes([3, 4, 5])
309    a.write_value(None, v)
310    assert a.value == bytes(reversed(v))
311
312    # Packed adapter with single element format
313    v = 1234
314    pv = struct.pack('>H', v)
315    c.value = v
316    a = PackedCharacteristicAdapter(c, '>H')
317
318    value = a.read_value(None)
319    assert value == pv
320    c.value = None
321    a.write_value(None, pv)
322    assert a.value == v
323
324    # Packed adapter with multi-element format
325    v1 = 1234
326    v2 = 5678
327    pv = struct.pack('>HH', v1, v2)
328    c.value = (v1, v2)
329    a = PackedCharacteristicAdapter(c, '>HH')
330
331    value = a.read_value(None)
332    assert value == pv
333    c.value = None
334    a.write_value(None, pv)
335    assert a.value == (v1, v2)
336
337    # Mapped adapter
338    v1 = 1234
339    v2 = 5678
340    pv = struct.pack('>HH', v1, v2)
341    mapped = {'v1': v1, 'v2': v2}
342    c.value = mapped
343    a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
344
345    value = a.read_value(None)
346    assert value == pv
347    c.value = None
348    a.write_value(None, pv)
349    assert a.value == mapped
350
351    # UTF-8 adapter
352    v = 'Hello π'
353    ev = v.encode('utf-8')
354    c.value = v
355    a = UTF8CharacteristicAdapter(c)
356
357    value = a.read_value(None)
358    assert value == ev
359    c.value = None
360    a.write_value(None, ev)
361    assert a.value == v
362
363
364# -----------------------------------------------------------------------------
365def test_CharacteristicValue():
366    b = bytes([1, 2, 3])
367    c = CharacteristicValue(read=lambda _: b)
368    x = c.read(None)
369    assert x == b
370
371    result = []
372    c = CharacteristicValue(
373        write=lambda connection, value: result.append((connection, value))
374    )
375    z = object()
376    c.write(z, b)
377    assert result == [(z, b)]
378
379
380# -----------------------------------------------------------------------------
381class LinkedDevices:
382    def __init__(self):
383        self.connections = [None, None, None]
384
385        self.link = LocalLink()
386        self.controllers = [
387            Controller('C1', link=self.link),
388            Controller('C2', link=self.link),
389            Controller('C3', link=self.link),
390        ]
391        self.devices = [
392            Device(
393                address='F0:F1:F2:F3:F4:F5',
394                host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
395            ),
396            Device(
397                address='F1:F2:F3:F4:F5:F6',
398                host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
399            ),
400            Device(
401                address='F2:F3:F4:F5:F6:F7',
402                host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])),
403            ),
404        ]
405
406        self.paired = [None, None, None]
407
408
409# -----------------------------------------------------------------------------
410async def async_barrier():
411    ready = asyncio.get_running_loop().create_future()
412    asyncio.get_running_loop().call_soon(ready.set_result, None)
413    await ready
414
415
416# -----------------------------------------------------------------------------
417@pytest.mark.asyncio
418async def test_read_write():
419    [client, server] = LinkedDevices().devices[:2]
420
421    characteristic1 = Characteristic(
422        'FDB159DB-036C-49E3-B3DB-6325AC750806',
423        Characteristic.READ | Characteristic.WRITE,
424        Characteristic.READABLE | Characteristic.WRITEABLE,
425    )
426
427    def on_characteristic1_write(connection, value):
428        characteristic1._last_value = (connection, value)
429
430    characteristic1.on('write', on_characteristic1_write)
431
432    def on_characteristic2_read(connection):
433        return bytes(str(connection.peer_address))
434
435    def on_characteristic2_write(connection, value):
436        characteristic2._last_value = (connection, value)
437
438    characteristic2 = Characteristic(
439        '66DE9057-C848-4ACA-B993-D675644EBB85',
440        Characteristic.READ | Characteristic.WRITE,
441        Characteristic.READABLE | Characteristic.WRITEABLE,
442        CharacteristicValue(
443            read=on_characteristic2_read, write=on_characteristic2_write
444        ),
445    )
446
447    service1 = Service(
448        '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2]
449    )
450    server.add_services([service1])
451
452    await client.power_on()
453    await server.power_on()
454    connection = await client.connect(server.random_address)
455    peer = Peer(connection)
456
457    await peer.discover_services()
458    await peer.discover_characteristics()
459    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
460    assert len(c) == 1
461    c1 = c[0]
462    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
463    assert len(c) == 1
464    c2 = c[0]
465
466    v1 = await peer.read_value(c1)
467    assert v1 == b''
468    b = bytes([1, 2, 3])
469    await peer.write_value(c1, b)
470    await async_barrier()
471    assert characteristic1.value == b
472    v1 = await peer.read_value(c1)
473    assert v1 == b
474    assert type(characteristic1._last_value is tuple)
475    assert len(characteristic1._last_value) == 2
476    assert str(characteristic1._last_value[0].peer_address) == str(
477        client.random_address
478    )
479    assert characteristic1._last_value[1] == b
480    bb = bytes([3, 4, 5, 6])
481    characteristic1.value = bb
482    v1 = await peer.read_value(c1)
483    assert v1 == bb
484
485    await peer.write_value(c2, b)
486    await async_barrier()
487    assert type(characteristic2._last_value is tuple)
488    assert len(characteristic2._last_value) == 2
489    assert str(characteristic2._last_value[0].peer_address) == str(
490        client.random_address
491    )
492    assert characteristic2._last_value[1] == b
493
494
495# -----------------------------------------------------------------------------
496@pytest.mark.asyncio
497async def test_read_write2():
498    [client, server] = LinkedDevices().devices[:2]
499
500    v = bytes([0x11, 0x22, 0x33, 0x44])
501    characteristic1 = Characteristic(
502        'FDB159DB-036C-49E3-B3DB-6325AC750806',
503        Characteristic.READ | Characteristic.WRITE,
504        Characteristic.READABLE | Characteristic.WRITEABLE,
505        value=v,
506    )
507
508    service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1])
509    server.add_services([service1])
510
511    await client.power_on()
512    await server.power_on()
513    connection = await client.connect(server.random_address)
514    peer = Peer(connection)
515
516    await peer.discover_services()
517    c = peer.get_services_by_uuid(service1.uuid)
518    assert len(c) == 1
519    s = c[0]
520    await s.discover_characteristics()
521    c = s.get_characteristics_by_uuid(characteristic1.uuid)
522    assert len(c) == 1
523    c1 = c[0]
524
525    v1 = await c1.read_value()
526    assert v1 == v
527
528    a1 = PackedCharacteristicAdapter(c1, '>I')
529    v1 = await a1.read_value()
530    assert v1 == struct.unpack('>I', v)[0]
531
532    b = bytes([0x55, 0x66, 0x77, 0x88])
533    await a1.write_value(struct.unpack('>I', b)[0])
534    await async_barrier()
535    assert characteristic1.value == b
536    v1 = await a1.read_value()
537    assert v1 == struct.unpack('>I', b)[0]
538
539
540# -----------------------------------------------------------------------------
541@pytest.mark.asyncio
542async def test_subscribe_notify():
543    [client, server] = LinkedDevices().devices[:2]
544
545    characteristic1 = Characteristic(
546        'FDB159DB-036C-49E3-B3DB-6325AC750806',
547        Characteristic.READ | Characteristic.NOTIFY,
548        Characteristic.READABLE,
549        bytes([1, 2, 3]),
550    )
551
552    def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled):
553        characteristic1._last_subscription = (
554            connection,
555            notify_enabled,
556            indicate_enabled,
557        )
558
559    characteristic1.on('subscription', on_characteristic1_subscription)
560
561    characteristic2 = Characteristic(
562        '66DE9057-C848-4ACA-B993-D675644EBB85',
563        Characteristic.READ | Characteristic.INDICATE,
564        Characteristic.READABLE,
565        bytes([4, 5, 6]),
566    )
567
568    def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled):
569        characteristic2._last_subscription = (
570            connection,
571            notify_enabled,
572            indicate_enabled,
573        )
574
575    characteristic2.on('subscription', on_characteristic2_subscription)
576
577    characteristic3 = Characteristic(
578        'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
579        Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
580        Characteristic.READABLE,
581        bytes([7, 8, 9]),
582    )
583
584    def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled):
585        characteristic3._last_subscription = (
586            connection,
587            notify_enabled,
588            indicate_enabled,
589        )
590
591    characteristic3.on('subscription', on_characteristic3_subscription)
592
593    service1 = Service(
594        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
595        [characteristic1, characteristic2, characteristic3],
596    )
597    server.add_services([service1])
598
599    def on_characteristic_subscription(
600        connection, characteristic, notify_enabled, indicate_enabled
601    ):
602        server._last_subscription = (
603            connection,
604            characteristic,
605            notify_enabled,
606            indicate_enabled,
607        )
608
609    server.on('characteristic_subscription', on_characteristic_subscription)
610
611    await client.power_on()
612    await server.power_on()
613    connection = await client.connect(server.random_address)
614    peer = Peer(connection)
615
616    await peer.discover_services()
617    await peer.discover_characteristics()
618    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
619    assert len(c) == 1
620    c1 = c[0]
621    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
622    assert len(c) == 1
623    c2 = c[0]
624    c = peer.get_characteristics_by_uuid(characteristic3.uuid)
625    assert len(c) == 1
626    c3 = c[0]
627
628    c1._called = False
629    c1._last_update = None
630
631    def on_c1_update(value):
632        c1._called = True
633        c1._last_update = value
634
635    c1.on('update', on_c1_update)
636    await peer.subscribe(c1)
637    await async_barrier()
638    assert server._last_subscription[1] == characteristic1
639    assert server._last_subscription[2]
640    assert not server._last_subscription[3]
641    assert characteristic1._last_subscription[1]
642    assert not characteristic1._last_subscription[2]
643    await server.indicate_subscribers(characteristic1)
644    await async_barrier()
645    assert not c1._called
646    await server.notify_subscribers(characteristic1)
647    await async_barrier()
648    assert c1._called
649    assert c1._last_update == characteristic1.value
650
651    c1._called = False
652    c1._last_update = None
653    c1_value = characteristic1.value
654    await server.notify_subscribers(characteristic1, bytes([0, 1, 2]))
655    await async_barrier()
656    assert c1._called
657    assert c1._last_update == bytes([0, 1, 2])
658    assert characteristic1.value == c1_value
659
660    c1._called = False
661    await peer.unsubscribe(c1)
662    await server.notify_subscribers(characteristic1)
663    assert not c1._called
664
665    c2._called = False
666    c2._last_update = None
667
668    def on_c2_update(value):
669        c2._called = True
670        c2._last_update = value
671
672    await peer.subscribe(c2, on_c2_update)
673    await async_barrier()
674    await server.notify_subscriber(
675        characteristic2._last_subscription[0], characteristic2
676    )
677    await async_barrier()
678    assert not c2._called
679    await server.indicate_subscriber(
680        characteristic2._last_subscription[0], characteristic2
681    )
682    await async_barrier()
683    assert c2._called
684    assert c2._last_update == characteristic2.value
685
686    c2._called = False
687    await peer.unsubscribe(c2, on_c2_update)
688    await server.indicate_subscriber(
689        characteristic2._last_subscription[0], characteristic2
690    )
691    await async_barrier()
692    assert not c2._called
693
694    c3._called = False
695    c3._called_2 = False
696    c3._called_3 = False
697    c3._last_update = None
698    c3._last_update_2 = None
699    c3._last_update_3 = None
700
701    def on_c3_update(value):
702        c3._called = True
703        c3._last_update = value
704
705    def on_c3_update_2(value):  # for notify
706        c3._called_2 = True
707        c3._last_update_2 = value
708
709    def on_c3_update_3(value):  # for indicate
710        c3._called_3 = True
711        c3._last_update_3 = value
712
713    c3.on('update', on_c3_update)
714    await peer.subscribe(c3, on_c3_update_2)
715    await async_barrier()
716    await server.notify_subscriber(
717        characteristic3._last_subscription[0], characteristic3
718    )
719    await async_barrier()
720    assert c3._called
721    assert c3._last_update == characteristic3.value
722    assert c3._called_2
723    assert c3._last_update_2 == characteristic3.value
724    assert not c3._called_3
725
726    c3._called = False
727    c3._called_2 = False
728    c3._called_3 = False
729    await peer.unsubscribe(c3)
730    await peer.subscribe(c3, on_c3_update_3, prefer_notify=False)
731    await async_barrier()
732    characteristic3.value = bytes([1, 2, 3])
733    await server.indicate_subscriber(
734        characteristic3._last_subscription[0], characteristic3
735    )
736    await async_barrier()
737    assert c3._called
738    assert c3._last_update == characteristic3.value
739    assert not c3._called_2
740    assert c3._called_3
741    assert c3._last_update_3 == characteristic3.value
742
743    c3._called = False
744    c3._called_2 = False
745    c3._called_3 = False
746    await peer.unsubscribe(c3)
747    await server.notify_subscriber(
748        characteristic3._last_subscription[0], characteristic3
749    )
750    await server.indicate_subscriber(
751        characteristic3._last_subscription[0], characteristic3
752    )
753    await async_barrier()
754    assert not c3._called
755    assert not c3._called_2
756    assert not c3._called_3
757
758
759# -----------------------------------------------------------------------------
760@pytest.mark.asyncio
761async def test_mtu_exchange():
762    [d1, d2, d3] = LinkedDevices().devices[:3]
763
764    d3.gatt_server.max_mtu = 100
765
766    d3_connections = []
767
768    @d3.on('connection')
769    def on_d3_connection(connection):
770        d3_connections.append(connection)
771
772    await d1.power_on()
773    await d2.power_on()
774    await d3.power_on()
775
776    d1_connection = await d1.connect(d3.random_address)
777    assert len(d3_connections) == 1
778    assert d3_connections[0] is not None
779
780    d2_connection = await d2.connect(d3.random_address)
781    assert len(d3_connections) == 2
782    assert d3_connections[1] is not None
783
784    d1_peer = Peer(d1_connection)
785    d2_peer = Peer(d2_connection)
786
787    d1_client_mtu = await d1_peer.request_mtu(220)
788    assert d1_client_mtu == 100
789    assert d1_connection.att_mtu == 100
790
791    d2_client_mtu = await d2_peer.request_mtu(50)
792    assert d2_client_mtu == 50
793    assert d2_connection.att_mtu == 50
794
795
796# -----------------------------------------------------------------------------
797def test_char_property_to_string():
798    # single
799    assert Characteristic.property_name(0x01) == "BROADCAST"
800    assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
801
802    # double
803    assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
804    assert (
805        Characteristic.properties_as_string(
806            Characteristic.BROADCAST | Characteristic.READ
807        )
808        == "BROADCAST,READ"
809    )
810
811
812# -----------------------------------------------------------------------------
813def test_char_property_string_to_type():
814    # single
815    assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
816
817    # double
818    assert (
819        Characteristic.string_to_properties("BROADCAST,READ")
820        == Characteristic.BROADCAST | Characteristic.READ
821    )
822    assert (
823        Characteristic.string_to_properties("READ,BROADCAST")
824        == Characteristic.BROADCAST | Characteristic.READ
825    )
826
827
828# -----------------------------------------------------------------------------
829@pytest.mark.asyncio
830async def test_server_string():
831    [_, server] = LinkedDevices().devices[:2]
832
833    characteristic = Characteristic(
834        'FDB159DB-036C-49E3-B3DB-6325AC750806',
835        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
836        Characteristic.READABLE | Characteristic.WRITEABLE,
837        bytes([123]),
838    )
839
840    service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic])
841    server.add_service(service)
842
843    assert (
844        str(server.gatt_server)
845        == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
846CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
847Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
848CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
849Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
850Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
851CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
852Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
853Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
854    )
855
856
857# -----------------------------------------------------------------------------
858async def async_main():
859    await test_read_write()
860    await test_read_write2()
861    await test_subscribe_notify()
862    await test_characteristic_encoding()
863    await test_mtu_exchange()
864
865
866# -----------------------------------------------------------------------------
867def test_attribute_string_to_permissions():
868    assert Attribute.string_to_permissions('READABLE') == 1
869    assert Attribute.string_to_permissions('WRITEABLE') == 2
870    assert Attribute.string_to_permissions('READABLE,WRITEABLE') == 3
871
872
873# -----------------------------------------------------------------------------
874def test_charracteristic_permissions():
875    characteristic = Characteristic(
876        'FDB159DB-036C-49E3-B3DB-6325AC750806',
877        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
878        'READABLE,WRITEABLE',
879    )
880    assert characteristic.permissions == 3
881
882
883# -----------------------------------------------------------------------------
884def test_descriptor_permissions():
885    descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
886    assert descriptor.permissions == 3
887
888
889# -----------------------------------------------------------------------------
890if __name__ == '__main__':
891    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
892    test_UUID()
893    test_ATT_Error_Response()
894    test_ATT_Read_By_Group_Type_Request()
895    test_CharacteristicValue()
896    test_CharacteristicAdapter()
897    asyncio.run(async_main())
898