• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import bumble.device
17import grpc
18import grpc.aio
19import logging
20import struct
21
22from . import utils
23from bumble.core import (
24    BT_BR_EDR_TRANSPORT,
25    BT_LE_TRANSPORT,
26    BT_PERIPHERAL_ROLE,
27    UUID,
28    AdvertisingData,
29    ConnectionError,
30)
31from bumble.device import (
32    DEVICE_DEFAULT_SCAN_INTERVAL,
33    DEVICE_DEFAULT_SCAN_WINDOW,
34    Advertisement,
35    AdvertisingType,
36    Device,
37)
38from bumble.gatt import Service
39from bumble.hci import (
40    HCI_CONNECTION_ALREADY_EXISTS_ERROR,
41    HCI_PAGE_TIMEOUT_ERROR,
42    HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
43    Address,
44)
45from google.protobuf import any_pb2, empty_pb2  # pytype: disable=pyi-error
46from pandora.host_grpc_aio import HostServicer
47from pandora.host_pb2 import (
48    NOT_CONNECTABLE,
49    NOT_DISCOVERABLE,
50    PRIMARY_1M,
51    PRIMARY_CODED,
52    SECONDARY_1M,
53    SECONDARY_2M,
54    SECONDARY_CODED,
55    SECONDARY_NONE,
56    AdvertiseRequest,
57    AdvertiseResponse,
58    Connection,
59    ConnectLERequest,
60    ConnectLEResponse,
61    ConnectRequest,
62    ConnectResponse,
63    DataTypes,
64    DisconnectRequest,
65    InquiryResponse,
66    PrimaryPhy,
67    ReadLocalAddressResponse,
68    ScanningResponse,
69    ScanRequest,
70    SecondaryPhy,
71    SetConnectabilityModeRequest,
72    SetDiscoverabilityModeRequest,
73    WaitConnectionRequest,
74    WaitConnectionResponse,
75    WaitDisconnectionRequest,
76)
77from typing import AsyncGenerator, Dict, List, Optional, Set, Tuple, cast
78
79PRIMARY_PHY_MAP: Dict[int, PrimaryPhy] = {
80    # Default value reported by Bumble for legacy Advertising reports.
81    # FIXME(uael): `None` might be a better value, but Bumble need to change accordingly.
82    0: PRIMARY_1M,
83    1: PRIMARY_1M,
84    3: PRIMARY_CODED,
85}
86
87SECONDARY_PHY_MAP: Dict[int, SecondaryPhy] = {
88    0: SECONDARY_NONE,
89    1: SECONDARY_1M,
90    2: SECONDARY_2M,
91    3: SECONDARY_CODED,
92}
93
94
95class HostService(HostServicer):
96    grpc_server: grpc.aio.Server
97    device: Device
98    waited_connections: Set[int]
99
100    def __init__(self, grpc_server: grpc.aio.Server, device: Device) -> None:
101        super().__init__()
102        self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {'service_name': 'Host', 'device': device})
103        self.grpc_server = grpc_server
104        self.device = device
105        self.waited_connections = set()
106
107    @utils.rpc
108    async def FactoryReset(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty:
109        self.log.info('FactoryReset')
110
111        # delete all bonds
112        if self.device.keystore is not None:
113            await self.device.keystore.delete_all()
114
115        # trigger gRCP server stop then return
116        asyncio.create_task(self.grpc_server.stop(None))
117        return empty_pb2.Empty()
118
119    @utils.rpc
120    async def Reset(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty:
121        self.log.info('Reset')
122
123        # clear service.
124        self.waited_connections.clear()
125
126        # (re) power device on
127        await self.device.power_on()
128        return empty_pb2.Empty()
129
130    @utils.rpc
131    async def ReadLocalAddress(
132        self, request: empty_pb2.Empty, context: grpc.ServicerContext
133    ) -> ReadLocalAddressResponse:
134        self.log.info('ReadLocalAddress')
135        return ReadLocalAddressResponse(address=bytes(reversed(bytes(self.device.public_address))))
136
137    @utils.rpc
138    async def Connect(self, request: ConnectRequest, context: grpc.ServicerContext) -> ConnectResponse:
139        # Need to reverse bytes order since Bumble Address is using MSB.
140        address = Address(bytes(reversed(request.address)), address_type=Address.PUBLIC_DEVICE_ADDRESS)
141        self.log.info(f"Connect to {address}")
142
143        try:
144            connection = await self.device.connect(address, transport=BT_BR_EDR_TRANSPORT)
145        except ConnectionError as e:
146            if e.error_code == HCI_PAGE_TIMEOUT_ERROR:
147                self.log.warning(f"Peer not found: {e}")
148                return ConnectResponse(peer_not_found=empty_pb2.Empty())
149            if e.error_code == HCI_CONNECTION_ALREADY_EXISTS_ERROR:
150                self.log.warning(f"Connection already exists: {e}")
151                return ConnectResponse(connection_already_exists=empty_pb2.Empty())
152            raise e
153
154        self.log.info(f"Connect to {address} done (handle={connection.handle})")
155
156        cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
157        return ConnectResponse(connection=Connection(cookie=cookie))
158
159    @utils.rpc
160    async def WaitConnection(
161        self, request: WaitConnectionRequest, context: grpc.ServicerContext
162    ) -> WaitConnectionResponse:
163        if not request.address:
164            raise ValueError('Request address field must be set')
165
166        # Need to reverse bytes order since Bumble Address is using MSB.
167        address = Address(bytes(reversed(request.address)), address_type=Address.PUBLIC_DEVICE_ADDRESS)
168        if address in (Address.NIL, Address.ANY):
169            raise ValueError('Invalid address')
170
171        self.log.info(f"WaitConnection from {address}...")
172
173        connection = self.device.find_connection_by_bd_addr(address, transport=BT_BR_EDR_TRANSPORT)
174        if connection and id(connection) in self.waited_connections:
175            # this connection was already returned: wait for a new one.
176            connection = None
177
178        if not connection:
179            connection = await self.device.accept(address)
180
181        # save connection has waited and respond.
182        self.waited_connections.add(id(connection))
183
184        self.log.info(f"WaitConnection from {address} done (handle={connection.handle})")
185
186        cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
187        return WaitConnectionResponse(connection=Connection(cookie=cookie))
188
189    @utils.rpc
190    async def ConnectLE(self, request: ConnectLERequest, context: grpc.ServicerContext) -> ConnectLEResponse:
191        address = utils.address_from_request(request, request.WhichOneof("address"))
192        if address in (Address.NIL, Address.ANY):
193            raise ValueError('Invalid address')
194
195        self.log.info(f"ConnectLE to {address}...")
196
197        try:
198            connection = await self.device.connect(
199                address, transport=BT_LE_TRANSPORT, own_address_type=request.own_address_type
200            )
201        except ConnectionError as e:
202            if e.error_code == HCI_PAGE_TIMEOUT_ERROR:
203                self.log.warning(f"Peer not found: {e}")
204                return ConnectLEResponse(peer_not_found=empty_pb2.Empty())
205            if e.error_code == HCI_CONNECTION_ALREADY_EXISTS_ERROR:
206                self.log.warning(f"Connection already exists: {e}")
207                return ConnectLEResponse(connection_already_exists=empty_pb2.Empty())
208            raise e
209
210        self.log.info(f"ConnectLE to {address} done (handle={connection.handle})")
211
212        cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
213        return ConnectLEResponse(connection=Connection(cookie=cookie))
214
215    @utils.rpc
216    async def Disconnect(self, request: DisconnectRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
217        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
218        self.log.info(f"Disconnect: {connection_handle}")
219
220        self.log.info("Disconnecting...")
221        if connection := self.device.lookup_connection(connection_handle):
222            await connection.disconnect(HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR)
223        self.log.info("Disconnected")
224
225        return empty_pb2.Empty()
226
227    @utils.rpc
228    async def WaitDisconnection(
229        self, request: WaitDisconnectionRequest, context: grpc.ServicerContext
230    ) -> empty_pb2.Empty:
231        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
232        self.log.info(f"WaitDisconnection: {connection_handle}")
233
234        if connection := self.device.lookup_connection(connection_handle):
235            disconnection_future: asyncio.Future[None] = asyncio.get_running_loop().create_future()
236
237            def on_disconnection(_: None) -> None:
238                disconnection_future.set_result(None)
239
240            connection.on('disconnection', on_disconnection)
241            try:
242                await disconnection_future
243                self.log.info("Disconnected")
244            finally:
245                connection.remove_listener('disconnection', on_disconnection)  # type: ignore
246
247        return empty_pb2.Empty()
248
249    @utils.rpc
250    async def Advertise(
251        self, request: AdvertiseRequest, context: grpc.ServicerContext
252    ) -> AsyncGenerator[AdvertiseResponse, None]:
253        if not request.legacy:
254            raise NotImplementedError("TODO: add support for extended advertising in Bumble")
255        if request.interval:
256            raise NotImplementedError("TODO: add support for `request.interval`")
257        if request.interval_range:
258            raise NotImplementedError("TODO: add support for `request.interval_range`")
259        if request.primary_phy:
260            raise NotImplementedError("TODO: add support for `request.primary_phy`")
261        if request.secondary_phy:
262            raise NotImplementedError("TODO: add support for `request.secondary_phy`")
263
264        if self.device.is_advertising:
265            raise NotImplementedError('TODO: add support for advertising sets')
266
267        if data := request.data:
268            self.device.advertising_data = bytes(self.unpack_data_types(data))
269
270            if scan_response_data := request.scan_response_data:
271                self.device.scan_response_data = bytes(self.unpack_data_types(scan_response_data))
272                scannable = True
273            else:
274                scannable = False
275
276            # Retrieve services data
277            for service in self.device.gatt_server.attributes:
278                if isinstance(service, Service) and (service_data := service.get_advertising_data()):
279                    service_uuid = service.uuid.to_hex_str()
280                    if (
281                        service_uuid in request.data.incomplete_service_class_uuids16
282                        or service_uuid in request.data.complete_service_class_uuids16
283                        or service_uuid in request.data.incomplete_service_class_uuids32
284                        or service_uuid in request.data.complete_service_class_uuids32
285                        or service_uuid in request.data.incomplete_service_class_uuids128
286                        or service_uuid in request.data.complete_service_class_uuids128
287                    ):
288                        self.device.advertising_data += service_data
289                    if (
290                        service_uuid in scan_response_data.incomplete_service_class_uuids16
291                        or service_uuid in scan_response_data.complete_service_class_uuids16
292                        or service_uuid in scan_response_data.incomplete_service_class_uuids32
293                        or service_uuid in scan_response_data.complete_service_class_uuids32
294                        or service_uuid in scan_response_data.incomplete_service_class_uuids128
295                        or service_uuid in scan_response_data.complete_service_class_uuids128
296                    ):
297                        self.device.scan_response_data += service_data
298
299            target = None
300            if request.connectable and scannable:
301                advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
302            elif scannable:
303                advertising_type = AdvertisingType.UNDIRECTED_SCANNABLE
304            else:
305                advertising_type = AdvertisingType.UNDIRECTED
306        else:
307            target = None
308            advertising_type = AdvertisingType.UNDIRECTED
309
310        if request.target:
311            # Need to reverse bytes order since Bumble Address is using MSB.
312            target_bytes = bytes(reversed(request.target))
313            if request.target_variant() == "public":
314                target = Address(target_bytes, Address.PUBLIC_DEVICE_ADDRESS)
315                advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY  # FIXME: HIGH_DUTY ?
316            else:
317                target = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)
318                advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY  # FIXME: HIGH_DUTY ?
319
320        if request.connectable:
321
322            def on_connection(connection: bumble.device.Connection) -> None:
323                if connection.transport == BT_LE_TRANSPORT and connection.role == BT_PERIPHERAL_ROLE:
324                    pending_connection.set_result(connection)
325
326            self.device.on('connection', on_connection)
327
328        try:
329            while True:
330                if not self.device.is_advertising:
331                    self.log.info('Advertise')
332                    await self.device.start_advertising(
333                        target=target, advertising_type=advertising_type, own_address_type=request.own_address_type
334                    )
335
336                if not request.connectable:
337                    await asyncio.sleep(1)
338                    continue
339
340                pending_connection: asyncio.Future[
341                    bumble.device.Connection
342                ] = asyncio.get_running_loop().create_future()
343
344                self.log.info('Wait for LE connection...')
345                connection = await pending_connection
346
347                self.log.info(f"Advertise: Connected to {connection.peer_address} (handle={connection.handle})")
348
349                cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
350                yield AdvertiseResponse(connection=Connection(cookie=cookie))
351
352                # wait a small delay before restarting the advertisement.
353                await asyncio.sleep(1)
354        finally:
355            if request.connectable:
356                self.device.remove_listener('connection', on_connection)  # type: ignore
357
358            try:
359                self.log.info('Stop advertising')
360                await self.device.abort_on('flush', self.device.stop_advertising())
361            except:
362                pass
363
364    @utils.rpc
365    async def Scan(
366        self, request: ScanRequest, context: grpc.ServicerContext
367    ) -> AsyncGenerator[ScanningResponse, None]:
368        # TODO: modify `start_scanning` to accept floats instead of int for ms values
369        if request.phys:
370            raise NotImplementedError("TODO: add support for `request.phys`")
371
372        self.log.info('Scan')
373
374        scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue()
375        handler = self.device.on('advertisement', scan_queue.put_nowait)
376        await self.device.start_scanning(
377            legacy=request.legacy,
378            active=not request.passive,
379            own_address_type=request.own_address_type,
380            scan_interval=int(request.interval) if request.interval else DEVICE_DEFAULT_SCAN_INTERVAL,
381            scan_window=int(request.window) if request.window else DEVICE_DEFAULT_SCAN_WINDOW,
382        )
383
384        try:
385            # TODO: add support for `direct_address` in Bumble
386            # TODO: add support for `periodic_advertising_interval` in Bumble
387            while adv := await scan_queue.get():
388                sr = ScanningResponse(
389                    legacy=adv.is_legacy,
390                    connectable=adv.is_connectable,
391                    scannable=adv.is_scannable,
392                    truncated=adv.is_truncated,
393                    sid=adv.sid,
394                    primary_phy=PRIMARY_PHY_MAP[adv.primary_phy],
395                    secondary_phy=SECONDARY_PHY_MAP[adv.secondary_phy],
396                    tx_power=adv.tx_power,
397                    rssi=adv.rssi,
398                    data=self.pack_data_types(adv.data),
399                )
400
401                if adv.address.address_type == Address.PUBLIC_DEVICE_ADDRESS:
402                    sr.public = bytes(reversed(bytes(adv.address)))
403                elif adv.address.address_type == Address.RANDOM_DEVICE_ADDRESS:
404                    sr.random = bytes(reversed(bytes(adv.address)))
405                elif adv.address.address_type == Address.PUBLIC_IDENTITY_ADDRESS:
406                    sr.public_identity = bytes(reversed(bytes(adv.address)))
407                else:
408                    sr.random_static_identity = bytes(reversed(bytes(adv.address)))
409
410                yield sr
411
412        finally:
413            self.device.remove_listener('advertisement', handler)  # type: ignore
414            try:
415                self.log.info('Stop scanning')
416                await self.device.abort_on('flush', self.device.stop_scanning())
417            except:
418                pass
419
420    @utils.rpc
421    async def Inquiry(
422        self, request: empty_pb2.Empty, context: grpc.ServicerContext
423    ) -> AsyncGenerator[InquiryResponse, None]:
424        self.log.info('Inquiry')
425
426        inquiry_queue: asyncio.Queue[Optional[Tuple[Address, int, AdvertisingData, int]]] = asyncio.Queue()
427        complete_handler = self.device.on('inquiry_complete', lambda: inquiry_queue.put_nowait(None))
428        result_handler = self.device.on(  # type: ignore
429            'inquiry_result',
430            lambda address, class_of_device, eir_data, rssi: inquiry_queue.put_nowait(  # type: ignore
431                (address, class_of_device, eir_data, rssi)  # type: ignore
432            ),
433        )
434
435        await self.device.start_discovery(auto_restart=False)
436        try:
437            while inquiry_result := await inquiry_queue.get():
438                (address, class_of_device, eir_data, rssi) = inquiry_result
439                # FIXME: if needed, add support for `page_scan_repetition_mode` and `clock_offset` in Bumble
440                yield InquiryResponse(
441                    address=bytes(reversed(bytes(address))),
442                    class_of_device=class_of_device,
443                    rssi=rssi,
444                    data=self.pack_data_types(eir_data),
445                )
446
447        finally:
448            self.device.remove_listener('inquiry_complete', complete_handler)  # type: ignore
449            self.device.remove_listener('inquiry_result', result_handler)  # type: ignore
450            try:
451                self.log.info('Stop inquiry')
452                await self.device.abort_on('flush', self.device.stop_discovery())
453            except:
454                pass
455
456    @utils.rpc
457    async def SetDiscoverabilityMode(
458        self, request: SetDiscoverabilityModeRequest, context: grpc.ServicerContext
459    ) -> empty_pb2.Empty:
460        self.log.info("SetDiscoverabilityMode")
461        await self.device.set_discoverable(request.mode != NOT_DISCOVERABLE)
462        return empty_pb2.Empty()
463
464    @utils.rpc
465    async def SetConnectabilityMode(
466        self, request: SetConnectabilityModeRequest, context: grpc.ServicerContext
467    ) -> empty_pb2.Empty:
468        self.log.info("SetConnectabilityMode")
469        await self.device.set_connectable(request.mode != NOT_CONNECTABLE)
470        return empty_pb2.Empty()
471
472    def unpack_data_types(self, dt: DataTypes) -> AdvertisingData:
473        ad_structures: List[Tuple[int, bytes]] = []
474
475        uuids: List[str]
476        datas: Dict[str, bytes]
477
478        def uuid128_from_str(uuid: str) -> bytes:
479            """Decode a 128-bit uuid encoded as XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
480            to byte format."""
481            return bytes(reversed(bytes.fromhex(uuid.replace('-', ''))))
482
483        def uuid32_from_str(uuid: str) -> bytes:
484            """Decode a 32-bit uuid encoded as XXXXXXXX to byte format."""
485            return bytes(reversed(bytes.fromhex(uuid)))
486
487        def uuid16_from_str(uuid: str) -> bytes:
488            """Decode a 16-bit uuid encoded as XXXX to byte format."""
489            return bytes(reversed(bytes.fromhex(uuid)))
490
491        if uuids := dt.incomplete_service_class_uuids16:
492            ad_structures.append(
493                (
494                    AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
495                    b''.join([uuid16_from_str(uuid) for uuid in uuids]),
496                )
497            )
498        if uuids := dt.complete_service_class_uuids16:
499            ad_structures.append(
500                (
501                    AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
502                    b''.join([uuid16_from_str(uuid) for uuid in uuids]),
503                )
504            )
505        if uuids := dt.incomplete_service_class_uuids32:
506            ad_structures.append(
507                (
508                    AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
509                    b''.join([uuid32_from_str(uuid) for uuid in uuids]),
510                )
511            )
512        if uuids := dt.complete_service_class_uuids32:
513            ad_structures.append(
514                (
515                    AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
516                    b''.join([uuid32_from_str(uuid) for uuid in uuids]),
517                )
518            )
519        if uuids := dt.incomplete_service_class_uuids128:
520            ad_structures.append(
521                (
522                    AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
523                    b''.join([uuid128_from_str(uuid) for uuid in uuids]),
524                )
525            )
526        if uuids := dt.complete_service_class_uuids128:
527            ad_structures.append(
528                (
529                    AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
530                    b''.join([uuid128_from_str(uuid) for uuid in uuids]),
531                )
532            )
533        if dt.HasField('include_shortened_local_name'):
534            ad_structures.append((AdvertisingData.SHORTENED_LOCAL_NAME, bytes(self.device.name[:8], 'utf-8')))
535        elif dt.shortened_local_name:
536            ad_structures.append((AdvertisingData.SHORTENED_LOCAL_NAME, bytes(dt.shortened_local_name, 'utf-8')))
537        if dt.HasField('include_complete_local_name'):
538            ad_structures.append((AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.device.name, 'utf-8')))
539        elif dt.complete_local_name:
540            ad_structures.append((AdvertisingData.COMPLETE_LOCAL_NAME, bytes(dt.complete_local_name, 'utf-8')))
541        if dt.HasField('include_tx_power_level'):
542            raise ValueError('unsupported data type')
543        elif dt.tx_power_level:
544            ad_structures.append((AdvertisingData.TX_POWER_LEVEL, bytes(struct.pack('<I', dt.tx_power_level)[:1])))
545        if dt.HasField('include_class_of_device'):
546            ad_structures.append(
547                (AdvertisingData.CLASS_OF_DEVICE, bytes(struct.pack('<I', self.device.class_of_device)[:-1]))
548            )
549        elif dt.class_of_device:
550            ad_structures.append((AdvertisingData.CLASS_OF_DEVICE, bytes(struct.pack('<I', dt.class_of_device)[:-1])))
551        if dt.peripheral_connection_interval_min:
552            ad_structures.append(
553                (
554                    AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE,
555                    bytes(
556                        [
557                            *struct.pack('<H', dt.peripheral_connection_interval_min),
558                            *struct.pack(
559                                '<H',
560                                dt.peripheral_connection_interval_max
561                                if dt.peripheral_connection_interval_max
562                                else dt.peripheral_connection_interval_min,
563                            ),
564                        ]
565                    ),
566                )
567            )
568        if uuids := dt.service_solicitation_uuids16:
569            ad_structures.append(
570                (
571                    AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
572                    b''.join([uuid16_from_str(uuid) for uuid in uuids]),
573                )
574            )
575        if uuids := dt.service_solicitation_uuids32:
576            ad_structures.append(
577                (
578                    AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
579                    b''.join([uuid32_from_str(uuid) for uuid in uuids]),
580                )
581            )
582        if uuids := dt.service_solicitation_uuids128:
583            ad_structures.append(
584                (
585                    AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
586                    b''.join([uuid128_from_str(uuid) for uuid in uuids]),
587                )
588            )
589        if datas := dt.service_data_uuid16:
590            ad_structures.extend(
591                [
592                    (AdvertisingData.SERVICE_DATA_16_BIT_UUID, uuid16_from_str(uuid) + data)
593                    for uuid, data in datas.items()
594                ]
595            )
596        if datas := dt.service_data_uuid32:
597            ad_structures.extend(
598                [
599                    (AdvertisingData.SERVICE_DATA_32_BIT_UUID, uuid32_from_str(uuid) + data)
600                    for uuid, data in datas.items()
601                ]
602            )
603        if datas := dt.service_data_uuid128:
604            ad_structures.extend(
605                [
606                    (AdvertisingData.SERVICE_DATA_128_BIT_UUID, uuid128_from_str(uuid) + data)
607                    for uuid, data in datas.items()
608                ]
609            )
610        if dt.appearance:
611            ad_structures.append((AdvertisingData.APPEARANCE, struct.pack('<H', dt.appearance)))
612        if dt.advertising_interval:
613            ad_structures.append((AdvertisingData.ADVERTISING_INTERVAL, struct.pack('<H', dt.advertising_interval)))
614        if dt.uri:
615            ad_structures.append((AdvertisingData.URI, bytes(dt.uri, 'utf-8')))
616        if dt.le_supported_features:
617            ad_structures.append((AdvertisingData.LE_SUPPORTED_FEATURES, dt.le_supported_features))
618        if dt.manufacturer_specific_data:
619            ad_structures.append((AdvertisingData.MANUFACTURER_SPECIFIC_DATA, dt.manufacturer_specific_data))
620
621        return AdvertisingData(ad_structures)
622
623    def pack_data_types(self, ad: AdvertisingData) -> DataTypes:
624        dt = DataTypes()
625        uuids: List[UUID]
626        s: str
627        i: int
628        ij: Tuple[int, int]
629        uuid_data: Tuple[UUID, bytes]
630        data: bytes
631
632        if uuids := cast(List[UUID], ad.get(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS)):
633            dt.incomplete_service_class_uuids16.extend(list(map(lambda x: x.to_hex_str(), uuids)))
634        if uuids := cast(List[UUID], ad.get(AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS)):
635            dt.complete_service_class_uuids16.extend(list(map(lambda x: x.to_hex_str(), uuids)))
636        if uuids := cast(List[UUID], ad.get(AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS)):
637            dt.incomplete_service_class_uuids32.extend(list(map(lambda x: x.to_hex_str(), uuids)))
638        if uuids := cast(List[UUID], ad.get(AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS)):
639            dt.complete_service_class_uuids32.extend(list(map(lambda x: x.to_hex_str(), uuids)))
640        if uuids := cast(List[UUID], ad.get(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS)):
641            dt.incomplete_service_class_uuids128.extend(list(map(lambda x: x.to_hex_str(), uuids)))
642        if uuids := cast(List[UUID], ad.get(AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS)):
643            dt.complete_service_class_uuids128.extend(list(map(lambda x: x.to_hex_str(), uuids)))
644        if s := cast(str, ad.get(AdvertisingData.SHORTENED_LOCAL_NAME)):
645            dt.shortened_local_name = s
646        if s := cast(str, ad.get(AdvertisingData.COMPLETE_LOCAL_NAME)):
647            dt.complete_local_name = s
648        if i := cast(int, ad.get(AdvertisingData.TX_POWER_LEVEL)):
649            dt.tx_power_level = i
650        if i := cast(int, ad.get(AdvertisingData.CLASS_OF_DEVICE)):
651            dt.class_of_device = i
652        if ij := cast(Tuple[int, int], ad.get(AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE)):
653            dt.peripheral_connection_interval_min = ij[0]
654            dt.peripheral_connection_interval_max = ij[1]
655        if uuids := cast(List[UUID], ad.get(AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS)):
656            dt.service_solicitation_uuids16.extend(list(map(lambda x: x.to_hex_str(), uuids)))
657        if uuids := cast(List[UUID], ad.get(AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS)):
658            dt.service_solicitation_uuids32.extend(list(map(lambda x: x.to_hex_str(), uuids)))
659        if uuids := cast(List[UUID], ad.get(AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS)):
660            dt.service_solicitation_uuids128.extend(list(map(lambda x: x.to_hex_str(), uuids)))
661        if uuid_data := cast(Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_16_BIT_UUID)):
662            dt.service_data_uuid16[uuid_data[0].to_hex_str()] = uuid_data[1]
663        if uuid_data := cast(Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_32_BIT_UUID)):
664            dt.service_data_uuid32[uuid_data[0].to_hex_str()] = uuid_data[1]
665        if uuid_data := cast(Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_128_BIT_UUID)):
666            dt.service_data_uuid128[uuid_data[0].to_hex_str()] = uuid_data[1]
667        if data := cast(bytes, ad.get(AdvertisingData.PUBLIC_TARGET_ADDRESS, raw=True)):
668            dt.public_target_addresses.extend([data[i * 6 :: i * 6 + 6] for i in range(int(len(data) / 6))])
669        if data := cast(bytes, ad.get(AdvertisingData.RANDOM_TARGET_ADDRESS, raw=True)):
670            dt.random_target_addresses.extend([data[i * 6 :: i * 6 + 6] for i in range(int(len(data) / 6))])
671        if i := cast(int, ad.get(AdvertisingData.APPEARANCE)):
672            dt.appearance = i
673        if i := cast(int, ad.get(AdvertisingData.ADVERTISING_INTERVAL)):
674            dt.advertising_interval = i
675        if s := cast(str, ad.get(AdvertisingData.URI)):
676            dt.uri = s
677        if data := cast(bytes, ad.get(AdvertisingData.LE_SUPPORTED_FEATURES, raw=True)):
678            dt.le_supported_features = data
679        if data := cast(bytes, ad.get(AdvertisingData.MANUFACTURER_SPECIFIC_DATA, raw=True)):
680            dt.manufacturer_specific_data = data
681
682        return dt
683