• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2
3# Copyright 2016 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Command line tool to analyze wave file and detect artifacts."""
8
9import argparse
10import logging
11import math
12import numpy
13import pprint
14import wave
15
16import common
17from autotest_lib.client.cros.audio import audio_analysis
18from autotest_lib.client.cros.audio import audio_data
19from autotest_lib.client.cros.audio import audio_quality_measurement
20
21
22def add_args(parser):
23    """Adds command line arguments."""
24    parser.add_argument('filename', metavar='WAV_FILE', type=str,
25                        help='The wave file to check.')
26    parser.add_argument('--debug', action='store_true', default=False,
27                        help='Show debug message.')
28    parser.add_argument('--spectral', action='store_true', default=False,
29                        help='Spectral analysis on each channel.')
30    parser.add_argument('--quality', action='store_true', default=False,
31                        help='Quality analysis on each channel. Implies '
32                             '--spectral')
33
34
35def parse_args(parser):
36    """Parses args."""
37    args = parser.parse_args()
38    # Quality is checked within spectral analysis.
39    if args.quality:
40        args.spectral = True
41    return args
42
43
44class WaveFileException(Exception):
45    """Error in WaveFile."""
46    pass
47
48
49class WaveFile(object):
50    """Class which handles wave file reading.
51
52    Properties:
53        raw_data: audio_data.AudioRawData object for data in wave file.
54        rate: sampling rate.
55
56    """
57    def __init__(self, filename):
58        """Inits a wave file.
59
60        @param filename: file name of the wave file.
61
62        """
63        self.raw_data = None
64        self.rate = None
65
66        self._filename = filename
67        self._wave_reader = None
68        self._n_channels = None
69        self._sample_width_bits = None
70        self._n_frames = None
71        self._binary = None
72
73        self._read_wave_file()
74
75
76    def _read_wave_file(self):
77        """Reads wave file header and samples.
78
79        @raises:
80            WaveFileException: Wave format is not supported.
81
82        """
83        try:
84            self._wave_reader = wave.open(self._filename, 'r')
85            self._read_wave_header()
86            self._read_wave_binary()
87        except wave.Error as e:
88            if 'unknown format: 65534' in str(e):
89                raise WaveFileException(
90                        'WAVE_FORMAT_EXTENSIBLE is not supproted. '
91                        'Try command "sox in.wav -t wavpcm out.wav" to convert '
92                        'the file to WAVE_FORMAT_PCM format.')
93        finally:
94            if self._wave_reader:
95                self._wave_reader.close()
96
97
98    def _read_wave_header(self):
99        """Reads wave file header.
100
101        @raises WaveFileException: wave file is compressed.
102
103        """
104        # Header is a tuple of
105        # (nchannels, sampwidth, framerate, nframes, comptype, compname).
106        header = self._wave_reader.getparams()
107        logging.debug('Wave header: %s', header)
108
109        self._n_channels = header[0]
110        self._sample_width_bits = header[1] * 8
111        self.rate = header[2]
112        self._n_frames = header[3]
113        comptype = header[4]
114        compname = header[5]
115
116        if comptype != 'NONE' or compname != 'not compressed':
117            raise WaveFileException('Can not support compressed wav file.')
118
119
120    def _read_wave_binary(self):
121        """Reads in samples in wave file."""
122        self._binary = self._wave_reader.readframes(self._n_frames)
123        format_str = 'S%d_LE' % self._sample_width_bits
124        self.raw_data = audio_data.AudioRawData(
125                binary=self._binary,
126                channel=self._n_channels,
127                sample_format=format_str)
128
129
130class QualityCheckerError(Exception):
131    """Error in QualityChecker."""
132    pass
133
134
135class QualityChecker(object):
136    """Quality checker controls the flow of checking quality of raw data."""
137    def __init__(self, raw_data, rate):
138        """Inits a quality checker.
139
140        @param raw_data: An audio_data.AudioRawData object.
141        @param rate: Sampling rate.
142
143        """
144        self._raw_data = raw_data
145        self._rate = rate
146
147
148    def do_spectral_analysis(self, check_quality=False):
149        """Gets the spectral_analysis result.
150
151        @param check_quality: Check quality of each channel.
152
153        """
154        self.has_data()
155        for channel_idx in xrange(self._raw_data.channel):
156            signal = self._raw_data.channel_data[channel_idx]
157            max_abs = max(numpy.abs(signal))
158            logging.debug('Channel %d max abs signal: %f', channel_idx, max_abs)
159            if max_abs == 0:
160                logging.info('No data on channel %d, skip this channel',
161                              channel_idx)
162                continue
163
164            saturate_value = audio_data.get_maximum_value_from_sample_format(
165                    self._raw_data.sample_format)
166            normalized_signal = audio_analysis.normalize_signal(
167                    signal, saturate_value)
168            logging.debug('saturate_value: %f', saturate_value)
169            logging.debug('max signal after normalized: %f', max(normalized_signal))
170            spectral = audio_analysis.spectral_analysis(
171                    normalized_signal, self._rate)
172            logging.info('Channel %d spectral:\n%s', channel_idx,
173                         pprint.pformat(spectral))
174
175            if check_quality:
176                quality = audio_quality_measurement.quality_measurement(
177                        signal=normalized_signal,
178                        rate=self._rate,
179                        dominant_frequency=spectral[0][0])
180                logging.info('Channel %d quality:\n%s', channel_idx,
181                             pprint.pformat(quality))
182
183
184    def has_data(self):
185        """Checks if data has been set.
186
187        @raises QualityCheckerError: if data or rate is not set yet.
188
189        """
190        if not self._raw_data or not self._rate:
191            raise QualityCheckerError('Data and rate is not set yet')
192
193
194if __name__ == "__main__":
195    parser = argparse.ArgumentParser(
196        description='Check signal quality of a wave file. Each channel should'
197                    ' either be all zeros, or sine wave of a fixed frequency.')
198    add_args(parser)
199    args = parse_args(parser)
200
201    level = logging.DEBUG if args.debug else logging.INFO
202    format = '%(asctime)-15s:%(levelname)s:%(pathname)s:%(lineno)d: %(message)s'
203    logging.basicConfig(format=format, level=level)
204
205    wavefile = WaveFile(args.filename)
206
207    checker = QualityChecker(wavefile.raw_data, wavefile.rate)
208
209    if args.spectral:
210        checker.do_spectral_analysis(check_quality=args.quality)
211