• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import functools
21import pytest
22import logging
23
24from bumble import device
25from bumble.hci import CodecID, CodingFormat
26from bumble.profiles.bap import (
27    AudioLocation,
28    AseStateMachine,
29    ASE_Operation,
30    ASE_Config_Codec,
31    ASE_Config_QOS,
32    ASE_Disable,
33    ASE_Enable,
34    ASE_Receiver_Start_Ready,
35    ASE_Receiver_Stop_Ready,
36    ASE_Release,
37    ASE_Update_Metadata,
38    SupportedFrameDuration,
39    SupportedSamplingFrequency,
40    SamplingFrequency,
41    FrameDuration,
42    CodecSpecificCapabilities,
43    CodecSpecificConfiguration,
44    ContextType,
45    PacRecord,
46    AudioStreamControlService,
47    AudioStreamControlServiceProxy,
48    PublishedAudioCapabilitiesService,
49    PublishedAudioCapabilitiesServiceProxy,
50)
51from tests.test_utils import TwoDevices
52
53
54# -----------------------------------------------------------------------------
55# Logging
56# -----------------------------------------------------------------------------
57logger = logging.getLogger(__name__)
58
59
60# -----------------------------------------------------------------------------
61def basic_check(operation: ASE_Operation):
62    serialized = bytes(operation)
63    parsed = ASE_Operation.from_bytes(serialized)
64    assert bytes(parsed) == serialized
65
66
67# -----------------------------------------------------------------------------
68def test_codec_specific_capabilities() -> None:
69    SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
70    FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
71    AUDIO_CHANNEL_COUNTS = [1]
72    cap = CodecSpecificCapabilities(
73        supported_sampling_frequencies=SAMPLE_FREQUENCY,
74        supported_frame_durations=FRAME_SURATION,
75        supported_audio_channel_count=AUDIO_CHANNEL_COUNTS,
76        min_octets_per_codec_frame=40,
77        max_octets_per_codec_frame=40,
78        supported_max_codec_frames_per_sdu=1,
79    )
80    assert CodecSpecificCapabilities.from_bytes(bytes(cap)) == cap
81
82
83# -----------------------------------------------------------------------------
84def test_pac_record() -> None:
85    SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
86    FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
87    AUDIO_CHANNEL_COUNTS = [1]
88    cap = CodecSpecificCapabilities(
89        supported_sampling_frequencies=SAMPLE_FREQUENCY,
90        supported_frame_durations=FRAME_SURATION,
91        supported_audio_channel_count=AUDIO_CHANNEL_COUNTS,
92        min_octets_per_codec_frame=40,
93        max_octets_per_codec_frame=40,
94        supported_max_codec_frames_per_sdu=1,
95    )
96
97    pac_record = PacRecord(
98        coding_format=CodingFormat(CodecID.LC3),
99        codec_specific_capabilities=cap,
100        metadata=b'',
101    )
102    assert PacRecord.from_bytes(bytes(pac_record)) == pac_record
103
104
105# -----------------------------------------------------------------------------
106def test_vendor_specific_pac_record() -> None:
107    # Vendor-Specific codec, Google, ID=0xFFFF. No capabilities and metadata.
108    RAW_DATA = bytes.fromhex('ffe000ffff0000')
109    assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA
110
111
112# -----------------------------------------------------------------------------
113def test_ASE_Config_Codec() -> None:
114    operation = ASE_Config_Codec(
115        ase_id=[1, 2],
116        target_latency=[3, 4],
117        target_phy=[5, 6],
118        codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)],
119        codec_specific_configuration=[b'foo', b'bar'],
120    )
121    basic_check(operation)
122
123
124# -----------------------------------------------------------------------------
125def test_ASE_Config_QOS() -> None:
126    operation = ASE_Config_QOS(
127        ase_id=[1, 2],
128        cig_id=[1, 2],
129        cis_id=[3, 4],
130        sdu_interval=[5, 6],
131        framing=[0, 1],
132        phy=[2, 3],
133        max_sdu=[4, 5],
134        retransmission_number=[6, 7],
135        max_transport_latency=[8, 9],
136        presentation_delay=[10, 11],
137    )
138    basic_check(operation)
139
140
141# -----------------------------------------------------------------------------
142def test_ASE_Enable() -> None:
143    operation = ASE_Enable(
144        ase_id=[1, 2],
145        metadata=[b'foo', b'bar'],
146    )
147    basic_check(operation)
148
149
150# -----------------------------------------------------------------------------
151def test_ASE_Update_Metadata() -> None:
152    operation = ASE_Update_Metadata(
153        ase_id=[1, 2],
154        metadata=[b'foo', b'bar'],
155    )
156    basic_check(operation)
157
158
159# -----------------------------------------------------------------------------
160def test_ASE_Disable() -> None:
161    operation = ASE_Disable(ase_id=[1, 2])
162    basic_check(operation)
163
164
165# -----------------------------------------------------------------------------
166def test_ASE_Release() -> None:
167    operation = ASE_Release(ase_id=[1, 2])
168    basic_check(operation)
169
170
171# -----------------------------------------------------------------------------
172def test_ASE_Receiver_Start_Ready() -> None:
173    operation = ASE_Receiver_Start_Ready(ase_id=[1, 2])
174    basic_check(operation)
175
176
177# -----------------------------------------------------------------------------
178def test_ASE_Receiver_Stop_Ready() -> None:
179    operation = ASE_Receiver_Stop_Ready(ase_id=[1, 2])
180    basic_check(operation)
181
182
183# -----------------------------------------------------------------------------
184def test_codec_specific_configuration() -> None:
185    SAMPLE_FREQUENCY = SamplingFrequency.FREQ_16000
186    FRAME_SURATION = FrameDuration.DURATION_10000_US
187    AUDIO_LOCATION = AudioLocation.FRONT_LEFT
188    config = CodecSpecificConfiguration(
189        sampling_frequency=SAMPLE_FREQUENCY,
190        frame_duration=FRAME_SURATION,
191        audio_channel_allocation=AUDIO_LOCATION,
192        octets_per_codec_frame=60,
193        codec_frames_per_sdu=1,
194    )
195    assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
196
197
198# -----------------------------------------------------------------------------
199@pytest.mark.asyncio
200async def test_pacs():
201    devices = TwoDevices()
202    devices[0].add_service(
203        PublishedAudioCapabilitiesService(
204            supported_sink_context=ContextType.MEDIA,
205            available_sink_context=ContextType.MEDIA,
206            supported_source_context=0,
207            available_source_context=0,
208            sink_pac=[
209                # Codec Capability Setting 16_2
210                PacRecord(
211                    coding_format=CodingFormat(CodecID.LC3),
212                    codec_specific_capabilities=CodecSpecificCapabilities(
213                        supported_sampling_frequencies=(
214                            SupportedSamplingFrequency.FREQ_16000
215                        ),
216                        supported_frame_durations=(
217                            SupportedFrameDuration.DURATION_10000_US_SUPPORTED
218                        ),
219                        supported_audio_channel_count=[1],
220                        min_octets_per_codec_frame=40,
221                        max_octets_per_codec_frame=40,
222                        supported_max_codec_frames_per_sdu=1,
223                    ),
224                ),
225                # Codec Capability Setting 24_2
226                PacRecord(
227                    coding_format=CodingFormat(CodecID.LC3),
228                    codec_specific_capabilities=CodecSpecificCapabilities(
229                        supported_sampling_frequencies=(
230                            SupportedSamplingFrequency.FREQ_24000
231                        ),
232                        supported_frame_durations=(
233                            SupportedFrameDuration.DURATION_10000_US_SUPPORTED
234                        ),
235                        supported_audio_channel_count=[1],
236                        min_octets_per_codec_frame=60,
237                        max_octets_per_codec_frame=60,
238                        supported_max_codec_frames_per_sdu=1,
239                    ),
240                ),
241            ],
242            sink_audio_locations=AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT,
243        )
244    )
245
246    await devices.setup_connection()
247    peer = device.Peer(devices.connections[1])
248    pacs_client = await peer.discover_service_and_create_proxy(
249        PublishedAudioCapabilitiesServiceProxy
250    )
251
252
253# -----------------------------------------------------------------------------
254@pytest.mark.asyncio
255async def test_ascs():
256    devices = TwoDevices()
257    devices[0].add_service(
258        AudioStreamControlService(device=devices[0], sink_ase_id=[1, 2])
259    )
260
261    await devices.setup_connection()
262    peer = device.Peer(devices.connections[1])
263    ascs_client = await peer.discover_service_and_create_proxy(
264        AudioStreamControlServiceProxy
265    )
266
267    notifications = {1: asyncio.Queue(), 2: asyncio.Queue()}
268
269    def on_notification(data: bytes, ase_id: int):
270        notifications[ase_id].put_nowait(data)
271
272    # Should be idle
273    assert await ascs_client.sink_ase[0].read_value() == bytes(
274        [1, AseStateMachine.State.IDLE]
275    )
276    assert await ascs_client.sink_ase[1].read_value() == bytes(
277        [2, AseStateMachine.State.IDLE]
278    )
279
280    # Subscribe
281    await ascs_client.sink_ase[0].subscribe(
282        functools.partial(on_notification, ase_id=1)
283    )
284    await ascs_client.sink_ase[1].subscribe(
285        functools.partial(on_notification, ase_id=2)
286    )
287
288    # Config Codec
289    config = CodecSpecificConfiguration(
290        sampling_frequency=SamplingFrequency.FREQ_48000,
291        frame_duration=FrameDuration.DURATION_10000_US,
292        audio_channel_allocation=AudioLocation.FRONT_LEFT,
293        octets_per_codec_frame=120,
294        codec_frames_per_sdu=1,
295    )
296    await ascs_client.ase_control_point.write_value(
297        ASE_Config_Codec(
298            ase_id=[1, 2],
299            target_latency=[3, 4],
300            target_phy=[5, 6],
301            codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)],
302            codec_specific_configuration=[config, config],
303        )
304    )
305    assert (await notifications[1].get())[:2] == bytes(
306        [1, AseStateMachine.State.CODEC_CONFIGURED]
307    )
308    assert (await notifications[2].get())[:2] == bytes(
309        [2, AseStateMachine.State.CODEC_CONFIGURED]
310    )
311
312    # Config QOS
313    await ascs_client.ase_control_point.write_value(
314        ASE_Config_QOS(
315            ase_id=[1, 2],
316            cig_id=[1, 2],
317            cis_id=[3, 4],
318            sdu_interval=[5, 6],
319            framing=[0, 1],
320            phy=[2, 3],
321            max_sdu=[4, 5],
322            retransmission_number=[6, 7],
323            max_transport_latency=[8, 9],
324            presentation_delay=[10, 11],
325        )
326    )
327    assert (await notifications[1].get())[:2] == bytes(
328        [1, AseStateMachine.State.QOS_CONFIGURED]
329    )
330    assert (await notifications[2].get())[:2] == bytes(
331        [2, AseStateMachine.State.QOS_CONFIGURED]
332    )
333
334    # Enable
335    await ascs_client.ase_control_point.write_value(
336        ASE_Enable(
337            ase_id=[1, 2],
338            metadata=[b'foo', b'bar'],
339        )
340    )
341    assert (await notifications[1].get())[:2] == bytes(
342        [1, AseStateMachine.State.ENABLING]
343    )
344    assert (await notifications[2].get())[:2] == bytes(
345        [2, AseStateMachine.State.ENABLING]
346    )
347
348    # CIS establishment
349    devices[0].emit(
350        'cis_establishment',
351        device.CisLink(
352            device=devices[0],
353            acl_connection=devices.connections[0],
354            handle=5,
355            cis_id=3,
356            cig_id=1,
357        ),
358    )
359    devices[0].emit(
360        'cis_establishment',
361        device.CisLink(
362            device=devices[0],
363            acl_connection=devices.connections[0],
364            handle=6,
365            cis_id=4,
366            cig_id=2,
367        ),
368    )
369    assert (await notifications[1].get())[:2] == bytes(
370        [1, AseStateMachine.State.STREAMING]
371    )
372    assert (await notifications[2].get())[:2] == bytes(
373        [2, AseStateMachine.State.STREAMING]
374    )
375
376    # Release
377    await ascs_client.ase_control_point.write_value(
378        ASE_Release(
379            ase_id=[1, 2],
380            metadata=[b'foo', b'bar'],
381        )
382    )
383    assert (await notifications[1].get())[:2] == bytes(
384        [1, AseStateMachine.State.RELEASING]
385    )
386    assert (await notifications[2].get())[:2] == bytes(
387        [2, AseStateMachine.State.RELEASING]
388    )
389    assert (await notifications[1].get())[:2] == bytes([1, AseStateMachine.State.IDLE])
390    assert (await notifications[2].get())[:2] == bytes([2, AseStateMachine.State.IDLE])
391
392    await asyncio.sleep(0.001)
393
394
395# -----------------------------------------------------------------------------
396async def run():
397    await test_pacs()
398
399
400# -----------------------------------------------------------------------------
401if __name__ == '__main__':
402    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
403    asyncio.run(run())
404