1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides the test utilities for audio tests using chameleon.""" 6 7# TODO (cychiang) Move test utilities from chameleon_audio_helpers 8# to this module. 9 10import logging 11import multiprocessing 12import os 13import pprint 14import re 15from contextlib import contextmanager 16 17from autotest_lib.client.common_lib import error 18from autotest_lib.client.cros import constants 19from autotest_lib.client.cros.audio import audio_analysis 20from autotest_lib.client.cros.audio import audio_spec 21from autotest_lib.client.cros.audio import audio_data 22from autotest_lib.client.cros.audio import audio_helper 23from autotest_lib.client.cros.audio import audio_quality_measurement 24from autotest_lib.client.cros.chameleon import chameleon_audio_ids 25 26CHAMELEON_AUDIO_IDS_TO_CRAS_NODE_TYPES = { 27 chameleon_audio_ids.CrosIds.HDMI: 'HDMI', 28 chameleon_audio_ids.CrosIds.HEADPHONE: 'HEADPHONE', 29 chameleon_audio_ids.CrosIds.EXTERNAL_MIC: 'MIC', 30 chameleon_audio_ids.CrosIds.SPEAKER: 'INTERNAL_SPEAKER', 31 chameleon_audio_ids.CrosIds.INTERNAL_MIC: 'INTERNAL_MIC', 32 chameleon_audio_ids.CrosIds.BLUETOOTH_HEADPHONE: 'BLUETOOTH', 33 chameleon_audio_ids.CrosIds.BLUETOOTH_MIC: 'BLUETOOTH', 34 chameleon_audio_ids.CrosIds.USBIN: 'USB', 35 chameleon_audio_ids.CrosIds.USBOUT: 'USB', 36} 37 38 39def cros_port_id_to_cras_node_type(port_id): 40 """Gets Cras node type from Cros port id. 41 42 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 43 44 @returns: A Cras node type defined in cras_utils.CRAS_NODE_TYPES. 45 46 """ 47 return CHAMELEON_AUDIO_IDS_TO_CRAS_NODE_TYPES[port_id] 48 49 50def check_output_port(audio_facade, port_id): 51 """Checks selected output node on Cros device is correct for a port. 52 53 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 54 55 """ 56 output_node_type = cros_port_id_to_cras_node_type(port_id) 57 check_audio_nodes(audio_facade, ([output_node_type], None)) 58 59 60def check_input_port(audio_facade, port_id): 61 """Checks selected input node on Cros device is correct for a port. 62 63 @param port_id: A port id defined in chameleon_audio_ids.CrosIds. 64 65 """ 66 input_node_type = cros_port_id_to_cras_node_type(port_id) 67 check_audio_nodes(audio_facade, (None, [input_node_type])) 68 69 70def check_audio_nodes(audio_facade, audio_nodes): 71 """Checks the node selected by Cros device is correct. 72 73 @param audio_facade: A RemoteAudioFacade to access audio functions on 74 Cros device. 75 76 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 77 expected selected output and input nodes. 78 79 @raises: error.TestFail if the nodes selected by Cros device are not expected. 80 81 """ 82 curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types() 83 out_audio_nodes, in_audio_nodes = audio_nodes 84 if (in_audio_nodes != None and 85 sorted(curr_in_nodes) != sorted(in_audio_nodes)): 86 raise error.TestFail('Wrong input node(s) selected: %s ' 87 'expected: %s' % (str(curr_in_nodes), str(in_audio_nodes))) 88 89 # Treat line-out node as headphone node in Chameleon test since some 90 # Cros devices detect audio board as lineout. This actually makes sense 91 # because 3.5mm audio jack is connected to LineIn port on Chameleon. 92 if (out_audio_nodes == ['HEADPHONE'] and curr_out_nodes == ['LINEOUT']): 93 return 94 95 if (out_audio_nodes != None and 96 sorted(curr_out_nodes) != sorted(out_audio_nodes)): 97 raise error.TestFail('Wrong output node(s) selected %s ' 98 'expected: %s' % (str(curr_out_nodes), str(out_audio_nodes))) 99 100 101def check_plugged_nodes(audio_facade, audio_nodes): 102 """Checks the nodes that are currently plugged on Cros device are correct. 103 104 @param audio_facade: A RemoteAudioFacade to access audio functions on 105 Cros device. 106 107 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 108 expected plugged output and input nodes. 109 110 @raises: error.TestFail if the plugged nodes on Cros device are not expected. 111 112 """ 113 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 114 out_audio_nodes, in_audio_nodes = audio_nodes 115 if (in_audio_nodes != None and 116 sorted(curr_in_nodes) != sorted(in_audio_nodes)): 117 raise error.TestFail('Wrong input node(s) plugged: %s ' 118 'expected: %s!' % (str(curr_in_nodes), str(in_audio_nodes))) 119 if (out_audio_nodes != None and 120 sorted(curr_out_nodes) != sorted(out_audio_nodes)): 121 raise error.TestFail('Wrong output node(s) plugged: %s ' 122 'expected: %s!' % (str(curr_out_nodes), str(out_audio_nodes))) 123 124 125def bluetooth_nodes_plugged(audio_facade): 126 """Checks bluetooth nodes are plugged. 127 128 @param audio_facade: A RemoteAudioFacade to access audio functions on 129 Cros device. 130 131 @raises: error.TestFail if either input or output bluetooth node is 132 not plugged. 133 134 """ 135 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 136 return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes 137 138 139def get_board_name(host): 140 """Gets the board name. 141 142 @param host: The CrosHost object. 143 144 @returns: The board name. 145 146 """ 147 return host.get_board().split(':')[1] 148 149 150def has_internal_speaker(host): 151 """Checks if the Cros device has speaker. 152 153 @param host: The CrosHost object. 154 155 @returns: True if Cros device has internal speaker. False otherwise. 156 157 """ 158 board_name = get_board_name(host) 159 if not audio_spec.has_internal_speaker(host.get_board_type(), board_name): 160 logging.info('Board %s does not have speaker.', board_name) 161 return False 162 return True 163 164 165def has_internal_microphone(host): 166 """Checks if the Cros device has internal microphone. 167 168 @param host: The CrosHost object. 169 170 @returns: True if Cros device has internal microphone. False otherwise. 171 172 """ 173 board_name = get_board_name(host) 174 if not audio_spec.has_internal_microphone(host.get_board_type()): 175 logging.info('Board %s does not have internal microphone.', board_name) 176 return False 177 return True 178 179 180def has_headphone(host): 181 """Checks if the Cros device has headphone. 182 183 @param host: The CrosHost object. 184 185 @returns: True if Cros device has headphone. False otherwise. 186 187 """ 188 board_name = get_board_name(host) 189 if not audio_spec.has_headphone(host.get_board_type()): 190 logging.info('Board %s does not have headphone.', board_name) 191 return False 192 return True 193 194 195def has_hotwording(host): 196 """Checks if the Cros device has hotwording. 197 198 @param host: The CrosHost object. 199 200 @returns: True if the board has hotwording. False otherwise. 201 202 """ 203 board_name = get_board_name(host) 204 model_name = host.get_platform() 205 206 return audio_spec.has_hotwording(board_name, model_name) 207 208 209def suspend_resume(host, suspend_time_secs, resume_network_timeout_secs=50): 210 """Performs the suspend/resume on Cros device. 211 212 @param suspend_time_secs: Time in seconds to let Cros device suspend. 213 @resume_network_timeout_secs: Time in seconds to let Cros device resume and 214 obtain network. 215 """ 216 def action_suspend(): 217 """Calls the host method suspend.""" 218 host.suspend(suspend_time=suspend_time_secs) 219 220 boot_id = host.get_boot_id() 221 proc = multiprocessing.Process(target=action_suspend) 222 logging.info("Suspending...") 223 proc.daemon = True 224 proc.start() 225 host.test_wait_for_sleep(suspend_time_secs / 3) 226 logging.info("DUT suspended! Waiting to resume...") 227 host.test_wait_for_resume( 228 boot_id, suspend_time_secs + resume_network_timeout_secs) 229 logging.info("DUT resumed!") 230 231 232def dump_cros_audio_logs(host, audio_facade, directory, suffix='', 233 fail_if_warnings=False): 234 """Dumps logs for audio debugging from Cros device. 235 236 @param host: The CrosHost object. 237 @param audio_facade: A RemoteAudioFacade to access audio functions on 238 Cros device. 239 @directory: The directory to dump logs. 240 241 """ 242 def get_file_path(name): 243 """Gets file path to dump logs. 244 245 @param name: The file name. 246 247 @returns: The file path with an optional suffix. 248 249 """ 250 file_name = '%s.%s' % (name, suffix) if suffix else name 251 file_path = os.path.join(directory, file_name) 252 return file_path 253 254 audio_facade.dump_diagnostics(get_file_path('audio_diagnostics.txt')) 255 256 host.get_file('/var/log/messages', get_file_path('messages')) 257 258 host.get_file(constants.MULTIMEDIA_XMLRPC_SERVER_LOG_FILE, 259 get_file_path('multimedia_xmlrpc_server.log')) 260 261 # Raising error if any warning messages in the audio diagnostics 262 if fail_if_warnings: 263 audio_logs = examine_audio_diagnostics(get_file_path( 264 'audio_diagnostics.txt')) 265 if audio_logs != '': 266 raise error.TestFail(audio_logs) 267 268 269def examine_audio_diagnostics(path): 270 """Examines audio diagnostic content. 271 272 @param path: Path to audio diagnostic file. 273 274 @returns: Warning messages or ''. 275 276 """ 277 warning_msgs = [] 278 line_number = 1 279 280 underrun_pattern = re.compile('num_underruns: (\d*)') 281 282 with open(path) as f: 283 for line in f.readlines(): 284 285 # Check for number of underruns. 286 search_result = underrun_pattern.search(line) 287 if search_result: 288 num_underruns = int(search_result.group(1)) 289 if num_underruns != 0: 290 warning_msgs.append( 291 'Found %d underrun at line %d: %s' % ( 292 num_underruns, line_number, line)) 293 294 # TODO(cychiang) add other check like maximum client reply delay. 295 line_number = line_number + 1 296 297 if warning_msgs: 298 return ('Found issue in audio diganostics result : %s' % 299 '\n'.join(warning_msgs)) 300 301 logging.info('audio_diagnostic result looks fine') 302 return '' 303 304 305@contextmanager 306def monitor_no_nodes_changed(audio_facade, callback=None): 307 """Context manager to monitor nodes changed signal on Cros device. 308 309 Starts the counter in the beginning. Stops the counter in the end to make 310 sure there is no NodesChanged signal during the try block. 311 312 E.g. with monitor_no_nodes_changed(audio_facade): 313 do something on playback/recording 314 315 @param audio_facade: A RemoteAudioFacade to access audio functions on 316 Cros device. 317 @param fail_callback: The callback to call before raising TestFail 318 when there is unexpected NodesChanged signals. 319 320 @raises: error.TestFail if there is NodesChanged signal on 321 Cros device during the context. 322 323 """ 324 try: 325 audio_facade.start_counting_signal('NodesChanged') 326 yield 327 finally: 328 count = audio_facade.stop_counting_signal() 329 if count: 330 message = 'Got %d unexpected NodesChanged signal' % count 331 logging.error(message) 332 if callback: 333 callback() 334 raise error.TestFail(message) 335 336 337# The second dominant frequency should have energy less than -26dB of the 338# first dominant frequency in the spectrum. 339_DEFAULT_SECOND_PEAK_RATIO = 0.05 340 341# Tolerate more noise for bluetooth audio using HSP. 342_HSP_SECOND_PEAK_RATIO = 0.2 343 344# Tolerate more noise for speaker. 345_SPEAKER_SECOND_PEAK_RATIO = 0.1 346 347# Tolerate more noise for internal microphone. 348_INTERNAL_MIC_SECOND_PEAK_RATIO = 0.2 349 350# maximum tolerant noise level 351DEFAULT_TOLERANT_NOISE_LEVEL = 0.01 352 353# If relative error of two durations is less than 0.2, 354# they will be considered equivalent. 355DEFAULT_EQUIVALENT_THRESHOLD = 0.2 356 357# The frequency at lower than _DC_FREQ_THRESHOLD should have coefficient 358# smaller than _DC_COEFF_THRESHOLD. 359_DC_FREQ_THRESHOLD = 0.001 360_DC_COEFF_THRESHOLD = 0.01 361 362def get_second_peak_ratio(source_id, recorder_id, is_hsp=False): 363 """Gets the second peak ratio suitable for use case. 364 365 @param source_id: ID defined in chameleon_audio_ids for source widget. 366 @param recorder_id: ID defined in chameleon_audio_ids for recorder widget. 367 @param is_hsp: For bluetooth HSP use case. 368 369 @returns: A float for proper second peak ratio to be used in 370 check_recorded_frequency. 371 """ 372 if is_hsp: 373 return _HSP_SECOND_PEAK_RATIO 374 elif source_id == chameleon_audio_ids.CrosIds.SPEAKER: 375 return _SPEAKER_SECOND_PEAK_RATIO 376 elif recorder_id == chameleon_audio_ids.CrosIds.INTERNAL_MIC: 377 return _INTERNAL_MIC_SECOND_PEAK_RATIO 378 else: 379 return _DEFAULT_SECOND_PEAK_RATIO 380 381 382# The deviation of estimated dominant frequency from golden frequency. 383DEFAULT_FREQUENCY_DIFF_THRESHOLD = 5 384 385def check_recorded_frequency( 386 golden_file, recorder, 387 second_peak_ratio=_DEFAULT_SECOND_PEAK_RATIO, 388 frequency_diff_threshold=DEFAULT_FREQUENCY_DIFF_THRESHOLD, 389 ignore_frequencies=None, check_anomaly=False, check_artifacts=False, 390 mute_durations=None, volume_changes=None, 391 tolerant_noise_level=DEFAULT_TOLERANT_NOISE_LEVEL): 392 """Checks if the recorded data contains sine tone of golden frequency. 393 394 @param golden_file: An AudioTestData object that serves as golden data. 395 @param recorder: An AudioWidget used in the test to record data. 396 @param second_peak_ratio: The test fails when the second dominant 397 frequency has coefficient larger than this 398 ratio of the coefficient of first dominant 399 frequency. 400 @param frequency_diff_threshold: The maximum difference between estimated 401 frequency of test signal and golden 402 frequency. This value should be small for 403 signal passed through line. 404 @param ignore_frequencies: A list of frequencies to be ignored. The 405 component in the spectral with frequency too 406 close to the frequency in the list will be 407 ignored. The comparison of frequencies uses 408 frequency_diff_threshold as well. 409 @param check_anomaly: True to check anomaly in the signal. 410 @param check_artifacts: True to check artifacts in the signal. 411 @param mute_durations: Each duration of mute in seconds in the signal. 412 @param volume_changes: A list containing alternative -1 for decreasing 413 volume and +1 for increasing volume. 414 @param tolerant_noise_level: The maximum noise level can be tolerated 415 416 @returns: A list containing tuples of (dominant_frequency, coefficient) for 417 valid channels. Coefficient can be a measure of signal magnitude 418 on that dominant frequency. Invalid channels where golden_channel 419 is None are ignored. 420 421 @raises error.TestFail if the recorded data does not contain sine tone of 422 golden frequency. 423 424 """ 425 if not ignore_frequencies: 426 ignore_frequencies = [] 427 428 # Also ignore harmonics of ignore frequencies. 429 ignore_frequencies_harmonics = [] 430 for ignore_freq in ignore_frequencies: 431 ignore_frequencies_harmonics += [ignore_freq * n for n in xrange(1, 4)] 432 433 data_format = recorder.data_format 434 recorded_data = audio_data.AudioRawData( 435 binary=recorder.get_binary(), 436 channel=data_format['channel'], 437 sample_format=data_format['sample_format']) 438 439 errors = [] 440 dominant_spectrals = [] 441 442 for test_channel, golden_channel in enumerate(recorder.channel_map): 443 if golden_channel is None: 444 logging.info('Skipped channel %d', test_channel) 445 continue 446 447 signal = recorded_data.channel_data[test_channel] 448 saturate_value = audio_data.get_maximum_value_from_sample_format( 449 data_format['sample_format']) 450 logging.debug('Channel %d max signal: %f', test_channel, max(signal)) 451 normalized_signal = audio_analysis.normalize_signal( 452 signal, saturate_value) 453 logging.debug('saturate_value: %f', saturate_value) 454 logging.debug('max signal after normalized: %f', max(normalized_signal)) 455 spectral = audio_analysis.spectral_analysis( 456 normalized_signal, data_format['rate']) 457 logging.debug('spectral: %s', spectral) 458 459 if not spectral: 460 errors.append( 461 'Channel %d: Can not find dominant frequency.' % 462 test_channel) 463 464 golden_frequency = golden_file.frequencies[golden_channel] 465 logging.debug('Checking channel %s spectral %s against frequency %s', 466 test_channel, spectral, golden_frequency) 467 468 dominant_frequency = spectral[0][0] 469 470 if (abs(dominant_frequency - golden_frequency) > 471 frequency_diff_threshold): 472 errors.append( 473 'Channel %d: Dominant frequency %s is away from golden %s' % 474 (test_channel, dominant_frequency, golden_frequency)) 475 476 if check_anomaly: 477 detected_anomaly = audio_analysis.anomaly_detection( 478 signal=normalized_signal, 479 rate=data_format['rate'], 480 freq=golden_frequency) 481 if detected_anomaly: 482 errors.append( 483 'Channel %d: Detect anomaly near these time: %s' % 484 (test_channel, detected_anomaly)) 485 else: 486 logging.info( 487 'Channel %d: Quality is good as there is no anomaly', 488 test_channel) 489 490 if check_artifacts or mute_durations or volume_changes: 491 result = audio_quality_measurement.quality_measurement( 492 normalized_signal, 493 data_format['rate'], 494 dominant_frequency=dominant_frequency) 495 logging.debug('Quality measurement result:\n%s', pprint.pformat(result)) 496 if check_artifacts: 497 if len(result['artifacts']['noise_before_playback']) > 0: 498 errors.append( 499 'Channel %d: Detects artifacts before playing near' 500 ' these time and duration: %s' % 501 (test_channel, 502 str(result['artifacts']['noise_before_playback']))) 503 504 if len(result['artifacts']['noise_after_playback']) > 0: 505 errors.append( 506 'Channel %d: Detects artifacts after playing near' 507 ' these time and duration: %s' % 508 (test_channel, 509 str(result['artifacts']['noise_after_playback']))) 510 511 if mute_durations: 512 delays = result['artifacts']['delay_during_playback'] 513 delay_durations = [] 514 for x in delays: 515 delay_durations.append(x[1]) 516 mute_matched, delay_matched = longest_common_subsequence( 517 mute_durations, 518 delay_durations, 519 DEFAULT_EQUIVALENT_THRESHOLD) 520 521 # updated delay list 522 new_delays = [delays[i] 523 for i in delay_matched if not delay_matched[i]] 524 525 result['artifacts']['delay_during_playback'] = new_delays 526 527 unmatched_mutes = [mute_durations[i] 528 for i in mute_matched if not mute_matched[i]] 529 530 if len(unmatched_mutes) > 0: 531 errors.append( 532 'Channel %d: Unmatched mute duration: %s' % 533 (test_channel, unmatched_mutes)) 534 535 if check_artifacts: 536 if len(result['artifacts']['delay_during_playback']) > 0: 537 errors.append( 538 'Channel %d: Detects delay during playing near' 539 ' these time and duration: %s' % 540 (test_channel, 541 result['artifacts']['delay_during_playback'])) 542 543 if len(result['artifacts']['burst_during_playback']) > 0: 544 errors.append( 545 'Channel %d: Detects burst/pop near these time: %s' % 546 (test_channel, 547 result['artifacts']['burst_during_playback'])) 548 549 if result['equivalent_noise_level'] > tolerant_noise_level: 550 errors.append( 551 'Channel %d: noise level is higher than tolerant' 552 ' noise level: %f > %f' % 553 (test_channel, 554 result['equivalent_noise_level'], 555 tolerant_noise_level)) 556 557 if volume_changes: 558 matched = True 559 volume_changing = result['volume_changes'] 560 if len(volume_changing) != len(volume_changes): 561 matched = False 562 else: 563 for i in xrange(len(volume_changing)): 564 if volume_changing[i][1] != volume_changes[i]: 565 matched = False 566 break 567 if not matched: 568 errors.append( 569 'Channel %d: volume changing is not as expected, ' 570 'found changing time and events are: %s while ' 571 'expected changing events are %s'% 572 (test_channel, 573 volume_changing, 574 volume_changes)) 575 576 # Filter out the harmonics resulted from imperfect sin wave. 577 # This list is different for different channels. 578 harmonics = [dominant_frequency * n for n in xrange(2, 10)] 579 580 def should_be_ignored(frequency): 581 """Checks if frequency is close to any frequency in ignore list. 582 583 The ignore list is harmonics of frequency to be ignored 584 (like power noise), plus harmonics of dominant frequencies, 585 plus DC. 586 587 @param frequency: The frequency to be tested. 588 589 @returns: True if the frequency should be ignored. False otherwise. 590 591 """ 592 for ignore_frequency in (ignore_frequencies_harmonics + harmonics 593 + [0.0]): 594 if (abs(frequency - ignore_frequency) < 595 frequency_diff_threshold): 596 logging.debug('Ignore frequency: %s', frequency) 597 return True 598 599 # Checks DC is small enough. 600 for freq, coeff in spectral: 601 if freq < _DC_FREQ_THRESHOLD and coeff > _DC_COEFF_THRESHOLD: 602 errors.append( 603 'Channel %d: Found large DC coefficient: ' 604 '(%f Hz, %f)' % (test_channel, freq, coeff)) 605 606 # Filter out the frequencies to be ignored. 607 spectral_post_ignore = [ 608 x for x in spectral if not should_be_ignored(x[0])] 609 610 if len(spectral_post_ignore) > 1: 611 first_coeff = spectral_post_ignore[0][1] 612 second_coeff = spectral_post_ignore[1][1] 613 if second_coeff > first_coeff * second_peak_ratio: 614 errors.append( 615 'Channel %d: Found large second dominant frequencies: ' 616 '%s' % (test_channel, spectral_post_ignore)) 617 618 if not spectral_post_ignore: 619 errors.append( 620 'Channel %d: No frequency left after removing unwanted ' 621 'frequencies. Spectral: %s; After removing unwanted ' 622 'frequencies: %s' % 623 (test_channel, spectral, spectral_post_ignore)) 624 625 else: 626 dominant_spectrals.append(spectral_post_ignore[0]) 627 628 if errors: 629 raise error.TestFail(', '.join(errors)) 630 631 return dominant_spectrals 632 633 634def longest_common_subsequence(list1, list2, equivalent_threshold): 635 """Finds longest common subsequence of list1 and list2 636 637 Such as list1: [0.3, 0.4], 638 list2: [0.001, 0.299, 0.002, 0.401, 0.001] 639 equivalent_threshold: 0.001 640 it will return matched1: [True, True], 641 matched2: [False, True, False, True, False] 642 643 @param list1: a list of integer or float value 644 @param list2: a list of integer or float value 645 @param equivalent_threshold: two values are considered equivalent if their 646 relative error is less than 647 equivalent_threshold. 648 649 @returns: a tuple of list (matched_1, matched_2) indicating each item 650 of list1 and list2 are matched or not. 651 652 """ 653 length1, length2 = len(list1), len(list2) 654 matching = [[0] * (length2 + 1)] * (length1 + 1) 655 # matching[i][j] is the maximum number of matched pairs for first i items 656 # in list1 and first j items in list2. 657 for i in xrange(length1): 658 for j in xrange(length2): 659 # Maximum matched pairs may be obtained without 660 # i-th item in list1 or without j-th item in list2 661 matching[i + 1][j + 1] = max(matching[i + 1][j], 662 matching[i][j + 1]) 663 diff = abs(list1[i] - list2[j]) 664 relative_error = diff / list1[i] 665 # If i-th item in list1 can be matched to j-th item in list2 666 if relative_error < equivalent_threshold: 667 matching[i + 1][j + 1] = matching[i][j] + 1 668 669 # Backtracking which item in list1 and list2 are matched 670 matched1 = [False] * length1 671 matched2 = [False] * length2 672 i, j = length1, length2 673 while i > 0 and j > 0: 674 # Maximum number is obtained by matching i-th item in list1 675 # and j-th one in list2. 676 if matching[i][j] == matching[i - 1][j - 1] + 1: 677 matched1[i - 1] = True 678 matched2[j - 1] = True 679 i, j = i - 1, j - 1 680 elif matching[i][j] == matching[i - 1][j]: 681 i -= 1 682 else: 683 j -= 1 684 return (matched1, matched2) 685 686 687def switch_to_hsp(audio_facade): 688 """Switches to HSP profile. 689 690 Selects bluetooth microphone and runs a recording process on Cros device. 691 This triggers bluetooth profile be switched from A2DP to HSP. 692 Note the user can call stop_recording on audio facade to stop the recording 693 process, or let multimedia_xmlrpc_server terminates it in its cleanup. 694 695 """ 696 audio_facade.set_chrome_active_node_type(None, 'BLUETOOTH') 697 check_audio_nodes(audio_facade, (None, ['BLUETOOTH'])) 698 audio_facade.start_recording( 699 dict(file_type='raw', sample_format='S16_LE', channel=2, 700 rate=48000)) 701 702 703def compare_recorded_correlation(golden_file, recorder, parameters=None): 704 """Checks recorded audio in an AudioInputWidget against a golden file. 705 706 Compares recorded data with golden data by cross correlation method. 707 Refer to audio_helper.compare_data for details of comparison. 708 709 @param golden_file: An AudioTestData object that serves as golden data. 710 @param recorder: An AudioInputWidget that has recorded some audio data. 711 @param parameters: A dict containing parameters for method. 712 713 """ 714 logging.info('Comparing recorded data with golden file %s ...', 715 golden_file.path) 716 audio_helper.compare_data_correlation( 717 golden_file.get_binary(), golden_file.data_format, 718 recorder.get_binary(), recorder.data_format, recorder.channel_map, 719 parameters) 720 721 722def check_and_set_chrome_active_node_types(audio_facade, output_type=None, 723 input_type=None): 724 """Check the target types are available, and set them to be active nodes. 725 726 @param audio_facade: An AudioFacadeNative or AudioFacadeAdapter object. 727 @output_type: An output node type defined in cras_utils.CRAS_NODE_TYPES. 728 None to skip. 729 @input_type: An input node type defined in cras_utils.CRAS_NODE_TYPES. 730 None to skip. 731 732 @raises: error.TestError if the expected node type is missing. We use 733 error.TestError here because usually this step is not the main 734 purpose of the test, but a setup step. 735 736 """ 737 output_types, input_types = audio_facade.get_plugged_node_types() 738 logging.debug('Plugged types: output: %r, input: %r', 739 output_types, input_types) 740 if output_type and output_type not in output_types: 741 raise error.TestError( 742 'Target output type %s not present' % output_type) 743 if input_type and input_type not in input_types: 744 raise error.TestError( 745 'Target input type %s not present' % input_type) 746 audio_facade.set_chrome_active_node_type(output_type, input_type) 747 748 749def check_hp_or_lineout_plugged(audio_facade): 750 """Checks whether line-out or headphone is plugged. 751 752 @param audio_facade: A RemoteAudioFacade to access audio functions on 753 Cros device. 754 755 @returns: 'LINEOUT' if line-out node is plugged. 756 'HEADPHONE' if headphone node is plugged. 757 758 @raises: error.TestFail if the plugged nodes does not contain one of 759 'LINEOUT' and 'HEADPHONE'. 760 761 """ 762 # Checks whether line-out or headphone is detected. 763 output_nodes, _ = audio_facade.get_plugged_node_types() 764 if 'LINEOUT' in output_nodes: 765 return 'LINEOUT' 766 if 'HEADPHONE' in output_nodes: 767 return 'HEADPHONE' 768 raise error.TestFail('Can not detect line-out or headphone') 769