# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import struct import sys import os import logging from bumble.core import AdvertisingData from bumble.device import Device from bumble.transport import open_transport_or_link from bumble.core import UUID from bumble.gatt import Service, Characteristic, CharacteristicValue # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid') ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID( '6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties' ) ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC = UUID( 'f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint' ) ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID( '38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus' ) ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume') ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID( '2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT' ) # ----------------------------------------------------------------------------- async def main(): if len(sys.argv) != 4: print( 'Usage: python run_asha_sink.py ' '' ) print('example: python run_asha_sink.py device1.json usb:0 audio_out.g722') return audio_out = open(sys.argv[3], 'wb') async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) # Handler for audio control commands def on_audio_control_point_write(_connection, value): print('--- AUDIO CONTROL POINT Write:', value.hex()) opcode = value[0] if opcode == 1: # Start audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]] print( f'### START: codec={value[1]}, audio_type={audio_type}, ' f'volume={value[3]}, otherstate={value[4]}' ) elif opcode == 2: print('### STOP') elif opcode == 3: print(f'### STATUS: connected={value[1]}') # Respond with a status asyncio.create_task( device.notify_subscribers(audio_status_characteristic, force=True) ) # Handler for volume control def on_volume_write(_connection, value): print('--- VOLUME Write:', value[0]) # Register an L2CAP CoC server def on_coc(channel): def on_data(data): print('<<< Voice data received:', data.hex()) audio_out.write(data) channel.sink = on_data psm = device.register_l2cap_channel_server(0, on_coc, 8) print(f'### LE_PSM_OUT = {psm}') # Add the ASHA service to the GATT server read_only_properties_characteristic = Characteristic( ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, bytes( [ 0x01, # Version 0x00, # Device Capabilities [Left, Monaural] 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, # HiSyncId 0x01, # Feature Map [LE CoC audio output streaming supported] 0x00, 0x00, # Render Delay 0x00, 0x00, # RFU 0x02, 0x00, # Codec IDs [G.722 at 16 kHz] ] ), ) audio_control_point_characteristic = Characteristic( ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC, Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE, Characteristic.WRITEABLE, CharacteristicValue(write=on_audio_control_point_write), ) audio_status_characteristic = Characteristic( ASHA_AUDIO_STATUS_CHARACTERISTIC, Characteristic.READ | Characteristic.NOTIFY, Characteristic.READABLE, bytes([0]), ) volume_characteristic = Characteristic( ASHA_VOLUME_CHARACTERISTIC, Characteristic.WRITE_WITHOUT_RESPONSE, Characteristic.WRITEABLE, CharacteristicValue(write=on_volume_write), ) le_psm_out_characteristic = Characteristic( ASHA_LE_PSM_OUT_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, struct.pack('