1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import os 21import pytest 22 23from bumble.controller import Controller 24from bumble.link import LocalLink 25from bumble.device import Device 26from bumble.host import Host 27from bumble.transport import AsyncPipeSink 28from bumble.avdtp import ( 29 AVDTP_IDLE_STATE, 30 AVDTP_STREAMING_STATE, 31 MediaPacketPump, 32 Protocol, 33 Listener, 34 MediaCodecCapabilities, 35 MediaPacket, 36 AVDTP_AUDIO_MEDIA_TYPE, 37 AVDTP_TSEP_SNK, 38 A2DP_SBC_CODEC_TYPE 39) 40from bumble.a2dp import ( 41 SbcMediaCodecInformation, 42 SBC_MONO_CHANNEL_MODE, 43 SBC_DUAL_CHANNEL_MODE, 44 SBC_STEREO_CHANNEL_MODE, 45 SBC_JOINT_STEREO_CHANNEL_MODE, 46 SBC_LOUDNESS_ALLOCATION_METHOD, 47 SBC_SNR_ALLOCATION_METHOD 48) 49 50# ----------------------------------------------------------------------------- 51# Logging 52# ----------------------------------------------------------------------------- 53logger = logging.getLogger(__name__) 54 55 56# ----------------------------------------------------------------------------- 57class TwoDevices: 58 def __init__(self): 59 self.connections = [None, None] 60 61 self.link = LocalLink() 62 self.controllers = [ 63 Controller('C1', link = self.link), 64 Controller('C2', link = self.link) 65 ] 66 self.devices = [ 67 Device( 68 address = 'F0:F1:F2:F3:F4:F5', 69 host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0])) 70 ), 71 Device( 72 address = 'F5:F4:F3:F2:F1:F0', 73 host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1])) 74 ) 75 ] 76 77 self.paired = [None, None] 78 79 def on_connection(self, which, connection): 80 self.connections[which] = connection 81 82 83# ----------------------------------------------------------------------------- 84@pytest.mark.asyncio 85async def test_self_connection(): 86 # Create two devices, each with a controller, attached to the same link 87 two_devices = TwoDevices() 88 89 # Attach listeners 90 two_devices.devices[0].on('connection', lambda connection: two_devices.on_connection(0, connection)) 91 two_devices.devices[1].on('connection', lambda connection: two_devices.on_connection(1, connection)) 92 93 # Start 94 await two_devices.devices[0].power_on() 95 await two_devices.devices[1].power_on() 96 97 # Connect the two devices 98 await two_devices.devices[0].connect(two_devices.devices[1].random_address) 99 100 # Check the post conditions 101 assert(two_devices.connections[0] is not None) 102 assert(two_devices.connections[1] is not None) 103 104 105# ----------------------------------------------------------------------------- 106def source_codec_capabilities(): 107 return MediaCodecCapabilities( 108 media_type = AVDTP_AUDIO_MEDIA_TYPE, 109 media_codec_type = A2DP_SBC_CODEC_TYPE, 110 media_codec_information = SbcMediaCodecInformation.from_discrete_values( 111 sampling_frequency = 44100, 112 channel_mode = SBC_JOINT_STEREO_CHANNEL_MODE, 113 block_length = 16, 114 subbands = 8, 115 allocation_method = SBC_LOUDNESS_ALLOCATION_METHOD, 116 minimum_bitpool_value = 2, 117 maximum_bitpool_value = 53 118 ) 119 ) 120 121 122# ----------------------------------------------------------------------------- 123def sink_codec_capabilities(): 124 return MediaCodecCapabilities( 125 media_type = AVDTP_AUDIO_MEDIA_TYPE, 126 media_codec_type = A2DP_SBC_CODEC_TYPE, 127 media_codec_information = SbcMediaCodecInformation.from_lists( 128 sampling_frequencies = [48000, 44100, 32000, 16000], 129 channel_modes = [ 130 SBC_MONO_CHANNEL_MODE, 131 SBC_DUAL_CHANNEL_MODE, 132 SBC_STEREO_CHANNEL_MODE, 133 SBC_JOINT_STEREO_CHANNEL_MODE 134 ], 135 block_lengths = [4, 8, 12, 16], 136 subbands = [4, 8], 137 allocation_methods = [SBC_LOUDNESS_ALLOCATION_METHOD, SBC_SNR_ALLOCATION_METHOD], 138 minimum_bitpool_value = 2, 139 maximum_bitpool_value = 53 140 ) 141 ) 142 143 144# ----------------------------------------------------------------------------- 145@pytest.mark.asyncio 146async def test_source_sink_1(): 147 two_devices = TwoDevices() 148 await two_devices.devices[0].power_on() 149 await two_devices.devices[1].power_on() 150 151 def on_rtp_packet(packet): 152 rtp_packets.append(packet) 153 if len(rtp_packets) == rtp_packets_expected: 154 rtp_packets_fully_received.set_result(None) 155 156 sink = None 157 158 def on_avdtp_connection(server): 159 nonlocal sink 160 sink = server.add_sink(sink_codec_capabilities()) 161 sink.on('rtp_packet', on_rtp_packet) 162 163 # Create a listener to wait for AVDTP connections 164 listener = Listener(Listener.create_registrar(two_devices.devices[1])) 165 listener.on('connection', on_avdtp_connection) 166 167 connection = await two_devices.devices[0].connect(two_devices.devices[1].random_address) 168 client = await Protocol.connect(connection) 169 endpoints = await client.discover_remote_endpoints() 170 assert(len(endpoints) == 1) 171 remote_sink = list(endpoints)[0] 172 assert(remote_sink.in_use == 0) 173 assert(remote_sink.media_type == AVDTP_AUDIO_MEDIA_TYPE) 174 assert(remote_sink.tsep == AVDTP_TSEP_SNK) 175 176 async def generate_packets(packet_count): 177 sequence_number = 0 178 timestamp = 0 179 for i in range(packet_count): 180 payload = bytes([sequence_number % 256]) 181 packet = MediaPacket(2, 0, 0, 0, sequence_number, timestamp, 0, [], 96, payload) 182 packet.timestamp_seconds = timestamp / 44100 183 timestamp += 10 184 sequence_number += 1 185 yield packet 186 187 # Send packets using a pump object 188 rtp_packets_fully_received = asyncio.get_running_loop().create_future() 189 rtp_packets_expected = 3 190 rtp_packets = [] 191 pump = MediaPacketPump(generate_packets(3)) 192 source = client.add_source(source_codec_capabilities(), pump) 193 stream = await client.create_stream(source, remote_sink) 194 await stream.start() 195 assert(stream.state == AVDTP_STREAMING_STATE) 196 assert(stream.local_endpoint.in_use == 1) 197 assert(stream.rtp_channel is not None) 198 assert(sink.in_use == 1) 199 assert(sink.stream is not None) 200 assert(sink.stream.state == AVDTP_STREAMING_STATE) 201 await rtp_packets_fully_received 202 203 await stream.close() 204 assert(stream.rtp_channel is None) 205 assert(source.in_use == 0) 206 assert(source.stream.state == AVDTP_IDLE_STATE) 207 assert(sink.in_use == 0) 208 assert(sink.stream.state == AVDTP_IDLE_STATE) 209 210 # Send packets manually 211 rtp_packets_fully_received = asyncio.get_running_loop().create_future() 212 rtp_packets_expected = 3 213 rtp_packets = [] 214 source_packets = [ 215 MediaPacket(2, 0, 0, 0, i, i * 10, 0, [], 96, bytes([i])) 216 for i in range(3) 217 ] 218 source = client.add_source(source_codec_capabilities(), None) 219 stream = await client.create_stream(source, remote_sink) 220 await stream.start() 221 assert(stream.state == AVDTP_STREAMING_STATE) 222 assert(stream.local_endpoint.in_use == 1) 223 assert(stream.rtp_channel is not None) 224 assert(sink.in_use == 1) 225 assert(sink.stream is not None) 226 assert(sink.stream.state == AVDTP_STREAMING_STATE) 227 228 stream.send_media_packet(source_packets[0]) 229 stream.send_media_packet(source_packets[1]) 230 stream.send_media_packet(source_packets[2]) 231 232 await stream.close() 233 assert(stream.rtp_channel is None) 234 assert(len(rtp_packets) == 3) 235 assert(source.in_use == 0) 236 assert(source.stream.state == AVDTP_IDLE_STATE) 237 assert(sink.in_use == 0) 238 assert(sink.stream.state == AVDTP_IDLE_STATE) 239 240 241# ----------------------------------------------------------------------------- 242async def run_test_self(): 243 await test_self_connection() 244 await test_source_sink_1() 245 246 247# ----------------------------------------------------------------------------- 248if __name__ == '__main__': 249 logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 250 asyncio.run(run_test_self()) 251