# Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Feedback implementation for audio with closed-loop cable.""" import logging import numpy import os import tempfile import wave import common from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import site_utils from autotest_lib.client.common_lib.feedback import client from autotest_lib.server.brillo import host_utils # Constants used for updating the audio policy. # _DUT_AUDIO_POLICY_PATH = 'system/etc/audio_policy.conf' _AUDIO_POLICY_ATTACHED_INPUT_DEVICES = 'attached_input_devices' _AUDIO_POLICY_ATTACHED_OUTPUT_DEVICES = 'attached_output_devices' _AUDIO_POLICY_DEFAULT_OUTPUT_DEVICE = 'default_output_device' _WIRED_HEADSET_IN = 'AUDIO_DEVICE_IN_WIRED_HEADSET' _WIRED_HEADSET_OUT = 'AUDIO_DEVICE_OUT_WIRED_HEADSET' # Constants used when recording playback. # _REC_FILENAME = 'rec_file.wav' _REC_DURATION = 10 # Number of channels to record. _DEFAULT_NUM_CHANNELS = 1 # Recording sample rate (48kHz). _DEFAULT_SAMPLE_RATE = 48000 # Recording sample format is signed 16-bit PCM (two bytes). _DEFAULT_SAMPLE_WIDTH = 2 # The peak when recording silence is 5% of the max volume. _SILENCE_THRESHOLD = 0.05 # Thresholds used when comparing files. # # The frequency threshold used when comparing files. The frequency of the # recorded audio has to be within _FREQUENCY_THRESHOLD percent of the frequency # of the original audio. _FREQUENCY_THRESHOLD = 0.01 # Noise threshold controls how much noise is allowed as a fraction of the # magnitude of the peak frequency after taking an FFT. The power of all the # other frequencies in the signal should be within _FFT_NOISE_THRESHOLD percent # of the power of the main frequency. _FFT_NOISE_THRESHOLD = 0.05 def _max_volume(sample_width): """Returns the maximum possible volume. This is the highest absolute value of an integer of a given width. If the sample width is one, then we assume an unsigned intger. For all other sample sizes, we assume that the format is signed. @param sample_width: The sample width in bytes. """ return (1 << 8) if sample_width == 1 else (1 << (sample_width * 8 - 1)) class Client(client.Client): """Audio closed-loop feedback implementation. This class (and the queries it instantiates) perform playback and recording of audio on the DUT itself, with the assumption that the audio in/out connections are cross-wired with a cable. It provides some shared logic that queries can use for handling the DUT as well as maintaining shared state between queries (such as an audible volume threshold). """ def __init__(self): """Construct the client library.""" super(Client, self).__init__() self.host = None self.dut_tmp_dir = None self.tmp_dir = None self.orig_policy = None def set_audible_threshold(self, threshold): """Sets the audible volume threshold. @param threshold: New threshold value. """ self.audible_threshold = threshold def _patch_audio_policy(self): """Updates the audio_policy.conf file to use the headphone jack. Currently, there's no way to update the audio routing if a headset is plugged in. This function manually changes the audio routing to play through the headset. TODO(ralphnathan): Remove this once b/25188354 is resolved. """ # Fetch the DUT's original audio policy. _, self.orig_policy = tempfile.mkstemp(dir=self.tmp_dir) self.host.get_file(_DUT_AUDIO_POLICY_PATH, self.orig_policy, delete_dest=True) # Patch the policy to route audio to a headset. _, test_policy = tempfile.mkstemp(dir=self.tmp_dir) policy_changed = False with open(self.orig_policy) as orig_file: with open(test_policy, 'w') as test_file: for line in orig_file: if _WIRED_HEADSET_OUT not in line: if _AUDIO_POLICY_ATTACHED_OUTPUT_DEVICES in line: line = '%s|%s\n' % (line.rstrip(), _WIRED_HEADSET_OUT) policy_changed = True elif _AUDIO_POLICY_DEFAULT_OUTPUT_DEVICE in line: line = '%s %s\n' % (line.rstrip().rsplit(' ', 1)[0], _WIRED_HEADSET_OUT) policy_changed = True if _WIRED_HEADSET_IN not in line: if _AUDIO_POLICY_ATTACHED_INPUT_DEVICES in line: line = '%s|%s\n' % (line.rstrip(), _WIRED_HEADSET_IN) policy_changed = True test_file.write(line) # Update the DUT's audio policy if changed. if policy_changed: logging.info('Updating audio policy to route audio to headset') self.host.remount() self.host.send_file(test_policy, _DUT_AUDIO_POLICY_PATH, delete_dest=True) self.host.reboot() else: os.remove(self.orig_policy) self.orig_policy = None os.remove(test_policy) # Interface overrides. # def _initialize_impl(self, test, host): """Initializes the feedback object. @param test: An object representing the test case. @param host: An object representing the DUT. """ self.host = host self.tmp_dir = test.tmpdir self.dut_tmp_dir = host.get_tmp_dir() self._patch_audio_policy() def _finalize_impl(self): """Finalizes the feedback object.""" if self.orig_policy: logging.info('Restoring DUT audio policy') self.host.remount() self.host.send_file(self.orig_policy, _DUT_AUDIO_POLICY_PATH, delete_dest=True) os.remove(self.orig_policy) self.orig_policy = None def _new_query_impl(self, query_id): """Instantiates a new query. @param query_id: A query identifier. @return A query object. @raise error.TestError: Query is not supported. """ if query_id == client.QUERY_AUDIO_PLAYBACK_SILENT: return SilentPlaybackAudioQuery(self) elif query_id == client.QUERY_AUDIO_PLAYBACK_AUDIBLE: return AudiblePlaybackAudioQuery(self) elif query_id == client.QUERY_AUDIO_RECORDING: return RecordingAudioQuery(self) else: raise error.TestError('Unsupported query (%s)' % query_id) class _PlaybackAudioQuery(client.OutputQuery): """Playback query base class.""" def __init__(self, client): """Constructor. @param client: The instantiating client object. """ super(_PlaybackAudioQuery, self).__init__() self.client = client self.dut_rec_filename = None self.local_tmp_dir = None self.recording_pid = None def _get_local_rec_filename(self): """Waits for recording to finish and copies the file to the host. @return A string of the local filename containing the recorded audio. @raise error.TestError: Error while validating the recording. """ # Wait for recording to finish. timeout = _REC_DURATION + 5 if not host_utils.wait_for_process(self.client.host, self.recording_pid, timeout): raise error.TestError( 'Recording did not terminate within %d seconds' % timeout) _, local_rec_filename = tempfile.mkstemp( prefix='recording-', suffix='.wav', dir=self.local_tmp_dir) self.client.host.get_file(self.dut_rec_filename, local_rec_filename, delete_dest=True) return local_rec_filename # Implementation overrides. # def _prepare_impl(self, sample_width=_DEFAULT_SAMPLE_WIDTH, sample_rate=_DEFAULT_SAMPLE_RATE, num_channels=_DEFAULT_NUM_CHANNELS, duration_secs=_REC_DURATION): """Implementation of query preparation logic. @sample_width: Sample width to record at. @sample_rate: Sample rate to record at. @num_channels: Number of channels to record at. @duration_secs: Duration (in seconds) to record for. """ self.num_channels = num_channels self.sample_rate = sample_rate self.sample_width = sample_width self.dut_rec_filename = os.path.join(self.client.dut_tmp_dir, _REC_FILENAME) self.local_tmp_dir = tempfile.mkdtemp(dir=self.client.tmp_dir) # Trigger recording in the background. # TODO(garnold) Remove 'su root' once b/25663983 is resolved. cmd = ('su root slesTest_recBuffQueue -c%d -d%d -r%d -%d %s' % (num_channels, duration_secs, sample_rate, sample_width, self.dut_rec_filename)) logging.info("Recording cmd: %s", cmd) self.recording_pid = host_utils.run_in_background(self.client.host, cmd) class SilentPlaybackAudioQuery(_PlaybackAudioQuery): """Implementation of a silent playback query.""" def __init__(self, client): super(SilentPlaybackAudioQuery, self).__init__(client) # Implementation overrides. # def _validate_impl(self): """Implementation of query validation logic.""" local_rec_filename = self._get_local_rec_filename() try: silence_peaks = site_utils.check_wav_file( local_rec_filename, num_channels=self.num_channels, sample_rate=self.sample_rate, sample_width=self.sample_width) except ValueError as e: raise error.TestFail('Invalid file attributes: %s' % e) silence_peak = max(silence_peaks) # Fail if the silence peak volume exceeds the maximum allowed. max_vol = _max_volume(self.sample_width) * _SILENCE_THRESHOLD if silence_peak > max_vol: logging.error('Silence peak level (%d) exceeds the max allowed ' '(%d)', silence_peak, max_vol) raise error.TestFail('Environment is too noisy') # Update the client audible threshold, if so instructed. audible_threshold = silence_peak * 15 logging.info('Silent peak level (%d) is below the max allowed (%d); ' 'setting audible threshold to %d', silence_peak, max_vol, audible_threshold) self.client.set_audible_threshold(audible_threshold) class AudiblePlaybackAudioQuery(_PlaybackAudioQuery): """Implementation of an audible playback query.""" def __init__(self, client): super(AudiblePlaybackAudioQuery, self).__init__(client) def _check_peaks(self): """Ensure that peak recording volume exceeds the threshold.""" local_rec_filename = self._get_local_rec_filename() try: audible_peaks = site_utils.check_wav_file( local_rec_filename, num_channels=self.num_channels, sample_rate=self.sample_rate, sample_width=self.sample_width) except ValueError as e: raise error.TestFail('Invalid file attributes: %s' % e) min_channel, min_audible_peak = min(enumerate(audible_peaks), key=lambda p: p[1]) if min_audible_peak < self.client.audible_threshold: logging.error( 'Audible peak level (%d) is less than expected (%d) for ' 'channel %d', min_audible_peak, self.client.audible_threshold, min_channel) raise error.TestFail( 'The played audio peak level is below the expected ' 'threshold. Either playback did not work, or the volume ' 'level is too low. Check the audio connections and ' 'settings on the DUT.') logging.info('Audible peak level (%d) exceeds the threshold (%d)', min_audible_peak, self.client.audible_threshold) def _is_outside_frequency_threshold(self, freq_golden, freq_rec): """Compares the frequency of the recorded audio with the golden audio. This function checks to see if the frequencies corresponding to the peak FFT values are similiar meaning that the dominant frequency in the audio signal is the same for the recorded audio as that in the audio played. @freq_golden: The dominant frequency in the reference audio file. @freq_rec: The dominant frequency in the recorded audio file. @returns: True is freq_rec is with _FREQUENCY_THRESHOLD percent of freq_golden. """ ratio = float(freq_rec) / freq_golden if ratio > 1 + _FREQUENCY_THRESHOLD or ratio < 1 - _FREQUENCY_THRESHOLD: return True return False def _compare_file(self, audio_file): """Compares the recorded audio file to the golden audio file. This method checks for two things: 1. That the main frequency is the same in both the files. This is done using the FFT and observing the frequency corresponding to the peak. 2. That there is no other dominant frequency in the recorded file. This is done by sweeping the frequency domain and checking that the frequency is always less than _FFT_NOISE_THRESHOLD percentage of the peak. The key assumption here is that the reference audio file contains only one frequency. @param audio_file: Reference audio file containing the golden signal. @raise error.TestFail: The frequency of the recorded signal doesn't match that of the golden signal. @raise error.TestFail: There is too much noise in the recorded signal. """ local_rec_filename = self._get_local_rec_filename() # Open both files and extract data. golden_file = wave.open(audio_file, 'rb') golden_file_frames = site_utils.extract_wav_frames(golden_file) rec_file = wave.open(local_rec_filename, 'rb') rec_file_frames = site_utils.extract_wav_frames(rec_file) num_channels = golden_file.getnchannels() for channel in range(num_channels): golden_data = golden_file_frames[channel::num_channels] rec_data = rec_file_frames[channel::num_channels] # Get fft and frequencies corresponding to the fft values. fft_golden = numpy.fft.rfft(golden_data) fft_rec = numpy.fft.rfft(rec_data) fft_freqs_golden = numpy.fft.rfftfreq( len(golden_data), 1.0 / golden_file.getframerate()) fft_freqs_rec = numpy.fft.rfftfreq(len(rec_data), 1.0 / rec_file.getframerate()) # Get frequency at highest peak. freq_golden = fft_freqs_golden[numpy.argmax(numpy.abs(fft_golden))] abs_fft_rec = numpy.abs(fft_rec) freq_rec = fft_freqs_rec[numpy.argmax(abs_fft_rec)] # Compare the two frequencies. logging.info('Golden frequency = %f', freq_golden) logging.info('Recorded frequency = %f', freq_rec) if self._is_outside_frequency_threshold(freq_golden, freq_rec): raise error.TestFail('The recorded audio frequency does not ' 'match that of the audio played.') # Check for noise in the frequency domain. fft_rec_peak_val = numpy.max(abs_fft_rec) noise_detected = False for fft_index, fft_val in enumerate(abs_fft_rec): if self._is_outside_frequency_threshold(freq_golden, freq_rec): # If the frequency exceeds _FFT_NOISE_THRESHOLD, then fail # the test. if fft_val > _FFT_NOISE_THRESHOLD * fft_rec_peak_val: logging.warning('Unexpected frequency peak detected at ' '%f Hz.', fft_freqs_rec[fft_index]) noise_detected = True if noise_detected: raise error.TestFail('Signal is noiser than expected.') # Implementation overrides. # def _validate_impl(self, audio_file=None): """Implementation of query validation logic. @audio_file: File to compare recorded audio to. """ self._check_peaks() # If the reference audio file is available, then perform an additional # check. if audio_file: self._compare_file(audio_file) class RecordingAudioQuery(client.InputQuery): """Implementation of a recording query.""" def __init__(self, client): super(RecordingAudioQuery, self).__init__() self.client = client def _prepare_impl(self, **kwargs): """Implementation of query preparation logic (no-op).""" pass def _emit_impl(self): """Implementation of query emission logic.""" self.client.host.run('slesTest_sawtoothBufferQueue') def _validate_impl(self, captured_audio_file, sample_width, sample_rate=None, num_channels=None, peak_percent_min=1, peak_percent_max=100): """Implementation of query validation logic. @param captured_audio_file: Path to the recorded WAV file. @peak_percent_min: Lower bound on peak recorded volume as percentage of max molume (0-100). Default is 1%. @peak_percent_max: Upper bound on peak recorded volume as percentage of max molume (0-100). Default is 100% (no limit). """ # TODO(garnold) Currently, we just test whether anything audible was # recorded. We should compare the captured audio to the one produced. try: recorded_peaks = site_utils.check_wav_file( captured_audio_file, num_channels=num_channels, sample_rate=sample_rate, sample_width=sample_width) except ValueError as e: raise error.TestFail('Recorded audio file is invalid: %s' % e) max_volume = _max_volume(sample_width) peak_min = max_volume * peak_percent_min / 100 peak_max = max_volume * peak_percent_max / 100 for channel, recorded_peak in enumerate(recorded_peaks): if recorded_peak < peak_min: logging.error( 'Recorded audio peak level (%d) is less than expected ' '(%d) for channel %d', recorded_peak, peak_min, channel) raise error.TestFail( 'The recorded audio peak level is below the expected ' 'threshold. Either recording did not capture the ' 'produced audio, or the recording level is too low. ' 'Check the audio connections and settings on the DUT.') if recorded_peak > peak_max: logging.error( 'Recorded audio peak level (%d) is more than expected ' '(%d) for channel %d', recorded_peak, peak_max, channel) raise error.TestFail( 'The recorded audio peak level exceeds the expected ' 'maximum. Either recording captured much background ' 'noise, or the recording level is too high. Check the ' 'audio connections and settings on the DUT.')