1# Lint as: python2, python3 2# Copyright 2020 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Server side Bluetooth audio tests.""" 7 8from __future__ import absolute_import 9from __future__ import division 10from __future__ import print_function 11 12import logging 13import os 14import re 15import subprocess 16import time 17 18import common 19from autotest_lib.client.bin import utils 20from autotest_lib.client.common_lib import error 21from autotest_lib.client.cros.bluetooth.bluetooth_audio_test_data import ( 22 A2DP, HFP_NBS, HFP_WBS, AUDIO_DATA_TARBALL_PATH, VISQOL_BUFFER_LENGTH, 23 DATA_DIR, VISQOL_PATH, VISQOL_SIMILARITY_MODEL, VISQOL_TEST_DIR, 24 AUDIO_RECORD_DIR, audio_test_data, get_audio_test_data, 25 get_visqol_binary) 26from autotest_lib.server.cros.bluetooth.bluetooth_adapter_tests import ( 27 BluetoothAdapterTests, test_retry_and_log) 28from six.moves import range 29 30 31class BluetoothAdapterAudioTests(BluetoothAdapterTests): 32 """Server side Bluetooth adapter audio test class.""" 33 34 DEVICE_TYPE = 'BLUETOOTH_AUDIO' 35 FREQUENCY_TOLERANCE_RATIO = 0.01 36 WAIT_DAEMONS_READY_SECS = 1 37 DEFAULT_CHUNK_IN_SECS = 1 38 IGNORE_LAST_FEW_CHUNKS = 2 39 40 # Useful constant for upsampling NBS files for compatibility with ViSQOL 41 MIN_VISQOL_SAMPLE_RATE = 16000 42 43 # The node types of the bluetooth output nodes in cras are the same for both 44 # A2DP and HFP. 45 CRAS_BLUETOOTH_OUTPUT_NODE_TYPE = 'BLUETOOTH' 46 # The node types of the bluetooth input nodes in cras are different for WBS 47 # and NBS. 48 CRAS_HFP_BLUETOOTH_INPUT_NODE_TYPE = {HFP_WBS: 'BLUETOOTH', 49 HFP_NBS: 'BLUETOOTH_NB_MIC'} 50 51 def _get_pulseaudio_bluez_source(self, get_source_method, device, 52 test_profile): 53 """Get the specified bluez device number in the pulseaudio source list. 54 55 @param get_source_method: the method to get distinct bluez source 56 @param device: the bluetooth peer device 57 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 58 59 @returns: True if the specified bluez source is derived 60 """ 61 sources = device.ListSources(test_profile) 62 logging.debug('ListSources()\n%s', sources) 63 self.bluez_source = get_source_method(test_profile) 64 result = bool(self.bluez_source) 65 if result: 66 logging.debug('bluez_source device number: %s', self.bluez_source) 67 else: 68 logging.debug('waiting for bluez_source ready in pulseaudio...') 69 return result 70 71 72 def _get_pulseaudio_bluez_sink(self, get_sink_method, device, test_profile): 73 """Get the specified bluez device number in the pulseaudio sink list. 74 75 @param get_sink_method: the method to get distinct bluez sink 76 @param device: the bluetooth peer device 77 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 78 79 @returns: True if the specified bluez sink is derived 80 """ 81 sinks = device.ListSinks(test_profile) 82 logging.debug('ListSinks()\n%s', sinks) 83 self.bluez_sink = get_sink_method(test_profile) 84 result = bool(self.bluez_sink) 85 if result: 86 logging.debug('bluez_sink device number: %s', self.bluez_sink) 87 else: 88 logging.debug('waiting for bluez_sink ready in pulseaudio...') 89 return result 90 91 92 def _get_pulseaudio_bluez_source_a2dp(self, device, test_profile): 93 """Get the a2dp bluez source device number. 94 95 @param device: the bluetooth peer device 96 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 97 98 @returns: True if the specified a2dp bluez source is derived 99 """ 100 return self._get_pulseaudio_bluez_source( 101 device.GetBluezSourceA2DPDevice, device, test_profile) 102 103 104 def _get_pulseaudio_bluez_source_hfp(self, device, test_profile): 105 """Get the hfp bluez source device number. 106 107 @param device: the bluetooth peer device 108 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 109 110 @returns: True if the specified hfp bluez source is derived 111 """ 112 return self._get_pulseaudio_bluez_source( 113 device.GetBluezSourceHFPDevice, device, test_profile) 114 115 116 def _get_pulseaudio_bluez_sink_hfp(self, device, test_profile): 117 """Get the hfp bluez sink device number. 118 119 @param device: the bluetooth peer device 120 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 121 122 @returns: True if the specified hfp bluez sink is derived 123 """ 124 return self._get_pulseaudio_bluez_sink( 125 device.GetBluezSinkHFPDevice, device, test_profile) 126 127 128 def _check_audio_frames_legitimacy(self, audio_test_data, recording_device, 129 recorded_file=None): 130 """Check if audio frames in the recorded file are legitimate. 131 132 For a wav file, a simple check is to make sure the recorded audio file 133 is not empty. 134 135 For a raw file, a simple check is to make sure the recorded audio file 136 are not all zeros. 137 138 @param audio_test_data: a dictionary about the audio test data 139 defined in client/cros/bluetooth/bluetooth_audio_test_data.py 140 @param recording_device: which device recorded the audio, 141 possible values are 'recorded_by_dut' or 'recorded_by_peer' 142 @param recorded_file: the recorded file name 143 144 @returns: True if audio frames are legitimate. 145 """ 146 result = self.bluetooth_facade.check_audio_frames_legitimacy( 147 audio_test_data, recording_device, recorded_file) 148 if not result: 149 self.results = {'audio_frames_legitimacy': 'empty or all zeros'} 150 logging.error('The recorded audio file is empty or all zeros.') 151 return result 152 153 154 def _check_frequency(self, test_profile, recorded_freq, expected_freq): 155 """Check if the recorded frequency is within tolerance. 156 157 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 158 @param recorded_freq: the frequency of recorded audio 159 @param expected_freq: the expected frequency 160 161 @returns: True if the recoreded frequency falls within the tolerance of 162 the expected frequency 163 """ 164 tolerance = expected_freq * self.FREQUENCY_TOLERANCE_RATIO 165 return abs(expected_freq - recorded_freq) <= tolerance 166 167 168 def _check_primary_frequencies(self, test_profile, audio_test_data, 169 recording_device, recorded_file=None): 170 """Check if the recorded frequencies meet expectation. 171 172 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 173 @param audio_test_data: a dictionary about the audio test data 174 defined in client/cros/bluetooth/bluetooth_audio_test_data.py 175 @param recording_device: which device recorded the audio, 176 possible values are 'recorded_by_dut' or 'recorded_by_peer' 177 @param recorded_file: the recorded file name 178 179 @returns: True if the recorded frequencies of all channels fall within 180 the tolerance of expected frequencies 181 """ 182 recorded_frequencies = self.bluetooth_facade.get_primary_frequencies( 183 audio_test_data, recording_device, recorded_file) 184 expected_frequencies = audio_test_data['frequencies'] 185 final_result = True 186 self.results = dict() 187 188 if len(recorded_frequencies) < len(expected_frequencies): 189 logging.error('recorded_frequencies: %s, expected_frequencies: %s', 190 str(recorded_frequencies), str(expected_frequencies)) 191 final_result = False 192 else: 193 for channel, expected_freq in enumerate(expected_frequencies): 194 recorded_freq = recorded_frequencies[channel] 195 ret_val = self._check_frequency( 196 test_profile, recorded_freq, expected_freq) 197 pass_fail_str = 'pass' if ret_val else 'fail' 198 result = ('primary frequency %d (expected %d): %s' % 199 (recorded_freq, expected_freq, pass_fail_str)) 200 self.results['Channel %d' % channel] = result 201 logging.info('Channel %d: %s', channel, result) 202 203 if not ret_val: 204 final_result = False 205 206 logging.debug(str(self.results)) 207 if not final_result: 208 logging.error('Failure at checking primary frequencies') 209 return final_result 210 211 212 def _poll_for_condition(self, condition, timeout=20, sleep_interval=1, 213 desc='waiting for condition'): 214 try: 215 utils.poll_for_condition(condition=condition, 216 timeout=timeout, 217 sleep_interval=sleep_interval, 218 desc=desc) 219 except Exception as e: 220 raise error.TestError('Exception occurred when %s (%s)' % (desc, e)) 221 222 223 def initialize_bluetooth_audio(self, device, test_profile): 224 """Initialize the Bluetooth audio task. 225 226 Note: pulseaudio is not stable. Need to restart it in the beginning. 227 228 @param device: the bluetooth peer device 229 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 230 231 """ 232 if not self.bluetooth_facade.create_audio_record_directory( 233 AUDIO_RECORD_DIR): 234 raise error.TestError('Failed to create %s on the DUT' % 235 AUDIO_RECORD_DIR) 236 237 if not device.StartPulseaudio(test_profile): 238 raise error.TestError('Failed to start pulseaudio.') 239 logging.debug('pulseaudio is started.') 240 241 if test_profile in (HFP_WBS, HFP_NBS): 242 if device.StartOfono(): 243 logging.debug('ofono is started.') 244 else: 245 raise error.TestError('Failed to start ofono.') 246 elif device.StopOfono(): 247 logging.debug('ofono is stopped.') 248 else: 249 logging.warn('Failed to stop ofono. Ignored.') 250 251 # Need time to complete starting services. 252 time.sleep(self.WAIT_DAEMONS_READY_SECS) 253 254 255 def cleanup_bluetooth_audio(self, device, test_profile): 256 """Cleanup for Bluetooth audio. 257 258 @param device: the bluetooth peer device 259 @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS 260 261 """ 262 if device.StopPulseaudio(): 263 logging.debug('pulseaudio is stopped.') 264 else: 265 logging.warn('Failed to stop pulseaudio. Ignored.') 266 267 if device.StopOfono(): 268 logging.debug('ofono is stopped.') 269 else: 270 logging.warn('Failed to stop ofono. Ignored.') 271 272 273 def initialize_bluetooth_player(self, device): 274 """Initialize the Bluetooth media player. 275 276 @param device: the Bluetooth peer device. 277 278 """ 279 if not device.ExportMediaPlayer(): 280 raise error.TestError('Failed to export media player.') 281 logging.debug('mpris-proxy is started.') 282 283 # Wait for player to show up and observed by playerctl. 284 desc='waiting for media player' 285 self._poll_for_condition( 286 lambda: bool(device.GetExportedMediaPlayer()), desc=desc) 287 288 289 def cleanup_bluetooth_player(self, device): 290 """Cleanup for Bluetooth media player. 291 292 @param device: the bluetooth peer device. 293 294 """ 295 device.UnexportMediaPlayer() 296 297 298 def select_audio_output_node(self): 299 """Select the audio output node through cras. 300 301 @raises: error.TestError if failed. 302 """ 303 def bluetooth_type_selected(node_type): 304 """Check if the bluetooth node type is selected.""" 305 selected = self.bluetooth_facade.get_selected_output_device_type() 306 logging.debug('active output node type: %s, expected %s', 307 selected, node_type) 308 return selected == node_type 309 310 node_type = self.CRAS_BLUETOOTH_OUTPUT_NODE_TYPE 311 if not self.bluetooth_facade.select_output_node(node_type): 312 raise error.TestError('select_output_node failed') 313 314 desc='waiting for %s as active cras audio output node type' % node_type 315 logging.debug(desc) 316 self._poll_for_condition(lambda: bluetooth_type_selected(node_type), 317 desc=desc) 318 319 320 def initialize_hfp(self, device, test_profile, test_data, 321 recording_device, bluez_function): 322 """Initial set up for hfp tests. 323 324 Setup that is required for all hfp tests where 325 dut is either source or sink. Selects input device, starts recording, 326 and lastly it waits for pulseaudio bluez source/sink. 327 328 @param device: the bluetooth peer device 329 @param test_profile: the test profile used, HFP_WBS or HFP_NBS 330 @param test_data: a dictionary about the audio test data defined in 331 client/cros/bluetooth/bluetooth_audio_test_data.py 332 @param recording_device: which device recorded the audio, possible 333 values are 'recorded_by_dut' or 'recorded_by_peer' 334 @param bluez_function: the appropriate bluez hfp function either 335 _get_pulseaudio_bluez_source_hfp or 336 _get_pulseaudio_bluez_sink_hfp depending on the role of the dut 337 """ 338 device_type = 'DUT' if recording_device == 'recorded_by_dut' else 'Peer' 339 dut_role = 'sink' if recording_device == 'recorded_by_dut' else 'source' 340 341 # Select audio input device. 342 desc = 'waiting for cras to select audio input device' 343 logging.debug(desc) 344 self._poll_for_condition( 345 lambda: self.bluetooth_facade.select_input_device(device.name), 346 desc=desc) 347 348 # Select audio output node so that we do not rely on chrome to do it. 349 self.select_audio_output_node() 350 351 # Enable HFP profile. 352 logging.debug('Start recording audio on {}'.format(device_type)) 353 if not self.bluetooth_facade.start_capturing_audio_subprocess( 354 test_data, recording_device): 355 desc = '{} failed to start capturing audio.'.format(device_type) 356 raise error.TestError(desc) 357 358 # Wait for pulseaudio bluez hfp source/sink 359 desc = 'waiting for pulseaudio bluez hfp {}'.format(dut_role) 360 logging.debug(desc) 361 self._poll_for_condition(lambda: bluez_function(device, test_profile), 362 desc=desc) 363 364 365 def hfp_record_on_dut(self, device, test_profile, test_data): 366 """Play audio from test_data dictionary from peer device to dut. 367 368 Play file described in test_data dictionary from peer device to dut 369 using test_profile, either HFP_WBS or HFP_NBS and record on dut. 370 371 @param device: the bluetooth peer device 372 @param test_profile: the test profile used, HFP_WBS or HFP_NBS 373 @param test_data: a dictionary about the audio test data defined in 374 client/cros/bluetooth/bluetooth_audio_test_data.py 375 376 @returns: True if the recorded audio frames are legitimate, False 377 if they are not, ie. it did not record. 378 """ 379 # Select audio input device. 380 logging.debug('Select input device') 381 if not self.bluetooth_facade.select_input_device(device.name): 382 raise error.TestError('DUT failed to select audio input device.') 383 384 # Start playing audio on chameleon. 385 logging.debug('Start playing audio on Pi') 386 if not device.StartPlayingAudioSubprocess(test_profile, test_data): 387 err = 'Failed to start playing audio file on the peer device' 388 raise error.TestError(err) 389 390 time.sleep(test_data['duration']) 391 392 # Stop playing audio on chameleon. 393 logging.debug('Stop playing audio on Pi') 394 if not device.StopPlayingAudioSubprocess(): 395 err = 'Failed to stop playing audio on the peer device' 396 raise error.TestError(err) 397 398 # Disable HFP profile. 399 logging.debug('Stop recording audio on DUT') 400 if not self.bluetooth_facade.stop_capturing_audio_subprocess(): 401 raise error.TestError('DUT failed to stop capturing audio.') 402 403 # Check if the audio frames in the recorded file are legitimate. 404 return self._check_audio_frames_legitimacy(test_data, 'recorded_by_dut') 405 406 407 def hfp_record_on_peer(self, device, test_profile, test_data): 408 """Play audio from test_data dictionary from dut to peer device. 409 410 Play file described in test_data dictionary from dut to peer device 411 using test_profile, either HFP_WBS or HFP_NBS and record on peer. 412 413 @param device: The bluetooth peer device. 414 @param test_profile: The test profile used, HFP_WBS or HFP_NBS. 415 @param test_data: A dictionary about the audio test data defined in 416 client/cros/bluetooth/bluetooth_audio_test_data.py. 417 418 @returns: True if the recorded audio frames are legitimate, False 419 if they are not, ie. it did not record. 420 """ 421 logging.debug('Start recording audio on Pi') 422 # Start recording audio on the peer Bluetooth audio device. 423 if not device.StartRecordingAudioSubprocess(test_profile, test_data): 424 raise error.TestError( 425 'Failed to record on the peer Bluetooth audio device.') 426 427 # Play audio on the DUT in a non-blocked way. 428 # If there are issues, cras_test_client playing back might be blocked 429 # forever. We would like to avoid the testing procedure from that. 430 logging.debug('Start playing audio') 431 if not self.bluetooth_facade.start_playing_audio_subprocess(test_data): 432 raise error.TestError('DUT failed to play audio.') 433 434 time.sleep(test_data['duration']) 435 436 logging.debug('Stop recording audio on Pi') 437 # Stop recording audio on the peer Bluetooth audio device. 438 if not device.StopRecordingingAudioSubprocess(): 439 msg = 'Failed to stop recording on the peer Bluetooth audio device' 440 logging.error(msg) 441 442 # Disable HFP profile. 443 logging.debug('Stop recording audio on DUT') 444 if not self.bluetooth_facade.stop_capturing_audio_subprocess(): 445 raise error.TestError('DUT failed to stop capturing audio.') 446 447 # Stop playing audio on DUT. 448 logging.debug('Stop playing audio on DUT') 449 if not self.bluetooth_facade.stop_playing_audio_subprocess(): 450 raise error.TestError('DUT failed to stop playing audio.') 451 452 # Copy the recorded audio file to the DUT for spectrum analysis. 453 logging.debug('Scp to DUT') 454 recorded_file = test_data['recorded_by_peer'] 455 device.ScpToDut(recorded_file, recorded_file, self.host.ip) 456 457 # Check if the audio frames in the recorded file are legitimate. 458 return self._check_audio_frames_legitimacy(test_data, 459 'recorded_by_peer') 460 461 462 def parse_visqol_output(self, stdout, stderr): 463 """ 464 Parse stdout and stderr string from VISQOL output and parse into 465 a float score. 466 467 On error, stderr will contain the error message, otherwise will be None. 468 On success, stdout will be a string, first line will be 469 VISQOL version, followed by indication of speech mode. Followed by 470 paths to reference and degraded file, and a float MOS-LQO score, which 471 is what we're interested in. Followed by more detailed charts about 472 specific scoring by segments of the files. Stdout is None on error. 473 474 @param stdout: The stdout bytes from commandline output of VISQOL. 475 @param stderr: The stderr bytes from commandline output of VISQOL. 476 477 @returns: A tuple of a float score and string representation of the 478 srderr or None if there was no error. 479 """ 480 string_out = stdout or '' 481 482 # Log verbose VISQOL output: 483 log_file = os.path.join(VISQOL_TEST_DIR, 'VISQOL_LOG.txt') 484 with open(log_file, 'w+') as f: 485 f.write('String Error:\n{}\n'.format(stderr)) 486 f.write('String Out:\n{}\n'.format(stdout)) 487 488 # pattern matches first float or int after 'MOS-LQO:' in stdout, 489 # e.g. it would match the line 'MOS-LQO 2.3' in the stdout 490 score_pattern = re.compile(r'.*MOS-LQO:\s*(\d+.?\d*)') 491 score_search = re.search(score_pattern, string_out) 492 493 # re.search returns None if no pattern match found, otherwise the score 494 # would be in the match object's group 1 matches just the float score 495 score = float(score_search.group(1)) if score_search else -1.0 496 return stderr, score 497 498 499 def get_visqol_score(self, ref_file, deg_file, speech_mode=True, 500 verbose=True): 501 """ 502 Runs VISQOL using the subprocess library on the provided reference file 503 and degraded file and returns the VISQOL score. 504 505 @param ref_file: File path to the reference wav file. 506 @param deg_file: File path to the degraded wav file. 507 @param speech_mode: [Optional] Defaults to True, accepts 16k sample 508 rate files and ignores frequencies > 8kHz for scoring. 509 @param verbose: [Optional] Defaults to True, outputs more details. 510 511 @returns: A float score for the tested file. 512 """ 513 visqol_cmd = [VISQOL_PATH] 514 visqol_cmd += ['--reference_file', ref_file] 515 visqol_cmd += ['--degraded_file', deg_file] 516 visqol_cmd += ['--similarity_to_quality_model', VISQOL_SIMILARITY_MODEL] 517 518 if speech_mode: 519 visqol_cmd.append('--use_speech_mode') 520 if verbose: 521 visqol_cmd.append('--verbose') 522 523 visqol_process = subprocess.Popen(visqol_cmd, stdout=subprocess.PIPE, 524 stderr=subprocess.PIPE) 525 stdout, stderr = visqol_process.communicate() 526 527 err, score = self.parse_visqol_output(stdout, stderr) 528 529 if err: 530 raise error.TestError(err) 531 elif score < 0.0: 532 raise error.TestError('Failed to parse score, got {}'.format(score)) 533 534 return score 535 536 537 def get_ref_and_deg_files(self, trimmed_file, test_profile, test_data): 538 """Return path for reference and degraded files to run visqol on. 539 540 @param trimmed_file: Path to the trimmed audio file on DUT. 541 @param test_profile: The test profile used HFP_WBS or HFP_NBS. 542 @param test_data: A dictionary about the audio test data defined in 543 client/cros/bluetooth/bluetooth_audio_test_data.py. 544 545 @returns: A tuple of path to the reference file and degraded file if 546 they exist, otherwise False for the files that aren't available. 547 """ 548 # Path in autotest server in ViSQOL folder to store degraded file from 549 # retrieved from the DUT 550 deg_file = os.path.join(VISQOL_TEST_DIR, os.path.split(trimmed_file)[1]) 551 played_file = test_data['file'] 552 # If profile is WBS, no resampling required 553 if test_profile == HFP_WBS: 554 self.host.get_file(trimmed_file, deg_file) 555 return played_file, deg_file 556 557 # On NBS, degraded and reference files need to be resampled to 16 kHz 558 # Build path for the upsampled (us) reference (ref) file on DUT 559 ref_file = '{}_us_ref{}'.format(*os.path.splitext(played_file)) 560 # If resampled ref file already exists, don't need to do it again 561 if not os.path.isfile(ref_file): 562 if not self.bluetooth_facade.convert_audio_sample_rate( 563 played_file, ref_file, test_data, 564 self.MIN_VISQOL_SAMPLE_RATE): 565 return False, False 566 # Move upsampled reference file to autotest server 567 self.host.get_file(ref_file, ref_file) 568 569 # Build path for resampled degraded file on DUT 570 deg_on_dut = '{}_us{}'.format(*os.path.splitext(trimmed_file)) 571 # Resample degraded file to 16 kHz and move to autotest server 572 if not self.bluetooth_facade.convert_audio_sample_rate( 573 trimmed_file, deg_on_dut, test_data, 574 self.MIN_VISQOL_SAMPLE_RATE): 575 return ref_file, False 576 577 self.host.get_file(deg_on_dut, deg_file) 578 579 return ref_file, deg_file 580 581 582 def format_recorded_file(self, test_data, test_profile, recording_device): 583 """Format recorded files to be compatible with ViSQOL. 584 585 Convert raw files to wav if recorded file is a raw file, trim file to 586 duration, if required, resample the file, then lastly return the paths 587 for the reference file and degraded file on the autotest server. 588 589 @param test_data: A dictionary about the audio test data defined in 590 client/cros/bluetooth/bluetooth_audio_test_data.py. 591 @param test_profile: The test profile used, HFP_WBS or HFP_NBS. 592 @param recording_device: Which device recorded the audio, either 593 'recorded_by_dut' or 'recorded_by_peer'. 594 595 @returns: A tuple of path to the reference file and degraded file if 596 they exist, otherwise False for the files that aren't available. 597 """ 598 # Path to recorded file either on DUT or BT peer 599 recorded_file = test_data[recording_device] 600 untrimmed_file = recorded_file 601 if recorded_file.endswith('.raw'): 602 # build path for file converted from raw to wav, i.e. change the ext 603 untrimmed_file = os.path.splitext(recorded_file)[0] + '.wav' 604 if not self.bluetooth_facade.convert_raw_to_wav( 605 recorded_file, untrimmed_file, test_data): 606 raise error.TestError('Could not convert raw file to wav') 607 608 # Compute the duration of played file without added buffer 609 new_duration = test_data['duration'] - VISQOL_BUFFER_LENGTH 610 # build path for file resulting from trimming to desired duration 611 trimmed_file = '{}_t{}'.format(*os.path.splitext(untrimmed_file)) 612 if not self.bluetooth_facade.trim_wav_file( 613 untrimmed_file, trimmed_file, new_duration, test_data): 614 raise error.TestError('Failed to trim recorded file') 615 616 return self.get_ref_and_deg_files(trimmed_file, test_profile, test_data) 617 618 619 def handle_chunks(self, device, test_profile, test_data, duration): 620 """Handle chunks of recorded streams and verify the primary frequencies. 621 622 @param device: the bluetooth peer device 623 @param test_profile: the a2dp test profile; 624 choices are A2DP and A2DP_LONG 625 @param test_data: the test data of the test profile 626 @param duration: the duration of the audio file to test 627 628 @returns: True if all chunks pass the frequencies check. 629 """ 630 chunk_in_secs = test_data['chunk_in_secs'] 631 if not bool(chunk_in_secs): 632 chunk_in_secs = self.DEFAULT_CHUNK_IN_SECS 633 nchunks = duration // chunk_in_secs 634 logging.info('Number of chunks: %d', nchunks) 635 636 all_chunks_test_result = True 637 for i in range(nchunks): 638 logging.info('Handle chunk %d', i) 639 recorded_file = device.HandleOneChunk(chunk_in_secs, i, 640 test_profile, self.host.ip) 641 if recorded_file is None: 642 raise error.TestError('Failed to handle chunk %d' % i) 643 644 # Check if the audio frames in the recorded file are legitimate. 645 if not self._check_audio_frames_legitimacy( 646 test_data, 'recorded_by_peer', recorded_file=recorded_file): 647 if (i > self.IGNORE_LAST_FEW_CHUNKS and 648 i >= nchunks - self.IGNORE_LAST_FEW_CHUNKS): 649 logging.info('empty chunk %d ignored for last %d chunks', 650 i, self.IGNORE_LAST_FEW_CHUNKS) 651 else: 652 all_chunks_test_result = False 653 break 654 655 # Check if the primary frequencies of the recorded file 656 # meet expectation. 657 if not self._check_primary_frequencies(A2DP, test_data, 658 'recorded_by_peer', 659 recorded_file=recorded_file): 660 if (i > self.IGNORE_LAST_FEW_CHUNKS and 661 i >= nchunks - self.IGNORE_LAST_FEW_CHUNKS): 662 msg = 'partially filled chunk %d ignored for last %d chunks' 663 logging.info(msg, i, self.IGNORE_LAST_FEW_CHUNKS) 664 else: 665 all_chunks_test_result = False 666 break 667 668 return all_chunks_test_result 669 670 671 # --------------------------------------------------------------- 672 # Definitions of all bluetooth audio test cases 673 # --------------------------------------------------------------- 674 675 676 @test_retry_and_log(False) 677 def test_hfp_dut_as_source_visqol_score(self, device, test_profile): 678 """Test Case: hfp test files streaming from peer device to dut 679 680 @param device: the bluetooth peer device 681 @param test_profile: which test profile is used, HFP_WBS or HFP_NBS 682 683 @returns: True if the all the test files score at or above their 684 source_passing_score value as defined in 685 bluetooth_audio_test_data.py 686 """ 687 # list of test wav files 688 hfp_test_data = audio_test_data[test_profile] 689 test_files = hfp_test_data['visqol_test_files'] 690 691 get_visqol_binary() 692 get_audio_test_data() 693 694 # Download test data to DUT 695 self.host.send_file(AUDIO_DATA_TARBALL_PATH, AUDIO_DATA_TARBALL_PATH) 696 if not self.bluetooth_facade.unzip_audio_test_data( 697 AUDIO_DATA_TARBALL_PATH, DATA_DIR): 698 logging.error('Audio data directory not found in DUT') 699 raise error.TestError('Failed to unzip audio test data to DUT') 700 701 # Result of visqol test on all files 702 visqol_results = dict() 703 704 for test_file in test_files: 705 filename = os.path.split(test_file['file'])[1] 706 logging.debug('Testing file: {}'.format(filename)) 707 708 # Set up hfp test to record on peer 709 self.initialize_hfp(device, test_profile, test_file, 710 'recorded_by_peer', 711 self._get_pulseaudio_bluez_source_hfp) 712 logging.debug('Initialized HFP') 713 714 if not self.hfp_record_on_peer(device, test_profile, test_file): 715 return False 716 logging.debug('Recorded {} successfully'.format(filename)) 717 718 ref_file, deg_file = self.format_recorded_file(test_file, 719 test_profile, 720 'recorded_by_peer') 721 if not ref_file or not deg_file: 722 desc = 'Failed to get ref and deg file: ref {}, deg {}'.format( 723 ref_file, deg_file) 724 raise error.TestError(desc) 725 726 score = self.get_visqol_score(ref_file, deg_file, 727 speech_mode=test_file['speech_mode']) 728 729 logging.info('{} scored {}, min passing score: {}'.format( 730 filename, score, test_file['source_passing_score'])) 731 passed = score >= test_file['source_passing_score'] 732 visqol_results[filename] = passed 733 734 if not passed: 735 logging.warning('Failed: {}'.format(filename)) 736 737 return all(visqol_results.values()) 738 739 740 @test_retry_and_log(False) 741 def test_hfp_dut_as_sink_visqol_score(self, device, test_profile): 742 """Test Case: hfp test files streaming from peer device to dut 743 744 @param device: the bluetooth peer device 745 @param test_profile: which test profile is used, HFP_WBS or HFP_NBS 746 747 @returns: True if the all the test files score at or above their 748 sink_passing_score value as defined in 749 bluetooth_audio_test_data.py 750 """ 751 # list of test wav files 752 hfp_test_data = audio_test_data[test_profile] 753 test_files = hfp_test_data['visqol_test_files'] 754 755 get_visqol_binary() 756 get_audio_test_data() 757 self.host.send_file(AUDIO_DATA_TARBALL_PATH, AUDIO_DATA_TARBALL_PATH) 758 if not self.bluetooth_facade.unzip_audio_test_data( 759 AUDIO_DATA_TARBALL_PATH, DATA_DIR): 760 logging.error('Audio data directory not found in DUT') 761 raise error.TestError('Failed to unzip audio test data to DUT') 762 763 # Result of visqol test on all files 764 visqol_results = dict() 765 766 for test_file in test_files: 767 filename = os.path.split(test_file['file'])[1] 768 logging.debug('Testing file: {}'.format(filename)) 769 770 # Set up hfp test to record on dut 771 self.initialize_hfp(device, test_profile, test_file, 772 'recorded_by_dut', 773 self._get_pulseaudio_bluez_sink_hfp) 774 logging.debug('Initialized HFP') 775 # Record audio on dut played from pi, returns true if anything 776 # was successfully recorded, false otherwise 777 if not self.hfp_record_on_dut(device, test_profile, test_file): 778 return False 779 logging.debug('Recorded {} successfully'.format(filename)) 780 781 ref_file, deg_file = self.format_recorded_file(test_file, 782 test_profile, 783 'recorded_by_dut') 784 if not ref_file or not deg_file: 785 desc = 'Failed to get ref and deg file: ref {}, deg {}'.format( 786 ref_file, deg_file) 787 raise error.TestError(desc) 788 789 score = self.get_visqol_score(ref_file, deg_file, 790 speech_mode=test_file['speech_mode']) 791 792 logging.info('{} scored {}, min passing score: {}'.format( 793 filename, score, test_file['sink_passing_score'])) 794 passed = score >= test_file['sink_passing_score'] 795 visqol_results[filename] = passed 796 797 if not passed: 798 logging.warning('Failed: {}'.format(filename)) 799 800 return all(visqol_results.values()) 801 802 @test_retry_and_log(False) 803 def test_device_a2dp_connected(self, device, timeout=15): 804 """ Tests a2dp profile is connected on device. """ 805 self.results = {} 806 check_connection = lambda: self._get_pulseaudio_bluez_source_a2dp( 807 device, A2DP) 808 is_connected = self._wait_for_condition(check_connection, 809 'test_device_a2dp_connected', 810 timeout=timeout) 811 self.results['peer a2dp connected'] = is_connected 812 813 return all(self.results.values()) 814 815 @test_retry_and_log(False) 816 def test_a2dp_sinewaves(self, device, test_profile, duration): 817 """Test Case: a2dp sinewaves 818 819 @param device: the bluetooth peer device 820 @param test_profile: the a2dp test profile; 821 choices are A2DP and A2DP_LONG 822 @param duration: the duration of the audio file to test 823 0 means to use the default value in the test profile 824 825 @returns: True if the recorded primary frequency is within the 826 tolerance of the playback sine wave frequency. 827 828 """ 829 # Make a copy since the test_data may be formatted with distinct 830 # arguments in the follow-up tests. 831 test_data = audio_test_data[test_profile].copy() 832 if bool(duration): 833 test_data['duration'] = duration 834 else: 835 duration = test_data['duration'] 836 837 test_data['file'] %= duration 838 logging.info('%s test for %d seconds.', test_profile, duration) 839 840 # Wait for pulseaudio a2dp bluez source 841 desc = 'waiting for pulseaudio a2dp bluez source' 842 logging.debug(desc) 843 self._poll_for_condition( 844 lambda: self._get_pulseaudio_bluez_source_a2dp(device, 845 test_profile), 846 desc=desc) 847 848 # Select audio output node so that we do not rely on chrome to do it. 849 self.select_audio_output_node() 850 851 # Start recording audio on the peer Bluetooth audio device. 852 logging.debug('Start recording a2dp') 853 if not device.StartRecordingAudioSubprocess(test_profile, test_data): 854 raise error.TestError( 855 'Failed to record on the peer Bluetooth audio device.') 856 857 # Play audio on the DUT in a non-blocked way and check the recorded 858 # audio stream in a real-time manner. 859 logging.debug('Start playing audio') 860 if not self.bluetooth_facade.start_playing_audio_subprocess(test_data): 861 raise error.TestError('DUT failed to play audio.') 862 863 # Handle chunks of recorded streams and verify the primary frequencies. 864 # This is a blocking call until all chunks are completed. 865 all_chunks_test_result = self.handle_chunks(device, test_profile, 866 test_data, duration) 867 868 # Stop recording audio on the peer Bluetooth audio device. 869 logging.debug('Stop recording a2dp') 870 if not device.StopRecordingingAudioSubprocess(): 871 msg = 'Failed to stop recording on the peer Bluetooth audio device' 872 logging.error(msg) 873 874 # Stop playing audio on DUT. 875 logging.debug('Stop playing audio on DUT') 876 if not self.bluetooth_facade.stop_playing_audio_subprocess(): 877 raise error.TestError('DUT failed to stop playing audio.') 878 879 return all_chunks_test_result 880 881 @test_retry_and_log(False) 882 def test_hfp_dut_as_source(self, device, test_profile): 883 """Test Case: hfp sinewave streaming from dut to peer device 884 885 @param device: the bluetooth peer device 886 @param test_profile: which test profile is used, HFP_WBS or HFP_NBS 887 888 @returns: True if the recorded primary frequency is within the 889 tolerance of the playback sine wave frequency. 890 """ 891 hfp_test_data = audio_test_data[test_profile] 892 893 self.initialize_hfp(device, test_profile, hfp_test_data, 894 'recorded_by_peer', 895 self._get_pulseaudio_bluez_source_hfp) 896 897 if not self.hfp_record_on_peer(device, test_profile, hfp_test_data): 898 return False 899 900 # Check if the primary frequencies of recorded file meet expectation. 901 check_freq_result = self._check_primary_frequencies( 902 test_profile, hfp_test_data, 'recorded_by_peer') 903 return check_freq_result 904 905 906 @test_retry_and_log(False) 907 def test_hfp_dut_as_sink(self, device, test_profile): 908 """Test Case: hfp sinewave streaming from peer device to dut 909 910 @param device: the bluetooth peer device 911 @param test_profile: which test profile is used, HFP_WBS or HFP_NBS 912 913 @returns: True if the recorded primary frequency is within the 914 tolerance of the playback sine wave frequency. 915 916 """ 917 hfp_test_data = audio_test_data[test_profile] 918 919 # Set up hfp test to record on dut 920 self.initialize_hfp(device, test_profile, hfp_test_data, 921 'recorded_by_dut', 922 self._get_pulseaudio_bluez_sink_hfp) 923 924 # Record audio on dut play from pi, returns true if anything recorded 925 if not self.hfp_record_on_dut(device, test_profile, hfp_test_data): 926 return False 927 928 # Check if the primary frequencies of recorded file meet expectation. 929 check_freq_result = self._check_primary_frequencies( 930 test_profile, hfp_test_data, 'recorded_by_dut') 931 return check_freq_result 932 933 934 @test_retry_and_log(False) 935 def test_avrcp_commands(self, device): 936 """Test Case: Test AVRCP commands issued by peer can be received at DUT 937 938 The very first AVRCP command (Linux evdev event) the DUT receives 939 contains extra information than just the AVRCP event, e.g. EV_REP 940 report used to specify delay settings. Send the first command before 941 the actual test starts to avoid dealing with them during test. 942 943 The peer device name is required to monitor the event reception on the 944 DUT. However, as the peer device itself already registered with the 945 kernel as an udev input device. The AVRCP profile will register as an 946 separate input device with the name pattern: name + (AVRCP), e.g. 947 RASPI_AUDIO (AVRCP). Using 'AVRCP' as device name to help search for 948 the device. 949 950 @param device: the Bluetooth peer device 951 952 @returns: True if the all AVRCP commands received by DUT, false 953 otherwise 954 955 """ 956 device.SendMediaPlayerCommand('play') 957 958 name = device.name 959 device.name = 'AVRCP' 960 961 result_pause = self.test_avrcp_event(device, 962 device.SendMediaPlayerCommand, 'pause') 963 result_play = self.test_avrcp_event(device, 964 device.SendMediaPlayerCommand, 'play') 965 result_stop = self.test_avrcp_event(device, 966 device.SendMediaPlayerCommand, 'stop') 967 result_next = self.test_avrcp_event(device, 968 device.SendMediaPlayerCommand, 'next') 969 result_previous = self.test_avrcp_event(device, 970 device.SendMediaPlayerCommand, 'previous') 971 972 device.name = name 973 self.results = {'pause': result_pause, 'play': result_play, 974 'stop': result_stop, 'next': result_next, 975 'previous': result_previous} 976 return all(self.results.values()) 977 978 979 @test_retry_and_log(False) 980 def test_avrcp_media_info(self, device): 981 """Test Case: Test AVRCP media info sent by DUT can be received by peer 982 983 The test update all media information twice to prevent previous 984 leftover data affect the current iteration of test. Then compare the 985 expected results against the information received on the peer device. 986 987 This test verifies media information including: playback status, 988 length, title, artist, and album. Position of the media is not 989 currently support as playerctl on the peer side cannot correctly 990 retrieve such information. 991 992 Length and position information are transmitted in the unit of 993 microsecond. However, BlueZ process those time data in the resolution 994 of millisecond. Discard microsecond detail when comparing those media 995 information. 996 997 @param device: the Bluetooth peer device 998 999 @returns: True if the all AVRCP media info received by DUT, false 1000 otherwise 1001 1002 """ 1003 # First round of updating media information to overwrite all leftovers. 1004 init_status = 'stopped' 1005 init_length = 20200414 1006 init_position = 8686868 1007 init_metadata = {'album': 'metadata_album_init', 1008 'artist': 'metadata_artist_init', 1009 'title': 'metadata_title_init'} 1010 self.bluetooth_facade.set_player_playback_status(init_status) 1011 self.bluetooth_facade.set_player_length(init_length) 1012 self.bluetooth_facade.set_player_position(init_position) 1013 self.bluetooth_facade.set_player_metadata(init_metadata) 1014 1015 # Second round of updating for actual testing. 1016 expected_status = 'playing' 1017 expected_length = 68686868 1018 expected_position = 20200414 1019 expected_metadata = {'album': 'metadata_album_expected', 1020 'artist': 'metadata_artist_expected', 1021 'title': 'metadata_title_expected'} 1022 self.bluetooth_facade.set_player_playback_status(expected_status) 1023 self.bluetooth_facade.set_player_length(expected_length) 1024 self.bluetooth_facade.set_player_position(expected_position) 1025 self.bluetooth_facade.set_player_metadata(expected_metadata) 1026 1027 received_media_info = device.GetMediaPlayerMediaInfo() 1028 logging.debug(received_media_info) 1029 1030 try: 1031 actual_length = int(received_media_info.get('length')) 1032 except: 1033 actual_length = 0 1034 1035 result_status = bool(expected_status == 1036 received_media_info.get('status').lower()) 1037 result_album = bool(expected_metadata['album'] == 1038 received_media_info.get('album')) 1039 result_artist = bool(expected_metadata['artist'] == 1040 received_media_info.get('artist')) 1041 result_title = bool(expected_metadata['title'] == 1042 received_media_info.get('title')) 1043 # The AVRCP time information is in the unit of microseconds but with 1044 # milliseconds resolution. Convert both send and received length into 1045 # milliseconds for comparison. 1046 result_length = bool(expected_length // 1000 == actual_length // 1000) 1047 1048 self.results = {'status': result_status, 'album': result_album, 1049 'artist': result_artist, 'title': result_title, 1050 'length': result_length} 1051 return all(self.results.values()) 1052