• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18import os
19import wave
20
21from acts.controllers.utils_lib.ssh import connection
22from acts.controllers.utils_lib.ssh import settings
23from acts.test_utils.audio_analysis_lib import audio_analysis
24from acts.test_utils.audio_analysis_lib.check_quality import quality_analysis
25from acts.test_utils.coex.audio_capture import AudioCapture
26from acts.test_utils.coex.audio_capture import RECORD_FILE_TEMPLATE
27
28ANOMALY_DETECTION_BLOCK_SIZE = audio_analysis.ANOMALY_DETECTION_BLOCK_SIZE
29ANOMALY_GROUPING_TOLERANCE = audio_analysis.ANOMALY_GROUPING_TOLERANCE
30PATTERN_MATCHING_THRESHOLD = audio_analysis.PATTERN_MATCHING_THRESHOLD
31ANALYSIS_FILE_TEMPLATE = "audio_analysis_%s.txt"
32bits_per_sample = 32
33
34
35class FileNotFound(Exception):
36    """Raises Exception if file is not present"""
37
38
39class SshAudioCapture(AudioCapture):
40
41    def __init__(self, test_params, path):
42        super(SshAudioCapture, self).__init__(test_params, path)
43        self.remote_path = path
44        self.ssh_session = None
45
46    def capture_audio(self, trim_beginning=0, trim_end=0):
47        """Captures audio and store results.
48
49        Args:
50            trim_beginning: To trim audio at the start in seconds.
51            trim_end: To trim audio at the end in seconds.
52
53        Returns:
54            Returns exit status of ssh connection.
55        """
56        if not trim_beginning:
57            trim_beginning = self.audio_params.get('trim_beginning', 0)
58        if not trim_end:
59            trim_end = self.audio_params.get('trim_end', 0)
60        if self.audio_params["ssh_config"]:
61            ssh_settings = settings.from_config(
62                self.audio_params["ssh_config"])
63            self.ssh_session = connection.SshConnection(ssh_settings)
64            cur_path = os.path.dirname(os.path.realpath(__file__))
65            local_path = os.path.join(cur_path, "audio_capture.py")
66            self.ssh_session.send_file(local_path,
67                                       self.audio_params["dest_path"])
68            path = self.audio_params["dest_path"]
69            test_params = str(self.audio_params).replace("\'", "\"")
70            self.cmd = "python3 audio_capture.py -p '{}' -t '{}'".format(
71                path, test_params)
72            job_result = self.ssh_session.run(self.cmd)
73            logging.debug("Job Result {}".format(job_result.stdout))
74            self.ssh_session.pull_file(
75                self.remote_path, os.path.join(
76                    self.audio_params["dest_path"], "*.wav"),
77                    ignore_status=True)
78            return bool(not job_result.exit_status)
79        else:
80            return self.capture_and_store_audio(trim_beginning, trim_end)
81
82    def terminate_and_store_audio_results(self):
83        """Terminates audio and stores audio files."""
84        if self.audio_params["ssh_config"]:
85            self.ssh_session.run('rm *.wav', ignore_status=True)
86        else:
87            self.terminate_audio()
88
89    def THDN(self, win_size=None, step_size=None, q=1, freq=None):
90        """Calculate THD+N value for most recently recorded file.
91
92        Args:
93            win_size: analysis window size (must be less than length of
94                signal). Used with step size to analyze signal piece by
95                piece. If not specified, entire signal will be analyzed.
96            step_size: number of samples to move window per-analysis. If not
97                specified, entire signal will be analyzed.
98            q: quality factor for the notch filter used to remove fundamental
99                frequency from signal to isolate noise.
100            freq: the fundamental frequency to remove from the signal. If none,
101                the fundamental frequency will be determined using FFT.
102        Returns:
103            channel_results (list): THD+N value for each channel's signal.
104                List index corresponds to channel index.
105        """
106        latest_file_path = self.record_file_template % self.last_fileno()
107        if not (win_size and step_size):
108            return audio_analysis.get_file_THDN(filename=latest_file_path,
109                                                q=q,
110                                                freq=freq)
111        else:
112            return audio_analysis.get_file_max_THDN(filename=latest_file_path,
113                                                    step_size=step_size,
114                                                    window_size=win_size,
115                                                    q=q,
116                                                    freq=freq)
117
118    def detect_anomalies(self, freq=None,
119                         block_size=ANOMALY_DETECTION_BLOCK_SIZE,
120                         threshold=PATTERN_MATCHING_THRESHOLD,
121                         tolerance=ANOMALY_GROUPING_TOLERANCE):
122        """Detect anomalies in most recently recorded file.
123
124        An anomaly is defined as a sample in a recorded sine wave that differs
125        by at least the value defined by the threshold parameter from a golden
126        generated sine wave of the same amplitude, sample rate, and frequency.
127
128        Args:
129            freq (int|float): fundamental frequency of the signal. All other
130                frequencies are noise. If None, will be calculated with FFT.
131            block_size (int): the number of samples to analyze at a time in the
132                anomaly detection algorithm.
133            threshold (float): the threshold of the correlation index to
134                determine if two sample values match.
135            tolerance (float): the sample tolerance for anomaly time values
136                to be grouped as the same anomaly
137        Returns:
138            channel_results (list): anomaly durations for each channel's signal.
139                List index corresponds to channel index.
140        """
141        latest_file_path = self.record_file_template % self.last_fileno()
142        return audio_analysis.get_file_anomaly_durations(
143            filename=latest_file_path,
144            freq=freq,
145            block_size=block_size,
146            threshold=threshold,
147            tolerance=tolerance)
148
149    def get_last_record_duration_millis(self):
150        """Get duration of most recently recorded file.
151
152        Returns:
153            duration (float): duration of recorded file in milliseconds.
154        """
155        latest_file_path = self.record_file_template % self.last_fileno()
156        with wave.open(latest_file_path, 'r') as f:
157            frames = f.getnframes()
158            rate = f.getframerate()
159            duration = (frames / float(rate)) * 1000
160        return duration
161
162    def audio_quality_analysis(self, path):
163        """Measures audio quality based on the audio file given as input.
164
165        Args:
166            path: Log path
167
168        Returns:
169            analysis_path on success.
170        """
171        dest_file_path = os.path.join(path,
172                RECORD_FILE_TEMPLATE % self.last_fileno())
173        analysis_path = os.path.join(path,
174                ANALYSIS_FILE_TEMPLATE % self.last_fileno())
175        if not os.path.exists(dest_file_path):
176            raise FileNotFound("Recorded file not found")
177        try:
178            quality_analysis(
179                filename=dest_file_path,
180                output_file=analysis_path,
181                bit_width=bits_per_sample,
182                rate=self.audio_params["sample_rate"],
183                channel=self.audio_params["channel"],
184                spectral_only=False)
185        except Exception as err:
186            logging.exception("Failed to analyze raw audio: %s" % err)
187        return analysis_path
188
189