• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import sys
21import os
22from bumble.device import (
23    Device,
24    Connection,
25    AdvertisingParameters,
26    AdvertisingEventProperties,
27)
28from bumble.hci import (
29    OwnAddressType,
30)
31
32from bumble.transport import open_transport_or_link
33
34
35# -----------------------------------------------------------------------------
36async def main() -> None:
37    if len(sys.argv) < 3:
38        print(
39            'Usage: run_cig_setup.py <config-file>'
40            '<transport-spec-for-device-1> <transport-spec-for-device-2>'
41        )
42        print(
43            'example: run_cig_setup.py device1.json'
44            'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
45        )
46        return
47
48    print('<<< connecting to HCI...')
49    hci_transports = await asyncio.gather(
50        open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
51    )
52    print('<<< connected')
53
54    devices = [
55        Device.from_config_file_with_hci(
56            sys.argv[1], hci_transport.source, hci_transport.sink
57        )
58        for hci_transport in hci_transports
59    ]
60
61    devices[0].cis_enabled = True
62    devices[1].cis_enabled = True
63
64    await asyncio.gather(*[device.power_on() for device in devices])
65    advertising_set = await devices[0].create_advertising_set()
66
67    connection = await devices[1].connect(
68        devices[0].public_address, own_address_type=OwnAddressType.PUBLIC
69    )
70
71    cid_ids = [2, 3]
72    cis_handles = await devices[1].setup_cig(
73        cig_id=1,
74        cis_id=cid_ids,
75        sdu_interval=(10000, 0),
76        framing=0,
77        max_sdu=(120, 0),
78        retransmission_number=13,
79        max_transport_latency=(100, 0),
80    )
81
82    def on_cis_request(
83        connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int
84    ):
85        connection.abort_on('disconnection', devices[0].accept_cis_request(cis_handle))
86
87    devices[0].on('cis_request', on_cis_request)
88
89    cis_links = await devices[1].create_cis(
90        [(cis, connection.handle) for cis in cis_handles]
91    )
92
93    for cis_link in cis_links:
94        await cis_link.disconnect()
95
96    await asyncio.gather(
97        *[hci_transport.source.terminated for hci_transport in hci_transports]
98    )
99
100
101# -----------------------------------------------------------------------------
102logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
103asyncio.run(main())
104