• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7import logging
8import numpy
9import os
10import re
11import tempfile
12import threading
13import time
14
15from glob import glob
16from autotest_lib.client.bin import test, utils
17from autotest_lib.client.bin.input.input_device import *
18from autotest_lib.client.common_lib import error
19from autotest_lib.client.cros.audio import alsa_utils
20from autotest_lib.client.cros.audio import audio_data
21from autotest_lib.client.cros.audio import cmd_utils
22from autotest_lib.client.cros.audio import cras_utils
23from autotest_lib.client.cros.audio import sox_utils
24
25LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'
26
27_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics'
28
29_DEFAULT_NUM_CHANNELS = 2
30_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat'
31_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
32_DEFAULT_PLAYBACK_VOLUME = 100
33_DEFAULT_CAPTURE_GAIN = 2500
34_DEFAULT_ALSA_MAX_VOLUME = '100%'
35_DEFAULT_ALSA_CAPTURE_GAIN = '25dB'
36
37# Minimum RMS value to pass when checking recorded file.
38_DEFAULT_SOX_RMS_THRESHOLD = 0.08
39
40_JACK_VALUE_ON_RE = re.compile('.*values=on')
41_HP_JACK_CONTROL_RE = re.compile('numid=(\d+).*Headphone\sJack')
42_MIC_JACK_CONTROL_RE = re.compile('numid=(\d+).*Mic\sJack')
43
44_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
45_SOX_ROUGH_FREQ_RE = re.compile('Rough\s+frequency:\s+(.+)')
46
47_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected'
48_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS'
49_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS'
50
51# Tools from platform/audiotest
52AUDIOFUNTEST_PATH = 'audiofuntest'
53AUDIOLOOP_PATH = 'looptest'
54LOOPBACK_LATENCY_PATH = 'loopback_latency'
55SOX_PATH = 'sox'
56TEST_TONES_PATH = 'test_tones'
57
58_MINIMUM_NORM = 0.001
59_CORRELATION_INDEX_THRESHOLD = 0.999
60# The minimum difference of estimated frequencies between two sine waves.
61_FREQUENCY_DIFF_THRESHOLD = 20
62# The minimum RMS value of meaningful audio data.
63_MEANINGFUL_RMS_THRESHOLD = 0.001
64
65def set_mixer_controls(mixer_settings={}, card='0'):
66    """Sets all mixer controls listed in the mixer settings on card.
67
68    @param mixer_settings: Mixer settings to set.
69    @param card: Index of audio card to set mixer settings for.
70    """
71    logging.info('Setting mixer control values on %s', card)
72    for item in mixer_settings:
73        logging.info('Setting %s to %s on card %s',
74                     item['name'], item['value'], card)
75        cmd = 'amixer -c %s cset name=%s %s'
76        cmd = cmd % (card, item['name'], item['value'])
77        try:
78            utils.system(cmd)
79        except error.CmdError:
80            # A card is allowed not to support all the controls, so don't
81            # fail the test here if we get an error.
82            logging.info('amixer command failed: %s', cmd)
83
84def set_volume_levels(volume, capture):
85    """Sets the volume and capture gain through cras_test_client.
86
87    @param volume: The playback volume to set.
88    @param capture: The capture gain to set.
89    """
90    logging.info('Setting volume level to %d', volume)
91    utils.system('/usr/bin/cras_test_client --volume %d' % volume)
92    logging.info('Setting capture gain to %d', capture)
93    utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture)
94    utils.system('/usr/bin/cras_test_client --dump_server_info')
95    utils.system('/usr/bin/cras_test_client --mute 0')
96    utils.system('amixer -c 0 contents')
97
98def loopback_latency_check(**args):
99    """Checks loopback latency.
100
101    @param args: additional arguments for loopback_latency.
102
103    @return A tuple containing measured and reported latency in uS.
104        Return None if no audio detected.
105    """
106    noise_threshold = str(args['n']) if args.has_key('n') else '400'
107
108    cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold)
109
110    output = utils.system_output(cmd, retain_output=True)
111
112    # Sleep for a short while to make sure device is not busy anymore
113    # after called loopback_latency.
114    time.sleep(.1)
115
116    measured_latency = None
117    reported_latency = None
118    for line in output.split('\n'):
119        match = re.search(_MEASURED_LATENCY_RE, line, re.I)
120        if match:
121            measured_latency = int(match.group(1))
122            continue
123        match = re.search(_REPORTED_LATENCY_RE, line, re.I)
124        if match:
125            reported_latency = int(match.group(1))
126            continue
127        if re.search(_AUDIO_NOT_FOUND_RE, line, re.I):
128            return None
129    if measured_latency and reported_latency:
130        return (measured_latency, reported_latency)
131    else:
132        # Should not reach here, just in case.
133        return None
134
135def get_mixer_jack_status(jack_reg_exp):
136    """Gets the mixer jack status.
137
138    @param jack_reg_exp: The regular expression to match jack control name.
139
140    @return None if the control does not exist, return True if jack control
141        is detected plugged, return False otherwise.
142    """
143    output = utils.system_output('amixer -c0 controls', retain_output=True)
144    numid = None
145    for line in output.split('\n'):
146        m = jack_reg_exp.match(line)
147        if m:
148            numid = m.group(1)
149            break
150
151    # Proceed only when matched numid is not empty.
152    if numid:
153        output = utils.system_output('amixer -c0 cget numid=%s' % numid)
154        for line in output.split('\n'):
155            if _JACK_VALUE_ON_RE.match(line):
156                return True
157        return False
158    else:
159        return None
160
161def get_hp_jack_status():
162    """Gets the status of headphone jack."""
163    status = get_mixer_jack_status(_HP_JACK_CONTROL_RE)
164    if status is not None:
165        return status
166
167    # When headphone jack is not found in amixer, lookup input devices
168    # instead.
169    #
170    # TODO(hychao): Check hp/mic jack status dynamically from evdev. And
171    # possibly replace the existing check using amixer.
172    for evdev in glob('/dev/input/event*'):
173        device = InputDevice(evdev)
174        if device.is_hp_jack():
175            return device.get_headphone_insert()
176    else:
177        return None
178
179def get_mic_jack_status():
180    """Gets the status of mic jack."""
181    status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE)
182    if status is not None:
183        return status
184
185    # When mic jack is not found in amixer, lookup input devices instead.
186    for evdev in glob('/dev/input/event*'):
187        device = InputDevice(evdev)
188        if device.is_mic_jack():
189            return device.get_microphone_insert()
190    else:
191        return None
192
193def log_loopback_dongle_status():
194    """Log the status of the loopback dongle to make sure it is equipped."""
195    dongle_status_ok = True
196
197    # Check Mic Jack
198    mic_jack_status = get_mic_jack_status()
199    logging.info('Mic jack status: %s', mic_jack_status)
200    dongle_status_ok &= bool(mic_jack_status)
201
202    # Check Headphone Jack
203    hp_jack_status = get_hp_jack_status()
204    logging.info('Headphone jack status: %s', hp_jack_status)
205    dongle_status_ok &= bool(hp_jack_status)
206
207    # Use latency check to test if audio can be captured through dongle.
208    # We only want to know the basic function of dongle, so no need to
209    # assert the latency accuracy here.
210    latency = loopback_latency_check(n=4000)
211    if latency:
212        logging.info('Got latency measured %d, reported %d',
213                latency[0], latency[1])
214    else:
215        logging.info('Latency check fail.')
216        dongle_status_ok = False
217
218    logging.info('audio loopback dongle test: %s',
219            'PASS' if dongle_status_ok else 'FAIL')
220
221# Functions to test audio palyback.
222def play_sound(duration_seconds=None, audio_file_path=None):
223    """Plays a sound file found at |audio_file_path| for |duration_seconds|.
224
225    If |audio_file_path|=None, plays a default audio file.
226    If |duration_seconds|=None, plays audio file in its entirety.
227
228    @param duration_seconds: Duration to play sound.
229    @param audio_file_path: Path to the audio file.
230    """
231    if not audio_file_path:
232        audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav'
233    duration_arg = ('-d %d' % duration_seconds) if duration_seconds else ''
234    utils.system('aplay %s %s' % (duration_arg, audio_file_path))
235
236def get_play_sine_args(channel, odev='default', freq=1000, duration=10,
237                       sample_size=16):
238    """Gets the command args to generate a sine wav to play to odev.
239
240    @param channel: 0 for left, 1 for right; otherwize, mono.
241    @param odev: alsa output device.
242    @param freq: frequency of the generated sine tone.
243    @param duration: duration of the generated sine tone.
244    @param sample_size: output audio sample size. Default to 16.
245    """
246    cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa',
247               odev, 'synth', str(duration)]
248    if channel == 0:
249        cmdargs += ['sine', str(freq), 'sine', '0']
250    elif channel == 1:
251        cmdargs += ['sine', '0', 'sine', str(freq)]
252    else:
253        cmdargs += ['sine', str(freq)]
254
255    return cmdargs
256
257def play_sine(channel, odev='default', freq=1000, duration=10,
258              sample_size=16):
259    """Generates a sine wave and plays to odev.
260
261    @param channel: 0 for left, 1 for right; otherwize, mono.
262    @param odev: alsa output device.
263    @param freq: frequency of the generated sine tone.
264    @param duration: duration of the generated sine tone.
265    @param sample_size: output audio sample size. Default to 16.
266    """
267    cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size)
268    utils.system(' '.join(cmdargs))
269
270# Functions to compose customized sox command, execute it and process the
271# output of sox command.
272def get_sox_mixer_cmd(infile, channel,
273                      num_channels=_DEFAULT_NUM_CHANNELS,
274                      sox_format=_DEFAULT_SOX_FORMAT):
275    """Gets sox mixer command to reduce channel.
276
277    @param infile: Input file name.
278    @param channel: The selected channel to take effect.
279    @param num_channels: The number of total channels to test.
280    @param sox_format: Format to generate sox command.
281    """
282    # Build up a pan value string for the sox command.
283    if channel == 0:
284        pan_values = '1'
285    else:
286        pan_values = '0'
287    for pan_index in range(1, num_channels):
288        if channel == pan_index:
289            pan_values = '%s%s' % (pan_values, ',1')
290        else:
291            pan_values = '%s%s' % (pan_values, ',0')
292
293    return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH,
294            sox_format, infile, sox_format, pan_values)
295
296def sox_stat_output(infile, channel,
297                    num_channels=_DEFAULT_NUM_CHANNELS,
298                    sox_format=_DEFAULT_SOX_FORMAT):
299    """Executes sox stat command.
300
301    @param infile: Input file name.
302    @param channel: The selected channel.
303    @param num_channels: The number of total channels to test.
304    @param sox_format: Format to generate sox command.
305
306    @return The output of sox stat command
307    """
308    sox_mixer_cmd = get_sox_mixer_cmd(infile, channel,
309                                      num_channels, sox_format)
310    stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format)
311    sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
312    return utils.system_output(sox_cmd, retain_output=True)
313
314def get_audio_rms(sox_output):
315    """Gets the audio RMS value from sox stat output
316
317    @param sox_output: Output of sox stat command.
318
319    @return The RMS value parsed from sox stat output.
320    """
321    for rms_line in sox_output.split('\n'):
322        m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
323        if m is not None:
324            return float(m.group(1))
325
326def get_rough_freq(sox_output):
327    """Gets the rough audio frequency from sox stat output
328
329    @param sox_output: Output of sox stat command.
330
331    @return The rough frequency value parsed from sox stat output.
332    """
333    for rms_line in sox_output.split('\n'):
334        m = _SOX_ROUGH_FREQ_RE.match(rms_line)
335        if m is not None:
336            return int(m.group(1))
337
338def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD):
339    """Checks if the calculated RMS value is expected.
340
341    @param sox_output: The output from sox stat command.
342    @param sox_threshold: The threshold to test RMS value against.
343
344    @raises error.TestError if RMS amplitude can't be parsed.
345    @raises error.TestFail if the RMS amplitude of the recording isn't above
346            the threshold.
347    """
348    rms_val = get_audio_rms(sox_output)
349
350    # In case we don't get a valid RMS value.
351    if rms_val is None:
352        raise error.TestError(
353            'Failed to generate an audio RMS value from playback.')
354
355    logging.info('Got audio RMS value of %f. Minimum pass is %f.',
356                 rms_val, sox_threshold)
357    if rms_val < sox_threshold:
358        raise error.TestFail(
359            'Audio RMS value %f too low. Minimum pass is %f.' %
360            (rms_val, sox_threshold))
361
362def noise_reduce_file(in_file, noise_file, out_file,
363                      sox_format=_DEFAULT_SOX_FORMAT):
364    """Runs the sox command to reduce noise.
365
366    Runs the sox command to noise-reduce in_file using the noise
367    profile from noise_file.
368
369    @param in_file: The file to noise reduce.
370    @param noise_file: The file containing the noise profile.
371        This can be created by recording silence.
372    @param out_file: The file contains the noise reduced sound.
373    @param sox_format: The  sox format to generate sox command.
374    """
375    prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH,
376               sox_format, noise_file)
377    reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
378            (SOX_PATH, sox_format, in_file, sox_format, out_file))
379    utils.system('%s | %s' % (prof_cmd, reduce_cmd))
380
381def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND):
382    """Records a sample from the default input device.
383
384    @param tmpfile: The file to record to.
385    @param record_command: The command to record audio.
386    """
387    utils.system('%s %s' % (record_command, tmpfile))
388
389def create_wav_file(wav_dir, prefix=""):
390    """Creates a unique name for wav file.
391
392    The created file name will be preserved in autotest result directory
393    for future analysis.
394
395    @param wav_dir: The directory of created wav file.
396    @param prefix: specified file name prefix.
397    """
398    filename = "%s-%s.wav" % (prefix, time.time())
399    return os.path.join(wav_dir, filename)
400
401def run_in_parallel(*funs):
402    """Runs methods in parallel.
403
404    @param funs: methods to run.
405    """
406    threads = []
407    for f in funs:
408        t = threading.Thread(target=f)
409        t.start()
410        threads.append(t)
411
412    for t in threads:
413        t.join()
414
415def loopback_test_channels(noise_file_name, wav_dir,
416                           playback_callback=None,
417                           check_recorded_callback=check_audio_rms,
418                           preserve_test_file=True,
419                           num_channels = _DEFAULT_NUM_CHANNELS,
420                           record_callback=record_sample,
421                           mix_callback=None):
422    """Tests loopback on all channels.
423
424    @param noise_file_name: Name of the file contains pre-recorded noise.
425    @param wav_dir: The directory of created wav file.
426    @param playback_callback: The callback to do the playback for
427        one channel.
428    @param record_callback: The callback to do the recording.
429    @param check_recorded_callback: The callback to check recorded file.
430    @param preserve_test_file: Retain the recorded files for future debugging.
431    @param num_channels: The number of total channels to test.
432    @param mix_callback: The callback to do on the one-channel file.
433    """
434    for channel in xrange(num_channels):
435        record_file_name = create_wav_file(wav_dir,
436                                           "record-%d" % channel)
437        functions = [lambda: record_callback(record_file_name)]
438
439        if playback_callback:
440            functions.append(lambda: playback_callback(channel))
441
442        if mix_callback:
443            mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel)
444            functions.append(lambda: mix_callback(mix_file_name))
445
446        run_in_parallel(*functions)
447
448        if mix_callback:
449            sox_output_mix = sox_stat_output(mix_file_name, channel)
450            rms_val_mix = get_audio_rms(sox_output_mix)
451            logging.info('Got mixed audio RMS value of %f.', rms_val_mix)
452
453        sox_output_record = sox_stat_output(record_file_name, channel)
454        rms_val_record = get_audio_rms(sox_output_record)
455        logging.info('Got recorded audio RMS value of %f.', rms_val_record)
456
457        reduced_file_name = create_wav_file(wav_dir,
458                                            "reduced-%d" % channel)
459        noise_reduce_file(record_file_name, noise_file_name,
460                          reduced_file_name)
461
462        sox_output_reduced = sox_stat_output(reduced_file_name, channel)
463
464        if not preserve_test_file:
465            os.unlink(reduced_file_name)
466            os.unlink(record_file_name)
467            if mix_callback:
468                os.unlink(mix_file_name)
469
470        check_recorded_callback(sox_output_reduced)
471
472
473def get_channel_sox_stat(
474        input_audio, channel_index, channels=2, bits=16, rate=48000):
475    """Gets the sox stat info of the selected channel in the input audio file.
476
477    @param input_audio: The input audio file to be analyzed.
478    @param channel_index: The index of the channel to be analyzed.
479                          (1 for the first channel).
480    @param channels: The number of channels in the input audio.
481    @param bits: The number of bits of each audio sample.
482    @param rate: The sampling rate.
483    """
484    if channel_index <= 0 or channel_index > channels:
485        raise ValueError('incorrect channel_indexi: %d' % channel_index)
486
487    if channels == 1:
488        return sox_utils.get_stat(
489                input_audio, channels=channels, bits=bits, rate=rate)
490
491    p1 = cmd_utils.popen(
492            sox_utils.extract_channel_cmd(
493                    input_audio, '-', channel_index,
494                    channels=channels, bits=bits, rate=rate),
495            stdout=cmd_utils.PIPE)
496    p2 = cmd_utils.popen(
497            sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate),
498            stdin=p1.stdout, stderr=cmd_utils.PIPE)
499    stat_output = p2.stderr.read()
500    cmd_utils.wait_and_check_returncode(p1, p2)
501    return sox_utils.parse_stat_output(stat_output)
502
503
504def get_rms(input_audio, channels=1, bits=16, rate=48000):
505    """Gets the RMS values of all channels of the input audio.
506
507    @param input_audio: The input audio file to be checked.
508    @param channels: The number of channels in the input audio.
509    @param bits: The number of bits of each audio sample.
510    @param rate: The sampling rate.
511    """
512    stats = [get_channel_sox_stat(
513            input_audio, i + 1, channels=channels, bits=bits,
514            rate=rate) for i in xrange(channels)]
515
516    logging.info('sox stat: %s', [str(s) for s in stats])
517    return [s.rms for s in stats]
518
519
520def reduce_noise_and_get_rms(
521        input_audio, noise_file, channels=1, bits=16, rate=48000):
522    """Reduces noise in the input audio by the given noise file and then gets
523    the RMS values of all channels of the input audio.
524
525    @param input_audio: The input audio file to be analyzed.
526    @param noise_file: The noise file used to reduce noise in the input audio.
527    @param channels: The number of channels in the input audio.
528    @param bits: The number of bits of each audio sample.
529    @param rate: The sampling rate.
530    """
531    with tempfile.NamedTemporaryFile() as reduced_file:
532        p1 = cmd_utils.popen(
533                sox_utils.noise_profile_cmd(
534                        noise_file, '-', channels=channels, bits=bits,
535                        rate=rate),
536                stdout=cmd_utils.PIPE)
537        p2 = cmd_utils.popen(
538                sox_utils.noise_reduce_cmd(
539                        input_audio, reduced_file.name, '-',
540                        channels=channels, bits=bits, rate=rate),
541                stdin=p1.stdout)
542        cmd_utils.wait_and_check_returncode(p1, p2)
543        return get_rms(reduced_file.name, channels, bits, rate)
544
545
546def skip_devices_to_test(*boards):
547    """Devices to skip due to hardware or test compatibility issues.
548
549    @param boards: the boards to skip testing.
550    """
551    # TODO(scottz): Remove this when crbug.com/220147 is fixed.
552    dut_board = utils.get_current_board()
553    if dut_board in boards:
554       raise error.TestNAError('This test is not available on %s' % dut_board)
555
556
557def cras_rms_test_setup():
558    """Setups for the cras_rms_tests.
559
560    To make sure the line_out-to-mic_in path is all green.
561    """
562    # TODO(owenlin): Now, the nodes are choosed by chrome.
563    #                We should do it here.
564    cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME)
565    cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME)
566
567    cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN)
568
569    cras_utils.set_system_mute(False)
570    cras_utils.set_capture_mute(False)
571
572
573def generate_rms_postmortem():
574    """Generates postmortem for rms tests."""
575    try:
576        logging.info('audio postmortem report')
577        log_loopback_dongle_status()
578        logging.info(get_audio_diagnostics())
579    except Exception:
580        logging.exception('Error while generating postmortem report')
581
582
583def get_audio_diagnostics():
584    """Gets audio diagnostic results.
585
586    @returns: a string containing diagnostic results.
587
588    """
589    return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=cmd_utils.PIPE)
590
591
592def get_max_cross_correlation(signal_a, signal_b):
593    """Gets max cross-correlation and best time delay of two signals.
594
595    Computes cross-correlation function between two
596    signals and gets the maximum value and time delay.
597    The steps includes:
598      1. Compute cross-correlation function of X and Y and get Cxy.
599         The correlation function Cxy is an array where Cxy[k] is the
600         cross product of X and Y when Y is delayed by k.
601         Refer to manual of numpy.correlate for detail of correlation.
602      2. Find the maximum value C_max and index C_index in Cxy.
603      3. Compute L2 norm of X and Y to get norm(X) and norm(Y).
604      4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation.
605
606    Max cross-correlation indicates the similarity of X and Y. The value
607    is 1 if X equals Y multiplied by a positive scalar.
608    The value is -1 if X equals Y multiplied by a negative scaler.
609    Any constant level shift will be regarded as distortion and will make
610    max cross-correlation value deviated from 1.
611    C_index is the best time delay of Y that make Y looks similar to X.
612    Refer to http://en.wikipedia.org/wiki/Cross-correlation.
613
614    @param signal_a: A list of numbers which contains the first signal.
615    @param signal_b: A list of numbers which contains the second signal.
616
617    @raises: ValueError if any number in signal_a or signal_b is not a float.
618             ValueError if norm of any array is less than _MINIMUM_NORM.
619
620    @returns: A tuple (correlation index, best delay). If there are more than
621              one best delay, just return the first one.
622    """
623    def check_list_contains_float(numbers):
624        """Checks the elements in a list are all float.
625
626        @param numbers: A list of numbers.
627
628        @raises: ValueError if there is any element which is not a float
629                 in the list.
630        """
631        if any(not isinstance(x, float) for x in numbers):
632            raise ValueError('List contains number which is not a float')
633
634    check_list_contains_float(signal_a)
635    check_list_contains_float(signal_b)
636
637    norm_a = numpy.linalg.norm(signal_a)
638    norm_b = numpy.linalg.norm(signal_b)
639    logging.debug('norm_a: %f', norm_a)
640    logging.debug('norm_b: %f', norm_b)
641    if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM:
642        raise ValueError('No meaningful data as norm is too small.')
643
644    correlation = numpy.correlate(signal_a, signal_b, 'full')
645    max_correlation = max(correlation)
646    best_delays = [i for i, j in enumerate(correlation) if j == max_correlation]
647    if len(best_delays) > 1:
648        logging.warning('There are more than one best delay: %r', best_delays)
649    return max_correlation / (norm_a * norm_b), best_delays[0]
650
651
652def trim_data(data, threshold=0):
653    """Trims a data by removing value that is too small in head and tail.
654
655    Removes elements in head and tail whose absolute value is smaller than
656    or equal to threshold.
657    E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) =
658    ([0.2, 0.3, 0.2], 2)
659
660    @param data: A list of numbers.
661    @param threshold: The threshold to compare against.
662
663    @returns: A tuple (trimmed_data, end_trimmed_length), where
664              end_trimmed_length is the length of original data being trimmed
665              from the end.
666              Returns ([], None) if there is no valid data.
667    """
668    indice_valid = [
669            i for i, j in enumerate(data) if abs(j) > threshold]
670    if not indice_valid:
671        logging.warning(
672                'There is no element with absolute value greater '
673                'than threshold %f', threshold)
674        return [], None
675    logging.debug('Start and end of indice_valid: %d, %d',
676                  indice_valid[0], indice_valid[-1])
677    end_trimmed_length = len(data) - indice_valid[-1] - 1
678    logging.debug('Trimmed length in the end: %d', end_trimmed_length)
679    return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length)
680
681
682def get_one_channel_correlation(test_data, golden_data):
683    """Gets max cross-correlation of test_data and golden_data.
684
685    Trims test data and compute the max cross-correlation against golden_data.
686    Signal can be trimmed because those zero values in the head and tail of
687    a signal will not affect correlation computation.
688
689    @param test_data: A list containing the data to compare against golden data.
690    @param golden_data: A list containing the golden data.
691
692    @returns: A tuple (max cross-correlation, best_delay) if data is valid.
693              Otherwise returns (None, None). Refer to docstring of
694              get_max_cross_correlation.
695    """
696    trimmed_test_data, end_trimmed_length = trim_data(test_data)
697
698    def to_float(samples):
699      """Casts elements in the list to float.
700
701      @param samples: A list of numbers.
702
703      @returns: A list of original numbers casted to float.
704      """
705      samples_float = [float(x) for x in samples]
706      return samples_float
707
708    max_cross_correlation, best_delay =  get_max_cross_correlation(
709            to_float(golden_data),
710            to_float(trimmed_test_data))
711
712    # The reason to add back the trimmed length in the end.
713    # E.g.:
714    # golden data:
715    #
716    # |-----------vvvv----------------|  vvvv is the signal of interest.
717    #       a                 b
718    #
719    # test data:
720    #
721    # |---x----vvvv--------x----------------|  x is the place to trim.
722    #   c   d         e            f
723    #
724    # trimmed test data:
725    #
726    # |----vvvv--------|
727    #   d         e
728    #
729    # The first output of cross correlation computation :
730    #
731    #                  |-----------vvvv----------------|
732    #                       a                 b
733    #
734    # |----vvvv--------|
735    #   d         e
736    #
737    # The largest output of cross correlation computation happens at
738    # delay a + e.
739    #
740    #                  |-----------vvvv----------------|
741    #                       a                 b
742    #
743    #                         |----vvvv--------|
744    #                           d         e
745    #
746    # Cross correlation starts computing by aligning the last sample
747    # of the trimmed test data to the first sample of golden data.
748    # The best delay calculated from trimmed test data and golden data
749    # cross correlation is e + a. But the real best delay that should be
750    # identical on two channel should be e + a + f.
751    # So we need to add back the length being trimmed in the end.
752
753    if max_cross_correlation:
754        return max_cross_correlation, best_delay + end_trimmed_length
755    else:
756        return None, None
757
758
759def compare_one_channel_correlation(test_data, golden_data, parameters):
760    """Compares two one-channel data by correlation.
761
762    @param test_data: A list containing the data to compare against golden data.
763    @param golden_data: A list containing the golden data.
764    @param parameters: A dict containing parameters for method.
765
766    @returns: A dict containing:
767              index: The index of similarity where 1 means they are different
768                  only by a positive scale.
769              best_delay: The best delay of test data in relative to golden
770                  data.
771              equal: A bool containing comparing result.
772    """
773    if 'correlation_threshold' in parameters:
774        threshold = parameters['correlation_threshold']
775    else:
776        threshold = _CORRELATION_INDEX_THRESHOLD
777
778    result_dict = dict()
779    max_cross_correlation, best_delay = get_one_channel_correlation(
780            test_data, golden_data)
781    result_dict['index'] = max_cross_correlation
782    result_dict['best_delay'] = best_delay
783    result_dict['equal'] = True if (
784        max_cross_correlation and
785        max_cross_correlation > threshold) else False
786    logging.debug('result_dict: %r', result_dict)
787    return result_dict
788
789
790def get_one_channel_stat(data, data_format):
791    """Gets statistic information of data.
792
793    @param data: A list containing one channel data.
794    @param data_format: A dict containing data format of data.
795
796    @return: The sox stat parsed result. An object containing
797             sameple_count: An int. Samples read.
798             length: A float. Length in seconds.
799             rms: A float. RMS amplitude.
800             rough_frequency: A float. Rough frequency.
801    """
802    if not data:
803        raise ValueError('Data is empty. Can not get stat')
804    raw_data = audio_data.AudioRawData(
805            binary=None, channel=1,
806            sample_format=data_format['sample_format'])
807    raw_data.copy_channel_data([data])
808    with tempfile.NamedTemporaryFile() as raw_data_file:
809        raw_data_path = raw_data_file.name
810        raw_data.write_to_file(raw_data_path)
811
812        bits = 8 * (audio_data.SAMPLE_FORMATS[
813                    data_format['sample_format']]['size_bytes'])
814        stat = sox_utils.get_stat(raw_data_path, channels=1, bits=bits,
815                                  rate=data_format['rate'])
816        return stat
817
818
819def compare_one_channel_frequency(test_data, test_data_format,
820                                  golden_data, golden_data_format):
821    """Compares two one-channel data by frequency.
822
823    @param test_data: A list containing the data to compare against golden data.
824    @param test_data_format: A dict containing data format of test data.
825    @param golden_data: A list containing the golden data.
826    @param golden_data_format: A dict containing data format of golden data.
827
828    @returns: A dict containing:
829              test_data_frequency: test data frequency.
830              golden_data_frequency: golden data frequency.
831              equal: A bool containing comparing result.
832
833    @raises: ValueError if the test data RMS is too small to be meaningful.
834
835    """
836    result_dict = dict()
837    golden_data_stat = get_one_channel_stat(golden_data, golden_data_format)
838    logging.info('Get golden data one channel stat: %s', golden_data_stat)
839    test_data_stat = get_one_channel_stat(test_data, test_data_format)
840    logging.info('Get test data one channel stat: %s', test_data_stat)
841
842    result_dict['golden_data_frequency'] = golden_data_stat.rough_frequency
843    result_dict['test_data_frequency'] = test_data_stat.rough_frequency
844    result_dict['equal'] = True if (
845            abs(result_dict['test_data_frequency'] -
846                result_dict['golden_data_frequency']) < _FREQUENCY_DIFF_THRESHOLD
847            ) else False
848    logging.debug('result_dict: %r', result_dict)
849    if test_data_stat.rms < _MEANINGFUL_RMS_THRESHOLD:
850        raise ValueError('Recorded RMS %f is too small to be meaningful.',
851                         test_data_stat.rms)
852    return result_dict
853
854
855def compare_one_channel_data(test_data, test_data_format,
856                             golden_data, golden_data_format, method,
857                             parameters):
858    """Compares two one-channel data.
859
860    @param test_data: A list containing the data to compare against golden data.
861    @param test_data_format: The data format of test data.
862    @param golden_data: A list containing the golden data.
863    @param golden_data_format: The data format of golden data.
864    @param method: The comparing method. Currently only 'correlation' is
865                   supported.
866    @param parameters: A dict containing parameters for method.
867
868    @returns: A dict containing:
869              index: The index of similarity where 1 means they are different
870                  only by a positive scale.
871              best_delay: The best delay of test data in relative to golden
872                  data.
873              equal: A bool containing comparing result.
874
875    @raises: NotImplementedError if method is not supported.
876    """
877    if method == 'correlation':
878        return compare_one_channel_correlation(test_data, golden_data,
879                parameters)
880    if method == 'frequency':
881        return compare_one_channel_frequency(
882                test_data, test_data_format, golden_data, golden_data_format)
883    raise NotImplementedError('method %s is not implemented' % method)
884
885
886def compare_data(golden_data_binary, golden_data_format,
887                 test_data_binary, test_data_format,
888                 channel_map, method, parameters=None):
889    """Compares two raw data.
890
891    @param golden_data_binary: The binary containing golden data.
892    @param golden_data_format: The data format of golden data.
893    @param test_data_binary: The binary containing test data.
894    @param test_data_format: The data format of test data.
895    @param channel_map: A list containing channel mapping.
896                        E.g. [1, 0, None, None, None, None, None, None] means
897                        channel 0 of test data should map to channel 1 of
898                        golden data. Channel 1 of test data should map to
899                        channel 0 of golden data. Channel 2 to 7 of test data
900                        should be skipped.
901    @param method: The method to compare data. Use 'correlation' to compare
902                   general data. Use 'frequency' to compare data containing
903                   sine wave.
904
905    @param parameters: A dict containing parameters for method, if needed.
906
907    @returns: A boolean for compare result.
908
909    @raises: NotImplementedError if file type is not raw.
910             NotImplementedError if sampling rates of two data are not the same.
911    """
912    if parameters is None:
913        parameters = dict()
914
915    if (golden_data_format['file_type'] != 'raw' or
916        test_data_format['file_type'] != 'raw'):
917        raise NotImplementedError('Only support raw data in compare_data.')
918    if (golden_data_format['rate'] != test_data_format['rate']):
919        raise NotImplementedError(
920                'Only support comparing data with the same sampling rate')
921    golden_data = audio_data.AudioRawData(
922            binary=golden_data_binary,
923            channel=golden_data_format['channel'],
924            sample_format=golden_data_format['sample_format'])
925    test_data = audio_data.AudioRawData(
926            binary=test_data_binary,
927            channel=test_data_format['channel'],
928            sample_format=test_data_format['sample_format'])
929    compare_results = []
930    for test_channel, golden_channel in enumerate(channel_map):
931        if golden_channel is None:
932            logging.info('Skipped channel %d', test_channel)
933            continue
934        test_data_one_channel = test_data.channel_data[test_channel]
935        golden_data_one_channel = golden_data.channel_data[golden_channel]
936        result_dict = dict(test_channel=test_channel,
937                           golden_channel=golden_channel)
938        result_dict.update(
939                compare_one_channel_data(
940                        test_data_one_channel, test_data_format,
941                        golden_data_one_channel, golden_data_format, method,
942                        parameters))
943        compare_results.append(result_dict)
944    logging.info('compare_results: %r', compare_results)
945    return_value = False if not compare_results else True
946    for result in compare_results:
947        if not result['equal']:
948            logging.error(
949                    'Failed on test channel %d and golden channel %d',
950                    result['test_channel'], result['golden_channel'])
951            return_value = False
952    # Also checks best delay are exactly the same.
953    if method == 'correlation':
954        best_delays = set([result['best_delay'] for result in compare_results])
955        if len(best_delays) > 1:
956            logging.error('There are more than one best delay.')
957            return_value = False
958    return return_value
959
960
961class _base_rms_test(test.test):
962    """Base class for all rms_test """
963
964    def postprocess(self):
965        super(_base_rms_test, self).postprocess()
966
967        # Sum up the number of failed constraints in each iteration
968        if sum(len(x) for x in self.failed_constraints):
969            generate_rms_postmortem()
970
971
972class chrome_rms_test(_base_rms_test):
973    """Base test class for audio RMS test with Chrome.
974
975    The chrome instance can be accessed by self.chrome.
976    """
977    def warmup(self):
978        skip_devices_to_test('x86-mario')
979        super(chrome_rms_test, self).warmup()
980
981        # Not all client of this file using telemetry.
982        # Just do the import here for those who really need it.
983        from autotest_lib.client.common_lib.cros import chrome
984
985        self.chrome = chrome.Chrome()
986
987        # The audio configuration could be changed when we
988        # restart chrome.
989        try:
990            cras_rms_test_setup()
991        except Exception:
992            self.chrome.browser.Close()
993            raise
994
995
996    def cleanup(self, *args):
997        try:
998            self.chrome.browser.Close()
999        finally:
1000            super(chrome_rms_test, self).cleanup()
1001
1002class cras_rms_test(_base_rms_test):
1003    """Base test class for CRAS audio RMS test."""
1004
1005    def warmup(self):
1006        skip_devices_to_test('x86-mario')
1007        super(cras_rms_test, self).warmup()
1008        cras_rms_test_setup()
1009
1010
1011def alsa_rms_test_setup():
1012    """Setup for alsa_rms_test.
1013
1014    Different boards/chipsets have different set of mixer controls.  Even
1015    controls that have the same name on different boards might have different
1016    capabilities.  The following is a general idea to setup a given class of
1017    boards, and some specialized setup for certain boards.
1018    """
1019    card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic')
1020    arch = utils.get_arch()
1021    board = utils.get_board()
1022    uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090')
1023    if board in ['daisy_spring', 'daisy_skate']:
1024        # The MIC controls of the boards do not support dB syntax.
1025        alsa_utils.mixer_cmd(card_id,
1026                             'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME)
1027        alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_MAX_VOLUME)
1028        alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_MAX_VOLUME)
1029    elif arch in ['armv7l', 'aarch64'] or uses_max98090:
1030        # ARM platforms or Intel platforms that uses max98090 codec driver.
1031        alsa_utils.mixer_cmd(card_id,
1032                             'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME)
1033        alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_CAPTURE_GAIN)
1034        alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_CAPTURE_GAIN)
1035    else:
1036        # The rest of Intel platforms.
1037        alsa_utils.mixer_cmd(card_id, 'sset Master ' + _DEFAULT_ALSA_MAX_VOLUME)
1038        alsa_utils.mixer_cmd(card_id,
1039                             'sset Capture ' + _DEFAULT_ALSA_CAPTURE_GAIN)
1040
1041
1042class alsa_rms_test(_base_rms_test):
1043    """Base test class for ALSA audio RMS test."""
1044
1045    def warmup(self):
1046        skip_devices_to_test('x86-mario')
1047        super(alsa_rms_test, self).warmup()
1048
1049        alsa_rms_test_setup()
1050