1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import sys 21import os 22from bumble.device import ( 23 Device, 24 Connection, 25 AdvertisingParameters, 26 AdvertisingEventProperties, 27) 28from bumble.hci import ( 29 OwnAddressType, 30) 31 32from bumble.transport import open_transport_or_link 33 34 35# ----------------------------------------------------------------------------- 36async def main() -> None: 37 if len(sys.argv) < 3: 38 print( 39 'Usage: run_cig_setup.py <config-file>' 40 '<transport-spec-for-device-1> <transport-spec-for-device-2>' 41 ) 42 print( 43 'example: run_cig_setup.py device1.json' 44 'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402' 45 ) 46 return 47 48 print('<<< connecting to HCI...') 49 hci_transports = await asyncio.gather( 50 open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3]) 51 ) 52 print('<<< connected') 53 54 devices = [ 55 Device.from_config_file_with_hci( 56 sys.argv[1], hci_transport.source, hci_transport.sink 57 ) 58 for hci_transport in hci_transports 59 ] 60 61 devices[0].cis_enabled = True 62 devices[1].cis_enabled = True 63 64 await asyncio.gather(*[device.power_on() for device in devices]) 65 advertising_set = await devices[0].create_advertising_set() 66 67 connection = await devices[1].connect( 68 devices[0].public_address, own_address_type=OwnAddressType.PUBLIC 69 ) 70 71 cid_ids = [2, 3] 72 cis_handles = await devices[1].setup_cig( 73 cig_id=1, 74 cis_id=cid_ids, 75 sdu_interval=(10000, 0), 76 framing=0, 77 max_sdu=(120, 0), 78 retransmission_number=13, 79 max_transport_latency=(100, 0), 80 ) 81 82 def on_cis_request( 83 connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int 84 ): 85 connection.abort_on('disconnection', devices[0].accept_cis_request(cis_handle)) 86 87 devices[0].on('cis_request', on_cis_request) 88 89 cis_links = await devices[1].create_cis( 90 [(cis, connection.handle) for cis in cis_handles] 91 ) 92 93 for cis_link in cis_links: 94 await cis_link.disconnect() 95 96 await asyncio.gather( 97 *[hci_transport.source.terminated for hci_transport in hci_transports] 98 ) 99 100 101# ----------------------------------------------------------------------------- 102logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 103asyncio.run(main()) 104