1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18import numpy 19import os 20import scipy.io.wavfile as sciwav 21 22from acts_contrib.test_utils.coex.audio_capture_device import AudioCaptureBase 23from acts_contrib.test_utils.coex.audio_capture_device import CaptureAudioOverAdb 24from acts_contrib.test_utils.coex.audio_capture_device import CaptureAudioOverLocal 25from acts_contrib.test_utils.audio_analysis_lib import audio_analysis 26from acts_contrib.test_utils.audio_analysis_lib.check_quality import quality_analysis 27 28ANOMALY_DETECTION_BLOCK_SIZE = audio_analysis.ANOMALY_DETECTION_BLOCK_SIZE 29ANOMALY_GROUPING_TOLERANCE = audio_analysis.ANOMALY_GROUPING_TOLERANCE 30PATTERN_MATCHING_THRESHOLD = audio_analysis.PATTERN_MATCHING_THRESHOLD 31ANALYSIS_FILE_TEMPLATE = "audio_analysis_%s.txt" 32bits_per_sample = 32 33 34 35def get_audio_capture_device(ad, audio_params): 36 """Gets the device object of the audio capture device connected to server. 37 38 The audio capture device returned is specified by the audio_params 39 within user_params. audio_params must specify a "type" field, that 40 is either "AndroidDevice" or "Local" 41 42 Args: 43 ad: Android Device object. 44 audio_params: object containing variables to record audio. 45 46 Returns: 47 Object of the audio capture device. 48 49 Raises: 50 ValueError if audio_params['type'] is not "AndroidDevice" or 51 "Local". 52 """ 53 54 if audio_params['type'] == 'AndroidDevice': 55 return CaptureAudioOverAdb(ad, audio_params) 56 57 elif audio_params['type'] == 'Local': 58 return CaptureAudioOverLocal(audio_params) 59 60 else: 61 raise ValueError('Unrecognized audio capture device ' 62 '%s' % audio_params['type']) 63 64 65class FileNotFound(Exception): 66 """Raises Exception if file is not present""" 67 68 69class AudioCaptureResult(AudioCaptureBase): 70 def __init__(self, path, audio_params=None): 71 """Initializes Audio Capture Result class. 72 73 Args: 74 path: Path of audio capture result. 75 """ 76 super().__init__() 77 self.path = path 78 self.audio_params = audio_params 79 self.analysis_path = os.path.join(self.log_path, 80 ANALYSIS_FILE_TEMPLATE) 81 if self.audio_params: 82 self._trim_wave_file() 83 84 def THDN(self, win_size=None, step_size=None, q=1, freq=None): 85 """Calculate THD+N value for most recently recorded file. 86 87 Args: 88 win_size: analysis window size (must be less than length of 89 signal). Used with step size to analyze signal piece by 90 piece. If not specified, entire signal will be analyzed. 91 step_size: number of samples to move window per-analysis. If not 92 specified, entire signal will be analyzed. 93 q: quality factor for the notch filter used to remove fundamental 94 frequency from signal to isolate noise. 95 freq: the fundamental frequency to remove from the signal. If none, 96 the fundamental frequency will be determined using FFT. 97 Returns: 98 channel_results (list): THD+N value for each channel's signal. 99 List index corresponds to channel index. 100 """ 101 if not (win_size and step_size): 102 return audio_analysis.get_file_THDN(filename=self.path, 103 q=q, 104 freq=freq) 105 else: 106 return audio_analysis.get_file_max_THDN(filename=self.path, 107 step_size=step_size, 108 window_size=win_size, 109 q=q, 110 freq=freq) 111 112 def detect_anomalies(self, 113 freq=None, 114 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 115 threshold=PATTERN_MATCHING_THRESHOLD, 116 tolerance=ANOMALY_GROUPING_TOLERANCE): 117 """Detect anomalies in most recently recorded file. 118 119 An anomaly is defined as a sample in a recorded sine wave that differs 120 by at least the value defined by the threshold parameter from a golden 121 generated sine wave of the same amplitude, sample rate, and frequency. 122 123 Args: 124 freq (int|float): fundamental frequency of the signal. All other 125 frequencies are noise. If None, will be calculated with FFT. 126 block_size (int): the number of samples to analyze at a time in the 127 anomaly detection algorithm. 128 threshold (float): the threshold of the correlation index to 129 determine if two sample values match. 130 tolerance (float): the sample tolerance for anomaly time values 131 to be grouped as the same anomaly 132 Returns: 133 channel_results (list): anomaly durations for each channel's signal. 134 List index corresponds to channel index. 135 """ 136 return audio_analysis.get_file_anomaly_durations(filename=self.path, 137 freq=freq, 138 block_size=block_size, 139 threshold=threshold, 140 tolerance=tolerance) 141 142 @property 143 def analysis_fileno(self): 144 """Returns the file number to dump audio analysis results.""" 145 counter = 0 146 while os.path.exists(self.analysis_path % counter): 147 counter += 1 148 return counter 149 150 def audio_quality_analysis(self): 151 """Measures audio quality based on the audio file given as input. 152 153 Returns: 154 analysis_path on success. 155 """ 156 analysis_path = self.analysis_path % self.analysis_fileno 157 if not os.path.exists(self.path): 158 raise FileNotFound("Recorded file not found") 159 try: 160 quality_analysis(filename=self.path, 161 output_file=analysis_path, 162 bit_width=bits_per_sample, 163 rate=self.audio_params["sample_rate"], 164 channel=self.audio_params["channel"], 165 spectral_only=False) 166 except Exception as err: 167 logging.exception("Failed to analyze raw audio: %s" % err) 168 return analysis_path 169 170 def _trim_wave_file(self): 171 """Trim wave files. 172 173 """ 174 original_record_file_name = 'original_' + os.path.basename(self.path) 175 original_record_file_path = os.path.join(os.path.dirname(self.path), 176 original_record_file_name) 177 os.rename(self.path, original_record_file_path) 178 fs, data = sciwav.read(original_record_file_path) 179 trim_start = self.audio_params['trim_start'] 180 trim_end = self.audio_params['trim_end'] 181 trim = numpy.array([[trim_start, trim_end]]) 182 trim = trim * fs 183 new_wave_file_list = [] 184 for elem in trim: 185 # To check start and end doesn't exceed raw data dimension 186 start_read = min(elem[0], data.shape[0] - 1) 187 end_read = min(elem[1], data.shape[0] - 1) 188 new_wave_file_list.extend(data[start_read:end_read]) 189 new_wave_file = numpy.array(new_wave_file_list) 190 191 sciwav.write(self.path, fs, new_wave_file) 192