1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18import os 19import wave 20 21from acts.controllers.utils_lib.ssh import connection 22from acts.controllers.utils_lib.ssh import settings 23from acts.test_utils.audio_analysis_lib import audio_analysis 24from acts.test_utils.audio_analysis_lib.check_quality import quality_analysis 25from acts.test_utils.coex.audio_capture import AudioCapture 26from acts.test_utils.coex.audio_capture import RECORD_FILE_TEMPLATE 27 28ANOMALY_DETECTION_BLOCK_SIZE = audio_analysis.ANOMALY_DETECTION_BLOCK_SIZE 29ANOMALY_GROUPING_TOLERANCE = audio_analysis.ANOMALY_GROUPING_TOLERANCE 30PATTERN_MATCHING_THRESHOLD = audio_analysis.PATTERN_MATCHING_THRESHOLD 31ANALYSIS_FILE_TEMPLATE = "audio_analysis_%s.txt" 32bits_per_sample = 32 33 34 35class FileNotFound(Exception): 36 """Raises Exception if file is not present""" 37 38 39class SshAudioCapture(AudioCapture): 40 41 def __init__(self, test_params, path): 42 super(SshAudioCapture, self).__init__(test_params, path) 43 self.remote_path = path 44 self.ssh_session = None 45 46 def capture_audio(self, trim_beginning=0, trim_end=0): 47 """Captures audio and store results. 48 49 Args: 50 trim_beginning: To trim audio at the start in seconds. 51 trim_end: To trim audio at the end in seconds. 52 53 Returns: 54 Returns exit status of ssh connection. 55 """ 56 if not trim_beginning: 57 trim_beginning = self.audio_params.get('trim_beginning', 0) 58 if not trim_end: 59 trim_end = self.audio_params.get('trim_end', 0) 60 if self.audio_params["ssh_config"]: 61 ssh_settings = settings.from_config( 62 self.audio_params["ssh_config"]) 63 self.ssh_session = connection.SshConnection(ssh_settings) 64 cur_path = os.path.dirname(os.path.realpath(__file__)) 65 local_path = os.path.join(cur_path, "audio_capture.py") 66 self.ssh_session.send_file(local_path, 67 self.audio_params["dest_path"]) 68 path = self.audio_params["dest_path"] 69 test_params = str(self.audio_params).replace("\'", "\"") 70 self.cmd = "python3 audio_capture.py -p '{}' -t '{}'".format( 71 path, test_params) 72 job_result = self.ssh_session.run(self.cmd) 73 logging.debug("Job Result {}".format(job_result.stdout)) 74 self.ssh_session.pull_file( 75 self.remote_path, os.path.join( 76 self.audio_params["dest_path"], "*.wav"), 77 ignore_status=True) 78 return bool(not job_result.exit_status) 79 else: 80 return self.capture_and_store_audio(trim_beginning, trim_end) 81 82 def terminate_and_store_audio_results(self): 83 """Terminates audio and stores audio files.""" 84 if self.audio_params["ssh_config"]: 85 self.ssh_session.run('rm *.wav', ignore_status=True) 86 else: 87 self.terminate_audio() 88 89 def THDN(self, win_size=None, step_size=None, q=1, freq=None): 90 """Calculate THD+N value for most recently recorded file. 91 92 Args: 93 win_size: analysis window size (must be less than length of 94 signal). Used with step size to analyze signal piece by 95 piece. If not specified, entire signal will be analyzed. 96 step_size: number of samples to move window per-analysis. If not 97 specified, entire signal will be analyzed. 98 q: quality factor for the notch filter used to remove fundamental 99 frequency from signal to isolate noise. 100 freq: the fundamental frequency to remove from the signal. If none, 101 the fundamental frequency will be determined using FFT. 102 Returns: 103 channel_results (list): THD+N value for each channel's signal. 104 List index corresponds to channel index. 105 """ 106 latest_file_path = self.record_file_template % self.last_fileno() 107 if not (win_size and step_size): 108 return audio_analysis.get_file_THDN(filename=latest_file_path, 109 q=q, 110 freq=freq) 111 else: 112 return audio_analysis.get_file_max_THDN(filename=latest_file_path, 113 step_size=step_size, 114 window_size=win_size, 115 q=q, 116 freq=freq) 117 118 def detect_anomalies(self, freq=None, 119 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 120 threshold=PATTERN_MATCHING_THRESHOLD, 121 tolerance=ANOMALY_GROUPING_TOLERANCE): 122 """Detect anomalies in most recently recorded file. 123 124 An anomaly is defined as a sample in a recorded sine wave that differs 125 by at least the value defined by the threshold parameter from a golden 126 generated sine wave of the same amplitude, sample rate, and frequency. 127 128 Args: 129 freq (int|float): fundamental frequency of the signal. All other 130 frequencies are noise. If None, will be calculated with FFT. 131 block_size (int): the number of samples to analyze at a time in the 132 anomaly detection algorithm. 133 threshold (float): the threshold of the correlation index to 134 determine if two sample values match. 135 tolerance (float): the sample tolerance for anomaly time values 136 to be grouped as the same anomaly 137 Returns: 138 channel_results (list): anomaly durations for each channel's signal. 139 List index corresponds to channel index. 140 """ 141 latest_file_path = self.record_file_template % self.last_fileno() 142 return audio_analysis.get_file_anomaly_durations( 143 filename=latest_file_path, 144 freq=freq, 145 block_size=block_size, 146 threshold=threshold, 147 tolerance=tolerance) 148 149 def get_last_record_duration_millis(self): 150 """Get duration of most recently recorded file. 151 152 Returns: 153 duration (float): duration of recorded file in milliseconds. 154 """ 155 latest_file_path = self.record_file_template % self.last_fileno() 156 with wave.open(latest_file_path, 'r') as f: 157 frames = f.getnframes() 158 rate = f.getframerate() 159 duration = (frames / float(rate)) * 1000 160 return duration 161 162 def audio_quality_analysis(self, path): 163 """Measures audio quality based on the audio file given as input. 164 165 Args: 166 path: Log path 167 168 Returns: 169 analysis_path on success. 170 """ 171 dest_file_path = os.path.join(path, 172 RECORD_FILE_TEMPLATE % self.last_fileno()) 173 analysis_path = os.path.join(path, 174 ANALYSIS_FILE_TEMPLATE % self.last_fileno()) 175 if not os.path.exists(dest_file_path): 176 raise FileNotFound("Recorded file not found") 177 try: 178 quality_analysis( 179 filename=dest_file_path, 180 output_file=analysis_path, 181 bit_width=bits_per_sample, 182 rate=self.audio_params["sample_rate"], 183 channel=self.audio_params["channel"], 184 spectral_only=False) 185 except Exception as err: 186 logging.exception("Failed to analyze raw audio: %s" % err) 187 return analysis_path 188 189