1#!/usr/bin/python 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6 7import logging 8import numpy 9import os 10import re 11import subprocess 12import tempfile 13import threading 14import time 15 16from glob import glob 17from autotest_lib.client.bin import test, utils 18from autotest_lib.client.bin.input.input_device import * 19from autotest_lib.client.common_lib import error 20from autotest_lib.client.cros.audio import alsa_utils 21from autotest_lib.client.cros.audio import audio_data 22from autotest_lib.client.cros.audio import cmd_utils 23from autotest_lib.client.cros.audio import cras_utils 24from autotest_lib.client.cros.audio import sox_utils 25 26LD_LIBRARY_PATH = 'LD_LIBRARY_PATH' 27 28_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics' 29 30_DEFAULT_NUM_CHANNELS = 2 31_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat' 32_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L' 33_DEFAULT_PLAYBACK_VOLUME = 100 34_DEFAULT_CAPTURE_GAIN = 2500 35_DEFAULT_ALSA_MAX_VOLUME = '100%' 36_DEFAULT_ALSA_CAPTURE_GAIN = '25dB' 37 38# Minimum RMS value to pass when checking recorded file. 39_DEFAULT_SOX_RMS_THRESHOLD = 0.08 40 41_JACK_VALUE_ON_RE = re.compile(r'.*values=on') 42_HP_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Headphone\sJack') 43_MIC_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Mic\sJack') 44 45_SOX_RMS_AMPLITUDE_RE = re.compile(r'RMS\s+amplitude:\s+(.+)') 46_SOX_ROUGH_FREQ_RE = re.compile(r'Rough\s+frequency:\s+(.+)') 47 48_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected' 49_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS' 50_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS' 51 52# Tools from platform/audiotest 53AUDIOFUNTEST_PATH = 'audiofuntest' 54AUDIOLOOP_PATH = 'looptest' 55LOOPBACK_LATENCY_PATH = 'loopback_latency' 56SOX_PATH = 'sox' 57TEST_TONES_PATH = 'test_tones' 58 59_MINIMUM_NORM = 0.001 60_CORRELATION_INDEX_THRESHOLD = 0.999 61# The minimum difference of estimated frequencies between two sine waves. 62_FREQUENCY_DIFF_THRESHOLD = 20 63# The minimum RMS value of meaningful audio data. 64_MEANINGFUL_RMS_THRESHOLD = 0.001 65 66def set_mixer_controls(mixer_settings={}, card='0'): 67 """Sets all mixer controls listed in the mixer settings on card. 68 69 @param mixer_settings: Mixer settings to set. 70 @param card: Index of audio card to set mixer settings for. 71 """ 72 logging.info('Setting mixer control values on %s', card) 73 for item in mixer_settings: 74 logging.info('Setting %s to %s on card %s', 75 item['name'], item['value'], card) 76 cmd = 'amixer -c %s cset name=%s %s' 77 cmd = cmd % (card, item['name'], item['value']) 78 try: 79 utils.system(cmd) 80 except error.CmdError: 81 # A card is allowed not to support all the controls, so don't 82 # fail the test here if we get an error. 83 logging.info('amixer command failed: %s', cmd) 84 85def set_volume_levels(volume, capture): 86 """Sets the volume and capture gain through cras_test_client. 87 88 @param volume: The playback volume to set. 89 @param capture: The capture gain to set. 90 """ 91 logging.info('Setting volume level to %d', volume) 92 try: 93 utils.system('/usr/bin/cras_test_client --volume %d' % volume) 94 logging.info('Setting capture gain to %d', capture) 95 utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture) 96 utils.system('/usr/bin/cras_test_client --dump_server_info') 97 utils.system('/usr/bin/cras_test_client --mute 0') 98 utils.system('amixer -c 0 contents') 99 except error.CmdError, e: 100 raise error.TestError( 101 '*** Can not tune volume through CRAS. *** (' + str(e) + ')') 102 103def loopback_latency_check(**args): 104 """Checks loopback latency. 105 106 @param args: additional arguments for loopback_latency. 107 108 @return A tuple containing measured and reported latency in uS. 109 Return None if no audio detected. 110 """ 111 noise_threshold = str(args['n']) if args.has_key('n') else '400' 112 113 cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold) 114 115 output = utils.system_output(cmd, retain_output=True) 116 117 # Sleep for a short while to make sure device is not busy anymore 118 # after called loopback_latency. 119 time.sleep(.1) 120 121 measured_latency = None 122 reported_latency = None 123 for line in output.split('\n'): 124 match = re.search(_MEASURED_LATENCY_RE, line, re.I) 125 if match: 126 measured_latency = int(match.group(1)) 127 continue 128 match = re.search(_REPORTED_LATENCY_RE, line, re.I) 129 if match: 130 reported_latency = int(match.group(1)) 131 continue 132 if re.search(_AUDIO_NOT_FOUND_RE, line, re.I): 133 return None 134 if measured_latency and reported_latency: 135 return (measured_latency, reported_latency) 136 else: 137 # Should not reach here, just in case. 138 return None 139 140def get_mixer_jack_status(jack_reg_exp): 141 """Gets the mixer jack status. 142 143 @param jack_reg_exp: The regular expression to match jack control name. 144 145 @return None if the control does not exist, return True if jack control 146 is detected plugged, return False otherwise. 147 """ 148 output = utils.system_output('amixer -c0 controls', retain_output=True) 149 numid = None 150 for line in output.split('\n'): 151 m = jack_reg_exp.match(line) 152 if m: 153 numid = m.group(1) 154 break 155 156 # Proceed only when matched numid is not empty. 157 if numid: 158 output = utils.system_output('amixer -c0 cget numid=%s' % numid) 159 for line in output.split('\n'): 160 if _JACK_VALUE_ON_RE.match(line): 161 return True 162 return False 163 else: 164 return None 165 166def get_hp_jack_status(): 167 """Gets the status of headphone jack.""" 168 status = get_mixer_jack_status(_HP_JACK_CONTROL_RE) 169 if status is not None: 170 return status 171 172 # When headphone jack is not found in amixer, lookup input devices 173 # instead. 174 # 175 # TODO(hychao): Check hp/mic jack status dynamically from evdev. And 176 # possibly replace the existing check using amixer. 177 for evdev in glob('/dev/input/event*'): 178 device = InputDevice(evdev) 179 if device.is_hp_jack(): 180 return device.get_headphone_insert() 181 else: 182 return None 183 184def get_mic_jack_status(): 185 """Gets the status of mic jack.""" 186 status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE) 187 if status is not None: 188 return status 189 190 # When mic jack is not found in amixer, lookup input devices instead. 191 for evdev in glob('/dev/input/event*'): 192 device = InputDevice(evdev) 193 if device.is_mic_jack(): 194 return device.get_microphone_insert() 195 else: 196 return None 197 198def log_loopback_dongle_status(): 199 """Log the status of the loopback dongle to make sure it is equipped.""" 200 dongle_status_ok = True 201 202 # Check Mic Jack 203 mic_jack_status = get_mic_jack_status() 204 logging.info('Mic jack status: %s', mic_jack_status) 205 dongle_status_ok &= bool(mic_jack_status) 206 207 # Check Headphone Jack 208 hp_jack_status = get_hp_jack_status() 209 logging.info('Headphone jack status: %s', hp_jack_status) 210 dongle_status_ok &= bool(hp_jack_status) 211 212 # Use latency check to test if audio can be captured through dongle. 213 # We only want to know the basic function of dongle, so no need to 214 # assert the latency accuracy here. 215 latency = loopback_latency_check(n=4000) 216 if latency: 217 logging.info('Got latency measured %d, reported %d', 218 latency[0], latency[1]) 219 else: 220 logging.info('Latency check fail.') 221 dongle_status_ok = False 222 223 logging.info('audio loopback dongle test: %s', 224 'PASS' if dongle_status_ok else 'FAIL') 225 226# Functions to test audio palyback. 227def play_sound(duration_seconds=None, audio_file_path=None): 228 """Plays a sound file found at |audio_file_path| for |duration_seconds|. 229 230 If |audio_file_path|=None, plays a default audio file. 231 If |duration_seconds|=None, plays audio file in its entirety. 232 233 @param duration_seconds: Duration to play sound. 234 @param audio_file_path: Path to the audio file. 235 """ 236 if not audio_file_path: 237 audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav' 238 duration_arg = ('-d %d' % duration_seconds) if duration_seconds else '' 239 utils.system('aplay %s %s' % (duration_arg, audio_file_path)) 240 241def get_play_sine_args(channel, odev='default', freq=1000, duration=10, 242 sample_size=16): 243 """Gets the command args to generate a sine wav to play to odev. 244 245 @param channel: 0 for left, 1 for right; otherwize, mono. 246 @param odev: alsa output device. 247 @param freq: frequency of the generated sine tone. 248 @param duration: duration of the generated sine tone. 249 @param sample_size: output audio sample size. Default to 16. 250 """ 251 cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa', 252 odev, 'synth', str(duration)] 253 if channel == 0: 254 cmdargs += ['sine', str(freq), 'sine', '0'] 255 elif channel == 1: 256 cmdargs += ['sine', '0', 'sine', str(freq)] 257 else: 258 cmdargs += ['sine', str(freq)] 259 260 return cmdargs 261 262def play_sine(channel, odev='default', freq=1000, duration=10, 263 sample_size=16): 264 """Generates a sine wave and plays to odev. 265 266 @param channel: 0 for left, 1 for right; otherwize, mono. 267 @param odev: alsa output device. 268 @param freq: frequency of the generated sine tone. 269 @param duration: duration of the generated sine tone. 270 @param sample_size: output audio sample size. Default to 16. 271 """ 272 cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size) 273 utils.system(' '.join(cmdargs)) 274 275# Functions to compose customized sox command, execute it and process the 276# output of sox command. 277def get_sox_mixer_cmd(infile, channel, 278 num_channels=_DEFAULT_NUM_CHANNELS, 279 sox_format=_DEFAULT_SOX_FORMAT): 280 """Gets sox mixer command to reduce channel. 281 282 @param infile: Input file name. 283 @param channel: The selected channel to take effect. 284 @param num_channels: The number of total channels to test. 285 @param sox_format: Format to generate sox command. 286 """ 287 # Build up a pan value string for the sox command. 288 if channel == 0: 289 pan_values = '1' 290 else: 291 pan_values = '0' 292 for pan_index in range(1, num_channels): 293 if channel == pan_index: 294 pan_values = '%s%s' % (pan_values, ',1') 295 else: 296 pan_values = '%s%s' % (pan_values, ',0') 297 298 return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH, 299 sox_format, infile, sox_format, pan_values) 300 301def sox_stat_output(infile, channel, 302 num_channels=_DEFAULT_NUM_CHANNELS, 303 sox_format=_DEFAULT_SOX_FORMAT): 304 """Executes sox stat command. 305 306 @param infile: Input file name. 307 @param channel: The selected channel. 308 @param num_channels: The number of total channels to test. 309 @param sox_format: Format to generate sox command. 310 311 @return The output of sox stat command 312 """ 313 sox_mixer_cmd = get_sox_mixer_cmd(infile, channel, 314 num_channels, sox_format) 315 stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format) 316 sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd) 317 return utils.system_output(sox_cmd, retain_output=True) 318 319def get_audio_rms(sox_output): 320 """Gets the audio RMS value from sox stat output 321 322 @param sox_output: Output of sox stat command. 323 324 @return The RMS value parsed from sox stat output. 325 """ 326 for rms_line in sox_output.split('\n'): 327 m = _SOX_RMS_AMPLITUDE_RE.match(rms_line) 328 if m is not None: 329 return float(m.group(1)) 330 331def get_rough_freq(sox_output): 332 """Gets the rough audio frequency from sox stat output 333 334 @param sox_output: Output of sox stat command. 335 336 @return The rough frequency value parsed from sox stat output. 337 """ 338 for rms_line in sox_output.split('\n'): 339 m = _SOX_ROUGH_FREQ_RE.match(rms_line) 340 if m is not None: 341 return int(m.group(1)) 342 343def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD): 344 """Checks if the calculated RMS value is expected. 345 346 @param sox_output: The output from sox stat command. 347 @param sox_threshold: The threshold to test RMS value against. 348 349 @raises error.TestError if RMS amplitude can't be parsed. 350 @raises error.TestFail if the RMS amplitude of the recording isn't above 351 the threshold. 352 """ 353 rms_val = get_audio_rms(sox_output) 354 355 # In case we don't get a valid RMS value. 356 if rms_val is None: 357 raise error.TestError( 358 'Failed to generate an audio RMS value from playback.') 359 360 logging.info('Got audio RMS value of %f. Minimum pass is %f.', 361 rms_val, sox_threshold) 362 if rms_val < sox_threshold: 363 raise error.TestFail( 364 'Audio RMS value %f too low. Minimum pass is %f.' % 365 (rms_val, sox_threshold)) 366 367def noise_reduce_file(in_file, noise_file, out_file, 368 sox_format=_DEFAULT_SOX_FORMAT): 369 """Runs the sox command to reduce noise. 370 371 Runs the sox command to noise-reduce in_file using the noise 372 profile from noise_file. 373 374 @param in_file: The file to noise reduce. 375 @param noise_file: The file containing the noise profile. 376 This can be created by recording silence. 377 @param out_file: The file contains the noise reduced sound. 378 @param sox_format: The sox format to generate sox command. 379 """ 380 prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH, 381 sox_format, noise_file) 382 reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' % 383 (SOX_PATH, sox_format, in_file, sox_format, out_file)) 384 utils.system('%s | %s' % (prof_cmd, reduce_cmd)) 385 386def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND): 387 """Records a sample from the default input device. 388 389 @param tmpfile: The file to record to. 390 @param record_command: The command to record audio. 391 """ 392 utils.system('%s %s' % (record_command, tmpfile)) 393 394def create_wav_file(wav_dir, prefix=""): 395 """Creates a unique name for wav file. 396 397 The created file name will be preserved in autotest result directory 398 for future analysis. 399 400 @param wav_dir: The directory of created wav file. 401 @param prefix: specified file name prefix. 402 """ 403 filename = "%s-%s.wav" % (prefix, time.time()) 404 return os.path.join(wav_dir, filename) 405 406def run_in_parallel(*funs): 407 """Runs methods in parallel. 408 409 @param funs: methods to run. 410 """ 411 threads = [] 412 for f in funs: 413 t = threading.Thread(target=f) 414 t.start() 415 threads.append(t) 416 417 for t in threads: 418 t.join() 419 420def loopback_test_channels(noise_file_name, wav_dir, 421 playback_callback=None, 422 check_recorded_callback=check_audio_rms, 423 preserve_test_file=True, 424 num_channels = _DEFAULT_NUM_CHANNELS, 425 record_callback=record_sample, 426 mix_callback=None): 427 """Tests loopback on all channels. 428 429 @param noise_file_name: Name of the file contains pre-recorded noise. 430 @param wav_dir: The directory of created wav file. 431 @param playback_callback: The callback to do the playback for 432 one channel. 433 @param record_callback: The callback to do the recording. 434 @param check_recorded_callback: The callback to check recorded file. 435 @param preserve_test_file: Retain the recorded files for future debugging. 436 @param num_channels: The number of total channels to test. 437 @param mix_callback: The callback to do on the one-channel file. 438 """ 439 for channel in xrange(num_channels): 440 record_file_name = create_wav_file(wav_dir, 441 "record-%d" % channel) 442 functions = [lambda: record_callback(record_file_name)] 443 444 if playback_callback: 445 functions.append(lambda: playback_callback(channel)) 446 447 if mix_callback: 448 mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel) 449 functions.append(lambda: mix_callback(mix_file_name)) 450 451 run_in_parallel(*functions) 452 453 if mix_callback: 454 sox_output_mix = sox_stat_output(mix_file_name, channel) 455 rms_val_mix = get_audio_rms(sox_output_mix) 456 logging.info('Got mixed audio RMS value of %f.', rms_val_mix) 457 458 sox_output_record = sox_stat_output(record_file_name, channel) 459 rms_val_record = get_audio_rms(sox_output_record) 460 logging.info('Got recorded audio RMS value of %f.', rms_val_record) 461 462 reduced_file_name = create_wav_file(wav_dir, 463 "reduced-%d" % channel) 464 noise_reduce_file(record_file_name, noise_file_name, 465 reduced_file_name) 466 467 sox_output_reduced = sox_stat_output(reduced_file_name, channel) 468 469 if not preserve_test_file: 470 os.unlink(reduced_file_name) 471 os.unlink(record_file_name) 472 if mix_callback: 473 os.unlink(mix_file_name) 474 475 check_recorded_callback(sox_output_reduced) 476 477 478def get_channel_sox_stat( 479 input_audio, channel_index, channels=2, bits=16, rate=48000): 480 """Gets the sox stat info of the selected channel in the input audio file. 481 482 @param input_audio: The input audio file to be analyzed. 483 @param channel_index: The index of the channel to be analyzed. 484 (1 for the first channel). 485 @param channels: The number of channels in the input audio. 486 @param bits: The number of bits of each audio sample. 487 @param rate: The sampling rate. 488 """ 489 if channel_index <= 0 or channel_index > channels: 490 raise ValueError('incorrect channel_indexi: %d' % channel_index) 491 492 if channels == 1: 493 return sox_utils.get_stat( 494 input_audio, channels=channels, bits=bits, rate=rate) 495 496 p1 = cmd_utils.popen( 497 sox_utils.extract_channel_cmd( 498 input_audio, '-', channel_index, 499 channels=channels, bits=bits, rate=rate), 500 stdout=subprocess.PIPE) 501 p2 = cmd_utils.popen( 502 sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate), 503 stdin=p1.stdout, stderr=subprocess.PIPE) 504 stat_output = p2.stderr.read() 505 cmd_utils.wait_and_check_returncode(p1, p2) 506 return sox_utils.parse_stat_output(stat_output) 507 508 509def get_rms(input_audio, channels=1, bits=16, rate=48000): 510 """Gets the RMS values of all channels of the input audio. 511 512 @param input_audio: The input audio file to be checked. 513 @param channels: The number of channels in the input audio. 514 @param bits: The number of bits of each audio sample. 515 @param rate: The sampling rate. 516 """ 517 stats = [get_channel_sox_stat( 518 input_audio, i + 1, channels=channels, bits=bits, 519 rate=rate) for i in xrange(channels)] 520 521 logging.info('sox stat: %s', [str(s) for s in stats]) 522 return [s.rms for s in stats] 523 524 525def reduce_noise_and_get_rms( 526 input_audio, noise_file, channels=1, bits=16, rate=48000): 527 """Reduces noise in the input audio by the given noise file and then gets 528 the RMS values of all channels of the input audio. 529 530 @param input_audio: The input audio file to be analyzed. 531 @param noise_file: The noise file used to reduce noise in the input audio. 532 @param channels: The number of channels in the input audio. 533 @param bits: The number of bits of each audio sample. 534 @param rate: The sampling rate. 535 """ 536 with tempfile.NamedTemporaryFile() as reduced_file: 537 p1 = cmd_utils.popen( 538 sox_utils.noise_profile_cmd( 539 noise_file, '-', channels=channels, bits=bits, 540 rate=rate), 541 stdout=subprocess.PIPE) 542 p2 = cmd_utils.popen( 543 sox_utils.noise_reduce_cmd( 544 input_audio, reduced_file.name, '-', 545 channels=channels, bits=bits, rate=rate), 546 stdin=p1.stdout) 547 cmd_utils.wait_and_check_returncode(p1, p2) 548 return get_rms(reduced_file.name, channels, bits, rate) 549 550 551def skip_devices_to_test(*boards): 552 """Devices to skip due to hardware or test compatibility issues. 553 554 @param boards: the boards to skip testing. 555 """ 556 # TODO(scottz): Remove this when crbug.com/220147 is fixed. 557 dut_board = utils.get_current_board() 558 if dut_board in boards: 559 raise error.TestNAError('This test is not available on %s' % dut_board) 560 561 562def cras_rms_test_setup(): 563 """Setups for the cras_rms_tests. 564 565 To make sure the line_out-to-mic_in path is all green. 566 """ 567 # TODO(owenlin): Now, the nodes are choosed by chrome. 568 # We should do it here. 569 cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME) 570 cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME) 571 572 cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN) 573 574 cras_utils.set_system_mute(False) 575 cras_utils.set_capture_mute(False) 576 577 578def generate_rms_postmortem(): 579 """Generates postmortem for rms tests.""" 580 try: 581 logging.info('audio postmortem report') 582 log_loopback_dongle_status() 583 logging.info(get_audio_diagnostics()) 584 except Exception: 585 logging.exception('Error while generating postmortem report') 586 587 588def get_audio_diagnostics(): 589 """Gets audio diagnostic results. 590 591 @returns: a string containing diagnostic results. 592 593 """ 594 return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=subprocess.PIPE) 595 596 597def get_max_cross_correlation(signal_a, signal_b): 598 """Gets max cross-correlation and best time delay of two signals. 599 600 Computes cross-correlation function between two 601 signals and gets the maximum value and time delay. 602 The steps includes: 603 1. Compute cross-correlation function of X and Y and get Cxy. 604 The correlation function Cxy is an array where Cxy[k] is the 605 cross product of X and Y when Y is delayed by k. 606 Refer to manual of numpy.correlate for detail of correlation. 607 2. Find the maximum value C_max and index C_index in Cxy. 608 3. Compute L2 norm of X and Y to get norm(X) and norm(Y). 609 4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation. 610 611 Max cross-correlation indicates the similarity of X and Y. The value 612 is 1 if X equals Y multiplied by a positive scalar. 613 The value is -1 if X equals Y multiplied by a negative scaler. 614 Any constant level shift will be regarded as distortion and will make 615 max cross-correlation value deviated from 1. 616 C_index is the best time delay of Y that make Y looks similar to X. 617 Refer to http://en.wikipedia.org/wiki/Cross-correlation. 618 619 @param signal_a: A list of numbers which contains the first signal. 620 @param signal_b: A list of numbers which contains the second signal. 621 622 @raises: ValueError if any number in signal_a or signal_b is not a float. 623 ValueError if norm of any array is less than _MINIMUM_NORM. 624 625 @returns: A tuple (correlation index, best delay). If there are more than 626 one best delay, just return the first one. 627 """ 628 def check_list_contains_float(numbers): 629 """Checks the elements in a list are all float. 630 631 @param numbers: A list of numbers. 632 633 @raises: ValueError if there is any element which is not a float 634 in the list. 635 """ 636 if any(not isinstance(x, float) for x in numbers): 637 raise ValueError('List contains number which is not a float') 638 639 check_list_contains_float(signal_a) 640 check_list_contains_float(signal_b) 641 642 norm_a = numpy.linalg.norm(signal_a) 643 norm_b = numpy.linalg.norm(signal_b) 644 logging.debug('norm_a: %f', norm_a) 645 logging.debug('norm_b: %f', norm_b) 646 if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM: 647 raise ValueError('No meaningful data as norm is too small.') 648 649 correlation = numpy.correlate(signal_a, signal_b, 'full') 650 max_correlation = max(correlation) 651 best_delays = [i for i, j in enumerate(correlation) if j == max_correlation] 652 if len(best_delays) > 1: 653 logging.warning('There are more than one best delay: %r', best_delays) 654 return max_correlation / (norm_a * norm_b), best_delays[0] 655 656 657def trim_data(data, threshold=0): 658 """Trims a data by removing value that is too small in head and tail. 659 660 Removes elements in head and tail whose absolute value is smaller than 661 or equal to threshold. 662 E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) = 663 ([0.2, 0.3, 0.2], 2) 664 665 @param data: A list of numbers. 666 @param threshold: The threshold to compare against. 667 668 @returns: A tuple (trimmed_data, end_trimmed_length), where 669 end_trimmed_length is the length of original data being trimmed 670 from the end. 671 Returns ([], None) if there is no valid data. 672 """ 673 indice_valid = [ 674 i for i, j in enumerate(data) if abs(j) > threshold] 675 if not indice_valid: 676 logging.warning( 677 'There is no element with absolute value greater ' 678 'than threshold %f', threshold) 679 return [], None 680 logging.debug('Start and end of indice_valid: %d, %d', 681 indice_valid[0], indice_valid[-1]) 682 end_trimmed_length = len(data) - indice_valid[-1] - 1 683 logging.debug('Trimmed length in the end: %d', end_trimmed_length) 684 return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length) 685 686 687def get_one_channel_correlation(test_data, golden_data): 688 """Gets max cross-correlation of test_data and golden_data. 689 690 Trims test data and compute the max cross-correlation against golden_data. 691 Signal can be trimmed because those zero values in the head and tail of 692 a signal will not affect correlation computation. 693 694 @param test_data: A list containing the data to compare against golden data. 695 @param golden_data: A list containing the golden data. 696 697 @returns: A tuple (max cross-correlation, best_delay) if data is valid. 698 Otherwise returns (None, None). Refer to docstring of 699 get_max_cross_correlation. 700 """ 701 trimmed_test_data, end_trimmed_length = trim_data(test_data) 702 703 def to_float(samples): 704 """Casts elements in the list to float. 705 706 @param samples: A list of numbers. 707 708 @returns: A list of original numbers casted to float. 709 """ 710 samples_float = [float(x) for x in samples] 711 return samples_float 712 713 max_cross_correlation, best_delay = get_max_cross_correlation( 714 to_float(golden_data), 715 to_float(trimmed_test_data)) 716 717 # The reason to add back the trimmed length in the end. 718 # E.g.: 719 # golden data: 720 # 721 # |-----------vvvv----------------| vvvv is the signal of interest. 722 # a b 723 # 724 # test data: 725 # 726 # |---x----vvvv--------x----------------| x is the place to trim. 727 # c d e f 728 # 729 # trimmed test data: 730 # 731 # |----vvvv--------| 732 # d e 733 # 734 # The first output of cross correlation computation : 735 # 736 # |-----------vvvv----------------| 737 # a b 738 # 739 # |----vvvv--------| 740 # d e 741 # 742 # The largest output of cross correlation computation happens at 743 # delay a + e. 744 # 745 # |-----------vvvv----------------| 746 # a b 747 # 748 # |----vvvv--------| 749 # d e 750 # 751 # Cross correlation starts computing by aligning the last sample 752 # of the trimmed test data to the first sample of golden data. 753 # The best delay calculated from trimmed test data and golden data 754 # cross correlation is e + a. But the real best delay that should be 755 # identical on two channel should be e + a + f. 756 # So we need to add back the length being trimmed in the end. 757 758 if max_cross_correlation: 759 return max_cross_correlation, best_delay + end_trimmed_length 760 else: 761 return None, None 762 763 764def compare_one_channel_correlation(test_data, golden_data, parameters): 765 """Compares two one-channel data by correlation. 766 767 @param test_data: A list containing the data to compare against golden data. 768 @param golden_data: A list containing the golden data. 769 @param parameters: A dict containing parameters for method. 770 771 @returns: A dict containing: 772 index: The index of similarity where 1 means they are different 773 only by a positive scale. 774 best_delay: The best delay of test data in relative to golden 775 data. 776 equal: A bool containing comparing result. 777 """ 778 if 'correlation_threshold' in parameters: 779 threshold = parameters['correlation_threshold'] 780 else: 781 threshold = _CORRELATION_INDEX_THRESHOLD 782 783 result_dict = dict() 784 max_cross_correlation, best_delay = get_one_channel_correlation( 785 test_data, golden_data) 786 result_dict['index'] = max_cross_correlation 787 result_dict['best_delay'] = best_delay 788 result_dict['equal'] = True if ( 789 max_cross_correlation and 790 max_cross_correlation > threshold) else False 791 logging.debug('result_dict: %r', result_dict) 792 return result_dict 793 794 795def compare_data_correlation(golden_data_binary, golden_data_format, 796 test_data_binary, test_data_format, 797 channel_map, parameters=None): 798 """Compares two raw data using correlation. 799 800 @param golden_data_binary: The binary containing golden data. 801 @param golden_data_format: The data format of golden data. 802 @param test_data_binary: The binary containing test data. 803 @param test_data_format: The data format of test data. 804 @param channel_map: A list containing channel mapping. 805 E.g. [1, 0, None, None, None, None, None, None] means 806 channel 0 of test data should map to channel 1 of 807 golden data. Channel 1 of test data should map to 808 channel 0 of golden data. Channel 2 to 7 of test data 809 should be skipped. 810 @param parameters: A dict containing parameters for method, if needed. 811 812 @raises: NotImplementedError if file type is not raw. 813 NotImplementedError if sampling rates of two data are not the same. 814 error.TestFail if golden data and test data are not equal. 815 """ 816 if parameters is None: 817 parameters = dict() 818 819 if (golden_data_format['file_type'] != 'raw' or 820 test_data_format['file_type'] != 'raw'): 821 raise NotImplementedError('Only support raw data in compare_data.') 822 if (golden_data_format['rate'] != test_data_format['rate']): 823 raise NotImplementedError( 824 'Only support comparing data with the same sampling rate') 825 golden_data = audio_data.AudioRawData( 826 binary=golden_data_binary, 827 channel=golden_data_format['channel'], 828 sample_format=golden_data_format['sample_format']) 829 test_data = audio_data.AudioRawData( 830 binary=test_data_binary, 831 channel=test_data_format['channel'], 832 sample_format=test_data_format['sample_format']) 833 compare_results = [] 834 for test_channel, golden_channel in enumerate(channel_map): 835 if golden_channel is None: 836 logging.info('Skipped channel %d', test_channel) 837 continue 838 test_data_one_channel = test_data.channel_data[test_channel] 839 golden_data_one_channel = golden_data.channel_data[golden_channel] 840 result_dict = dict(test_channel=test_channel, 841 golden_channel=golden_channel) 842 result_dict.update( 843 compare_one_channel_correlation( 844 test_data_one_channel, golden_data_one_channel, 845 parameters)) 846 compare_results.append(result_dict) 847 logging.info('compare_results: %r', compare_results) 848 for result in compare_results: 849 if not result['equal']: 850 error_msg = ('Failed on test channel %d and golden channel %d with ' 851 'index %f') % ( 852 result['test_channel'], 853 result['golden_channel'], 854 result['index']) 855 logging.error(error_msg) 856 raise error.TestFail(error_msg) 857 # Also checks best delay are exactly the same. 858 best_delays = set([result['best_delay'] for result in compare_results]) 859 if len(best_delays) > 1: 860 error_msg = 'There are more than one best delay: %s' % best_delays 861 logging.error(error_msg) 862 raise error.TestFail(error_msg) 863 864 865class _base_rms_test(test.test): 866 """Base class for all rms_test """ 867 868 def postprocess(self): 869 super(_base_rms_test, self).postprocess() 870 871 # Sum up the number of failed constraints in each iteration 872 if sum(len(x) for x in self.failed_constraints): 873 generate_rms_postmortem() 874 875 876class chrome_rms_test(_base_rms_test): 877 """Base test class for audio RMS test with Chrome. 878 879 The chrome instance can be accessed by self.chrome. 880 """ 881 def warmup(self): 882 skip_devices_to_test('x86-mario') 883 super(chrome_rms_test, self).warmup() 884 885 # Not all client of this file using telemetry. 886 # Just do the import here for those who really need it. 887 from autotest_lib.client.common_lib.cros import chrome 888 889 self.chrome = chrome.Chrome(init_network_controller=True) 890 891 # The audio configuration could be changed when we 892 # restart chrome. 893 try: 894 cras_rms_test_setup() 895 except Exception: 896 self.chrome.browser.Close() 897 raise 898 899 900 def cleanup(self, *args): 901 try: 902 self.chrome.close() 903 finally: 904 super(chrome_rms_test, self).cleanup() 905 906class cras_rms_test(_base_rms_test): 907 """Base test class for CRAS audio RMS test.""" 908 909 def warmup(self): 910 skip_devices_to_test('x86-mario') 911 super(cras_rms_test, self).warmup() 912 cras_rms_test_setup() 913 914 915def alsa_rms_test_setup(): 916 """Setup for alsa_rms_test. 917 918 Different boards/chipsets have different set of mixer controls. Even 919 controls that have the same name on different boards might have different 920 capabilities. The following is a general idea to setup a given class of 921 boards, and some specialized setup for certain boards. 922 """ 923 card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic') 924 arch = utils.get_arch() 925 board = utils.get_board() 926 uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090') 927 if board in ['daisy_spring', 'daisy_skate']: 928 # The MIC controls of the boards do not support dB syntax. 929 alsa_utils.mixer_cmd(card_id, 930 ['sset', 'Headphone', _DEFAULT_ALSA_MAX_VOLUME]) 931 alsa_utils.mixer_cmd(card_id, ['sset', 'MIC1', 932 _DEFAULT_ALSA_MAX_VOLUME]) 933 alsa_utils.mixer_cmd(card_id, ['sset', 'MIC2', 934 _DEFAULT_ALSA_MAX_VOLUME]) 935 elif arch in ['armv7l', 'aarch64'] or uses_max98090: 936 # ARM platforms or Intel platforms that uses max98090 codec driver. 937 alsa_utils.mixer_cmd(card_id, 938 ['sset', 'Headphone', _DEFAULT_ALSA_MAX_VOLUME]) 939 alsa_utils.mixer_cmd(card_id, ['sset', 'MIC1', 940 _DEFAULT_ALSA_CAPTURE_GAIN]) 941 alsa_utils.mixer_cmd(card_id, ['sset', 'MIC2', 942 _DEFAULT_ALSA_CAPTURE_GAIN]) 943 else: 944 # The rest of Intel platforms. 945 alsa_utils.mixer_cmd(card_id, ['sset', 'Master', 946 _DEFAULT_ALSA_MAX_VOLUME]) 947 alsa_utils.mixer_cmd(card_id, 948 ['sset', 'Capture', _DEFAULT_ALSA_CAPTURE_GAIN]) 949 950 951class alsa_rms_test(_base_rms_test): 952 """Base test class for ALSA audio RMS test.""" 953 954 def warmup(self): 955 skip_devices_to_test('x86-mario') 956 super(alsa_rms_test, self).warmup() 957 958 alsa_rms_test_setup() 959