1# Lint as: python2, python3 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""This module provides the test utilities for audio tests using chameleon.""" 6 7# TODO (cychiang) Move test utilities from chameleon_audio_helpers 8# to this module. 9 10from __future__ import absolute_import 11from __future__ import division 12from __future__ import print_function 13 14import logging 15import multiprocessing 16import os 17import pprint 18import re 19from contextlib import contextmanager 20 21from autotest_lib.client.common_lib import error 22from autotest_lib.client.bin import utils 23from autotest_lib.client.cros import constants 24from autotest_lib.client.cros.audio import audio_analysis 25from autotest_lib.client.cros.audio import audio_spec 26from autotest_lib.client.cros.audio import audio_data 27from autotest_lib.client.cros.audio import audio_helper 28from autotest_lib.client.cros.audio import audio_quality_measurement 29from autotest_lib.client.cros.chameleon import chameleon_audio_ids 30from six.moves import range 31 32CHAMELEON_AUDIO_IDS_TO_CRAS_NODE_TYPES = { 33 chameleon_audio_ids.CrosIds.HDMI: 'HDMI', 34 chameleon_audio_ids.CrosIds.HEADPHONE: 'HEADPHONE', 35 chameleon_audio_ids.CrosIds.EXTERNAL_MIC: 'MIC', 36 chameleon_audio_ids.CrosIds.SPEAKER: 'INTERNAL_SPEAKER', 37 chameleon_audio_ids.CrosIds.INTERNAL_MIC: 'INTERNAL_MIC', 38 chameleon_audio_ids.CrosIds.BLUETOOTH_HEADPHONE: 'BLUETOOTH', 39 chameleon_audio_ids.CrosIds.BLUETOOTH_MIC: 'BLUETOOTH', 40 chameleon_audio_ids.CrosIds.USBIN: 'USB', 41 chameleon_audio_ids.CrosIds.USBOUT: 'USB', 42} 43 44 45def cros_port_id_to_cras_node_type(port_id): 46 """Gets Cras node type from Cros port id. 47 48 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 49 50 @returns: A Cras node type defined in cras_utils.CRAS_NODE_TYPES. 51 52 """ 53 return CHAMELEON_AUDIO_IDS_TO_CRAS_NODE_TYPES[port_id] 54 55 56def check_output_port(audio_facade, port_id): 57 """Checks selected output node on Cros device is correct for a port. 58 59 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 60 61 """ 62 output_node_type = cros_port_id_to_cras_node_type(port_id) 63 check_audio_nodes(audio_facade, ([output_node_type], None)) 64 65 66def check_input_port(audio_facade, port_id): 67 """Checks selected input node on Cros device is correct for a port. 68 69 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 70 71 """ 72 input_node_type = cros_port_id_to_cras_node_type(port_id) 73 check_audio_nodes(audio_facade, (None, [input_node_type])) 74 75 76def check_audio_nodes(audio_facade, audio_nodes): 77 """Checks the node selected by Cros device is correct. 78 79 @param audio_facade: A RemoteAudioFacade to access audio functions on 80 Cros device. 81 82 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 83 expected selected output and input nodes. 84 85 @raises: error.TestFail if the nodes selected by Cros device are not expected. 86 87 """ 88 curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types() 89 out_audio_nodes, in_audio_nodes = audio_nodes 90 if (in_audio_nodes != None 91 and sorted(curr_in_nodes) != sorted(in_audio_nodes)): 92 raise error.TestFail( 93 'Wrong input node(s) selected: %s ' 94 'expected: %s' % (str(curr_in_nodes), str(in_audio_nodes))) 95 96 # Treat line-out node as headphone node in Chameleon test since some 97 # Cros devices detect audio board as lineout. This actually makes sense 98 # because 3.5mm audio jack is connected to LineIn port on Chameleon. 99 if (out_audio_nodes == ['HEADPHONE'] and curr_out_nodes == ['LINEOUT']): 100 return 101 102 if (out_audio_nodes != None 103 and sorted(curr_out_nodes) != sorted(out_audio_nodes)): 104 raise error.TestFail( 105 'Wrong output node(s) selected %s ' 106 'expected: %s' % (str(curr_out_nodes), str(out_audio_nodes))) 107 108 109def check_plugged_nodes_contain(audio_facade, audio_nodes): 110 """Checks the nodes needed to be plugged on Cros device are plugged. 111 112 @param audio_facade: A RemoteAudioFacade to access audio functions on 113 Cros device. 114 115 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 116 expected plugged output and input nodes. 117 118 @raises: error.TestFail if the plugged nodes on Cros device are not plugged. 119 120 """ 121 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 122 out_audio_nodes, in_audio_nodes = audio_nodes 123 if in_audio_nodes != None: 124 for node in in_audio_nodes: 125 if node not in curr_in_nodes: 126 raise error.TestFail('Wrong input node(s) plugged: %s ' 127 'expected %s to be plugged!' % 128 (str(curr_in_nodes), str(in_audio_nodes))) 129 if out_audio_nodes != None: 130 for node in out_audio_nodes: 131 if node not in curr_out_nodes: 132 raise error.TestFail( 133 'Wrong output node(s) plugged: %s ' 134 'expected %s to be plugged!' % (str(curr_out_nodes), 135 str(out_audio_nodes))) 136 137 138def check_plugged_nodes(audio_facade, audio_nodes): 139 """Checks the nodes that are currently plugged on Cros device are correct. 140 141 @param audio_facade: A RemoteAudioFacade to access audio functions on 142 Cros device. 143 144 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 145 expected plugged output and input nodes. 146 147 @raises: error.TestFail if the plugged nodes on Cros device are not expected. 148 149 """ 150 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 151 out_audio_nodes, in_audio_nodes = audio_nodes 152 if (in_audio_nodes != None 153 and sorted(curr_in_nodes) != sorted(in_audio_nodes)): 154 raise error.TestFail('Wrong input node(s) plugged: %s ' 155 'expected: %s!' % (str(sorted(curr_in_nodes)), 156 str(sorted(in_audio_nodes)))) 157 if (out_audio_nodes != None 158 and sorted(curr_out_nodes) != sorted(out_audio_nodes)): 159 raise error.TestFail('Wrong output node(s) plugged: %s ' 160 'expected: %s!' % (str(sorted(curr_out_nodes)), 161 str(sorted(out_audio_nodes)))) 162 163 164def bluetooth_nodes_plugged(audio_facade): 165 """Checks bluetooth nodes are plugged. 166 167 @param audio_facade: A RemoteAudioFacade to access audio functions on 168 Cros device. 169 170 @raises: error.TestFail if either input or output bluetooth node is 171 not plugged. 172 173 """ 174 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 175 return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes 176 177 178def get_board_name(host): 179 """Gets the board name. 180 181 @param host: The CrosHost object. 182 183 @returns: The board name. 184 185 """ 186 return host.get_board().split(':')[1] 187 188 189def has_internal_speaker(host): 190 """Checks if the Cros device has speaker. 191 192 @param host: The CrosHost object. 193 194 @returns: True if Cros device has internal speaker. False otherwise. 195 196 """ 197 board_name = get_board_name(host) 198 if not audio_spec.has_internal_speaker(host.get_board_type(), board_name): 199 logging.info('Board %s does not have speaker.', board_name) 200 return False 201 return True 202 203 204def has_internal_microphone(host): 205 """Checks if the Cros device has internal microphone. 206 207 @param host: The CrosHost object. 208 209 @returns: True if Cros device has internal microphone. False otherwise. 210 211 """ 212 board_name = get_board_name(host) 213 if not audio_spec.has_internal_microphone(host.get_board_type()): 214 logging.info('Board %s does not have internal microphone.', board_name) 215 return False 216 return True 217 218 219def has_audio_jack(host): 220 """Checks if the Cros device has a 3.5mm audio jack. 221 222 @param host: The CrosHost object. 223 224 @returns: True if Cros device has it. False otherwise. 225 226 """ 227 board_name = get_board_name(host) 228 if not audio_spec.has_audio_jack(board_name, host.get_board_type()): 229 logging.info('Board %s does not have a audio jack.', board_name) 230 return False 231 return True 232 233 234def has_hotwording(host): 235 """Checks if the Cros device has hotwording. 236 237 @param host: The CrosHost object. 238 239 @returns: True if the board has hotwording. False otherwise. 240 241 """ 242 board_name = get_board_name(host) 243 model_name = host.get_platform() 244 245 return audio_spec.has_hotwording(board_name, model_name) 246 247 248def has_echo_reference(host): 249 """Checks if the Cros device has echo reference. 250 251 @param host: The CrosHost object. 252 253 @returns: True if the board has echo reference. False otherwise. 254 255 """ 256 return audio_spec.has_echo_reference(get_board_name(host)) 257 258def suspend_resume(host, suspend_time_secs=30, resume_network_timeout_secs=60): 259 """Performs the suspend/resume on Cros device. 260 261 @param suspend_time_secs: Time in seconds to let Cros device suspend. 262 @resume_network_timeout_secs: Time in seconds to let Cros device resume and 263 obtain network. 264 """ 265 266 def action_suspend(): 267 """Calls the host method suspend.""" 268 host.suspend(suspend_time=suspend_time_secs) 269 270 boot_id = host.get_boot_id() 271 proc = multiprocessing.Process(target=action_suspend) 272 logging.info("Suspending...") 273 proc.daemon = True 274 proc.start() 275 host.test_wait_for_sleep(suspend_time_secs) 276 logging.info("DUT suspended! Waiting to resume...") 277 host.test_wait_for_resume(boot_id, 278 suspend_time_secs + resume_network_timeout_secs) 279 logging.info("DUT resumed!") 280 281 282def suspend_resume_and_verify(host, 283 factory, 284 suspend_time_secs=30, 285 resume_network_timeout_secs=50, 286 rpc_reconnect_timeout=60): 287 """Performs the suspend/resume on Cros device and verify it. 288 289 @param suspend_time_secs: Time in seconds to let Cros device suspend. 290 @resume_network_timeout_secs: Time in seconds to let Cros device resume and 291 obtain network. 292 @rpc_reconnect_timeout=60: Time in seconds to wait for multimedia server to 293 reconnect. 294 """ 295 296 suspend_resume(host, 297 suspend_time_secs=suspend_time_secs, 298 resume_network_timeout_secs=resume_network_timeout_secs) 299 utils.poll_for_condition(condition=factory.ready, 300 timeout=rpc_reconnect_timeout, 301 desc='multimedia server reconnect') 302 303 304def dump_cros_audio_logs(host, 305 audio_facade, 306 directory, 307 suffix='', 308 fail_if_warnings=False): 309 """Dumps logs for audio debugging from Cros device. 310 311 @param host: The CrosHost object. 312 @param audio_facade: A RemoteAudioFacade to access audio functions on 313 Cros device. 314 @directory: The directory to dump logs. 315 316 """ 317 318 def get_file_path(name): 319 """Gets file path to dump logs. 320 321 @param name: The file name. 322 323 @returns: The file path with an optional suffix. 324 325 """ 326 file_name = '%s.%s' % (name, suffix) if suffix else name 327 file_path = os.path.join(directory, file_name) 328 return file_path 329 330 audio_facade.dump_diagnostics(get_file_path('audio_diagnostics.txt')) 331 332 host.get_file('/var/log/messages', get_file_path('messages')) 333 334 host.get_file(constants.MULTIMEDIA_XMLRPC_SERVER_LOG_FILE, 335 get_file_path('multimedia_xmlrpc_server.log')) 336 337 # Raising error if any warning messages in the audio diagnostics 338 if fail_if_warnings: 339 audio_logs = examine_audio_diagnostics( 340 get_file_path('audio_diagnostics.txt')) 341 if audio_logs != '': 342 raise error.TestFail(audio_logs) 343 344 345def examine_audio_diagnostics(path): 346 """Examines audio diagnostic content. 347 348 @param path: Path to audio diagnostic file. 349 350 @returns: Warning messages or ''. 351 352 """ 353 warning_msgs = [] 354 line_number = 1 355 356 underrun_pattern = re.compile('num_underruns: (\d*)') 357 358 with open(path) as f: 359 for line in f.readlines(): 360 361 # Check for number of underruns. 362 search_result = underrun_pattern.search(line) 363 if search_result: 364 num_underruns = int(search_result.group(1)) 365 if num_underruns != 0: 366 warning_msgs.append('Found %d underrun at line %d: %s' % 367 (num_underruns, line_number, line)) 368 369 # TODO(cychiang) add other check like maximum client reply delay. 370 line_number = line_number + 1 371 372 if warning_msgs: 373 return ('Found issue in audio diganostics result : %s' % 374 '\n'.join(warning_msgs)) 375 376 logging.info('audio_diagnostic result looks fine') 377 return '' 378 379 380@contextmanager 381def monitor_no_nodes_changed(audio_facade, callback=None): 382 """Context manager to monitor nodes changed signal on Cros device. 383 384 Starts the counter in the beginning. Stops the counter in the end to make 385 sure there is no NodesChanged signal during the try block. 386 387 E.g. with monitor_no_nodes_changed(audio_facade): 388 do something on playback/recording 389 390 @param audio_facade: A RemoteAudioFacade to access audio functions on 391 Cros device. 392 @param fail_callback: The callback to call before raising TestFail 393 when there is unexpected NodesChanged signals. 394 395 @raises: error.TestFail if there is NodesChanged signal on 396 Cros device during the context. 397 398 """ 399 try: 400 audio_facade.start_counting_signal('NodesChanged') 401 yield 402 finally: 403 count = audio_facade.stop_counting_signal() 404 if count: 405 message = 'Got %d unexpected NodesChanged signal' % count 406 logging.error(message) 407 if callback: 408 callback() 409 raise error.TestFail(message) 410 411 412# The second dominant frequency should have energy less than -26dB of the 413# first dominant frequency in the spectrum. 414_DEFAULT_SECOND_PEAK_RATIO = 0.05 415 416# Tolerate more noise for bluetooth audio using HSP. 417_HSP_SECOND_PEAK_RATIO = 0.2 418 419# Tolerate more noise for speaker. 420_SPEAKER_SECOND_PEAK_RATIO = 0.1 421 422# Tolerate more noise for internal microphone. 423_INTERNAL_MIC_SECOND_PEAK_RATIO = 0.2 424 425# maximum tolerant noise level 426DEFAULT_TOLERANT_NOISE_LEVEL = 0.01 427 428# If relative error of two durations is less than 0.2, 429# they will be considered equivalent. 430DEFAULT_EQUIVALENT_THRESHOLD = 0.2 431 432# The frequency at lower than _DC_FREQ_THRESHOLD should have coefficient 433# smaller than _DC_COEFF_THRESHOLD. 434_DC_FREQ_THRESHOLD = 0.001 435_DC_COEFF_THRESHOLD = 0.01 436 437 438def get_second_peak_ratio(source_id, recorder_id, is_hsp=False): 439 """Gets the second peak ratio suitable for use case. 440 441 @param source_id: ID defined in chameleon_audio_ids for source widget. 442 @param recorder_id: ID defined in chameleon_audio_ids for recorder widget. 443 @param is_hsp: For bluetooth HSP use case. 444 445 @returns: A float for proper second peak ratio to be used in 446 check_recorded_frequency. 447 """ 448 if is_hsp: 449 return _HSP_SECOND_PEAK_RATIO 450 elif source_id == chameleon_audio_ids.CrosIds.SPEAKER: 451 return _SPEAKER_SECOND_PEAK_RATIO 452 elif recorder_id == chameleon_audio_ids.CrosIds.INTERNAL_MIC: 453 return _INTERNAL_MIC_SECOND_PEAK_RATIO 454 else: 455 return _DEFAULT_SECOND_PEAK_RATIO 456 457 458# The deviation of estimated dominant frequency from golden frequency. 459DEFAULT_FREQUENCY_DIFF_THRESHOLD = 5 460 461 462def check_recorded_frequency( 463 golden_file, 464 recorder, 465 second_peak_ratio=_DEFAULT_SECOND_PEAK_RATIO, 466 frequency_diff_threshold=DEFAULT_FREQUENCY_DIFF_THRESHOLD, 467 ignore_frequencies=None, 468 check_anomaly=False, 469 check_artifacts=False, 470 mute_durations=None, 471 volume_changes=None, 472 tolerant_noise_level=DEFAULT_TOLERANT_NOISE_LEVEL): 473 """Checks if the recorded data contains sine tone of golden frequency. 474 475 @param golden_file: An AudioTestData object that serves as golden data. 476 @param recorder: An AudioWidget used in the test to record data. 477 @param second_peak_ratio: The test fails when the second dominant 478 frequency has coefficient larger than this 479 ratio of the coefficient of first dominant 480 frequency. 481 @param frequency_diff_threshold: The maximum difference between estimated 482 frequency of test signal and golden 483 frequency. This value should be small for 484 signal passed through line. 485 @param ignore_frequencies: A list of frequencies to be ignored. The 486 component in the spectral with frequency too 487 close to the frequency in the list will be 488 ignored. The comparison of frequencies uses 489 frequency_diff_threshold as well. 490 @param check_anomaly: True to check anomaly in the signal. 491 @param check_artifacts: True to check artifacts in the signal. 492 @param mute_durations: Each duration of mute in seconds in the signal. 493 @param volume_changes: A list containing alternative -1 for decreasing 494 volume and +1 for increasing volume. 495 @param tolerant_noise_level: The maximum noise level can be tolerated 496 497 @returns: A list containing tuples of (dominant_frequency, coefficient) for 498 valid channels. Coefficient can be a measure of signal magnitude 499 on that dominant frequency. Invalid channels where golden_channel 500 is None are ignored. 501 502 @raises error.TestFail if the recorded data does not contain sine tone of 503 golden frequency. 504 505 """ 506 if not ignore_frequencies: 507 ignore_frequencies = [] 508 509 # Also ignore harmonics of ignore frequencies. 510 ignore_frequencies_harmonics = [] 511 for ignore_freq in ignore_frequencies: 512 ignore_frequencies_harmonics += [ignore_freq * n for n in range(1, 4)] 513 514 data_format = recorder.data_format 515 recorded_data = audio_data.AudioRawData( 516 binary=recorder.get_binary(), 517 channel=data_format['channel'], 518 sample_format=data_format['sample_format']) 519 520 errors = [] 521 dominant_spectrals = [] 522 523 for test_channel, golden_channel in enumerate(recorder.channel_map): 524 if golden_channel is None: 525 logging.info('Skipped channel %d', test_channel) 526 continue 527 528 signal = recorded_data.channel_data[test_channel] 529 saturate_value = audio_data.get_maximum_value_from_sample_format( 530 data_format['sample_format']) 531 logging.debug('Channel %d max signal: %f', test_channel, max(signal)) 532 normalized_signal = audio_analysis.normalize_signal( 533 signal, saturate_value) 534 logging.debug('saturate_value: %f', saturate_value) 535 logging.debug('max signal after normalized: %f', 536 max(normalized_signal)) 537 spectral = audio_analysis.spectral_analysis(normalized_signal, 538 data_format['rate']) 539 logging.debug('spectral: %s', spectral) 540 541 if not spectral: 542 errors.append('Channel %d: Can not find dominant frequency.' % 543 test_channel) 544 545 golden_frequency = golden_file.frequencies[golden_channel] 546 logging.debug('Checking channel %s spectral %s against frequency %s', 547 test_channel, spectral, golden_frequency) 548 549 dominant_frequency = spectral[0][0] 550 551 if (abs(dominant_frequency - golden_frequency) > 552 frequency_diff_threshold): 553 errors.append( 554 'Channel %d: Dominant frequency %s is away from golden %s' 555 % (test_channel, dominant_frequency, golden_frequency)) 556 557 if check_anomaly: 558 detected_anomaly = audio_analysis.anomaly_detection( 559 signal=normalized_signal, 560 rate=data_format['rate'], 561 freq=golden_frequency) 562 if detected_anomaly: 563 errors.append('Channel %d: Detect anomaly near these time: %s' 564 % (test_channel, detected_anomaly)) 565 else: 566 logging.info( 567 'Channel %d: Quality is good as there is no anomaly', 568 test_channel) 569 570 if check_artifacts or mute_durations or volume_changes: 571 result = audio_quality_measurement.quality_measurement( 572 normalized_signal, 573 data_format['rate'], 574 dominant_frequency=dominant_frequency) 575 logging.debug('Quality measurement result:\n%s', 576 pprint.pformat(result)) 577 if check_artifacts: 578 if len(result['artifacts']['noise_before_playback']) > 0: 579 errors.append( 580 'Channel %d: Detects artifacts before playing near' 581 ' these time and duration: %s' % 582 (test_channel, 583 str(result['artifacts']['noise_before_playback'])) 584 ) 585 586 if len(result['artifacts']['noise_after_playback']) > 0: 587 errors.append( 588 'Channel %d: Detects artifacts after playing near' 589 ' these time and duration: %s' % 590 (test_channel, 591 str(result['artifacts']['noise_after_playback']))) 592 593 if mute_durations: 594 delays = result['artifacts']['delay_during_playback'] 595 delay_durations = [] 596 for x in delays: 597 delay_durations.append(x[1]) 598 mute_matched, delay_matched = longest_common_subsequence( 599 mute_durations, delay_durations, 600 DEFAULT_EQUIVALENT_THRESHOLD) 601 602 # updated delay list 603 new_delays = [ 604 delays[i] for i in delay_matched 605 if not delay_matched[i] 606 ] 607 608 result['artifacts']['delay_during_playback'] = new_delays 609 610 unmatched_mutes = [ 611 mute_durations[i] for i in mute_matched 612 if not mute_matched[i] 613 ] 614 615 if len(unmatched_mutes) > 0: 616 errors.append('Channel %d: Unmatched mute duration: %s' % 617 (test_channel, unmatched_mutes)) 618 619 if check_artifacts: 620 if len(result['artifacts']['delay_during_playback']) > 0: 621 errors.append( 622 'Channel %d: Detects delay during playing near' 623 ' these time and duration: %s' % 624 (test_channel, 625 result['artifacts']['delay_during_playback'])) 626 627 if len(result['artifacts']['burst_during_playback']) > 0: 628 errors.append( 629 'Channel %d: Detects burst/pop near these time: %s' 630 % (test_channel, 631 result['artifacts']['burst_during_playback'])) 632 633 if result['equivalent_noise_level'] > tolerant_noise_level: 634 errors.append( 635 'Channel %d: noise level is higher than tolerant' 636 ' noise level: %f > %f' % 637 (test_channel, result['equivalent_noise_level'], 638 tolerant_noise_level)) 639 640 if volume_changes: 641 matched = True 642 volume_changing = result['volume_changes'] 643 if len(volume_changing) != len(volume_changes): 644 matched = False 645 else: 646 for i in range(len(volume_changing)): 647 if volume_changing[i][1] != volume_changes[i]: 648 matched = False 649 break 650 if not matched: 651 errors.append( 652 'Channel %d: volume changing is not as expected, ' 653 'found changing time and events are: %s while ' 654 'expected changing events are %s' % 655 (test_channel, volume_changing, volume_changes)) 656 657 # Filter out the harmonics resulted from imperfect sin wave. 658 # This list is different for different channels. 659 harmonics = [dominant_frequency * n for n in range(2, 10)] 660 661 def should_be_ignored(frequency): 662 """Checks if frequency is close to any frequency in ignore list. 663 664 The ignore list is harmonics of frequency to be ignored 665 (like power noise), plus harmonics of dominant frequencies, 666 plus DC. 667 668 @param frequency: The frequency to be tested. 669 670 @returns: True if the frequency should be ignored. False otherwise. 671 672 """ 673 for ignore_frequency in ( 674 ignore_frequencies_harmonics + harmonics + [0.0]): 675 if (abs(frequency - ignore_frequency) < 676 frequency_diff_threshold): 677 logging.debug('Ignore frequency: %s', frequency) 678 return True 679 680 # Checks DC is small enough. 681 for freq, coeff in spectral: 682 if freq < _DC_FREQ_THRESHOLD and coeff > _DC_COEFF_THRESHOLD: 683 errors.append('Channel %d: Found large DC coefficient: ' 684 '(%f Hz, %f)' % (test_channel, freq, coeff)) 685 686 # Filter out the frequencies to be ignored. 687 spectral_post_ignore = [ 688 x for x in spectral if not should_be_ignored(x[0]) 689 ] 690 691 if len(spectral_post_ignore) > 1: 692 first_coeff = spectral_post_ignore[0][1] 693 second_coeff = spectral_post_ignore[1][1] 694 if second_coeff > first_coeff * second_peak_ratio: 695 errors.append( 696 'Channel %d: Found large second dominant frequencies: ' 697 '%s' % (test_channel, spectral_post_ignore)) 698 699 if not spectral_post_ignore: 700 errors.append( 701 'Channel %d: No frequency left after removing unwanted ' 702 'frequencies. Spectral: %s; After removing unwanted ' 703 'frequencies: %s' % (test_channel, spectral, 704 spectral_post_ignore)) 705 706 else: 707 dominant_spectrals.append(spectral_post_ignore[0]) 708 709 if errors: 710 raise error.TestFail(', '.join(errors)) 711 712 return dominant_spectrals 713 714 715def longest_common_subsequence(list1, list2, equivalent_threshold): 716 """Finds longest common subsequence of list1 and list2 717 718 Such as list1: [0.3, 0.4], 719 list2: [0.001, 0.299, 0.002, 0.401, 0.001] 720 equivalent_threshold: 0.001 721 it will return matched1: [True, True], 722 matched2: [False, True, False, True, False] 723 724 @param list1: a list of integer or float value 725 @param list2: a list of integer or float value 726 @param equivalent_threshold: two values are considered equivalent if their 727 relative error is less than 728 equivalent_threshold. 729 730 @returns: a tuple of list (matched_1, matched_2) indicating each item 731 of list1 and list2 are matched or not. 732 733 """ 734 length1, length2 = len(list1), len(list2) 735 matching = [[0] * (length2 + 1)] * (length1 + 1) 736 # matching[i][j] is the maximum number of matched pairs for first i items 737 # in list1 and first j items in list2. 738 for i in range(length1): 739 for j in range(length2): 740 # Maximum matched pairs may be obtained without 741 # i-th item in list1 or without j-th item in list2 742 matching[i + 1][j + 1] = max(matching[i + 1][j], 743 matching[i][j + 1]) 744 diff = abs(list1[i] - list2[j]) 745 relative_error = diff / list1[i] 746 # If i-th item in list1 can be matched to j-th item in list2 747 if relative_error < equivalent_threshold: 748 matching[i + 1][j + 1] = matching[i][j] + 1 749 750 # Backtracking which item in list1 and list2 are matched 751 matched1 = [False] * length1 752 matched2 = [False] * length2 753 i, j = length1, length2 754 while i > 0 and j > 0: 755 # Maximum number is obtained by matching i-th item in list1 756 # and j-th one in list2. 757 if matching[i][j] == matching[i - 1][j - 1] + 1: 758 matched1[i - 1] = True 759 matched2[j - 1] = True 760 i, j = i - 1, j - 1 761 elif matching[i][j] == matching[i - 1][j]: 762 i -= 1 763 else: 764 j -= 1 765 return (matched1, matched2) 766 767 768def switch_to_hsp(audio_facade): 769 """Switches to HSP profile. 770 771 Selects bluetooth microphone and runs a recording process on Cros device. 772 This triggers bluetooth profile be switched from A2DP to HSP. 773 Note the user can call stop_recording on audio facade to stop the recording 774 process, or let multimedia_xmlrpc_server terminates it in its cleanup. 775 776 """ 777 audio_facade.set_chrome_active_node_type(None, 'BLUETOOTH') 778 check_audio_nodes(audio_facade, (None, ['BLUETOOTH'])) 779 audio_facade.start_recording( 780 dict(file_type='raw', 781 sample_format='S16_LE', 782 channel=2, 783 rate=48000)) 784 785 786def compare_recorded_correlation(golden_file, recorder, parameters=None): 787 """Checks recorded audio in an AudioInputWidget against a golden file. 788 789 Compares recorded data with golden data by cross correlation method. 790 Refer to audio_helper.compare_data for details of comparison. 791 792 @param golden_file: An AudioTestData object that serves as golden data. 793 @param recorder: An AudioInputWidget that has recorded some audio data. 794 @param parameters: A dict containing parameters for method. 795 796 """ 797 logging.info('Comparing recorded data with golden file %s ...', 798 golden_file.path) 799 audio_helper.compare_data_correlation( 800 golden_file.get_binary(), golden_file.data_format, 801 recorder.get_binary(), recorder.data_format, recorder.channel_map, 802 parameters) 803 804 805def check_and_set_chrome_active_node_types(audio_facade, 806 output_type=None, 807 input_type=None): 808 """Check the target types are available, and set them to be active nodes. 809 810 @param audio_facade: An AudioFacadeNative or AudioFacadeAdapter object. 811 @output_type: An output node type defined in cras_utils.CRAS_NODE_TYPES. 812 None to skip. 813 @input_type: An input node type defined in cras_utils.CRAS_NODE_TYPES. 814 None to skip. 815 816 @raises: error.TestError if the expected node type is missing. We use 817 error.TestError here because usually this step is not the main 818 purpose of the test, but a setup step. 819 820 """ 821 output_types, input_types = audio_facade.get_plugged_node_types() 822 if output_type and output_type not in output_types: 823 raise error.TestError('Target output type %s not present in %r' % 824 (output_type, output_types)) 825 if input_type and input_type not in input_types: 826 raise error.TestError('Target input type %s not present in %r' % 827 (input_type, input_types)) 828 audio_facade.set_chrome_active_node_type(output_type, input_type) 829 830 831def check_hp_or_lineout_plugged(audio_facade): 832 """Checks whether line-out or headphone is plugged. 833 834 @param audio_facade: A RemoteAudioFacade to access audio functions on 835 Cros device. 836 837 @returns: 'LINEOUT' if line-out node is plugged. 838 'HEADPHONE' if headphone node is plugged. 839 840 @raises: error.TestFail if the plugged nodes does not contain one of 841 'LINEOUT' and 'HEADPHONE'. 842 843 """ 844 # Checks whether line-out or headphone is detected. 845 output_nodes, _ = audio_facade.get_plugged_node_types() 846 if 'LINEOUT' in output_nodes: 847 return 'LINEOUT' 848 if 'HEADPHONE' in output_nodes: 849 return 'HEADPHONE' 850 raise error.TestFail( 851 'No line-out or headphone in plugged nodes:%r.' 852 'Please check the audio cable or jack plugger.' % output_nodes) 853 854 855def get_internal_mic_node(host): 856 """Return the expected internal microphone node. 857 858 @param host: The CrosHost object. 859 860 @returns: The name of the expected internal microphone nodes. 861 """ 862 board_type = host.get_board_type() 863 board = get_board_name(host) 864 model = host.get_platform() 865 sku = host.host_info_store.get().device_sku 866 867 return audio_spec.get_internal_mic_node(board_type, board, model, sku) 868 869 870def get_plugged_internal_mics(host): 871 """Return a list of all the plugged internal microphone nodes. 872 873 @param host: The CrosHost object. 874 875 @returns: A list of all the plugged internal microphone nodes. 876 """ 877 board_type = host.get_board_type() 878 board = get_board_name(host) 879 model = host.get_platform() 880 sku = host.host_info_store.get().device_sku 881 882 return audio_spec.get_plugged_internal_mics(board_type, board, model, sku) 883 884def get_headphone_node(host): 885 """Return the expected headphone node. 886 887 @param host: The CrosHost object. 888 889 @returns: The name of the expected headphone nodes. 890 """ 891 return audio_spec.get_headphone_node(get_board_name(host)) 892