1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""This module provides the test utilities for audio tests using chameleon.""" 5 6# TODO (cychiang) Move test utilities from chameleon_audio_helpers 7# to this module. 8 9import logging 10import multiprocessing 11import os 12import pprint 13import re 14from contextlib import contextmanager 15 16from autotest_lib.client.common_lib import error 17from autotest_lib.client.cros import constants 18from autotest_lib.client.cros.audio import audio_analysis 19from autotest_lib.client.cros.audio import audio_spec 20from autotest_lib.client.cros.audio import audio_data 21from autotest_lib.client.cros.audio import audio_helper 22from autotest_lib.client.cros.audio import audio_quality_measurement 23from autotest_lib.client.cros.chameleon import chameleon_audio_ids 24 25CHAMELEON_AUDIO_IDS_TO_CRAS_NODE_TYPES = { 26 chameleon_audio_ids.CrosIds.HDMI: 'HDMI', 27 chameleon_audio_ids.CrosIds.HEADPHONE: 'HEADPHONE', 28 chameleon_audio_ids.CrosIds.EXTERNAL_MIC: 'MIC', 29 chameleon_audio_ids.CrosIds.SPEAKER: 'INTERNAL_SPEAKER', 30 chameleon_audio_ids.CrosIds.INTERNAL_MIC: 'INTERNAL_MIC', 31 chameleon_audio_ids.CrosIds.BLUETOOTH_HEADPHONE: 'BLUETOOTH', 32 chameleon_audio_ids.CrosIds.BLUETOOTH_MIC: 'BLUETOOTH', 33 chameleon_audio_ids.CrosIds.USBIN: 'USB', 34 chameleon_audio_ids.CrosIds.USBOUT: 'USB', 35} 36 37 38def cros_port_id_to_cras_node_type(port_id): 39 """Gets Cras node type from Cros port id. 40 41 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 42 43 @returns: A Cras node type defined in cras_utils.CRAS_NODE_TYPES. 44 45 """ 46 return CHAMELEON_AUDIO_IDS_TO_CRAS_NODE_TYPES[port_id] 47 48 49def check_output_port(audio_facade, port_id): 50 """Checks selected output node on Cros device is correct for a port. 51 52 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 53 54 """ 55 output_node_type = cros_port_id_to_cras_node_type(port_id) 56 check_audio_nodes(audio_facade, ([output_node_type], None)) 57 58 59def check_input_port(audio_facade, port_id): 60 """Checks selected input node on Cros device is correct for a port. 61 62 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 63 64 """ 65 input_node_type = cros_port_id_to_cras_node_type(port_id) 66 check_audio_nodes(audio_facade, (None, [input_node_type])) 67 68 69def check_audio_nodes(audio_facade, audio_nodes): 70 """Checks the node selected by Cros device is correct. 71 72 @param audio_facade: A RemoteAudioFacade to access audio functions on 73 Cros device. 74 75 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 76 expected selected output and input nodes. 77 78 @raises: error.TestFail if the nodes selected by Cros device are not expected. 79 80 """ 81 curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types() 82 out_audio_nodes, in_audio_nodes = audio_nodes 83 if (in_audio_nodes != None 84 and sorted(curr_in_nodes) != sorted(in_audio_nodes)): 85 raise error.TestFail( 86 'Wrong input node(s) selected: %s ' 87 'expected: %s' % (str(curr_in_nodes), str(in_audio_nodes))) 88 89 # Treat line-out node as headphone node in Chameleon test since some 90 # Cros devices detect audio board as lineout. This actually makes sense 91 # because 3.5mm audio jack is connected to LineIn port on Chameleon. 92 if (out_audio_nodes == ['HEADPHONE'] and curr_out_nodes == ['LINEOUT']): 93 return 94 95 if (out_audio_nodes != None 96 and sorted(curr_out_nodes) != sorted(out_audio_nodes)): 97 raise error.TestFail( 98 'Wrong output node(s) selected %s ' 99 'expected: %s' % (str(curr_out_nodes), str(out_audio_nodes))) 100 101 102def check_plugged_nodes_contain(audio_facade, audio_nodes): 103 """Checks the nodes needed to be plugged on Cros device are plugged. 104 105 @param audio_facade: A RemoteAudioFacade to access audio functions on 106 Cros device. 107 108 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 109 expected plugged output and input nodes. 110 111 @raises: error.TestFail if the plugged nodes on Cros device are not plugged. 112 113 """ 114 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 115 out_audio_nodes, in_audio_nodes = audio_nodes 116 if in_audio_nodes != None: 117 for node in in_audio_nodes: 118 if node not in curr_in_nodes: 119 raise error.TestFail('Wrong input node(s) plugged: %s ' 120 'expected %s to be plugged!' % 121 (str(curr_in_nodes), str(in_audio_nodes))) 122 if out_audio_nodes != None: 123 for node in out_audio_nodes: 124 if node not in curr_out_nodes: 125 raise error.TestFail( 126 'Wrong output node(s) plugged: %s ' 127 'expected %s to be plugged!' % (str(curr_out_nodes), 128 str(out_audio_nodes))) 129 130 131def check_plugged_nodes(audio_facade, audio_nodes): 132 """Checks the nodes that are currently plugged on Cros device are correct. 133 134 @param audio_facade: A RemoteAudioFacade to access audio functions on 135 Cros device. 136 137 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 138 expected plugged output and input nodes. 139 140 @raises: error.TestFail if the plugged nodes on Cros device are not expected. 141 142 """ 143 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 144 out_audio_nodes, in_audio_nodes = audio_nodes 145 if (in_audio_nodes != None 146 and sorted(curr_in_nodes) != sorted(in_audio_nodes)): 147 raise error.TestFail('Wrong input node(s) plugged: %s ' 148 'expected: %s!' % (str(sorted(curr_in_nodes)), 149 str(sorted(in_audio_nodes)))) 150 if (out_audio_nodes != None 151 and sorted(curr_out_nodes) != sorted(out_audio_nodes)): 152 raise error.TestFail('Wrong output node(s) plugged: %s ' 153 'expected: %s!' % (str(sorted(curr_out_nodes)), 154 str(sorted(out_audio_nodes)))) 155 156 157def bluetooth_nodes_plugged(audio_facade): 158 """Checks bluetooth nodes are plugged. 159 160 @param audio_facade: A RemoteAudioFacade to access audio functions on 161 Cros device. 162 163 @raises: error.TestFail if either input or output bluetooth node is 164 not plugged. 165 166 """ 167 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 168 return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes 169 170 171def get_board_name(host): 172 """Gets the board name. 173 174 @param host: The CrosHost object. 175 176 @returns: The board name. 177 178 """ 179 return host.get_board().split(':')[1] 180 181 182def has_internal_speaker(host): 183 """Checks if the Cros device has speaker. 184 185 @param host: The CrosHost object. 186 187 @returns: True if Cros device has internal speaker. False otherwise. 188 189 """ 190 board_name = get_board_name(host) 191 if not audio_spec.has_internal_speaker(host.get_board_type(), board_name): 192 logging.info('Board %s does not have speaker.', board_name) 193 return False 194 return True 195 196 197def has_internal_microphone(host): 198 """Checks if the Cros device has internal microphone. 199 200 @param host: The CrosHost object. 201 202 @returns: True if Cros device has internal microphone. False otherwise. 203 204 """ 205 board_name = get_board_name(host) 206 if not audio_spec.has_internal_microphone(host.get_board_type()): 207 logging.info('Board %s does not have internal microphone.', board_name) 208 return False 209 return True 210 211 212def has_audio_jack(host): 213 """Checks if the Cros device has a 3.5mm audio jack. 214 215 @param host: The CrosHost object. 216 217 @returns: True if Cros device has it. False otherwise. 218 219 """ 220 board_name = get_board_name(host) 221 if not audio_spec.has_audio_jack(board_name, host.get_board_type()): 222 logging.info('Board %s does not have a audio jack.', board_name) 223 return False 224 return True 225 226 227def has_hotwording(host): 228 """Checks if the Cros device has hotwording. 229 230 @param host: The CrosHost object. 231 232 @returns: True if the board has hotwording. False otherwise. 233 234 """ 235 board_name = get_board_name(host) 236 model_name = host.get_platform() 237 238 return audio_spec.has_hotwording(board_name, model_name) 239 240 241def has_echo_reference(host): 242 """Checks if the Cros device has echo reference. 243 244 @param host: The CrosHost object. 245 246 @returns: True if the board has echo reference. False otherwise. 247 248 """ 249 return audio_spec.has_echo_reference(get_board_name(host)) 250 251def suspend_resume(host, suspend_time_secs, resume_network_timeout_secs=50): 252 """Performs the suspend/resume on Cros device. 253 254 @param suspend_time_secs: Time in seconds to let Cros device suspend. 255 @resume_network_timeout_secs: Time in seconds to let Cros device resume and 256 obtain network. 257 """ 258 259 def action_suspend(): 260 """Calls the host method suspend.""" 261 host.suspend(suspend_time=suspend_time_secs) 262 263 boot_id = host.get_boot_id() 264 proc = multiprocessing.Process(target=action_suspend) 265 logging.info("Suspending...") 266 proc.daemon = True 267 proc.start() 268 host.test_wait_for_sleep(suspend_time_secs) 269 logging.info("DUT suspended! Waiting to resume...") 270 host.test_wait_for_resume(boot_id, 271 suspend_time_secs + resume_network_timeout_secs) 272 logging.info("DUT resumed!") 273 274 275def dump_cros_audio_logs(host, 276 audio_facade, 277 directory, 278 suffix='', 279 fail_if_warnings=False): 280 """Dumps logs for audio debugging from Cros device. 281 282 @param host: The CrosHost object. 283 @param audio_facade: A RemoteAudioFacade to access audio functions on 284 Cros device. 285 @directory: The directory to dump logs. 286 287 """ 288 289 def get_file_path(name): 290 """Gets file path to dump logs. 291 292 @param name: The file name. 293 294 @returns: The file path with an optional suffix. 295 296 """ 297 file_name = '%s.%s' % (name, suffix) if suffix else name 298 file_path = os.path.join(directory, file_name) 299 return file_path 300 301 audio_facade.dump_diagnostics(get_file_path('audio_diagnostics.txt')) 302 303 host.get_file('/var/log/messages', get_file_path('messages')) 304 305 host.get_file(constants.MULTIMEDIA_XMLRPC_SERVER_LOG_FILE, 306 get_file_path('multimedia_xmlrpc_server.log')) 307 308 # Raising error if any warning messages in the audio diagnostics 309 if fail_if_warnings: 310 audio_logs = examine_audio_diagnostics( 311 get_file_path('audio_diagnostics.txt')) 312 if audio_logs != '': 313 raise error.TestFail(audio_logs) 314 315 316def examine_audio_diagnostics(path): 317 """Examines audio diagnostic content. 318 319 @param path: Path to audio diagnostic file. 320 321 @returns: Warning messages or ''. 322 323 """ 324 warning_msgs = [] 325 line_number = 1 326 327 underrun_pattern = re.compile('num_underruns: (\d*)') 328 329 with open(path) as f: 330 for line in f.readlines(): 331 332 # Check for number of underruns. 333 search_result = underrun_pattern.search(line) 334 if search_result: 335 num_underruns = int(search_result.group(1)) 336 if num_underruns != 0: 337 warning_msgs.append('Found %d underrun at line %d: %s' % 338 (num_underruns, line_number, line)) 339 340 # TODO(cychiang) add other check like maximum client reply delay. 341 line_number = line_number + 1 342 343 if warning_msgs: 344 return ('Found issue in audio diganostics result : %s' % 345 '\n'.join(warning_msgs)) 346 347 logging.info('audio_diagnostic result looks fine') 348 return '' 349 350 351@contextmanager 352def monitor_no_nodes_changed(audio_facade, callback=None): 353 """Context manager to monitor nodes changed signal on Cros device. 354 355 Starts the counter in the beginning. Stops the counter in the end to make 356 sure there is no NodesChanged signal during the try block. 357 358 E.g. with monitor_no_nodes_changed(audio_facade): 359 do something on playback/recording 360 361 @param audio_facade: A RemoteAudioFacade to access audio functions on 362 Cros device. 363 @param fail_callback: The callback to call before raising TestFail 364 when there is unexpected NodesChanged signals. 365 366 @raises: error.TestFail if there is NodesChanged signal on 367 Cros device during the context. 368 369 """ 370 try: 371 audio_facade.start_counting_signal('NodesChanged') 372 yield 373 finally: 374 count = audio_facade.stop_counting_signal() 375 if count: 376 message = 'Got %d unexpected NodesChanged signal' % count 377 logging.error(message) 378 if callback: 379 callback() 380 raise error.TestFail(message) 381 382 383# The second dominant frequency should have energy less than -26dB of the 384# first dominant frequency in the spectrum. 385_DEFAULT_SECOND_PEAK_RATIO = 0.05 386 387# Tolerate more noise for bluetooth audio using HSP. 388_HSP_SECOND_PEAK_RATIO = 0.2 389 390# Tolerate more noise for speaker. 391_SPEAKER_SECOND_PEAK_RATIO = 0.1 392 393# Tolerate more noise for internal microphone. 394_INTERNAL_MIC_SECOND_PEAK_RATIO = 0.2 395 396# maximum tolerant noise level 397DEFAULT_TOLERANT_NOISE_LEVEL = 0.01 398 399# If relative error of two durations is less than 0.2, 400# they will be considered equivalent. 401DEFAULT_EQUIVALENT_THRESHOLD = 0.2 402 403# The frequency at lower than _DC_FREQ_THRESHOLD should have coefficient 404# smaller than _DC_COEFF_THRESHOLD. 405_DC_FREQ_THRESHOLD = 0.001 406_DC_COEFF_THRESHOLD = 0.01 407 408 409def get_second_peak_ratio(source_id, recorder_id, is_hsp=False): 410 """Gets the second peak ratio suitable for use case. 411 412 @param source_id: ID defined in chameleon_audio_ids for source widget. 413 @param recorder_id: ID defined in chameleon_audio_ids for recorder widget. 414 @param is_hsp: For bluetooth HSP use case. 415 416 @returns: A float for proper second peak ratio to be used in 417 check_recorded_frequency. 418 """ 419 if is_hsp: 420 return _HSP_SECOND_PEAK_RATIO 421 elif source_id == chameleon_audio_ids.CrosIds.SPEAKER: 422 return _SPEAKER_SECOND_PEAK_RATIO 423 elif recorder_id == chameleon_audio_ids.CrosIds.INTERNAL_MIC: 424 return _INTERNAL_MIC_SECOND_PEAK_RATIO 425 else: 426 return _DEFAULT_SECOND_PEAK_RATIO 427 428 429# The deviation of estimated dominant frequency from golden frequency. 430DEFAULT_FREQUENCY_DIFF_THRESHOLD = 5 431 432 433def check_recorded_frequency( 434 golden_file, 435 recorder, 436 second_peak_ratio=_DEFAULT_SECOND_PEAK_RATIO, 437 frequency_diff_threshold=DEFAULT_FREQUENCY_DIFF_THRESHOLD, 438 ignore_frequencies=None, 439 check_anomaly=False, 440 check_artifacts=False, 441 mute_durations=None, 442 volume_changes=None, 443 tolerant_noise_level=DEFAULT_TOLERANT_NOISE_LEVEL): 444 """Checks if the recorded data contains sine tone of golden frequency. 445 446 @param golden_file: An AudioTestData object that serves as golden data. 447 @param recorder: An AudioWidget used in the test to record data. 448 @param second_peak_ratio: The test fails when the second dominant 449 frequency has coefficient larger than this 450 ratio of the coefficient of first dominant 451 frequency. 452 @param frequency_diff_threshold: The maximum difference between estimated 453 frequency of test signal and golden 454 frequency. This value should be small for 455 signal passed through line. 456 @param ignore_frequencies: A list of frequencies to be ignored. The 457 component in the spectral with frequency too 458 close to the frequency in the list will be 459 ignored. The comparison of frequencies uses 460 frequency_diff_threshold as well. 461 @param check_anomaly: True to check anomaly in the signal. 462 @param check_artifacts: True to check artifacts in the signal. 463 @param mute_durations: Each duration of mute in seconds in the signal. 464 @param volume_changes: A list containing alternative -1 for decreasing 465 volume and +1 for increasing volume. 466 @param tolerant_noise_level: The maximum noise level can be tolerated 467 468 @returns: A list containing tuples of (dominant_frequency, coefficient) for 469 valid channels. Coefficient can be a measure of signal magnitude 470 on that dominant frequency. Invalid channels where golden_channel 471 is None are ignored. 472 473 @raises error.TestFail if the recorded data does not contain sine tone of 474 golden frequency. 475 476 """ 477 if not ignore_frequencies: 478 ignore_frequencies = [] 479 480 # Also ignore harmonics of ignore frequencies. 481 ignore_frequencies_harmonics = [] 482 for ignore_freq in ignore_frequencies: 483 ignore_frequencies_harmonics += [ignore_freq * n for n in xrange(1, 4)] 484 485 data_format = recorder.data_format 486 recorded_data = audio_data.AudioRawData( 487 binary=recorder.get_binary(), 488 channel=data_format['channel'], 489 sample_format=data_format['sample_format']) 490 491 errors = [] 492 dominant_spectrals = [] 493 494 for test_channel, golden_channel in enumerate(recorder.channel_map): 495 if golden_channel is None: 496 logging.info('Skipped channel %d', test_channel) 497 continue 498 499 signal = recorded_data.channel_data[test_channel] 500 saturate_value = audio_data.get_maximum_value_from_sample_format( 501 data_format['sample_format']) 502 logging.debug('Channel %d max signal: %f', test_channel, max(signal)) 503 normalized_signal = audio_analysis.normalize_signal( 504 signal, saturate_value) 505 logging.debug('saturate_value: %f', saturate_value) 506 logging.debug('max signal after normalized: %f', 507 max(normalized_signal)) 508 spectral = audio_analysis.spectral_analysis(normalized_signal, 509 data_format['rate']) 510 logging.debug('spectral: %s', spectral) 511 512 if not spectral: 513 errors.append('Channel %d: Can not find dominant frequency.' % 514 test_channel) 515 516 golden_frequency = golden_file.frequencies[golden_channel] 517 logging.debug('Checking channel %s spectral %s against frequency %s', 518 test_channel, spectral, golden_frequency) 519 520 dominant_frequency = spectral[0][0] 521 522 if (abs(dominant_frequency - golden_frequency) > 523 frequency_diff_threshold): 524 errors.append( 525 'Channel %d: Dominant frequency %s is away from golden %s' 526 % (test_channel, dominant_frequency, golden_frequency)) 527 528 if check_anomaly: 529 detected_anomaly = audio_analysis.anomaly_detection( 530 signal=normalized_signal, 531 rate=data_format['rate'], 532 freq=golden_frequency) 533 if detected_anomaly: 534 errors.append('Channel %d: Detect anomaly near these time: %s' 535 % (test_channel, detected_anomaly)) 536 else: 537 logging.info( 538 'Channel %d: Quality is good as there is no anomaly', 539 test_channel) 540 541 if check_artifacts or mute_durations or volume_changes: 542 result = audio_quality_measurement.quality_measurement( 543 normalized_signal, 544 data_format['rate'], 545 dominant_frequency=dominant_frequency) 546 logging.debug('Quality measurement result:\n%s', 547 pprint.pformat(result)) 548 if check_artifacts: 549 if len(result['artifacts']['noise_before_playback']) > 0: 550 errors.append( 551 'Channel %d: Detects artifacts before playing near' 552 ' these time and duration: %s' % 553 (test_channel, 554 str(result['artifacts']['noise_before_playback'])) 555 ) 556 557 if len(result['artifacts']['noise_after_playback']) > 0: 558 errors.append( 559 'Channel %d: Detects artifacts after playing near' 560 ' these time and duration: %s' % 561 (test_channel, 562 str(result['artifacts']['noise_after_playback']))) 563 564 if mute_durations: 565 delays = result['artifacts']['delay_during_playback'] 566 delay_durations = [] 567 for x in delays: 568 delay_durations.append(x[1]) 569 mute_matched, delay_matched = longest_common_subsequence( 570 mute_durations, delay_durations, 571 DEFAULT_EQUIVALENT_THRESHOLD) 572 573 # updated delay list 574 new_delays = [ 575 delays[i] for i in delay_matched 576 if not delay_matched[i] 577 ] 578 579 result['artifacts']['delay_during_playback'] = new_delays 580 581 unmatched_mutes = [ 582 mute_durations[i] for i in mute_matched 583 if not mute_matched[i] 584 ] 585 586 if len(unmatched_mutes) > 0: 587 errors.append('Channel %d: Unmatched mute duration: %s' % 588 (test_channel, unmatched_mutes)) 589 590 if check_artifacts: 591 if len(result['artifacts']['delay_during_playback']) > 0: 592 errors.append( 593 'Channel %d: Detects delay during playing near' 594 ' these time and duration: %s' % 595 (test_channel, 596 result['artifacts']['delay_during_playback'])) 597 598 if len(result['artifacts']['burst_during_playback']) > 0: 599 errors.append( 600 'Channel %d: Detects burst/pop near these time: %s' 601 % (test_channel, 602 result['artifacts']['burst_during_playback'])) 603 604 if result['equivalent_noise_level'] > tolerant_noise_level: 605 errors.append( 606 'Channel %d: noise level is higher than tolerant' 607 ' noise level: %f > %f' % 608 (test_channel, result['equivalent_noise_level'], 609 tolerant_noise_level)) 610 611 if volume_changes: 612 matched = True 613 volume_changing = result['volume_changes'] 614 if len(volume_changing) != len(volume_changes): 615 matched = False 616 else: 617 for i in xrange(len(volume_changing)): 618 if volume_changing[i][1] != volume_changes[i]: 619 matched = False 620 break 621 if not matched: 622 errors.append( 623 'Channel %d: volume changing is not as expected, ' 624 'found changing time and events are: %s while ' 625 'expected changing events are %s' % 626 (test_channel, volume_changing, volume_changes)) 627 628 # Filter out the harmonics resulted from imperfect sin wave. 629 # This list is different for different channels. 630 harmonics = [dominant_frequency * n for n in xrange(2, 10)] 631 632 def should_be_ignored(frequency): 633 """Checks if frequency is close to any frequency in ignore list. 634 635 The ignore list is harmonics of frequency to be ignored 636 (like power noise), plus harmonics of dominant frequencies, 637 plus DC. 638 639 @param frequency: The frequency to be tested. 640 641 @returns: True if the frequency should be ignored. False otherwise. 642 643 """ 644 for ignore_frequency in ( 645 ignore_frequencies_harmonics + harmonics + [0.0]): 646 if (abs(frequency - ignore_frequency) < 647 frequency_diff_threshold): 648 logging.debug('Ignore frequency: %s', frequency) 649 return True 650 651 # Checks DC is small enough. 652 for freq, coeff in spectral: 653 if freq < _DC_FREQ_THRESHOLD and coeff > _DC_COEFF_THRESHOLD: 654 errors.append('Channel %d: Found large DC coefficient: ' 655 '(%f Hz, %f)' % (test_channel, freq, coeff)) 656 657 # Filter out the frequencies to be ignored. 658 spectral_post_ignore = [ 659 x for x in spectral if not should_be_ignored(x[0]) 660 ] 661 662 if len(spectral_post_ignore) > 1: 663 first_coeff = spectral_post_ignore[0][1] 664 second_coeff = spectral_post_ignore[1][1] 665 if second_coeff > first_coeff * second_peak_ratio: 666 errors.append( 667 'Channel %d: Found large second dominant frequencies: ' 668 '%s' % (test_channel, spectral_post_ignore)) 669 670 if not spectral_post_ignore: 671 errors.append( 672 'Channel %d: No frequency left after removing unwanted ' 673 'frequencies. Spectral: %s; After removing unwanted ' 674 'frequencies: %s' % (test_channel, spectral, 675 spectral_post_ignore)) 676 677 else: 678 dominant_spectrals.append(spectral_post_ignore[0]) 679 680 if errors: 681 raise error.TestFail(', '.join(errors)) 682 683 return dominant_spectrals 684 685 686def longest_common_subsequence(list1, list2, equivalent_threshold): 687 """Finds longest common subsequence of list1 and list2 688 689 Such as list1: [0.3, 0.4], 690 list2: [0.001, 0.299, 0.002, 0.401, 0.001] 691 equivalent_threshold: 0.001 692 it will return matched1: [True, True], 693 matched2: [False, True, False, True, False] 694 695 @param list1: a list of integer or float value 696 @param list2: a list of integer or float value 697 @param equivalent_threshold: two values are considered equivalent if their 698 relative error is less than 699 equivalent_threshold. 700 701 @returns: a tuple of list (matched_1, matched_2) indicating each item 702 of list1 and list2 are matched or not. 703 704 """ 705 length1, length2 = len(list1), len(list2) 706 matching = [[0] * (length2 + 1)] * (length1 + 1) 707 # matching[i][j] is the maximum number of matched pairs for first i items 708 # in list1 and first j items in list2. 709 for i in xrange(length1): 710 for j in xrange(length2): 711 # Maximum matched pairs may be obtained without 712 # i-th item in list1 or without j-th item in list2 713 matching[i + 1][j + 1] = max(matching[i + 1][j], 714 matching[i][j + 1]) 715 diff = abs(list1[i] - list2[j]) 716 relative_error = diff / list1[i] 717 # If i-th item in list1 can be matched to j-th item in list2 718 if relative_error < equivalent_threshold: 719 matching[i + 1][j + 1] = matching[i][j] + 1 720 721 # Backtracking which item in list1 and list2 are matched 722 matched1 = [False] * length1 723 matched2 = [False] * length2 724 i, j = length1, length2 725 while i > 0 and j > 0: 726 # Maximum number is obtained by matching i-th item in list1 727 # and j-th one in list2. 728 if matching[i][j] == matching[i - 1][j - 1] + 1: 729 matched1[i - 1] = True 730 matched2[j - 1] = True 731 i, j = i - 1, j - 1 732 elif matching[i][j] == matching[i - 1][j]: 733 i -= 1 734 else: 735 j -= 1 736 return (matched1, matched2) 737 738 739def switch_to_hsp(audio_facade): 740 """Switches to HSP profile. 741 742 Selects bluetooth microphone and runs a recording process on Cros device. 743 This triggers bluetooth profile be switched from A2DP to HSP. 744 Note the user can call stop_recording on audio facade to stop the recording 745 process, or let multimedia_xmlrpc_server terminates it in its cleanup. 746 747 """ 748 audio_facade.set_chrome_active_node_type(None, 'BLUETOOTH') 749 check_audio_nodes(audio_facade, (None, ['BLUETOOTH'])) 750 audio_facade.start_recording( 751 dict(file_type='raw', 752 sample_format='S16_LE', 753 channel=2, 754 rate=48000)) 755 756 757def compare_recorded_correlation(golden_file, recorder, parameters=None): 758 """Checks recorded audio in an AudioInputWidget against a golden file. 759 760 Compares recorded data with golden data by cross correlation method. 761 Refer to audio_helper.compare_data for details of comparison. 762 763 @param golden_file: An AudioTestData object that serves as golden data. 764 @param recorder: An AudioInputWidget that has recorded some audio data. 765 @param parameters: A dict containing parameters for method. 766 767 """ 768 logging.info('Comparing recorded data with golden file %s ...', 769 golden_file.path) 770 audio_helper.compare_data_correlation( 771 golden_file.get_binary(), golden_file.data_format, 772 recorder.get_binary(), recorder.data_format, recorder.channel_map, 773 parameters) 774 775 776def check_and_set_chrome_active_node_types(audio_facade, 777 output_type=None, 778 input_type=None): 779 """Check the target types are available, and set them to be active nodes. 780 781 @param audio_facade: An AudioFacadeNative or AudioFacadeAdapter object. 782 @output_type: An output node type defined in cras_utils.CRAS_NODE_TYPES. 783 None to skip. 784 @input_type: An input node type defined in cras_utils.CRAS_NODE_TYPES. 785 None to skip. 786 787 @raises: error.TestError if the expected node type is missing. We use 788 error.TestError here because usually this step is not the main 789 purpose of the test, but a setup step. 790 791 """ 792 output_types, input_types = audio_facade.get_plugged_node_types() 793 if output_type and output_type not in output_types: 794 raise error.TestError('Target output type %s not present in %r' % 795 (output_type, output_types)) 796 if input_type and input_type not in input_types: 797 raise error.TestError('Target input type %s not present in %r' % 798 (input_type, input_types)) 799 audio_facade.set_chrome_active_node_type(output_type, input_type) 800 801 802def check_hp_or_lineout_plugged(audio_facade): 803 """Checks whether line-out or headphone is plugged. 804 805 @param audio_facade: A RemoteAudioFacade to access audio functions on 806 Cros device. 807 808 @returns: 'LINEOUT' if line-out node is plugged. 809 'HEADPHONE' if headphone node is plugged. 810 811 @raises: error.TestFail if the plugged nodes does not contain one of 812 'LINEOUT' and 'HEADPHONE'. 813 814 """ 815 # Checks whether line-out or headphone is detected. 816 output_nodes, _ = audio_facade.get_plugged_node_types() 817 if 'LINEOUT' in output_nodes: 818 return 'LINEOUT' 819 if 'HEADPHONE' in output_nodes: 820 return 'HEADPHONE' 821 raise error.TestFail( 822 'No line-out or headphone in plugged nodes:%r.' 823 'Please check the audio cable or jack plugger.' % output_nodes) 824 825 826def get_internal_mic_node(host): 827 """Return the expected internal microphone node. 828 829 @param host: The CrosHost object. 830 831 @returns: The name of the expected internal microphone nodes. 832 """ 833 board = get_board_name(host) 834 model = host.get_platform() 835 sku = host.host_info_store.get().device_sku 836 837 return audio_spec.get_internal_mic_node(board, model, sku) 838 839 840def get_plugged_internal_mics(host): 841 """Return a list of all the plugged internal microphone nodes. 842 843 @param host: The CrosHost object. 844 845 @returns: A list of all the plugged internal microphone nodes. 846 """ 847 board = get_board_name(host) 848 model = host.get_platform() 849 sku = host.host_info_store.get().device_sku 850 851 return audio_spec.get_plugged_internal_mics(board, model, sku) 852 853def get_headphone_node(host): 854 """Return the expected headphone node. 855 856 @param host: The CrosHost object. 857 858 @returns: The name of the expected headphone nodes. 859 """ 860 return audio_spec.get_headphone_node(get_board_name(host)) 861