1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import functools 21import pytest 22import logging 23 24from bumble import device 25from bumble.hci import CodecID, CodingFormat 26from bumble.profiles.bap import ( 27 AudioLocation, 28 AseStateMachine, 29 ASE_Operation, 30 ASE_Config_Codec, 31 ASE_Config_QOS, 32 ASE_Disable, 33 ASE_Enable, 34 ASE_Receiver_Start_Ready, 35 ASE_Receiver_Stop_Ready, 36 ASE_Release, 37 ASE_Update_Metadata, 38 SupportedFrameDuration, 39 SupportedSamplingFrequency, 40 SamplingFrequency, 41 FrameDuration, 42 CodecSpecificCapabilities, 43 CodecSpecificConfiguration, 44 ContextType, 45 PacRecord, 46 AudioStreamControlService, 47 AudioStreamControlServiceProxy, 48 PublishedAudioCapabilitiesService, 49 PublishedAudioCapabilitiesServiceProxy, 50) 51from tests.test_utils import TwoDevices 52 53 54# ----------------------------------------------------------------------------- 55# Logging 56# ----------------------------------------------------------------------------- 57logger = logging.getLogger(__name__) 58 59 60# ----------------------------------------------------------------------------- 61def basic_check(operation: ASE_Operation): 62 serialized = bytes(operation) 63 parsed = ASE_Operation.from_bytes(serialized) 64 assert bytes(parsed) == serialized 65 66 67# ----------------------------------------------------------------------------- 68def test_codec_specific_capabilities() -> None: 69 SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000 70 FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED 71 AUDIO_CHANNEL_COUNTS = [1] 72 cap = CodecSpecificCapabilities( 73 supported_sampling_frequencies=SAMPLE_FREQUENCY, 74 supported_frame_durations=FRAME_SURATION, 75 supported_audio_channel_count=AUDIO_CHANNEL_COUNTS, 76 min_octets_per_codec_frame=40, 77 max_octets_per_codec_frame=40, 78 supported_max_codec_frames_per_sdu=1, 79 ) 80 assert CodecSpecificCapabilities.from_bytes(bytes(cap)) == cap 81 82 83# ----------------------------------------------------------------------------- 84def test_pac_record() -> None: 85 SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000 86 FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED 87 AUDIO_CHANNEL_COUNTS = [1] 88 cap = CodecSpecificCapabilities( 89 supported_sampling_frequencies=SAMPLE_FREQUENCY, 90 supported_frame_durations=FRAME_SURATION, 91 supported_audio_channel_count=AUDIO_CHANNEL_COUNTS, 92 min_octets_per_codec_frame=40, 93 max_octets_per_codec_frame=40, 94 supported_max_codec_frames_per_sdu=1, 95 ) 96 97 pac_record = PacRecord( 98 coding_format=CodingFormat(CodecID.LC3), 99 codec_specific_capabilities=cap, 100 metadata=b'', 101 ) 102 assert PacRecord.from_bytes(bytes(pac_record)) == pac_record 103 104 105# ----------------------------------------------------------------------------- 106def test_vendor_specific_pac_record() -> None: 107 # Vendor-Specific codec, Google, ID=0xFFFF. No capabilities and metadata. 108 RAW_DATA = bytes.fromhex('ffe000ffff0000') 109 assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA 110 111 112# ----------------------------------------------------------------------------- 113def test_ASE_Config_Codec() -> None: 114 operation = ASE_Config_Codec( 115 ase_id=[1, 2], 116 target_latency=[3, 4], 117 target_phy=[5, 6], 118 codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)], 119 codec_specific_configuration=[b'foo', b'bar'], 120 ) 121 basic_check(operation) 122 123 124# ----------------------------------------------------------------------------- 125def test_ASE_Config_QOS() -> None: 126 operation = ASE_Config_QOS( 127 ase_id=[1, 2], 128 cig_id=[1, 2], 129 cis_id=[3, 4], 130 sdu_interval=[5, 6], 131 framing=[0, 1], 132 phy=[2, 3], 133 max_sdu=[4, 5], 134 retransmission_number=[6, 7], 135 max_transport_latency=[8, 9], 136 presentation_delay=[10, 11], 137 ) 138 basic_check(operation) 139 140 141# ----------------------------------------------------------------------------- 142def test_ASE_Enable() -> None: 143 operation = ASE_Enable( 144 ase_id=[1, 2], 145 metadata=[b'foo', b'bar'], 146 ) 147 basic_check(operation) 148 149 150# ----------------------------------------------------------------------------- 151def test_ASE_Update_Metadata() -> None: 152 operation = ASE_Update_Metadata( 153 ase_id=[1, 2], 154 metadata=[b'foo', b'bar'], 155 ) 156 basic_check(operation) 157 158 159# ----------------------------------------------------------------------------- 160def test_ASE_Disable() -> None: 161 operation = ASE_Disable(ase_id=[1, 2]) 162 basic_check(operation) 163 164 165# ----------------------------------------------------------------------------- 166def test_ASE_Release() -> None: 167 operation = ASE_Release(ase_id=[1, 2]) 168 basic_check(operation) 169 170 171# ----------------------------------------------------------------------------- 172def test_ASE_Receiver_Start_Ready() -> None: 173 operation = ASE_Receiver_Start_Ready(ase_id=[1, 2]) 174 basic_check(operation) 175 176 177# ----------------------------------------------------------------------------- 178def test_ASE_Receiver_Stop_Ready() -> None: 179 operation = ASE_Receiver_Stop_Ready(ase_id=[1, 2]) 180 basic_check(operation) 181 182 183# ----------------------------------------------------------------------------- 184def test_codec_specific_configuration() -> None: 185 SAMPLE_FREQUENCY = SamplingFrequency.FREQ_16000 186 FRAME_SURATION = FrameDuration.DURATION_10000_US 187 AUDIO_LOCATION = AudioLocation.FRONT_LEFT 188 config = CodecSpecificConfiguration( 189 sampling_frequency=SAMPLE_FREQUENCY, 190 frame_duration=FRAME_SURATION, 191 audio_channel_allocation=AUDIO_LOCATION, 192 octets_per_codec_frame=60, 193 codec_frames_per_sdu=1, 194 ) 195 assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config 196 197 198# ----------------------------------------------------------------------------- 199@pytest.mark.asyncio 200async def test_pacs(): 201 devices = TwoDevices() 202 devices[0].add_service( 203 PublishedAudioCapabilitiesService( 204 supported_sink_context=ContextType.MEDIA, 205 available_sink_context=ContextType.MEDIA, 206 supported_source_context=0, 207 available_source_context=0, 208 sink_pac=[ 209 # Codec Capability Setting 16_2 210 PacRecord( 211 coding_format=CodingFormat(CodecID.LC3), 212 codec_specific_capabilities=CodecSpecificCapabilities( 213 supported_sampling_frequencies=( 214 SupportedSamplingFrequency.FREQ_16000 215 ), 216 supported_frame_durations=( 217 SupportedFrameDuration.DURATION_10000_US_SUPPORTED 218 ), 219 supported_audio_channel_count=[1], 220 min_octets_per_codec_frame=40, 221 max_octets_per_codec_frame=40, 222 supported_max_codec_frames_per_sdu=1, 223 ), 224 ), 225 # Codec Capability Setting 24_2 226 PacRecord( 227 coding_format=CodingFormat(CodecID.LC3), 228 codec_specific_capabilities=CodecSpecificCapabilities( 229 supported_sampling_frequencies=( 230 SupportedSamplingFrequency.FREQ_24000 231 ), 232 supported_frame_durations=( 233 SupportedFrameDuration.DURATION_10000_US_SUPPORTED 234 ), 235 supported_audio_channel_count=[1], 236 min_octets_per_codec_frame=60, 237 max_octets_per_codec_frame=60, 238 supported_max_codec_frames_per_sdu=1, 239 ), 240 ), 241 ], 242 sink_audio_locations=AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT, 243 ) 244 ) 245 246 await devices.setup_connection() 247 peer = device.Peer(devices.connections[1]) 248 pacs_client = await peer.discover_service_and_create_proxy( 249 PublishedAudioCapabilitiesServiceProxy 250 ) 251 252 253# ----------------------------------------------------------------------------- 254@pytest.mark.asyncio 255async def test_ascs(): 256 devices = TwoDevices() 257 devices[0].add_service( 258 AudioStreamControlService(device=devices[0], sink_ase_id=[1, 2]) 259 ) 260 261 await devices.setup_connection() 262 peer = device.Peer(devices.connections[1]) 263 ascs_client = await peer.discover_service_and_create_proxy( 264 AudioStreamControlServiceProxy 265 ) 266 267 notifications = {1: asyncio.Queue(), 2: asyncio.Queue()} 268 269 def on_notification(data: bytes, ase_id: int): 270 notifications[ase_id].put_nowait(data) 271 272 # Should be idle 273 assert await ascs_client.sink_ase[0].read_value() == bytes( 274 [1, AseStateMachine.State.IDLE] 275 ) 276 assert await ascs_client.sink_ase[1].read_value() == bytes( 277 [2, AseStateMachine.State.IDLE] 278 ) 279 280 # Subscribe 281 await ascs_client.sink_ase[0].subscribe( 282 functools.partial(on_notification, ase_id=1) 283 ) 284 await ascs_client.sink_ase[1].subscribe( 285 functools.partial(on_notification, ase_id=2) 286 ) 287 288 # Config Codec 289 config = CodecSpecificConfiguration( 290 sampling_frequency=SamplingFrequency.FREQ_48000, 291 frame_duration=FrameDuration.DURATION_10000_US, 292 audio_channel_allocation=AudioLocation.FRONT_LEFT, 293 octets_per_codec_frame=120, 294 codec_frames_per_sdu=1, 295 ) 296 await ascs_client.ase_control_point.write_value( 297 ASE_Config_Codec( 298 ase_id=[1, 2], 299 target_latency=[3, 4], 300 target_phy=[5, 6], 301 codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)], 302 codec_specific_configuration=[config, config], 303 ) 304 ) 305 assert (await notifications[1].get())[:2] == bytes( 306 [1, AseStateMachine.State.CODEC_CONFIGURED] 307 ) 308 assert (await notifications[2].get())[:2] == bytes( 309 [2, AseStateMachine.State.CODEC_CONFIGURED] 310 ) 311 312 # Config QOS 313 await ascs_client.ase_control_point.write_value( 314 ASE_Config_QOS( 315 ase_id=[1, 2], 316 cig_id=[1, 2], 317 cis_id=[3, 4], 318 sdu_interval=[5, 6], 319 framing=[0, 1], 320 phy=[2, 3], 321 max_sdu=[4, 5], 322 retransmission_number=[6, 7], 323 max_transport_latency=[8, 9], 324 presentation_delay=[10, 11], 325 ) 326 ) 327 assert (await notifications[1].get())[:2] == bytes( 328 [1, AseStateMachine.State.QOS_CONFIGURED] 329 ) 330 assert (await notifications[2].get())[:2] == bytes( 331 [2, AseStateMachine.State.QOS_CONFIGURED] 332 ) 333 334 # Enable 335 await ascs_client.ase_control_point.write_value( 336 ASE_Enable( 337 ase_id=[1, 2], 338 metadata=[b'foo', b'bar'], 339 ) 340 ) 341 assert (await notifications[1].get())[:2] == bytes( 342 [1, AseStateMachine.State.ENABLING] 343 ) 344 assert (await notifications[2].get())[:2] == bytes( 345 [2, AseStateMachine.State.ENABLING] 346 ) 347 348 # CIS establishment 349 devices[0].emit( 350 'cis_establishment', 351 device.CisLink( 352 device=devices[0], 353 acl_connection=devices.connections[0], 354 handle=5, 355 cis_id=3, 356 cig_id=1, 357 ), 358 ) 359 devices[0].emit( 360 'cis_establishment', 361 device.CisLink( 362 device=devices[0], 363 acl_connection=devices.connections[0], 364 handle=6, 365 cis_id=4, 366 cig_id=2, 367 ), 368 ) 369 assert (await notifications[1].get())[:2] == bytes( 370 [1, AseStateMachine.State.STREAMING] 371 ) 372 assert (await notifications[2].get())[:2] == bytes( 373 [2, AseStateMachine.State.STREAMING] 374 ) 375 376 # Release 377 await ascs_client.ase_control_point.write_value( 378 ASE_Release( 379 ase_id=[1, 2], 380 metadata=[b'foo', b'bar'], 381 ) 382 ) 383 assert (await notifications[1].get())[:2] == bytes( 384 [1, AseStateMachine.State.RELEASING] 385 ) 386 assert (await notifications[2].get())[:2] == bytes( 387 [2, AseStateMachine.State.RELEASING] 388 ) 389 assert (await notifications[1].get())[:2] == bytes([1, AseStateMachine.State.IDLE]) 390 assert (await notifications[2].get())[:2] == bytes([2, AseStateMachine.State.IDLE]) 391 392 await asyncio.sleep(0.001) 393 394 395# ----------------------------------------------------------------------------- 396async def run(): 397 await test_pacs() 398 399 400# ----------------------------------------------------------------------------- 401if __name__ == '__main__': 402 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 403 asyncio.run(run()) 404