• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7import logging
8import numpy
9import os
10import re
11import subprocess
12import tempfile
13import threading
14import time
15
16from glob import glob
17from autotest_lib.client.bin import test, utils
18from autotest_lib.client.bin.input.input_device import *
19from autotest_lib.client.common_lib import error
20from autotest_lib.client.cros.audio import alsa_utils
21from autotest_lib.client.cros.audio import audio_data
22from autotest_lib.client.cros.audio import cmd_utils
23from autotest_lib.client.cros.audio import cras_utils
24from autotest_lib.client.cros.audio import sox_utils
25
26LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'
27
28_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics'
29
30_DEFAULT_NUM_CHANNELS = 2
31_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat'
32_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
33_DEFAULT_PLAYBACK_VOLUME = 100
34_DEFAULT_CAPTURE_GAIN = 2500
35_DEFAULT_ALSA_MAX_VOLUME = '100%'
36_DEFAULT_ALSA_CAPTURE_GAIN = '25dB'
37
38# Minimum RMS value to pass when checking recorded file.
39_DEFAULT_SOX_RMS_THRESHOLD = 0.08
40
41_JACK_VALUE_ON_RE = re.compile(r'.*values=on')
42_HP_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Headphone\sJack')
43_MIC_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Mic\sJack')
44
45_SOX_RMS_AMPLITUDE_RE = re.compile(r'RMS\s+amplitude:\s+(.+)')
46_SOX_ROUGH_FREQ_RE = re.compile(r'Rough\s+frequency:\s+(.+)')
47
48_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected'
49_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS'
50_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS'
51
52# Tools from platform/audiotest
53AUDIOFUNTEST_PATH = 'audiofuntest'
54AUDIOLOOP_PATH = 'looptest'
55LOOPBACK_LATENCY_PATH = 'loopback_latency'
56SOX_PATH = 'sox'
57TEST_TONES_PATH = 'test_tones'
58
59_MINIMUM_NORM = 0.001
60_CORRELATION_INDEX_THRESHOLD = 0.999
61# The minimum difference of estimated frequencies between two sine waves.
62_FREQUENCY_DIFF_THRESHOLD = 20
63# The minimum RMS value of meaningful audio data.
64_MEANINGFUL_RMS_THRESHOLD = 0.001
65
66def set_mixer_controls(mixer_settings={}, card='0'):
67    """Sets all mixer controls listed in the mixer settings on card.
68
69    @param mixer_settings: Mixer settings to set.
70    @param card: Index of audio card to set mixer settings for.
71    """
72    logging.info('Setting mixer control values on %s', card)
73    for item in mixer_settings:
74        logging.info('Setting %s to %s on card %s',
75                     item['name'], item['value'], card)
76        cmd = 'amixer -c %s cset name=%s %s'
77        cmd = cmd % (card, item['name'], item['value'])
78        try:
79            utils.system(cmd)
80        except error.CmdError:
81            # A card is allowed not to support all the controls, so don't
82            # fail the test here if we get an error.
83            logging.info('amixer command failed: %s', cmd)
84
85def set_volume_levels(volume, capture):
86    """Sets the volume and capture gain through cras_test_client.
87
88    @param volume: The playback volume to set.
89    @param capture: The capture gain to set.
90    """
91    logging.info('Setting volume level to %d', volume)
92    try:
93        utils.system('/usr/bin/cras_test_client --volume %d' % volume)
94        logging.info('Setting capture gain to %d', capture)
95        utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture)
96        utils.system('/usr/bin/cras_test_client --dump_server_info')
97        utils.system('/usr/bin/cras_test_client --mute 0')
98        utils.system('amixer -c 0 contents')
99    except error.CmdError, e:
100        raise error.TestError(
101                '*** Can not tune volume through CRAS. *** (' + str(e) + ')')
102
103def loopback_latency_check(**args):
104    """Checks loopback latency.
105
106    @param args: additional arguments for loopback_latency.
107
108    @return A tuple containing measured and reported latency in uS.
109        Return None if no audio detected.
110    """
111    noise_threshold = str(args['n']) if args.has_key('n') else '400'
112
113    cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold)
114
115    output = utils.system_output(cmd, retain_output=True)
116
117    # Sleep for a short while to make sure device is not busy anymore
118    # after called loopback_latency.
119    time.sleep(.1)
120
121    measured_latency = None
122    reported_latency = None
123    for line in output.split('\n'):
124        match = re.search(_MEASURED_LATENCY_RE, line, re.I)
125        if match:
126            measured_latency = int(match.group(1))
127            continue
128        match = re.search(_REPORTED_LATENCY_RE, line, re.I)
129        if match:
130            reported_latency = int(match.group(1))
131            continue
132        if re.search(_AUDIO_NOT_FOUND_RE, line, re.I):
133            return None
134    if measured_latency and reported_latency:
135        return (measured_latency, reported_latency)
136    else:
137        # Should not reach here, just in case.
138        return None
139
140def get_mixer_jack_status(jack_reg_exp):
141    """Gets the mixer jack status.
142
143    @param jack_reg_exp: The regular expression to match jack control name.
144
145    @return None if the control does not exist, return True if jack control
146        is detected plugged, return False otherwise.
147    """
148    output = utils.system_output('amixer -c0 controls', retain_output=True)
149    numid = None
150    for line in output.split('\n'):
151        m = jack_reg_exp.match(line)
152        if m:
153            numid = m.group(1)
154            break
155
156    # Proceed only when matched numid is not empty.
157    if numid:
158        output = utils.system_output('amixer -c0 cget numid=%s' % numid)
159        for line in output.split('\n'):
160            if _JACK_VALUE_ON_RE.match(line):
161                return True
162        return False
163    else:
164        return None
165
166def get_hp_jack_status():
167    """Gets the status of headphone jack."""
168    status = get_mixer_jack_status(_HP_JACK_CONTROL_RE)
169    if status is not None:
170        return status
171
172    # When headphone jack is not found in amixer, lookup input devices
173    # instead.
174    #
175    # TODO(hychao): Check hp/mic jack status dynamically from evdev. And
176    # possibly replace the existing check using amixer.
177    for evdev in glob('/dev/input/event*'):
178        device = InputDevice(evdev)
179        if device.is_hp_jack():
180            return device.get_headphone_insert()
181    else:
182        return None
183
184def get_mic_jack_status():
185    """Gets the status of mic jack."""
186    status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE)
187    if status is not None:
188        return status
189
190    # When mic jack is not found in amixer, lookup input devices instead.
191    for evdev in glob('/dev/input/event*'):
192        device = InputDevice(evdev)
193        if device.is_mic_jack():
194            return device.get_microphone_insert()
195    else:
196        return None
197
198def log_loopback_dongle_status():
199    """Log the status of the loopback dongle to make sure it is equipped."""
200    dongle_status_ok = True
201
202    # Check Mic Jack
203    mic_jack_status = get_mic_jack_status()
204    logging.info('Mic jack status: %s', mic_jack_status)
205    dongle_status_ok &= bool(mic_jack_status)
206
207    # Check Headphone Jack
208    hp_jack_status = get_hp_jack_status()
209    logging.info('Headphone jack status: %s', hp_jack_status)
210    dongle_status_ok &= bool(hp_jack_status)
211
212    # Use latency check to test if audio can be captured through dongle.
213    # We only want to know the basic function of dongle, so no need to
214    # assert the latency accuracy here.
215    latency = loopback_latency_check(n=4000)
216    if latency:
217        logging.info('Got latency measured %d, reported %d',
218                latency[0], latency[1])
219    else:
220        logging.info('Latency check fail.')
221        dongle_status_ok = False
222
223    logging.info('audio loopback dongle test: %s',
224            'PASS' if dongle_status_ok else 'FAIL')
225
226# Functions to test audio palyback.
227def play_sound(duration_seconds=None, audio_file_path=None):
228    """Plays a sound file found at |audio_file_path| for |duration_seconds|.
229
230    If |audio_file_path|=None, plays a default audio file.
231    If |duration_seconds|=None, plays audio file in its entirety.
232
233    @param duration_seconds: Duration to play sound.
234    @param audio_file_path: Path to the audio file.
235    """
236    if not audio_file_path:
237        audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav'
238    duration_arg = ('-d %d' % duration_seconds) if duration_seconds else ''
239    utils.system('aplay %s %s' % (duration_arg, audio_file_path))
240
241def get_play_sine_args(channel, odev='default', freq=1000, duration=10,
242                       sample_size=16):
243    """Gets the command args to generate a sine wav to play to odev.
244
245    @param channel: 0 for left, 1 for right; otherwize, mono.
246    @param odev: alsa output device.
247    @param freq: frequency of the generated sine tone.
248    @param duration: duration of the generated sine tone.
249    @param sample_size: output audio sample size. Default to 16.
250    """
251    cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa',
252               odev, 'synth', str(duration)]
253    if channel == 0:
254        cmdargs += ['sine', str(freq), 'sine', '0']
255    elif channel == 1:
256        cmdargs += ['sine', '0', 'sine', str(freq)]
257    else:
258        cmdargs += ['sine', str(freq)]
259
260    return cmdargs
261
262def play_sine(channel, odev='default', freq=1000, duration=10,
263              sample_size=16):
264    """Generates a sine wave and plays to odev.
265
266    @param channel: 0 for left, 1 for right; otherwize, mono.
267    @param odev: alsa output device.
268    @param freq: frequency of the generated sine tone.
269    @param duration: duration of the generated sine tone.
270    @param sample_size: output audio sample size. Default to 16.
271    """
272    cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size)
273    utils.system(' '.join(cmdargs))
274
275# Functions to compose customized sox command, execute it and process the
276# output of sox command.
277def get_sox_mixer_cmd(infile, channel,
278                      num_channels=_DEFAULT_NUM_CHANNELS,
279                      sox_format=_DEFAULT_SOX_FORMAT):
280    """Gets sox mixer command to reduce channel.
281
282    @param infile: Input file name.
283    @param channel: The selected channel to take effect.
284    @param num_channels: The number of total channels to test.
285    @param sox_format: Format to generate sox command.
286    """
287    # Build up a pan value string for the sox command.
288    if channel == 0:
289        pan_values = '1'
290    else:
291        pan_values = '0'
292    for pan_index in range(1, num_channels):
293        if channel == pan_index:
294            pan_values = '%s%s' % (pan_values, ',1')
295        else:
296            pan_values = '%s%s' % (pan_values, ',0')
297
298    return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH,
299            sox_format, infile, sox_format, pan_values)
300
301def sox_stat_output(infile, channel,
302                    num_channels=_DEFAULT_NUM_CHANNELS,
303                    sox_format=_DEFAULT_SOX_FORMAT):
304    """Executes sox stat command.
305
306    @param infile: Input file name.
307    @param channel: The selected channel.
308    @param num_channels: The number of total channels to test.
309    @param sox_format: Format to generate sox command.
310
311    @return The output of sox stat command
312    """
313    sox_mixer_cmd = get_sox_mixer_cmd(infile, channel,
314                                      num_channels, sox_format)
315    stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format)
316    sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
317    return utils.system_output(sox_cmd, retain_output=True)
318
319def get_audio_rms(sox_output):
320    """Gets the audio RMS value from sox stat output
321
322    @param sox_output: Output of sox stat command.
323
324    @return The RMS value parsed from sox stat output.
325    """
326    for rms_line in sox_output.split('\n'):
327        m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
328        if m is not None:
329            return float(m.group(1))
330
331def get_rough_freq(sox_output):
332    """Gets the rough audio frequency from sox stat output
333
334    @param sox_output: Output of sox stat command.
335
336    @return The rough frequency value parsed from sox stat output.
337    """
338    for rms_line in sox_output.split('\n'):
339        m = _SOX_ROUGH_FREQ_RE.match(rms_line)
340        if m is not None:
341            return int(m.group(1))
342
343def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD):
344    """Checks if the calculated RMS value is expected.
345
346    @param sox_output: The output from sox stat command.
347    @param sox_threshold: The threshold to test RMS value against.
348
349    @raises error.TestError if RMS amplitude can't be parsed.
350    @raises error.TestFail if the RMS amplitude of the recording isn't above
351            the threshold.
352    """
353    rms_val = get_audio_rms(sox_output)
354
355    # In case we don't get a valid RMS value.
356    if rms_val is None:
357        raise error.TestError(
358            'Failed to generate an audio RMS value from playback.')
359
360    logging.info('Got audio RMS value of %f. Minimum pass is %f.',
361                 rms_val, sox_threshold)
362    if rms_val < sox_threshold:
363        raise error.TestFail(
364            'Audio RMS value %f too low. Minimum pass is %f.' %
365            (rms_val, sox_threshold))
366
367def noise_reduce_file(in_file, noise_file, out_file,
368                      sox_format=_DEFAULT_SOX_FORMAT):
369    """Runs the sox command to reduce noise.
370
371    Runs the sox command to noise-reduce in_file using the noise
372    profile from noise_file.
373
374    @param in_file: The file to noise reduce.
375    @param noise_file: The file containing the noise profile.
376        This can be created by recording silence.
377    @param out_file: The file contains the noise reduced sound.
378    @param sox_format: The  sox format to generate sox command.
379    """
380    prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH,
381               sox_format, noise_file)
382    reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
383            (SOX_PATH, sox_format, in_file, sox_format, out_file))
384    utils.system('%s | %s' % (prof_cmd, reduce_cmd))
385
386def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND):
387    """Records a sample from the default input device.
388
389    @param tmpfile: The file to record to.
390    @param record_command: The command to record audio.
391    """
392    utils.system('%s %s' % (record_command, tmpfile))
393
394def create_wav_file(wav_dir, prefix=""):
395    """Creates a unique name for wav file.
396
397    The created file name will be preserved in autotest result directory
398    for future analysis.
399
400    @param wav_dir: The directory of created wav file.
401    @param prefix: specified file name prefix.
402    """
403    filename = "%s-%s.wav" % (prefix, time.time())
404    return os.path.join(wav_dir, filename)
405
406def run_in_parallel(*funs):
407    """Runs methods in parallel.
408
409    @param funs: methods to run.
410    """
411    threads = []
412    for f in funs:
413        t = threading.Thread(target=f)
414        t.start()
415        threads.append(t)
416
417    for t in threads:
418        t.join()
419
420def loopback_test_channels(noise_file_name, wav_dir,
421                           playback_callback=None,
422                           check_recorded_callback=check_audio_rms,
423                           preserve_test_file=True,
424                           num_channels = _DEFAULT_NUM_CHANNELS,
425                           record_callback=record_sample,
426                           mix_callback=None):
427    """Tests loopback on all channels.
428
429    @param noise_file_name: Name of the file contains pre-recorded noise.
430    @param wav_dir: The directory of created wav file.
431    @param playback_callback: The callback to do the playback for
432        one channel.
433    @param record_callback: The callback to do the recording.
434    @param check_recorded_callback: The callback to check recorded file.
435    @param preserve_test_file: Retain the recorded files for future debugging.
436    @param num_channels: The number of total channels to test.
437    @param mix_callback: The callback to do on the one-channel file.
438    """
439    for channel in xrange(num_channels):
440        record_file_name = create_wav_file(wav_dir,
441                                           "record-%d" % channel)
442        functions = [lambda: record_callback(record_file_name)]
443
444        if playback_callback:
445            functions.append(lambda: playback_callback(channel))
446
447        if mix_callback:
448            mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel)
449            functions.append(lambda: mix_callback(mix_file_name))
450
451        run_in_parallel(*functions)
452
453        if mix_callback:
454            sox_output_mix = sox_stat_output(mix_file_name, channel)
455            rms_val_mix = get_audio_rms(sox_output_mix)
456            logging.info('Got mixed audio RMS value of %f.', rms_val_mix)
457
458        sox_output_record = sox_stat_output(record_file_name, channel)
459        rms_val_record = get_audio_rms(sox_output_record)
460        logging.info('Got recorded audio RMS value of %f.', rms_val_record)
461
462        reduced_file_name = create_wav_file(wav_dir,
463                                            "reduced-%d" % channel)
464        noise_reduce_file(record_file_name, noise_file_name,
465                          reduced_file_name)
466
467        sox_output_reduced = sox_stat_output(reduced_file_name, channel)
468
469        if not preserve_test_file:
470            os.unlink(reduced_file_name)
471            os.unlink(record_file_name)
472            if mix_callback:
473                os.unlink(mix_file_name)
474
475        check_recorded_callback(sox_output_reduced)
476
477
478def get_channel_sox_stat(
479        input_audio, channel_index, channels=2, bits=16, rate=48000):
480    """Gets the sox stat info of the selected channel in the input audio file.
481
482    @param input_audio: The input audio file to be analyzed.
483    @param channel_index: The index of the channel to be analyzed.
484                          (1 for the first channel).
485    @param channels: The number of channels in the input audio.
486    @param bits: The number of bits of each audio sample.
487    @param rate: The sampling rate.
488    """
489    if channel_index <= 0 or channel_index > channels:
490        raise ValueError('incorrect channel_indexi: %d' % channel_index)
491
492    if channels == 1:
493        return sox_utils.get_stat(
494                input_audio, channels=channels, bits=bits, rate=rate)
495
496    p1 = cmd_utils.popen(
497            sox_utils.extract_channel_cmd(
498                    input_audio, '-', channel_index,
499                    channels=channels, bits=bits, rate=rate),
500            stdout=subprocess.PIPE)
501    p2 = cmd_utils.popen(
502            sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate),
503            stdin=p1.stdout, stderr=subprocess.PIPE)
504    stat_output = p2.stderr.read()
505    cmd_utils.wait_and_check_returncode(p1, p2)
506    return sox_utils.parse_stat_output(stat_output)
507
508
509def get_rms(input_audio, channels=1, bits=16, rate=48000):
510    """Gets the RMS values of all channels of the input audio.
511
512    @param input_audio: The input audio file to be checked.
513    @param channels: The number of channels in the input audio.
514    @param bits: The number of bits of each audio sample.
515    @param rate: The sampling rate.
516    """
517    stats = [get_channel_sox_stat(
518            input_audio, i + 1, channels=channels, bits=bits,
519            rate=rate) for i in xrange(channels)]
520
521    logging.info('sox stat: %s', [str(s) for s in stats])
522    return [s.rms for s in stats]
523
524
525def reduce_noise_and_get_rms(
526        input_audio, noise_file, channels=1, bits=16, rate=48000):
527    """Reduces noise in the input audio by the given noise file and then gets
528    the RMS values of all channels of the input audio.
529
530    @param input_audio: The input audio file to be analyzed.
531    @param noise_file: The noise file used to reduce noise in the input audio.
532    @param channels: The number of channels in the input audio.
533    @param bits: The number of bits of each audio sample.
534    @param rate: The sampling rate.
535    """
536    with tempfile.NamedTemporaryFile() as reduced_file:
537        p1 = cmd_utils.popen(
538                sox_utils.noise_profile_cmd(
539                        noise_file, '-', channels=channels, bits=bits,
540                        rate=rate),
541                stdout=subprocess.PIPE)
542        p2 = cmd_utils.popen(
543                sox_utils.noise_reduce_cmd(
544                        input_audio, reduced_file.name, '-',
545                        channels=channels, bits=bits, rate=rate),
546                stdin=p1.stdout)
547        cmd_utils.wait_and_check_returncode(p1, p2)
548        return get_rms(reduced_file.name, channels, bits, rate)
549
550
551def skip_devices_to_test(*boards):
552    """Devices to skip due to hardware or test compatibility issues.
553
554    @param boards: the boards to skip testing.
555    """
556    # TODO(scottz): Remove this when crbug.com/220147 is fixed.
557    dut_board = utils.get_current_board()
558    if dut_board in boards:
559       raise error.TestNAError('This test is not available on %s' % dut_board)
560
561
562def cras_rms_test_setup():
563    """Setups for the cras_rms_tests.
564
565    To make sure the line_out-to-mic_in path is all green.
566    """
567    # TODO(owenlin): Now, the nodes are choosed by chrome.
568    #                We should do it here.
569    cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME)
570    cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME)
571
572    cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN)
573
574    cras_utils.set_system_mute(False)
575    cras_utils.set_capture_mute(False)
576
577
578def generate_rms_postmortem():
579    """Generates postmortem for rms tests."""
580    try:
581        logging.info('audio postmortem report')
582        log_loopback_dongle_status()
583        logging.info(get_audio_diagnostics())
584    except Exception:
585        logging.exception('Error while generating postmortem report')
586
587
588def get_audio_diagnostics():
589    """Gets audio diagnostic results.
590
591    @returns: a string containing diagnostic results.
592
593    """
594    return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=subprocess.PIPE)
595
596
597def get_max_cross_correlation(signal_a, signal_b):
598    """Gets max cross-correlation and best time delay of two signals.
599
600    Computes cross-correlation function between two
601    signals and gets the maximum value and time delay.
602    The steps includes:
603      1. Compute cross-correlation function of X and Y and get Cxy.
604         The correlation function Cxy is an array where Cxy[k] is the
605         cross product of X and Y when Y is delayed by k.
606         Refer to manual of numpy.correlate for detail of correlation.
607      2. Find the maximum value C_max and index C_index in Cxy.
608      3. Compute L2 norm of X and Y to get norm(X) and norm(Y).
609      4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation.
610
611    Max cross-correlation indicates the similarity of X and Y. The value
612    is 1 if X equals Y multiplied by a positive scalar.
613    The value is -1 if X equals Y multiplied by a negative scaler.
614    Any constant level shift will be regarded as distortion and will make
615    max cross-correlation value deviated from 1.
616    C_index is the best time delay of Y that make Y looks similar to X.
617    Refer to http://en.wikipedia.org/wiki/Cross-correlation.
618
619    @param signal_a: A list of numbers which contains the first signal.
620    @param signal_b: A list of numbers which contains the second signal.
621
622    @raises: ValueError if any number in signal_a or signal_b is not a float.
623             ValueError if norm of any array is less than _MINIMUM_NORM.
624
625    @returns: A tuple (correlation index, best delay). If there are more than
626              one best delay, just return the first one.
627    """
628    def check_list_contains_float(numbers):
629        """Checks the elements in a list are all float.
630
631        @param numbers: A list of numbers.
632
633        @raises: ValueError if there is any element which is not a float
634                 in the list.
635        """
636        if any(not isinstance(x, float) for x in numbers):
637            raise ValueError('List contains number which is not a float')
638
639    check_list_contains_float(signal_a)
640    check_list_contains_float(signal_b)
641
642    norm_a = numpy.linalg.norm(signal_a)
643    norm_b = numpy.linalg.norm(signal_b)
644    logging.debug('norm_a: %f', norm_a)
645    logging.debug('norm_b: %f', norm_b)
646    if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM:
647        raise ValueError('No meaningful data as norm is too small.')
648
649    correlation = numpy.correlate(signal_a, signal_b, 'full')
650    max_correlation = max(correlation)
651    best_delays = [i for i, j in enumerate(correlation) if j == max_correlation]
652    if len(best_delays) > 1:
653        logging.warning('There are more than one best delay: %r', best_delays)
654    return max_correlation / (norm_a * norm_b), best_delays[0]
655
656
657def trim_data(data, threshold=0):
658    """Trims a data by removing value that is too small in head and tail.
659
660    Removes elements in head and tail whose absolute value is smaller than
661    or equal to threshold.
662    E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) =
663    ([0.2, 0.3, 0.2], 2)
664
665    @param data: A list of numbers.
666    @param threshold: The threshold to compare against.
667
668    @returns: A tuple (trimmed_data, end_trimmed_length), where
669              end_trimmed_length is the length of original data being trimmed
670              from the end.
671              Returns ([], None) if there is no valid data.
672    """
673    indice_valid = [
674            i for i, j in enumerate(data) if abs(j) > threshold]
675    if not indice_valid:
676        logging.warning(
677                'There is no element with absolute value greater '
678                'than threshold %f', threshold)
679        return [], None
680    logging.debug('Start and end of indice_valid: %d, %d',
681                  indice_valid[0], indice_valid[-1])
682    end_trimmed_length = len(data) - indice_valid[-1] - 1
683    logging.debug('Trimmed length in the end: %d', end_trimmed_length)
684    return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length)
685
686
687def get_one_channel_correlation(test_data, golden_data):
688    """Gets max cross-correlation of test_data and golden_data.
689
690    Trims test data and compute the max cross-correlation against golden_data.
691    Signal can be trimmed because those zero values in the head and tail of
692    a signal will not affect correlation computation.
693
694    @param test_data: A list containing the data to compare against golden data.
695    @param golden_data: A list containing the golden data.
696
697    @returns: A tuple (max cross-correlation, best_delay) if data is valid.
698              Otherwise returns (None, None). Refer to docstring of
699              get_max_cross_correlation.
700    """
701    trimmed_test_data, end_trimmed_length = trim_data(test_data)
702
703    def to_float(samples):
704      """Casts elements in the list to float.
705
706      @param samples: A list of numbers.
707
708      @returns: A list of original numbers casted to float.
709      """
710      samples_float = [float(x) for x in samples]
711      return samples_float
712
713    max_cross_correlation, best_delay =  get_max_cross_correlation(
714            to_float(golden_data),
715            to_float(trimmed_test_data))
716
717    # The reason to add back the trimmed length in the end.
718    # E.g.:
719    # golden data:
720    #
721    # |-----------vvvv----------------|  vvvv is the signal of interest.
722    #       a                 b
723    #
724    # test data:
725    #
726    # |---x----vvvv--------x----------------|  x is the place to trim.
727    #   c   d         e            f
728    #
729    # trimmed test data:
730    #
731    # |----vvvv--------|
732    #   d         e
733    #
734    # The first output of cross correlation computation :
735    #
736    #                  |-----------vvvv----------------|
737    #                       a                 b
738    #
739    # |----vvvv--------|
740    #   d         e
741    #
742    # The largest output of cross correlation computation happens at
743    # delay a + e.
744    #
745    #                  |-----------vvvv----------------|
746    #                       a                 b
747    #
748    #                         |----vvvv--------|
749    #                           d         e
750    #
751    # Cross correlation starts computing by aligning the last sample
752    # of the trimmed test data to the first sample of golden data.
753    # The best delay calculated from trimmed test data and golden data
754    # cross correlation is e + a. But the real best delay that should be
755    # identical on two channel should be e + a + f.
756    # So we need to add back the length being trimmed in the end.
757
758    if max_cross_correlation:
759        return max_cross_correlation, best_delay + end_trimmed_length
760    else:
761        return None, None
762
763
764def compare_one_channel_correlation(test_data, golden_data, parameters):
765    """Compares two one-channel data by correlation.
766
767    @param test_data: A list containing the data to compare against golden data.
768    @param golden_data: A list containing the golden data.
769    @param parameters: A dict containing parameters for method.
770
771    @returns: A dict containing:
772              index: The index of similarity where 1 means they are different
773                  only by a positive scale.
774              best_delay: The best delay of test data in relative to golden
775                  data.
776              equal: A bool containing comparing result.
777    """
778    if 'correlation_threshold' in parameters:
779        threshold = parameters['correlation_threshold']
780    else:
781        threshold = _CORRELATION_INDEX_THRESHOLD
782
783    result_dict = dict()
784    max_cross_correlation, best_delay = get_one_channel_correlation(
785            test_data, golden_data)
786    result_dict['index'] = max_cross_correlation
787    result_dict['best_delay'] = best_delay
788    result_dict['equal'] = True if (
789        max_cross_correlation and
790        max_cross_correlation > threshold) else False
791    logging.debug('result_dict: %r', result_dict)
792    return result_dict
793
794
795def compare_data_correlation(golden_data_binary, golden_data_format,
796                             test_data_binary, test_data_format,
797                             channel_map, parameters=None):
798    """Compares two raw data using correlation.
799
800    @param golden_data_binary: The binary containing golden data.
801    @param golden_data_format: The data format of golden data.
802    @param test_data_binary: The binary containing test data.
803    @param test_data_format: The data format of test data.
804    @param channel_map: A list containing channel mapping.
805                        E.g. [1, 0, None, None, None, None, None, None] means
806                        channel 0 of test data should map to channel 1 of
807                        golden data. Channel 1 of test data should map to
808                        channel 0 of golden data. Channel 2 to 7 of test data
809                        should be skipped.
810    @param parameters: A dict containing parameters for method, if needed.
811
812    @raises: NotImplementedError if file type is not raw.
813             NotImplementedError if sampling rates of two data are not the same.
814             error.TestFail if golden data and test data are not equal.
815    """
816    if parameters is None:
817        parameters = dict()
818
819    if (golden_data_format['file_type'] != 'raw' or
820        test_data_format['file_type'] != 'raw'):
821        raise NotImplementedError('Only support raw data in compare_data.')
822    if (golden_data_format['rate'] != test_data_format['rate']):
823        raise NotImplementedError(
824                'Only support comparing data with the same sampling rate')
825    golden_data = audio_data.AudioRawData(
826            binary=golden_data_binary,
827            channel=golden_data_format['channel'],
828            sample_format=golden_data_format['sample_format'])
829    test_data = audio_data.AudioRawData(
830            binary=test_data_binary,
831            channel=test_data_format['channel'],
832            sample_format=test_data_format['sample_format'])
833    compare_results = []
834    for test_channel, golden_channel in enumerate(channel_map):
835        if golden_channel is None:
836            logging.info('Skipped channel %d', test_channel)
837            continue
838        test_data_one_channel = test_data.channel_data[test_channel]
839        golden_data_one_channel = golden_data.channel_data[golden_channel]
840        result_dict = dict(test_channel=test_channel,
841                           golden_channel=golden_channel)
842        result_dict.update(
843                compare_one_channel_correlation(
844                        test_data_one_channel, golden_data_one_channel,
845                        parameters))
846        compare_results.append(result_dict)
847    logging.info('compare_results: %r', compare_results)
848    for result in compare_results:
849        if not result['equal']:
850            error_msg = ('Failed on test channel %d and golden channel %d with '
851                         'index %f') % (
852                                 result['test_channel'],
853                                 result['golden_channel'],
854                                 result['index'])
855            logging.error(error_msg)
856            raise error.TestFail(error_msg)
857    # Also checks best delay are exactly the same.
858    best_delays = set([result['best_delay'] for result in compare_results])
859    if len(best_delays) > 1:
860        error_msg = 'There are more than one best delay: %s' % best_delays
861        logging.error(error_msg)
862        raise error.TestFail(error_msg)
863
864
865class _base_rms_test(test.test):
866    """Base class for all rms_test """
867
868    def postprocess(self):
869        super(_base_rms_test, self).postprocess()
870
871        # Sum up the number of failed constraints in each iteration
872        if sum(len(x) for x in self.failed_constraints):
873            generate_rms_postmortem()
874
875
876class chrome_rms_test(_base_rms_test):
877    """Base test class for audio RMS test with Chrome.
878
879    The chrome instance can be accessed by self.chrome.
880    """
881    def warmup(self):
882        skip_devices_to_test('x86-mario')
883        super(chrome_rms_test, self).warmup()
884
885        # Not all client of this file using telemetry.
886        # Just do the import here for those who really need it.
887        from autotest_lib.client.common_lib.cros import chrome
888
889        self.chrome = chrome.Chrome(init_network_controller=True)
890
891        # The audio configuration could be changed when we
892        # restart chrome.
893        try:
894            cras_rms_test_setup()
895        except Exception:
896            self.chrome.browser.Close()
897            raise
898
899
900    def cleanup(self, *args):
901        try:
902            self.chrome.close()
903        finally:
904            super(chrome_rms_test, self).cleanup()
905
906class cras_rms_test(_base_rms_test):
907    """Base test class for CRAS audio RMS test."""
908
909    def warmup(self):
910        skip_devices_to_test('x86-mario')
911        super(cras_rms_test, self).warmup()
912        cras_rms_test_setup()
913
914
915def alsa_rms_test_setup():
916    """Setup for alsa_rms_test.
917
918    Different boards/chipsets have different set of mixer controls.  Even
919    controls that have the same name on different boards might have different
920    capabilities.  The following is a general idea to setup a given class of
921    boards, and some specialized setup for certain boards.
922    """
923    card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic')
924    arch = utils.get_arch()
925    board = utils.get_board()
926    uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090')
927    if board in ['daisy_spring', 'daisy_skate']:
928        # The MIC controls of the boards do not support dB syntax.
929        alsa_utils.mixer_cmd(card_id,
930                             ['sset', 'Headphone', _DEFAULT_ALSA_MAX_VOLUME])
931        alsa_utils.mixer_cmd(card_id, ['sset', 'MIC1',
932                                       _DEFAULT_ALSA_MAX_VOLUME])
933        alsa_utils.mixer_cmd(card_id, ['sset', 'MIC2',
934                                       _DEFAULT_ALSA_MAX_VOLUME])
935    elif arch in ['armv7l', 'aarch64'] or uses_max98090:
936        # ARM platforms or Intel platforms that uses max98090 codec driver.
937        alsa_utils.mixer_cmd(card_id,
938                             ['sset', 'Headphone', _DEFAULT_ALSA_MAX_VOLUME])
939        alsa_utils.mixer_cmd(card_id, ['sset', 'MIC1',
940                                       _DEFAULT_ALSA_CAPTURE_GAIN])
941        alsa_utils.mixer_cmd(card_id, ['sset', 'MIC2',
942                                       _DEFAULT_ALSA_CAPTURE_GAIN])
943    else:
944        # The rest of Intel platforms.
945        alsa_utils.mixer_cmd(card_id, ['sset', 'Master',
946                                       _DEFAULT_ALSA_MAX_VOLUME])
947        alsa_utils.mixer_cmd(card_id,
948                             ['sset', 'Capture', _DEFAULT_ALSA_CAPTURE_GAIN])
949
950
951class alsa_rms_test(_base_rms_test):
952    """Base test class for ALSA audio RMS test."""
953
954    def warmup(self):
955        skip_devices_to_test('x86-mario')
956        super(alsa_rms_test, self).warmup()
957
958        alsa_rms_test_setup()
959