• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import os
21import pytest
22
23from bumble.controller import Controller
24from bumble.link import LocalLink
25from bumble.device import Device
26from bumble.host import Host
27from bumble.transport import AsyncPipeSink
28from bumble.avdtp import (
29    AVDTP_IDLE_STATE,
30    AVDTP_STREAMING_STATE,
31    MediaPacketPump,
32    Protocol,
33    Listener,
34    MediaCodecCapabilities,
35    MediaPacket,
36    AVDTP_AUDIO_MEDIA_TYPE,
37    AVDTP_TSEP_SNK,
38    A2DP_SBC_CODEC_TYPE
39)
40from bumble.a2dp import (
41    SbcMediaCodecInformation,
42    SBC_MONO_CHANNEL_MODE,
43    SBC_DUAL_CHANNEL_MODE,
44    SBC_STEREO_CHANNEL_MODE,
45    SBC_JOINT_STEREO_CHANNEL_MODE,
46    SBC_LOUDNESS_ALLOCATION_METHOD,
47    SBC_SNR_ALLOCATION_METHOD
48)
49
50# -----------------------------------------------------------------------------
51# Logging
52# -----------------------------------------------------------------------------
53logger = logging.getLogger(__name__)
54
55
56# -----------------------------------------------------------------------------
57class TwoDevices:
58    def __init__(self):
59        self.connections = [None, None]
60
61        self.link = LocalLink()
62        self.controllers = [
63            Controller('C1', link = self.link),
64            Controller('C2', link = self.link)
65        ]
66        self.devices = [
67            Device(
68                address = 'F0:F1:F2:F3:F4:F5',
69                host    = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
70            ),
71            Device(
72                address = 'F5:F4:F3:F2:F1:F0',
73                host    = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
74            )
75        ]
76
77        self.paired = [None, None]
78
79    def on_connection(self, which, connection):
80        self.connections[which] = connection
81
82
83# -----------------------------------------------------------------------------
84@pytest.mark.asyncio
85async def test_self_connection():
86    # Create two devices, each with a controller, attached to the same link
87    two_devices = TwoDevices()
88
89    # Attach listeners
90    two_devices.devices[0].on('connection', lambda connection: two_devices.on_connection(0, connection))
91    two_devices.devices[1].on('connection', lambda connection: two_devices.on_connection(1, connection))
92
93    # Start
94    await two_devices.devices[0].power_on()
95    await two_devices.devices[1].power_on()
96
97    # Connect the two devices
98    await two_devices.devices[0].connect(two_devices.devices[1].random_address)
99
100    # Check the post conditions
101    assert(two_devices.connections[0] is not None)
102    assert(two_devices.connections[1] is not None)
103
104
105# -----------------------------------------------------------------------------
106def source_codec_capabilities():
107    return MediaCodecCapabilities(
108        media_type                = AVDTP_AUDIO_MEDIA_TYPE,
109        media_codec_type          = A2DP_SBC_CODEC_TYPE,
110        media_codec_information   = SbcMediaCodecInformation.from_discrete_values(
111            sampling_frequency    = 44100,
112            channel_mode          = SBC_JOINT_STEREO_CHANNEL_MODE,
113            block_length          = 16,
114            subbands              = 8,
115            allocation_method     = SBC_LOUDNESS_ALLOCATION_METHOD,
116            minimum_bitpool_value = 2,
117            maximum_bitpool_value = 53
118        )
119    )
120
121
122# -----------------------------------------------------------------------------
123def sink_codec_capabilities():
124    return MediaCodecCapabilities(
125        media_type                = AVDTP_AUDIO_MEDIA_TYPE,
126        media_codec_type          = A2DP_SBC_CODEC_TYPE,
127        media_codec_information   = SbcMediaCodecInformation.from_lists(
128            sampling_frequencies  = [48000, 44100, 32000, 16000],
129            channel_modes         = [
130                SBC_MONO_CHANNEL_MODE,
131                SBC_DUAL_CHANNEL_MODE,
132                SBC_STEREO_CHANNEL_MODE,
133                SBC_JOINT_STEREO_CHANNEL_MODE
134            ],
135            block_lengths         = [4, 8, 12, 16],
136            subbands              = [4, 8],
137            allocation_methods    = [SBC_LOUDNESS_ALLOCATION_METHOD, SBC_SNR_ALLOCATION_METHOD],
138            minimum_bitpool_value = 2,
139            maximum_bitpool_value = 53
140        )
141    )
142
143
144# -----------------------------------------------------------------------------
145@pytest.mark.asyncio
146async def test_source_sink_1():
147    two_devices = TwoDevices()
148    await two_devices.devices[0].power_on()
149    await two_devices.devices[1].power_on()
150
151    def on_rtp_packet(packet):
152        rtp_packets.append(packet)
153        if len(rtp_packets) == rtp_packets_expected:
154            rtp_packets_fully_received.set_result(None)
155
156    sink = None
157
158    def on_avdtp_connection(server):
159        nonlocal sink
160        sink = server.add_sink(sink_codec_capabilities())
161        sink.on('rtp_packet', on_rtp_packet)
162
163    # Create a listener to wait for AVDTP connections
164    listener = Listener(Listener.create_registrar(two_devices.devices[1]))
165    listener.on('connection', on_avdtp_connection)
166
167    connection = await two_devices.devices[0].connect(two_devices.devices[1].random_address)
168    client = await Protocol.connect(connection)
169    endpoints = await client.discover_remote_endpoints()
170    assert(len(endpoints) == 1)
171    remote_sink = list(endpoints)[0]
172    assert(remote_sink.in_use == 0)
173    assert(remote_sink.media_type == AVDTP_AUDIO_MEDIA_TYPE)
174    assert(remote_sink.tsep == AVDTP_TSEP_SNK)
175
176    async def generate_packets(packet_count):
177        sequence_number = 0
178        timestamp = 0
179        for i in range(packet_count):
180            payload = bytes([sequence_number % 256])
181            packet = MediaPacket(2, 0, 0, 0, sequence_number, timestamp, 0, [], 96, payload)
182            packet.timestamp_seconds = timestamp / 44100
183            timestamp += 10
184            sequence_number += 1
185            yield packet
186
187    # Send packets using a pump object
188    rtp_packets_fully_received = asyncio.get_running_loop().create_future()
189    rtp_packets_expected = 3
190    rtp_packets = []
191    pump = MediaPacketPump(generate_packets(3))
192    source = client.add_source(source_codec_capabilities(), pump)
193    stream = await client.create_stream(source, remote_sink)
194    await stream.start()
195    assert(stream.state == AVDTP_STREAMING_STATE)
196    assert(stream.local_endpoint.in_use == 1)
197    assert(stream.rtp_channel is not None)
198    assert(sink.in_use == 1)
199    assert(sink.stream is not None)
200    assert(sink.stream.state == AVDTP_STREAMING_STATE)
201    await rtp_packets_fully_received
202
203    await stream.close()
204    assert(stream.rtp_channel is None)
205    assert(source.in_use == 0)
206    assert(source.stream.state == AVDTP_IDLE_STATE)
207    assert(sink.in_use == 0)
208    assert(sink.stream.state == AVDTP_IDLE_STATE)
209
210    # Send packets manually
211    rtp_packets_fully_received = asyncio.get_running_loop().create_future()
212    rtp_packets_expected = 3
213    rtp_packets = []
214    source_packets = [
215        MediaPacket(2, 0, 0, 0, i, i * 10, 0, [], 96, bytes([i]))
216        for i in range(3)
217    ]
218    source = client.add_source(source_codec_capabilities(), None)
219    stream = await client.create_stream(source, remote_sink)
220    await stream.start()
221    assert(stream.state == AVDTP_STREAMING_STATE)
222    assert(stream.local_endpoint.in_use == 1)
223    assert(stream.rtp_channel is not None)
224    assert(sink.in_use == 1)
225    assert(sink.stream is not None)
226    assert(sink.stream.state == AVDTP_STREAMING_STATE)
227
228    stream.send_media_packet(source_packets[0])
229    stream.send_media_packet(source_packets[1])
230    stream.send_media_packet(source_packets[2])
231
232    await stream.close()
233    assert(stream.rtp_channel is None)
234    assert(len(rtp_packets) == 3)
235    assert(source.in_use == 0)
236    assert(source.stream.state == AVDTP_IDLE_STATE)
237    assert(sink.in_use == 0)
238    assert(sink.stream.state == AVDTP_IDLE_STATE)
239
240
241# -----------------------------------------------------------------------------
242async def run_test_self():
243    await test_self_connection()
244    await test_source_sink_1()
245
246
247# -----------------------------------------------------------------------------
248if __name__ == '__main__':
249    logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
250    asyncio.run(run_test_self())
251