1#!/usr/bin/python2 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6 7import logging 8import numpy 9import os 10import re 11import subprocess 12import tempfile 13import threading 14import time 15 16from glob import glob 17from autotest_lib.client.bin import test, utils 18from autotest_lib.client.bin.input.input_device import * 19from autotest_lib.client.common_lib import error 20from autotest_lib.client.cros.audio import audio_data 21from autotest_lib.client.cros.audio import cmd_utils 22from autotest_lib.client.cros.audio import cras_utils 23from autotest_lib.client.cros.audio import sox_utils 24 25LD_LIBRARY_PATH = 'LD_LIBRARY_PATH' 26 27_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics' 28 29_DEFAULT_NUM_CHANNELS = 2 30_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat' 31_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L' 32_DEFAULT_PLAYBACK_VOLUME = 100 33_DEFAULT_CAPTURE_GAIN = 2500 34_DEFAULT_ALSA_MAX_VOLUME = '100%' 35_DEFAULT_ALSA_CAPTURE_GAIN = '25dB' 36 37# Minimum RMS value to pass when checking recorded file. 38_DEFAULT_SOX_RMS_THRESHOLD = 0.08 39 40_JACK_VALUE_ON_RE = re.compile(r'.*values=on') 41_HP_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Headphone\sJack') 42_MIC_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Mic\sJack') 43 44_SOX_RMS_AMPLITUDE_RE = re.compile(r'RMS\s+amplitude:\s+(.+)') 45_SOX_ROUGH_FREQ_RE = re.compile(r'Rough\s+frequency:\s+(.+)') 46 47_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected' 48_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS' 49_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS' 50 51# Tools from platform/audiotest 52AUDIOFUNTEST_PATH = 'audiofuntest' 53AUDIOLOOP_PATH = 'looptest' 54LOOPBACK_LATENCY_PATH = 'loopback_latency' 55SOX_PATH = 'sox' 56TEST_TONES_PATH = 'test_tones' 57 58_MINIMUM_NORM = 0.001 59_CORRELATION_INDEX_THRESHOLD = 0.999 60# The minimum difference of estimated frequencies between two sine waves. 61_FREQUENCY_DIFF_THRESHOLD = 20 62# The minimum RMS value of meaningful audio data. 63_MEANINGFUL_RMS_THRESHOLD = 0.001 64 65def set_mixer_controls(mixer_settings={}, card='0'): 66 """Sets all mixer controls listed in the mixer settings on card. 67 68 @param mixer_settings: Mixer settings to set. 69 @param card: Index of audio card to set mixer settings for. 70 """ 71 logging.info('Setting mixer control values on %s', card) 72 for item in mixer_settings: 73 logging.info('Setting %s to %s on card %s', 74 item['name'], item['value'], card) 75 cmd = 'amixer -c %s cset name=%s %s' 76 cmd = cmd % (card, item['name'], item['value']) 77 try: 78 utils.system(cmd) 79 except error.CmdError: 80 # A card is allowed not to support all the controls, so don't 81 # fail the test here if we get an error. 82 logging.info('amixer command failed: %s', cmd) 83 84def set_volume_levels(volume, capture): 85 """Sets the volume and capture gain through cras_test_client. 86 87 @param volume: The playback volume to set. 88 @param capture: The capture gain to set. 89 """ 90 logging.info('Setting volume level to %d', volume) 91 try: 92 utils.system('/usr/bin/cras_test_client --volume %d' % volume) 93 logging.info('Setting capture gain to %d', capture) 94 utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture) 95 utils.system('/usr/bin/cras_test_client --dump_server_info') 96 utils.system('/usr/bin/cras_test_client --mute 0') 97 except error.CmdError as e: 98 raise error.TestError( 99 '*** Can not tune volume through CRAS. *** (' + str(e) + ')') 100 101 try: 102 utils.system('amixer -c 0 contents') 103 except error.CmdError as e: 104 logging.info('amixer command failed: %s', str(e)) 105 106def loopback_latency_check(**args): 107 """Checks loopback latency. 108 109 @param args: additional arguments for loopback_latency. 110 111 @return A tuple containing measured and reported latency in uS. 112 Return None if no audio detected. 113 """ 114 noise_threshold = str(args['n']) if args.has_key('n') else '400' 115 116 cmd = '%s -n %s -c' % (LOOPBACK_LATENCY_PATH, noise_threshold) 117 118 output = utils.system_output(cmd, retain_output=True) 119 120 # Sleep for a short while to make sure device is not busy anymore 121 # after called loopback_latency. 122 time.sleep(.1) 123 124 measured_latency = None 125 reported_latency = None 126 for line in output.split('\n'): 127 match = re.search(_MEASURED_LATENCY_RE, line, re.I) 128 if match: 129 measured_latency = int(match.group(1)) 130 continue 131 match = re.search(_REPORTED_LATENCY_RE, line, re.I) 132 if match: 133 reported_latency = int(match.group(1)) 134 continue 135 if re.search(_AUDIO_NOT_FOUND_RE, line, re.I): 136 return None 137 if measured_latency and reported_latency: 138 return (measured_latency, reported_latency) 139 else: 140 # Should not reach here, just in case. 141 return None 142 143def get_mixer_jack_status(jack_reg_exp): 144 """Gets the mixer jack status. 145 146 @param jack_reg_exp: The regular expression to match jack control name. 147 148 @return None if the control does not exist, return True if jack control 149 is detected plugged, return False otherwise. 150 """ 151 output = utils.system_output('amixer -c0 controls', retain_output=True) 152 numid = None 153 for line in output.split('\n'): 154 m = jack_reg_exp.match(line) 155 if m: 156 numid = m.group(1) 157 break 158 159 # Proceed only when matched numid is not empty. 160 if numid: 161 output = utils.system_output('amixer -c0 cget numid=%s' % numid) 162 for line in output.split('\n'): 163 if _JACK_VALUE_ON_RE.match(line): 164 return True 165 return False 166 else: 167 return None 168 169def get_hp_jack_status(): 170 """Gets the status of headphone jack.""" 171 status = get_mixer_jack_status(_HP_JACK_CONTROL_RE) 172 if status is not None: 173 return status 174 175 # When headphone jack is not found in amixer, lookup input devices 176 # instead. 177 # 178 # TODO(hychao): Check hp/mic jack status dynamically from evdev. And 179 # possibly replace the existing check using amixer. 180 for evdev in glob('/dev/input/event*'): 181 device = InputDevice(evdev) 182 if device.is_hp_jack(): 183 return device.get_headphone_insert() 184 else: 185 return None 186 187def get_mic_jack_status(): 188 """Gets the status of mic jack.""" 189 status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE) 190 if status is not None: 191 return status 192 193 # When mic jack is not found in amixer, lookup input devices instead. 194 for evdev in glob('/dev/input/event*'): 195 device = InputDevice(evdev) 196 if device.is_mic_jack(): 197 return device.get_microphone_insert() 198 else: 199 return None 200 201# Functions to test audio palyback. 202def play_sound(duration_seconds=None, audio_file_path=None): 203 """Plays a sound file found at |audio_file_path| for |duration_seconds|. 204 205 If |audio_file_path|=None, plays a default audio file. 206 If |duration_seconds|=None, plays audio file in its entirety. 207 208 @param duration_seconds: Duration to play sound. 209 @param audio_file_path: Path to the audio file. 210 """ 211 if not audio_file_path: 212 audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav' 213 duration_arg = ('-d %d' % duration_seconds) if duration_seconds else '' 214 utils.system('aplay %s %s' % (duration_arg, audio_file_path)) 215 216def get_play_sine_args(channel, odev='default', freq=1000, duration=10, 217 sample_size=16): 218 """Gets the command args to generate a sine wav to play to odev. 219 220 @param channel: 0 for left, 1 for right; otherwize, mono. 221 @param odev: alsa output device. 222 @param freq: frequency of the generated sine tone. 223 @param duration: duration of the generated sine tone. 224 @param sample_size: output audio sample size. Default to 16. 225 """ 226 cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa', 227 odev, 'synth', str(duration)] 228 if channel == 0: 229 cmdargs += ['sine', str(freq), 'sine', '0'] 230 elif channel == 1: 231 cmdargs += ['sine', '0', 'sine', str(freq)] 232 else: 233 cmdargs += ['sine', str(freq)] 234 235 return cmdargs 236 237def play_sine(channel, odev='default', freq=1000, duration=10, 238 sample_size=16): 239 """Generates a sine wave and plays to odev. 240 241 @param channel: 0 for left, 1 for right; otherwize, mono. 242 @param odev: alsa output device. 243 @param freq: frequency of the generated sine tone. 244 @param duration: duration of the generated sine tone. 245 @param sample_size: output audio sample size. Default to 16. 246 """ 247 cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size) 248 utils.system(' '.join(cmdargs)) 249 250def get_audio_rms(sox_output): 251 """Gets the audio RMS value from sox stat output 252 253 @param sox_output: Output of sox stat command. 254 255 @return The RMS value parsed from sox stat output. 256 """ 257 for rms_line in sox_output.split('\n'): 258 m = _SOX_RMS_AMPLITUDE_RE.match(rms_line) 259 if m is not None: 260 return float(m.group(1)) 261 262def get_rough_freq(sox_output): 263 """Gets the rough audio frequency from sox stat output 264 265 @param sox_output: Output of sox stat command. 266 267 @return The rough frequency value parsed from sox stat output. 268 """ 269 for rms_line in sox_output.split('\n'): 270 m = _SOX_ROUGH_FREQ_RE.match(rms_line) 271 if m is not None: 272 return int(m.group(1)) 273 274def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD): 275 """Checks if the calculated RMS value is expected. 276 277 @param sox_output: The output from sox stat command. 278 @param sox_threshold: The threshold to test RMS value against. 279 280 @raises error.TestError if RMS amplitude can't be parsed. 281 @raises error.TestFail if the RMS amplitude of the recording isn't above 282 the threshold. 283 """ 284 rms_val = get_audio_rms(sox_output) 285 286 # In case we don't get a valid RMS value. 287 if rms_val is None: 288 raise error.TestError( 289 'Failed to generate an audio RMS value from playback.') 290 291 logging.info('Got audio RMS value of %f. Minimum pass is %f.', 292 rms_val, sox_threshold) 293 if rms_val < sox_threshold: 294 raise error.TestFail( 295 'Audio RMS value %f too low. Minimum pass is %f.' % 296 (rms_val, sox_threshold)) 297 298def noise_reduce_file(in_file, noise_file, out_file, 299 sox_format=_DEFAULT_SOX_FORMAT): 300 """Runs the sox command to reduce noise. 301 302 Runs the sox command to noise-reduce in_file using the noise 303 profile from noise_file. 304 305 @param in_file: The file to noise reduce. 306 @param noise_file: The file containing the noise profile. 307 This can be created by recording silence. 308 @param out_file: The file contains the noise reduced sound. 309 @param sox_format: The sox format to generate sox command. 310 """ 311 prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH, 312 sox_format, noise_file) 313 reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' % 314 (SOX_PATH, sox_format, in_file, sox_format, out_file)) 315 utils.system('%s | %s' % (prof_cmd, reduce_cmd)) 316 317def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND): 318 """Records a sample from the default input device. 319 320 @param tmpfile: The file to record to. 321 @param record_command: The command to record audio. 322 """ 323 utils.system('%s %s' % (record_command, tmpfile)) 324 325def create_wav_file(wav_dir, prefix=""): 326 """Creates a unique name for wav file. 327 328 The created file name will be preserved in autotest result directory 329 for future analysis. 330 331 @param wav_dir: The directory of created wav file. 332 @param prefix: specified file name prefix. 333 """ 334 filename = "%s-%s.wav" % (prefix, time.time()) 335 return os.path.join(wav_dir, filename) 336 337def run_in_parallel(*funs): 338 """Runs methods in parallel. 339 340 @param funs: methods to run. 341 """ 342 threads = [] 343 for f in funs: 344 t = threading.Thread(target=f) 345 t.start() 346 threads.append(t) 347 348 for t in threads: 349 t.join() 350 351def get_channel_sox_stat( 352 input_audio, channel_index, channels=2, bits=16, rate=48000): 353 """Gets the sox stat info of the selected channel in the input audio file. 354 355 @param input_audio: The input audio file to be analyzed. 356 @param channel_index: The index of the channel to be analyzed. 357 (1 for the first channel). 358 @param channels: The number of channels in the input audio. 359 @param bits: The number of bits of each audio sample. 360 @param rate: The sampling rate. 361 """ 362 if channel_index <= 0 or channel_index > channels: 363 raise ValueError('incorrect channel_indexi: %d' % channel_index) 364 365 if channels == 1: 366 return sox_utils.get_stat( 367 input_audio, channels=channels, bits=bits, rate=rate) 368 369 p1 = cmd_utils.popen( 370 sox_utils.extract_channel_cmd( 371 input_audio, '-', channel_index, 372 channels=channels, bits=bits, rate=rate), 373 stdout=subprocess.PIPE) 374 p2 = cmd_utils.popen( 375 sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate), 376 stdin=p1.stdout, stderr=subprocess.PIPE) 377 stat_output = p2.stderr.read() 378 cmd_utils.wait_and_check_returncode(p1, p2) 379 return sox_utils.parse_stat_output(stat_output) 380 381 382def get_rms(input_audio, channels=1, bits=16, rate=48000): 383 """Gets the RMS values of all channels of the input audio. 384 385 @param input_audio: The input audio file to be checked. 386 @param channels: The number of channels in the input audio. 387 @param bits: The number of bits of each audio sample. 388 @param rate: The sampling rate. 389 """ 390 stats = [get_channel_sox_stat( 391 input_audio, i + 1, channels=channels, bits=bits, 392 rate=rate) for i in xrange(channels)] 393 394 logging.info('sox stat: %s', [str(s) for s in stats]) 395 return [s.rms for s in stats] 396 397 398def reduce_noise_and_get_rms( 399 input_audio, noise_file, channels=1, bits=16, rate=48000): 400 """Reduces noise in the input audio by the given noise file and then gets 401 the RMS values of all channels of the input audio. 402 403 @param input_audio: The input audio file to be analyzed. 404 @param noise_file: The noise file used to reduce noise in the input audio. 405 @param channels: The number of channels in the input audio. 406 @param bits: The number of bits of each audio sample. 407 @param rate: The sampling rate. 408 """ 409 with tempfile.NamedTemporaryFile() as reduced_file: 410 p1 = cmd_utils.popen( 411 sox_utils.noise_profile_cmd( 412 noise_file, '-', channels=channels, bits=bits, 413 rate=rate), 414 stdout=subprocess.PIPE) 415 p2 = cmd_utils.popen( 416 sox_utils.noise_reduce_cmd( 417 input_audio, reduced_file.name, '-', 418 channels=channels, bits=bits, rate=rate), 419 stdin=p1.stdout) 420 cmd_utils.wait_and_check_returncode(p1, p2) 421 return get_rms(reduced_file.name, channels, bits, rate) 422 423 424def cras_rms_test_setup(): 425 """Setups for the cras_rms_tests. 426 427 To make sure the line_out-to-mic_in path is all green. 428 """ 429 # TODO(owenlin): Now, the nodes are choosed by chrome. 430 # We should do it here. 431 cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME) 432 cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME) 433 434 cras_utils.set_system_mute(False) 435 cras_utils.set_capture_mute(False) 436 437 438def dump_rms_postmortem(result_dir): 439 """Dumps postmortem for rms tests.""" 440 try: 441 dump_audio_diagnostics( 442 os.path.join(result_dir, "audio_diagnostics.txt")) 443 except Exception: 444 logging.exception('Error while generating postmortem report') 445 446 447def dump_audio_diagnostics(file_path=None): 448 """Dumps audio diagnostics results to a file 449 450 Dumps the result of audio_diagnostics to a file. Returns a string 451 containing the result if the file_path is not specified. 452 453 @returns: None if 'file_path' is specified, otherwise, a string containing 454 the audio diagnostic results. 455 """ 456 if file_path: 457 with open(file_path, 'w') as f: 458 return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=f) 459 460 return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=subprocess.PIPE) 461 462 463def get_max_cross_correlation(signal_a, signal_b): 464 """Gets max cross-correlation and best time delay of two signals. 465 466 Computes cross-correlation function between two 467 signals and gets the maximum value and time delay. 468 The steps includes: 469 1. Compute cross-correlation function of X and Y and get Cxy. 470 The correlation function Cxy is an array where Cxy[k] is the 471 cross product of X and Y when Y is delayed by k. 472 Refer to manual of numpy.correlate for detail of correlation. 473 2. Find the maximum value C_max and index C_index in Cxy. 474 3. Compute L2 norm of X and Y to get norm(X) and norm(Y). 475 4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation. 476 477 Max cross-correlation indicates the similarity of X and Y. The value 478 is 1 if X equals Y multiplied by a positive scalar. 479 The value is -1 if X equals Y multiplied by a negative scaler. 480 Any constant level shift will be regarded as distortion and will make 481 max cross-correlation value deviated from 1. 482 C_index is the best time delay of Y that make Y looks similar to X. 483 Refer to http://en.wikipedia.org/wiki/Cross-correlation. 484 485 @param signal_a: A list of numbers which contains the first signal. 486 @param signal_b: A list of numbers which contains the second signal. 487 488 @raises: ValueError if any number in signal_a or signal_b is not a float. 489 ValueError if norm of any array is less than _MINIMUM_NORM. 490 491 @returns: A tuple (correlation index, best delay). If there are more than 492 one best delay, just return the first one. 493 """ 494 def check_list_contains_float(numbers): 495 """Checks the elements in a list are all float. 496 497 @param numbers: A list of numbers. 498 499 @raises: ValueError if there is any element which is not a float 500 in the list. 501 """ 502 if any(not isinstance(x, float) for x in numbers): 503 raise ValueError('List contains number which is not a float') 504 505 check_list_contains_float(signal_a) 506 check_list_contains_float(signal_b) 507 508 norm_a = numpy.linalg.norm(signal_a) 509 norm_b = numpy.linalg.norm(signal_b) 510 logging.debug('norm_a: %f', norm_a) 511 logging.debug('norm_b: %f', norm_b) 512 if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM: 513 raise ValueError('No meaningful data as norm is too small.') 514 515 correlation = numpy.correlate(signal_a, signal_b, 'full') 516 max_correlation = max(correlation) 517 best_delays = [i for i, j in enumerate(correlation) if j == max_correlation] 518 if len(best_delays) > 1: 519 logging.warning('There are more than one best delay: %r', best_delays) 520 return max_correlation / (norm_a * norm_b), best_delays[0] 521 522 523def trim_data(data, threshold=0): 524 """Trims a data by removing value that is too small in head and tail. 525 526 Removes elements in head and tail whose absolute value is smaller than 527 or equal to threshold. 528 E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) = 529 ([0.2, 0.3, 0.2], 2) 530 531 @param data: A list of numbers. 532 @param threshold: The threshold to compare against. 533 534 @returns: A tuple (trimmed_data, end_trimmed_length), where 535 end_trimmed_length is the length of original data being trimmed 536 from the end. 537 Returns ([], None) if there is no valid data. 538 """ 539 indice_valid = [ 540 i for i, j in enumerate(data) if abs(j) > threshold] 541 if not indice_valid: 542 logging.warning( 543 'There is no element with absolute value greater ' 544 'than threshold %f', threshold) 545 return [], None 546 logging.debug('Start and end of indice_valid: %d, %d', 547 indice_valid[0], indice_valid[-1]) 548 end_trimmed_length = len(data) - indice_valid[-1] - 1 549 logging.debug('Trimmed length in the end: %d', end_trimmed_length) 550 return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length) 551 552 553def get_one_channel_correlation(test_data, golden_data): 554 """Gets max cross-correlation of test_data and golden_data. 555 556 Trims test data and compute the max cross-correlation against golden_data. 557 Signal can be trimmed because those zero values in the head and tail of 558 a signal will not affect correlation computation. 559 560 @param test_data: A list containing the data to compare against golden data. 561 @param golden_data: A list containing the golden data. 562 563 @returns: A tuple (max cross-correlation, best_delay) if data is valid. 564 Otherwise returns (None, None). Refer to docstring of 565 get_max_cross_correlation. 566 """ 567 trimmed_test_data, end_trimmed_length = trim_data(test_data) 568 569 def to_float(samples): 570 """Casts elements in the list to float. 571 572 @param samples: A list of numbers. 573 574 @returns: A list of original numbers casted to float. 575 """ 576 samples_float = [float(x) for x in samples] 577 return samples_float 578 579 max_cross_correlation, best_delay = get_max_cross_correlation( 580 to_float(golden_data), 581 to_float(trimmed_test_data)) 582 583 # The reason to add back the trimmed length in the end. 584 # E.g.: 585 # golden data: 586 # 587 # |-----------vvvv----------------| vvvv is the signal of interest. 588 # a b 589 # 590 # test data: 591 # 592 # |---x----vvvv--------x----------------| x is the place to trim. 593 # c d e f 594 # 595 # trimmed test data: 596 # 597 # |----vvvv--------| 598 # d e 599 # 600 # The first output of cross correlation computation : 601 # 602 # |-----------vvvv----------------| 603 # a b 604 # 605 # |----vvvv--------| 606 # d e 607 # 608 # The largest output of cross correlation computation happens at 609 # delay a + e. 610 # 611 # |-----------vvvv----------------| 612 # a b 613 # 614 # |----vvvv--------| 615 # d e 616 # 617 # Cross correlation starts computing by aligning the last sample 618 # of the trimmed test data to the first sample of golden data. 619 # The best delay calculated from trimmed test data and golden data 620 # cross correlation is e + a. But the real best delay that should be 621 # identical on two channel should be e + a + f. 622 # So we need to add back the length being trimmed in the end. 623 624 if max_cross_correlation: 625 return max_cross_correlation, best_delay + end_trimmed_length 626 else: 627 return None, None 628 629 630def compare_one_channel_correlation(test_data, golden_data, parameters): 631 """Compares two one-channel data by correlation. 632 633 @param test_data: A list containing the data to compare against golden data. 634 @param golden_data: A list containing the golden data. 635 @param parameters: A dict containing parameters for method. 636 637 @returns: A dict containing: 638 index: The index of similarity where 1 means they are different 639 only by a positive scale. 640 best_delay: The best delay of test data in relative to golden 641 data. 642 equal: A bool containing comparing result. 643 """ 644 if 'correlation_threshold' in parameters: 645 threshold = parameters['correlation_threshold'] 646 else: 647 threshold = _CORRELATION_INDEX_THRESHOLD 648 649 result_dict = dict() 650 max_cross_correlation, best_delay = get_one_channel_correlation( 651 test_data, golden_data) 652 result_dict['index'] = max_cross_correlation 653 result_dict['best_delay'] = best_delay 654 result_dict['equal'] = True if ( 655 max_cross_correlation and 656 max_cross_correlation > threshold) else False 657 logging.debug('result_dict: %r', result_dict) 658 return result_dict 659 660 661def compare_data_correlation(golden_data_binary, golden_data_format, 662 test_data_binary, test_data_format, 663 channel_map, parameters=None): 664 """Compares two raw data using correlation. 665 666 @param golden_data_binary: The binary containing golden data. 667 @param golden_data_format: The data format of golden data. 668 @param test_data_binary: The binary containing test data. 669 @param test_data_format: The data format of test data. 670 @param channel_map: A list containing channel mapping. 671 E.g. [1, 0, None, None, None, None, None, None] means 672 channel 0 of test data should map to channel 1 of 673 golden data. Channel 1 of test data should map to 674 channel 0 of golden data. Channel 2 to 7 of test data 675 should be skipped. 676 @param parameters: A dict containing parameters for method, if needed. 677 678 @raises: NotImplementedError if file type is not raw. 679 NotImplementedError if sampling rates of two data are not the same. 680 error.TestFail if golden data and test data are not equal. 681 """ 682 if parameters is None: 683 parameters = dict() 684 685 if (golden_data_format['file_type'] != 'raw' or 686 test_data_format['file_type'] != 'raw'): 687 raise NotImplementedError('Only support raw data in compare_data.') 688 if (golden_data_format['rate'] != test_data_format['rate']): 689 raise NotImplementedError( 690 'Only support comparing data with the same sampling rate') 691 golden_data = audio_data.AudioRawData( 692 binary=golden_data_binary, 693 channel=golden_data_format['channel'], 694 sample_format=golden_data_format['sample_format']) 695 test_data = audio_data.AudioRawData( 696 binary=test_data_binary, 697 channel=test_data_format['channel'], 698 sample_format=test_data_format['sample_format']) 699 compare_results = [] 700 for test_channel, golden_channel in enumerate(channel_map): 701 if golden_channel is None: 702 logging.info('Skipped channel %d', test_channel) 703 continue 704 test_data_one_channel = test_data.channel_data[test_channel] 705 golden_data_one_channel = golden_data.channel_data[golden_channel] 706 result_dict = dict(test_channel=test_channel, 707 golden_channel=golden_channel) 708 result_dict.update( 709 compare_one_channel_correlation( 710 test_data_one_channel, golden_data_one_channel, 711 parameters)) 712 compare_results.append(result_dict) 713 logging.info('compare_results: %r', compare_results) 714 for result in compare_results: 715 if not result['equal']: 716 error_msg = ('Failed on test channel %d and golden channel %d with ' 717 'index %f') % ( 718 result['test_channel'], 719 result['golden_channel'], 720 result['index']) 721 logging.error(error_msg) 722 raise error.TestFail(error_msg) 723 # Also checks best delay are exactly the same. 724 best_delays = set([result['best_delay'] for result in compare_results]) 725 if len(best_delays) > 1: 726 error_msg = 'There are more than one best delay: %s' % best_delays 727 logging.error(error_msg) 728 raise error.TestFail(error_msg) 729 730 731def recorded_filesize_check(filesize, 732 duration, 733 channels=1, 734 rate=48000, 735 bits=16, 736 tolerant_ratio=0.1): 737 """Checks the recorded file size is correct. The check will pass if the 738 file size is larger than expected and smaller than our tolerant size. We 739 can tolerate size larger than expected as the way cras_test_client stop 740 recording base on duration is not always accurate but should record longer 741 than expected audio. 742 743 @param filesize: Actually file size. 744 @param duration: Expected seconds to record. 745 @param channels: Number of channels to record. 746 @param rate: Recording sample rate. 747 @param bits: The sample bit width. 748 @param tolerant_ratio: The larger than expected file size ratio that we 749 regard as pass. 750 751 @raises: TestFail if the size is less or larger than expect. 752 """ 753 expected = duration * channels * (bits / 8) * rate 754 ratio = abs(float(filesize) / expected) 755 if ratio < 1 or ratio > 1 + tolerant_ratio: 756 raise error.TestFail( 757 'File size not correct: %d expect: %d' % (filesize, expected)) 758 759 760class _base_rms_test(test.test): 761 """Base class for all rms_test """ 762 763 def postprocess(self): 764 super(_base_rms_test, self).postprocess() 765 766 # Sum up the number of failed constraints in each iteration 767 if sum(len(x) for x in self.failed_constraints): 768 dump_audio_diagnostics(test.resultsdir) 769 770 771class chrome_rms_test(_base_rms_test): 772 """Base test class for audio RMS test with Chrome. 773 774 The chrome instance can be accessed by self.chrome. 775 """ 776 def warmup(self): 777 super(chrome_rms_test, self).warmup() 778 779 # Not all client of this file using telemetry. 780 # Just do the import here for those who really need it. 781 from autotest_lib.client.common_lib.cros import chrome 782 783 self.chrome = chrome.Chrome(init_network_controller=True) 784 785 # The audio configuration could be changed when we 786 # restart chrome. 787 try: 788 cras_rms_test_setup() 789 except Exception: 790 self.chrome.browser.Close() 791 raise 792 793 794 def cleanup(self, *args): 795 try: 796 self.chrome.close() 797 finally: 798 super(chrome_rms_test, self).cleanup() 799 800class cras_rms_test(_base_rms_test): 801 """Base test class for CRAS audio RMS test.""" 802 803 def warmup(self): 804 super(cras_rms_test, self).warmup() 805 # Stop ui to make sure there are not other streams. 806 utils.stop_service('ui', ignore_status=True) 807 cras_rms_test_setup() 808 809 def cleanup(self, *args): 810 # Restart ui. 811 utils.start_service('ui', ignore_status=True) 812 813 814class alsa_rms_test(_base_rms_test): 815 """Base test class for ALSA audio RMS test. 816 817 Note the warmup will take 10 seconds and the device cannot be used before it 818 returns. 819 """ 820 def warmup(self): 821 super(alsa_rms_test, self).warmup() 822 823 cras_rms_test_setup() 824 # We need CRAS to initialize the volume and gain. 825 cras_utils.playback(playback_file="/dev/zero", duration=1) 826 # CRAS will release the device after 10 seconds. 827 time.sleep(11) 828