1#!/usr/bin/python 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6 7import logging 8import numpy 9import os 10import re 11import subprocess 12import tempfile 13import threading 14import time 15 16from glob import glob 17from autotest_lib.client.bin import test, utils 18from autotest_lib.client.bin.input.input_device import * 19from autotest_lib.client.common_lib import error 20from autotest_lib.client.cros.audio import audio_data 21from autotest_lib.client.cros.audio import cmd_utils 22from autotest_lib.client.cros.audio import cras_utils 23from autotest_lib.client.cros.audio import sox_utils 24 25LD_LIBRARY_PATH = 'LD_LIBRARY_PATH' 26 27_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics' 28 29_DEFAULT_NUM_CHANNELS = 2 30_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat' 31_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L' 32_DEFAULT_PLAYBACK_VOLUME = 100 33_DEFAULT_CAPTURE_GAIN = 2500 34_DEFAULT_ALSA_MAX_VOLUME = '100%' 35_DEFAULT_ALSA_CAPTURE_GAIN = '25dB' 36 37# Minimum RMS value to pass when checking recorded file. 38_DEFAULT_SOX_RMS_THRESHOLD = 0.08 39 40_JACK_VALUE_ON_RE = re.compile(r'.*values=on') 41_HP_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Headphone\sJack') 42_MIC_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Mic\sJack') 43 44_SOX_RMS_AMPLITUDE_RE = re.compile(r'RMS\s+amplitude:\s+(.+)') 45_SOX_ROUGH_FREQ_RE = re.compile(r'Rough\s+frequency:\s+(.+)') 46 47_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected' 48_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS' 49_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS' 50 51# Tools from platform/audiotest 52AUDIOFUNTEST_PATH = 'audiofuntest' 53AUDIOLOOP_PATH = 'looptest' 54LOOPBACK_LATENCY_PATH = 'loopback_latency' 55SOX_PATH = 'sox' 56TEST_TONES_PATH = 'test_tones' 57 58_MINIMUM_NORM = 0.001 59_CORRELATION_INDEX_THRESHOLD = 0.999 60# The minimum difference of estimated frequencies between two sine waves. 61_FREQUENCY_DIFF_THRESHOLD = 20 62# The minimum RMS value of meaningful audio data. 63_MEANINGFUL_RMS_THRESHOLD = 0.001 64 65def set_mixer_controls(mixer_settings={}, card='0'): 66 """Sets all mixer controls listed in the mixer settings on card. 67 68 @param mixer_settings: Mixer settings to set. 69 @param card: Index of audio card to set mixer settings for. 70 """ 71 logging.info('Setting mixer control values on %s', card) 72 for item in mixer_settings: 73 logging.info('Setting %s to %s on card %s', 74 item['name'], item['value'], card) 75 cmd = 'amixer -c %s cset name=%s %s' 76 cmd = cmd % (card, item['name'], item['value']) 77 try: 78 utils.system(cmd) 79 except error.CmdError: 80 # A card is allowed not to support all the controls, so don't 81 # fail the test here if we get an error. 82 logging.info('amixer command failed: %s', cmd) 83 84def set_volume_levels(volume, capture): 85 """Sets the volume and capture gain through cras_test_client. 86 87 @param volume: The playback volume to set. 88 @param capture: The capture gain to set. 89 """ 90 logging.info('Setting volume level to %d', volume) 91 try: 92 utils.system('/usr/bin/cras_test_client --volume %d' % volume) 93 logging.info('Setting capture gain to %d', capture) 94 utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture) 95 utils.system('/usr/bin/cras_test_client --dump_server_info') 96 utils.system('/usr/bin/cras_test_client --mute 0') 97 except error.CmdError as e: 98 raise error.TestError( 99 '*** Can not tune volume through CRAS. *** (' + str(e) + ')') 100 101 try: 102 utils.system('amixer -c 0 contents') 103 except error.CmdError as e: 104 logging.info('amixer command failed: %s', str(e)) 105 106def loopback_latency_check(**args): 107 """Checks loopback latency. 108 109 @param args: additional arguments for loopback_latency. 110 111 @return A tuple containing measured and reported latency in uS. 112 Return None if no audio detected. 113 """ 114 noise_threshold = str(args['n']) if args.has_key('n') else '400' 115 116 cmd = '%s -n %s -c' % (LOOPBACK_LATENCY_PATH, noise_threshold) 117 118 output = utils.system_output(cmd, retain_output=True) 119 120 # Sleep for a short while to make sure device is not busy anymore 121 # after called loopback_latency. 122 time.sleep(.1) 123 124 measured_latency = None 125 reported_latency = None 126 for line in output.split('\n'): 127 match = re.search(_MEASURED_LATENCY_RE, line, re.I) 128 if match: 129 measured_latency = int(match.group(1)) 130 continue 131 match = re.search(_REPORTED_LATENCY_RE, line, re.I) 132 if match: 133 reported_latency = int(match.group(1)) 134 continue 135 if re.search(_AUDIO_NOT_FOUND_RE, line, re.I): 136 return None 137 if measured_latency and reported_latency: 138 return (measured_latency, reported_latency) 139 else: 140 # Should not reach here, just in case. 141 return None 142 143def get_mixer_jack_status(jack_reg_exp): 144 """Gets the mixer jack status. 145 146 @param jack_reg_exp: The regular expression to match jack control name. 147 148 @return None if the control does not exist, return True if jack control 149 is detected plugged, return False otherwise. 150 """ 151 output = utils.system_output('amixer -c0 controls', retain_output=True) 152 numid = None 153 for line in output.split('\n'): 154 m = jack_reg_exp.match(line) 155 if m: 156 numid = m.group(1) 157 break 158 159 # Proceed only when matched numid is not empty. 160 if numid: 161 output = utils.system_output('amixer -c0 cget numid=%s' % numid) 162 for line in output.split('\n'): 163 if _JACK_VALUE_ON_RE.match(line): 164 return True 165 return False 166 else: 167 return None 168 169def get_hp_jack_status(): 170 """Gets the status of headphone jack.""" 171 status = get_mixer_jack_status(_HP_JACK_CONTROL_RE) 172 if status is not None: 173 return status 174 175 # When headphone jack is not found in amixer, lookup input devices 176 # instead. 177 # 178 # TODO(hychao): Check hp/mic jack status dynamically from evdev. And 179 # possibly replace the existing check using amixer. 180 for evdev in glob('/dev/input/event*'): 181 device = InputDevice(evdev) 182 if device.is_hp_jack(): 183 return device.get_headphone_insert() 184 else: 185 return None 186 187def get_mic_jack_status(): 188 """Gets the status of mic jack.""" 189 status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE) 190 if status is not None: 191 return status 192 193 # When mic jack is not found in amixer, lookup input devices instead. 194 for evdev in glob('/dev/input/event*'): 195 device = InputDevice(evdev) 196 if device.is_mic_jack(): 197 return device.get_microphone_insert() 198 else: 199 return None 200 201def log_loopback_dongle_status(): 202 """Log the status of the loopback dongle to make sure it is equipped.""" 203 dongle_status_ok = True 204 205 # Check Mic Jack 206 mic_jack_status = get_mic_jack_status() 207 logging.info('Mic jack status: %s', mic_jack_status) 208 dongle_status_ok &= bool(mic_jack_status) 209 210 # Check Headphone Jack 211 hp_jack_status = get_hp_jack_status() 212 logging.info('Headphone jack status: %s', hp_jack_status) 213 dongle_status_ok &= bool(hp_jack_status) 214 215 # Use latency check to test if audio can be captured through dongle. 216 # We only want to know the basic function of dongle, so no need to 217 # assert the latency accuracy here. 218 latency = loopback_latency_check(n=4000) 219 if latency: 220 logging.info('Got latency measured %d, reported %d', 221 latency[0], latency[1]) 222 else: 223 logging.info('Latency check fail.') 224 dongle_status_ok = False 225 226 logging.info('audio loopback dongle test: %s', 227 'PASS' if dongle_status_ok else 'FAIL') 228 229# Functions to test audio palyback. 230def play_sound(duration_seconds=None, audio_file_path=None): 231 """Plays a sound file found at |audio_file_path| for |duration_seconds|. 232 233 If |audio_file_path|=None, plays a default audio file. 234 If |duration_seconds|=None, plays audio file in its entirety. 235 236 @param duration_seconds: Duration to play sound. 237 @param audio_file_path: Path to the audio file. 238 """ 239 if not audio_file_path: 240 audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav' 241 duration_arg = ('-d %d' % duration_seconds) if duration_seconds else '' 242 utils.system('aplay %s %s' % (duration_arg, audio_file_path)) 243 244def get_play_sine_args(channel, odev='default', freq=1000, duration=10, 245 sample_size=16): 246 """Gets the command args to generate a sine wav to play to odev. 247 248 @param channel: 0 for left, 1 for right; otherwize, mono. 249 @param odev: alsa output device. 250 @param freq: frequency of the generated sine tone. 251 @param duration: duration of the generated sine tone. 252 @param sample_size: output audio sample size. Default to 16. 253 """ 254 cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa', 255 odev, 'synth', str(duration)] 256 if channel == 0: 257 cmdargs += ['sine', str(freq), 'sine', '0'] 258 elif channel == 1: 259 cmdargs += ['sine', '0', 'sine', str(freq)] 260 else: 261 cmdargs += ['sine', str(freq)] 262 263 return cmdargs 264 265def play_sine(channel, odev='default', freq=1000, duration=10, 266 sample_size=16): 267 """Generates a sine wave and plays to odev. 268 269 @param channel: 0 for left, 1 for right; otherwize, mono. 270 @param odev: alsa output device. 271 @param freq: frequency of the generated sine tone. 272 @param duration: duration of the generated sine tone. 273 @param sample_size: output audio sample size. Default to 16. 274 """ 275 cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size) 276 utils.system(' '.join(cmdargs)) 277 278# Functions to compose customized sox command, execute it and process the 279# output of sox command. 280def get_sox_mixer_cmd(infile, channel, 281 num_channels=_DEFAULT_NUM_CHANNELS, 282 sox_format=_DEFAULT_SOX_FORMAT): 283 """Gets sox mixer command to reduce channel. 284 285 @param infile: Input file name. 286 @param channel: The selected channel to take effect. 287 @param num_channels: The number of total channels to test. 288 @param sox_format: Format to generate sox command. 289 """ 290 # Build up a pan value string for the sox command. 291 if channel == 0: 292 pan_values = '1' 293 else: 294 pan_values = '0' 295 for pan_index in range(1, num_channels): 296 if channel == pan_index: 297 pan_values = '%s%s' % (pan_values, ',1') 298 else: 299 pan_values = '%s%s' % (pan_values, ',0') 300 301 return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH, 302 sox_format, infile, sox_format, pan_values) 303 304def sox_stat_output(infile, channel, 305 num_channels=_DEFAULT_NUM_CHANNELS, 306 sox_format=_DEFAULT_SOX_FORMAT): 307 """Executes sox stat command. 308 309 @param infile: Input file name. 310 @param channel: The selected channel. 311 @param num_channels: The number of total channels to test. 312 @param sox_format: Format to generate sox command. 313 314 @return The output of sox stat command 315 """ 316 sox_mixer_cmd = get_sox_mixer_cmd(infile, channel, 317 num_channels, sox_format) 318 stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format) 319 sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd) 320 return utils.system_output(sox_cmd, retain_output=True) 321 322def get_audio_rms(sox_output): 323 """Gets the audio RMS value from sox stat output 324 325 @param sox_output: Output of sox stat command. 326 327 @return The RMS value parsed from sox stat output. 328 """ 329 for rms_line in sox_output.split('\n'): 330 m = _SOX_RMS_AMPLITUDE_RE.match(rms_line) 331 if m is not None: 332 return float(m.group(1)) 333 334def get_rough_freq(sox_output): 335 """Gets the rough audio frequency from sox stat output 336 337 @param sox_output: Output of sox stat command. 338 339 @return The rough frequency value parsed from sox stat output. 340 """ 341 for rms_line in sox_output.split('\n'): 342 m = _SOX_ROUGH_FREQ_RE.match(rms_line) 343 if m is not None: 344 return int(m.group(1)) 345 346def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD): 347 """Checks if the calculated RMS value is expected. 348 349 @param sox_output: The output from sox stat command. 350 @param sox_threshold: The threshold to test RMS value against. 351 352 @raises error.TestError if RMS amplitude can't be parsed. 353 @raises error.TestFail if the RMS amplitude of the recording isn't above 354 the threshold. 355 """ 356 rms_val = get_audio_rms(sox_output) 357 358 # In case we don't get a valid RMS value. 359 if rms_val is None: 360 raise error.TestError( 361 'Failed to generate an audio RMS value from playback.') 362 363 logging.info('Got audio RMS value of %f. Minimum pass is %f.', 364 rms_val, sox_threshold) 365 if rms_val < sox_threshold: 366 raise error.TestFail( 367 'Audio RMS value %f too low. Minimum pass is %f.' % 368 (rms_val, sox_threshold)) 369 370def noise_reduce_file(in_file, noise_file, out_file, 371 sox_format=_DEFAULT_SOX_FORMAT): 372 """Runs the sox command to reduce noise. 373 374 Runs the sox command to noise-reduce in_file using the noise 375 profile from noise_file. 376 377 @param in_file: The file to noise reduce. 378 @param noise_file: The file containing the noise profile. 379 This can be created by recording silence. 380 @param out_file: The file contains the noise reduced sound. 381 @param sox_format: The sox format to generate sox command. 382 """ 383 prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH, 384 sox_format, noise_file) 385 reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' % 386 (SOX_PATH, sox_format, in_file, sox_format, out_file)) 387 utils.system('%s | %s' % (prof_cmd, reduce_cmd)) 388 389def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND): 390 """Records a sample from the default input device. 391 392 @param tmpfile: The file to record to. 393 @param record_command: The command to record audio. 394 """ 395 utils.system('%s %s' % (record_command, tmpfile)) 396 397def create_wav_file(wav_dir, prefix=""): 398 """Creates a unique name for wav file. 399 400 The created file name will be preserved in autotest result directory 401 for future analysis. 402 403 @param wav_dir: The directory of created wav file. 404 @param prefix: specified file name prefix. 405 """ 406 filename = "%s-%s.wav" % (prefix, time.time()) 407 return os.path.join(wav_dir, filename) 408 409def run_in_parallel(*funs): 410 """Runs methods in parallel. 411 412 @param funs: methods to run. 413 """ 414 threads = [] 415 for f in funs: 416 t = threading.Thread(target=f) 417 t.start() 418 threads.append(t) 419 420 for t in threads: 421 t.join() 422 423def loopback_test_channels(noise_file_name, wav_dir, 424 playback_callback=None, 425 check_recorded_callback=check_audio_rms, 426 preserve_test_file=True, 427 num_channels = _DEFAULT_NUM_CHANNELS, 428 record_callback=record_sample, 429 mix_callback=None): 430 """Tests loopback on all channels. 431 432 @param noise_file_name: Name of the file contains pre-recorded noise. 433 @param wav_dir: The directory of created wav file. 434 @param playback_callback: The callback to do the playback for 435 one channel. 436 @param record_callback: The callback to do the recording. 437 @param check_recorded_callback: The callback to check recorded file. 438 @param preserve_test_file: Retain the recorded files for future debugging. 439 @param num_channels: The number of total channels to test. 440 @param mix_callback: The callback to do on the one-channel file. 441 """ 442 for channel in xrange(num_channels): 443 record_file_name = create_wav_file(wav_dir, 444 "record-%d" % channel) 445 functions = [lambda: record_callback(record_file_name)] 446 447 if playback_callback: 448 functions.append(lambda: playback_callback(channel)) 449 450 if mix_callback: 451 mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel) 452 functions.append(lambda: mix_callback(mix_file_name)) 453 454 run_in_parallel(*functions) 455 456 if mix_callback: 457 sox_output_mix = sox_stat_output(mix_file_name, channel) 458 rms_val_mix = get_audio_rms(sox_output_mix) 459 logging.info('Got mixed audio RMS value of %f.', rms_val_mix) 460 461 sox_output_record = sox_stat_output(record_file_name, channel) 462 rms_val_record = get_audio_rms(sox_output_record) 463 logging.info('Got recorded audio RMS value of %f.', rms_val_record) 464 465 reduced_file_name = create_wav_file(wav_dir, 466 "reduced-%d" % channel) 467 noise_reduce_file(record_file_name, noise_file_name, 468 reduced_file_name) 469 470 sox_output_reduced = sox_stat_output(reduced_file_name, channel) 471 472 if not preserve_test_file: 473 os.unlink(reduced_file_name) 474 os.unlink(record_file_name) 475 if mix_callback: 476 os.unlink(mix_file_name) 477 478 check_recorded_callback(sox_output_reduced) 479 480 481def get_channel_sox_stat( 482 input_audio, channel_index, channels=2, bits=16, rate=48000): 483 """Gets the sox stat info of the selected channel in the input audio file. 484 485 @param input_audio: The input audio file to be analyzed. 486 @param channel_index: The index of the channel to be analyzed. 487 (1 for the first channel). 488 @param channels: The number of channels in the input audio. 489 @param bits: The number of bits of each audio sample. 490 @param rate: The sampling rate. 491 """ 492 if channel_index <= 0 or channel_index > channels: 493 raise ValueError('incorrect channel_indexi: %d' % channel_index) 494 495 if channels == 1: 496 return sox_utils.get_stat( 497 input_audio, channels=channels, bits=bits, rate=rate) 498 499 p1 = cmd_utils.popen( 500 sox_utils.extract_channel_cmd( 501 input_audio, '-', channel_index, 502 channels=channels, bits=bits, rate=rate), 503 stdout=subprocess.PIPE) 504 p2 = cmd_utils.popen( 505 sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate), 506 stdin=p1.stdout, stderr=subprocess.PIPE) 507 stat_output = p2.stderr.read() 508 cmd_utils.wait_and_check_returncode(p1, p2) 509 return sox_utils.parse_stat_output(stat_output) 510 511 512def get_rms(input_audio, channels=1, bits=16, rate=48000): 513 """Gets the RMS values of all channels of the input audio. 514 515 @param input_audio: The input audio file to be checked. 516 @param channels: The number of channels in the input audio. 517 @param bits: The number of bits of each audio sample. 518 @param rate: The sampling rate. 519 """ 520 stats = [get_channel_sox_stat( 521 input_audio, i + 1, channels=channels, bits=bits, 522 rate=rate) for i in xrange(channels)] 523 524 logging.info('sox stat: %s', [str(s) for s in stats]) 525 return [s.rms for s in stats] 526 527 528def reduce_noise_and_get_rms( 529 input_audio, noise_file, channels=1, bits=16, rate=48000): 530 """Reduces noise in the input audio by the given noise file and then gets 531 the RMS values of all channels of the input audio. 532 533 @param input_audio: The input audio file to be analyzed. 534 @param noise_file: The noise file used to reduce noise in the input audio. 535 @param channels: The number of channels in the input audio. 536 @param bits: The number of bits of each audio sample. 537 @param rate: The sampling rate. 538 """ 539 with tempfile.NamedTemporaryFile() as reduced_file: 540 p1 = cmd_utils.popen( 541 sox_utils.noise_profile_cmd( 542 noise_file, '-', channels=channels, bits=bits, 543 rate=rate), 544 stdout=subprocess.PIPE) 545 p2 = cmd_utils.popen( 546 sox_utils.noise_reduce_cmd( 547 input_audio, reduced_file.name, '-', 548 channels=channels, bits=bits, rate=rate), 549 stdin=p1.stdout) 550 cmd_utils.wait_and_check_returncode(p1, p2) 551 return get_rms(reduced_file.name, channels, bits, rate) 552 553 554def cras_rms_test_setup(): 555 """Setups for the cras_rms_tests. 556 557 To make sure the line_out-to-mic_in path is all green. 558 """ 559 # TODO(owenlin): Now, the nodes are choosed by chrome. 560 # We should do it here. 561 cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME) 562 cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME) 563 564 cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN) 565 566 cras_utils.set_system_mute(False) 567 cras_utils.set_capture_mute(False) 568 569 570def generate_rms_postmortem(): 571 """Generates postmortem for rms tests.""" 572 try: 573 logging.info('audio postmortem report') 574 log_loopback_dongle_status() 575 logging.info(get_audio_diagnostics()) 576 except Exception: 577 logging.exception('Error while generating postmortem report') 578 579 580def get_audio_diagnostics(): 581 """Gets audio diagnostic results. 582 583 @returns: a string containing diagnostic results. 584 585 """ 586 return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=subprocess.PIPE) 587 588 589def get_max_cross_correlation(signal_a, signal_b): 590 """Gets max cross-correlation and best time delay of two signals. 591 592 Computes cross-correlation function between two 593 signals and gets the maximum value and time delay. 594 The steps includes: 595 1. Compute cross-correlation function of X and Y and get Cxy. 596 The correlation function Cxy is an array where Cxy[k] is the 597 cross product of X and Y when Y is delayed by k. 598 Refer to manual of numpy.correlate for detail of correlation. 599 2. Find the maximum value C_max and index C_index in Cxy. 600 3. Compute L2 norm of X and Y to get norm(X) and norm(Y). 601 4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation. 602 603 Max cross-correlation indicates the similarity of X and Y. The value 604 is 1 if X equals Y multiplied by a positive scalar. 605 The value is -1 if X equals Y multiplied by a negative scaler. 606 Any constant level shift will be regarded as distortion and will make 607 max cross-correlation value deviated from 1. 608 C_index is the best time delay of Y that make Y looks similar to X. 609 Refer to http://en.wikipedia.org/wiki/Cross-correlation. 610 611 @param signal_a: A list of numbers which contains the first signal. 612 @param signal_b: A list of numbers which contains the second signal. 613 614 @raises: ValueError if any number in signal_a or signal_b is not a float. 615 ValueError if norm of any array is less than _MINIMUM_NORM. 616 617 @returns: A tuple (correlation index, best delay). If there are more than 618 one best delay, just return the first one. 619 """ 620 def check_list_contains_float(numbers): 621 """Checks the elements in a list are all float. 622 623 @param numbers: A list of numbers. 624 625 @raises: ValueError if there is any element which is not a float 626 in the list. 627 """ 628 if any(not isinstance(x, float) for x in numbers): 629 raise ValueError('List contains number which is not a float') 630 631 check_list_contains_float(signal_a) 632 check_list_contains_float(signal_b) 633 634 norm_a = numpy.linalg.norm(signal_a) 635 norm_b = numpy.linalg.norm(signal_b) 636 logging.debug('norm_a: %f', norm_a) 637 logging.debug('norm_b: %f', norm_b) 638 if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM: 639 raise ValueError('No meaningful data as norm is too small.') 640 641 correlation = numpy.correlate(signal_a, signal_b, 'full') 642 max_correlation = max(correlation) 643 best_delays = [i for i, j in enumerate(correlation) if j == max_correlation] 644 if len(best_delays) > 1: 645 logging.warning('There are more than one best delay: %r', best_delays) 646 return max_correlation / (norm_a * norm_b), best_delays[0] 647 648 649def trim_data(data, threshold=0): 650 """Trims a data by removing value that is too small in head and tail. 651 652 Removes elements in head and tail whose absolute value is smaller than 653 or equal to threshold. 654 E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) = 655 ([0.2, 0.3, 0.2], 2) 656 657 @param data: A list of numbers. 658 @param threshold: The threshold to compare against. 659 660 @returns: A tuple (trimmed_data, end_trimmed_length), where 661 end_trimmed_length is the length of original data being trimmed 662 from the end. 663 Returns ([], None) if there is no valid data. 664 """ 665 indice_valid = [ 666 i for i, j in enumerate(data) if abs(j) > threshold] 667 if not indice_valid: 668 logging.warning( 669 'There is no element with absolute value greater ' 670 'than threshold %f', threshold) 671 return [], None 672 logging.debug('Start and end of indice_valid: %d, %d', 673 indice_valid[0], indice_valid[-1]) 674 end_trimmed_length = len(data) - indice_valid[-1] - 1 675 logging.debug('Trimmed length in the end: %d', end_trimmed_length) 676 return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length) 677 678 679def get_one_channel_correlation(test_data, golden_data): 680 """Gets max cross-correlation of test_data and golden_data. 681 682 Trims test data and compute the max cross-correlation against golden_data. 683 Signal can be trimmed because those zero values in the head and tail of 684 a signal will not affect correlation computation. 685 686 @param test_data: A list containing the data to compare against golden data. 687 @param golden_data: A list containing the golden data. 688 689 @returns: A tuple (max cross-correlation, best_delay) if data is valid. 690 Otherwise returns (None, None). Refer to docstring of 691 get_max_cross_correlation. 692 """ 693 trimmed_test_data, end_trimmed_length = trim_data(test_data) 694 695 def to_float(samples): 696 """Casts elements in the list to float. 697 698 @param samples: A list of numbers. 699 700 @returns: A list of original numbers casted to float. 701 """ 702 samples_float = [float(x) for x in samples] 703 return samples_float 704 705 max_cross_correlation, best_delay = get_max_cross_correlation( 706 to_float(golden_data), 707 to_float(trimmed_test_data)) 708 709 # The reason to add back the trimmed length in the end. 710 # E.g.: 711 # golden data: 712 # 713 # |-----------vvvv----------------| vvvv is the signal of interest. 714 # a b 715 # 716 # test data: 717 # 718 # |---x----vvvv--------x----------------| x is the place to trim. 719 # c d e f 720 # 721 # trimmed test data: 722 # 723 # |----vvvv--------| 724 # d e 725 # 726 # The first output of cross correlation computation : 727 # 728 # |-----------vvvv----------------| 729 # a b 730 # 731 # |----vvvv--------| 732 # d e 733 # 734 # The largest output of cross correlation computation happens at 735 # delay a + e. 736 # 737 # |-----------vvvv----------------| 738 # a b 739 # 740 # |----vvvv--------| 741 # d e 742 # 743 # Cross correlation starts computing by aligning the last sample 744 # of the trimmed test data to the first sample of golden data. 745 # The best delay calculated from trimmed test data and golden data 746 # cross correlation is e + a. But the real best delay that should be 747 # identical on two channel should be e + a + f. 748 # So we need to add back the length being trimmed in the end. 749 750 if max_cross_correlation: 751 return max_cross_correlation, best_delay + end_trimmed_length 752 else: 753 return None, None 754 755 756def compare_one_channel_correlation(test_data, golden_data, parameters): 757 """Compares two one-channel data by correlation. 758 759 @param test_data: A list containing the data to compare against golden data. 760 @param golden_data: A list containing the golden data. 761 @param parameters: A dict containing parameters for method. 762 763 @returns: A dict containing: 764 index: The index of similarity where 1 means they are different 765 only by a positive scale. 766 best_delay: The best delay of test data in relative to golden 767 data. 768 equal: A bool containing comparing result. 769 """ 770 if 'correlation_threshold' in parameters: 771 threshold = parameters['correlation_threshold'] 772 else: 773 threshold = _CORRELATION_INDEX_THRESHOLD 774 775 result_dict = dict() 776 max_cross_correlation, best_delay = get_one_channel_correlation( 777 test_data, golden_data) 778 result_dict['index'] = max_cross_correlation 779 result_dict['best_delay'] = best_delay 780 result_dict['equal'] = True if ( 781 max_cross_correlation and 782 max_cross_correlation > threshold) else False 783 logging.debug('result_dict: %r', result_dict) 784 return result_dict 785 786 787def compare_data_correlation(golden_data_binary, golden_data_format, 788 test_data_binary, test_data_format, 789 channel_map, parameters=None): 790 """Compares two raw data using correlation. 791 792 @param golden_data_binary: The binary containing golden data. 793 @param golden_data_format: The data format of golden data. 794 @param test_data_binary: The binary containing test data. 795 @param test_data_format: The data format of test data. 796 @param channel_map: A list containing channel mapping. 797 E.g. [1, 0, None, None, None, None, None, None] means 798 channel 0 of test data should map to channel 1 of 799 golden data. Channel 1 of test data should map to 800 channel 0 of golden data. Channel 2 to 7 of test data 801 should be skipped. 802 @param parameters: A dict containing parameters for method, if needed. 803 804 @raises: NotImplementedError if file type is not raw. 805 NotImplementedError if sampling rates of two data are not the same. 806 error.TestFail if golden data and test data are not equal. 807 """ 808 if parameters is None: 809 parameters = dict() 810 811 if (golden_data_format['file_type'] != 'raw' or 812 test_data_format['file_type'] != 'raw'): 813 raise NotImplementedError('Only support raw data in compare_data.') 814 if (golden_data_format['rate'] != test_data_format['rate']): 815 raise NotImplementedError( 816 'Only support comparing data with the same sampling rate') 817 golden_data = audio_data.AudioRawData( 818 binary=golden_data_binary, 819 channel=golden_data_format['channel'], 820 sample_format=golden_data_format['sample_format']) 821 test_data = audio_data.AudioRawData( 822 binary=test_data_binary, 823 channel=test_data_format['channel'], 824 sample_format=test_data_format['sample_format']) 825 compare_results = [] 826 for test_channel, golden_channel in enumerate(channel_map): 827 if golden_channel is None: 828 logging.info('Skipped channel %d', test_channel) 829 continue 830 test_data_one_channel = test_data.channel_data[test_channel] 831 golden_data_one_channel = golden_data.channel_data[golden_channel] 832 result_dict = dict(test_channel=test_channel, 833 golden_channel=golden_channel) 834 result_dict.update( 835 compare_one_channel_correlation( 836 test_data_one_channel, golden_data_one_channel, 837 parameters)) 838 compare_results.append(result_dict) 839 logging.info('compare_results: %r', compare_results) 840 for result in compare_results: 841 if not result['equal']: 842 error_msg = ('Failed on test channel %d and golden channel %d with ' 843 'index %f') % ( 844 result['test_channel'], 845 result['golden_channel'], 846 result['index']) 847 logging.error(error_msg) 848 raise error.TestFail(error_msg) 849 # Also checks best delay are exactly the same. 850 best_delays = set([result['best_delay'] for result in compare_results]) 851 if len(best_delays) > 1: 852 error_msg = 'There are more than one best delay: %s' % best_delays 853 logging.error(error_msg) 854 raise error.TestFail(error_msg) 855 856 857class _base_rms_test(test.test): 858 """Base class for all rms_test """ 859 860 def postprocess(self): 861 super(_base_rms_test, self).postprocess() 862 863 # Sum up the number of failed constraints in each iteration 864 if sum(len(x) for x in self.failed_constraints): 865 generate_rms_postmortem() 866 867 868class chrome_rms_test(_base_rms_test): 869 """Base test class for audio RMS test with Chrome. 870 871 The chrome instance can be accessed by self.chrome. 872 """ 873 def warmup(self): 874 super(chrome_rms_test, self).warmup() 875 876 # Not all client of this file using telemetry. 877 # Just do the import here for those who really need it. 878 from autotest_lib.client.common_lib.cros import chrome 879 880 self.chrome = chrome.Chrome(init_network_controller=True) 881 882 # The audio configuration could be changed when we 883 # restart chrome. 884 try: 885 cras_rms_test_setup() 886 except Exception: 887 self.chrome.browser.Close() 888 raise 889 890 891 def cleanup(self, *args): 892 try: 893 self.chrome.close() 894 finally: 895 super(chrome_rms_test, self).cleanup() 896 897class cras_rms_test(_base_rms_test): 898 """Base test class for CRAS audio RMS test.""" 899 900 def warmup(self): 901 super(cras_rms_test, self).warmup() 902 cras_rms_test_setup() 903 904 905class alsa_rms_test(_base_rms_test): 906 """Base test class for ALSA audio RMS test. 907 908 Note the warmup will take 10 seconds and the device cannot be used before it 909 returns. 910 """ 911 def warmup(self): 912 super(alsa_rms_test, self).warmup() 913 914 cras_rms_test_setup() 915 # We need CRAS to initialize the volume and gain. 916 cras_utils.playback(playback_file="/dev/zero", duration=1) 917 # CRAS will release the device after 10 seconds. 918 time.sleep(11) 919