• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""This module provides utilities to do audio data analysis."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11import logging
12import numpy
13import operator
14from six.moves import range
15
16# Only peaks with coefficient greater than 0.01 of the first peak should be
17# considered. Note that this correspond to -40dB in the spectrum.
18DEFAULT_MIN_PEAK_RATIO = 0.01
19
20PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
21
22# The minimum RMS value of meaningful audio data.
23MEANINGFUL_RMS_THRESHOLD = 0.001
24
25class RMSTooSmallError(Exception):
26    """Error when signal RMS is too small."""
27    pass
28
29
30class EmptyDataError(Exception):
31    """Error when signal is empty."""
32
33
34def normalize_signal(signal, saturate_value):
35    """Normalizes the signal with respect to the saturate value.
36
37    @param signal: A list for one-channel PCM data.
38    @param saturate_value: The maximum value that the PCM data might be.
39
40    @returns: A numpy array containing normalized signal. The normalized signal
41              has value -1 and 1 when it saturates.
42
43    """
44    signal = numpy.array(signal)
45    return signal / float(saturate_value)
46
47
48def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
49                      peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
50    """Gets the dominant frequencies by spectral analysis.
51
52    @param signal: A list of numbers for one-channel PCM data. This should be
53                   normalized to [-1, 1] so the function can check if signal RMS
54                   is too small to be meaningful.
55    @param rate: Sampling rate.
56    @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
57                           peaks other than the greatest one should be
58                           considered.
59                           This is to ignore peaks that are too small compared
60                           to the first peak peak_0.
61    @param peak_window_size_hz: The window size in Hz to find the peaks.
62                                The minimum differences between found peaks will
63                                be half of this value.
64
65    @returns: A list of tuples:
66              [(peak_frequency_0, peak_coefficient_0),
67               (peak_frequency_1, peak_coefficient_1),
68               (peak_frequency_2, peak_coefficient_2), ...]
69              where the tuples are sorted by coefficients.
70              The last peak_coefficient will be no less than
71              peak_coefficient * min_peak_ratio.
72              If RMS is less than MEANINGFUL_RMS_THRESHOLD, returns [(0, 0)].
73
74    """
75    # Checks the signal is meaningful.
76    if len(signal) == 0:
77        raise EmptyDataError('Signal data is empty')
78
79    signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
80    logging.debug('signal RMS = %s', signal_rms)
81
82    # If RMS is too small, set dominant frequency and coefficient to 0.
83    if signal_rms < MEANINGFUL_RMS_THRESHOLD:
84        logging.warning(
85                'RMS %s is too small to be meaningful. Set frequency to 0.',
86                signal_rms)
87        return [(0, 0)]
88
89    logging.debug('Doing spectral analysis ...')
90
91    # First, pass signal through a window function to mitigate spectral leakage.
92    y_conv_w = signal * numpy.hanning(len(signal))
93
94    length = len(y_conv_w)
95
96    # x_f is the frequency in Hz, y_f is the transformed coefficient.
97    x_f = _rfft_freq(length, rate)
98    y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
99
100    # y_f is complex so consider its absolute value for magnitude.
101    abs_y_f = numpy.abs(y_f)
102    threshold = max(abs_y_f) * min_peak_ratio
103
104    # Suppresses all coefficients that are below threshold.
105    for i in range(len(abs_y_f)):
106        if abs_y_f[i] < threshold:
107            abs_y_f[i] = 0
108
109    # Gets the peak detection window size in indice.
110    # x_f[1] is the frequency difference per index.
111    peak_window_size = int(peak_window_size_hz / x_f[1])
112
113    # Detects peaks.
114    peaks = peak_detection(abs_y_f, peak_window_size)
115
116    # Transform back the peak location from index to frequency.
117    results = []
118    for index, value in peaks:
119        results.append((x_f[index], value))
120    return results
121
122
123def _rfft_freq(length, rate):
124    """Gets the frequency at each index of real FFT.
125
126    @param length: The window length of FFT.
127    @param rate: Sampling rate.
128
129    @returns: A numpy array containing frequency corresponding to
130              numpy.fft.rfft result at each index.
131
132    """
133    # The difference in Hz between each index.
134    val = rate / float(length)
135    # Only care half of frequencies for FFT on real signal.
136    result_length = length // 2 + 1
137    return numpy.linspace(0, (result_length - 1) * val, result_length)
138
139
140def peak_detection(array, window_size):
141    """Detects peaks in an array.
142
143    A point (i, array[i]) is a peak if array[i] is the maximum among
144    array[i - half_window_size] to array[i + half_window_size].
145    If array[i - half_window_size] to array[i + half_window_size] are all equal,
146    then there is no peak in this window.
147    Note that we only consider peak with value greater than 0.
148
149    @param window_size: The window to detect peaks.
150
151    @returns: A list of tuples:
152              [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
153              where the tuples are sorted by peak values.
154
155    """
156    half_window_size = window_size // 2
157    length = len(array)
158
159    def mid_is_peak(array, mid, left, right):
160        """Checks if value at mid is the largest among left to right in array.
161
162        @param array: A list of numbers.
163        @param mid: The mid index.
164        @param left: The left index.
165        @param rigth: The right index.
166
167        @returns: A tuple (is_peak, next_candidate)
168                  is_peak is True if array[index] is the maximum among numbers
169                  in array between index [left, right] inclusively.
170                  next_candidate is the index of next candidate for peak if
171                  is_peak is False. It is the index of maximum value in
172                  [mid + 1, right]. If is_peak is True, next_candidate is
173                  right + 1.
174
175        """
176        value_mid = array[mid]
177        is_peak = True
178        next_peak_candidate_index = None
179
180        # Check the left half window.
181        for index in range(left, mid):
182            if array[index] >= value_mid:
183                is_peak = False
184                break
185
186        # Mid is at the end of array.
187        if mid == right:
188            return is_peak, right + 1
189
190        # Check the right half window and also record next candidate.
191        # Favor the larger index for next_peak_candidate_index.
192        for index in range(right, mid, -1):
193            if (next_peak_candidate_index is None or
194                array[index] > array[next_peak_candidate_index]):
195                next_peak_candidate_index = index
196
197        if array[next_peak_candidate_index] >= value_mid:
198            is_peak = False
199
200        if is_peak:
201            next_peak_candidate_index = right + 1
202
203        return is_peak, next_peak_candidate_index
204
205    results = []
206    mid = 0
207    next_candidate_idx = None
208    while mid < length:
209        left = max(0, mid - half_window_size)
210        right = min(length - 1, mid + half_window_size)
211
212        # Only consider value greater than 0.
213        if array[mid] == 0:
214            mid = mid + 1
215            continue;
216
217        is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
218
219        if is_peak:
220            results.append((mid, array[mid]))
221
222        # Use the next candidate found in [mid + 1, right], or right + 1.
223        mid = next_candidate_idx
224
225    # Sort the peaks by values.
226    return sorted(results, key=lambda x: x[1], reverse=True)
227
228
229# The default pattern mathing threshold. By experiment, this threshold
230# can tolerate normal noise of 0.3 amplitude when sine wave signal
231# amplitude is 1.
232PATTERN_MATCHING_THRESHOLD = 0.85
233
234# The default block size of pattern matching.
235ANOMALY_DETECTION_BLOCK_SIZE = 120
236
237def anomaly_detection(signal, rate, freq,
238                      block_size=ANOMALY_DETECTION_BLOCK_SIZE,
239                      threshold=PATTERN_MATCHING_THRESHOLD):
240    """Detects anomaly in a sine wave signal.
241
242    This method detects anomaly in a sine wave signal by matching
243    patterns of each block.
244    For each moving window of block in the test signal, checks if there
245    is any block in golden signal that is similar to this block of test signal.
246    If there is such a block in golden signal, then this block of test
247    signal is matched and there is no anomaly in this block of test signal.
248    If there is any block in test signal that is not matched, then this block
249    covers an anomaly.
250    The block of test signal starts from index 0, and proceeds in steps of
251    half block size. The overlapping of test signal blocks makes sure there must
252    be at least one block covering the transition from sine wave to anomaly.
253
254    @param signal: A 1-D array-like object for 1-channel PCM data.
255    @param rate: The sampling rate.
256    @param freq: The expected frequency of signal.
257    @param block_size: The block size in samples to detect anomaly.
258    @param threshold: The threshold of correlation index to be judge as matched.
259
260    @returns: A list containing detected anomaly time in seconds.
261
262    """
263    if len(signal) == 0:
264        raise EmptyDataError('Signal data is empty')
265
266    golden_y = _generate_golden_pattern(rate, freq, block_size)
267
268    results = []
269
270    for start in range(0, len(signal), block_size // 2):
271        end = start + block_size
272        test_signal = signal[start:end]
273        matched = _moving_pattern_matching(golden_y, test_signal, threshold)
274        if not matched:
275            results.append(start)
276
277    results = [float(x) / rate for x in results]
278
279    return results
280
281
282def _generate_golden_pattern(rate, freq, block_size):
283    """Generates a golden pattern of certain frequency.
284
285    The golden pattern must cover all the possibilities of waveforms in a
286    block. So, we need a golden pattern covering 1 period + 1 block size,
287    such that the test block can start anywhere in a period, and extends
288    a block size.
289
290    |period |1 bk|
291    |       |    |
292     . .     . .
293    .   .   .   .
294         . .     .
295
296    @param rate: The sampling rate.
297    @param freq: The frequency of golden pattern.
298    @param block_size: The block size in samples to detect anomaly.
299
300    @returns: A 1-D array for golden pattern.
301
302    """
303    samples_in_a_period = int(rate / freq) + 1
304    samples_in_golden_pattern = samples_in_a_period + block_size
305    golden_x = numpy.linspace(
306            0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
307            samples_in_golden_pattern)
308    golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
309    return golden_y
310
311
312def _moving_pattern_matching(golden_signal, test_signal, threshold):
313    """Checks if test_signal is similar to any block of golden_signal.
314
315    Compares test signal with each block of golden signal by correlation
316    index. If there is any block of golden signal that is similar to
317    test signal, then it is matched.
318
319    @param golden_signal: A 1-D array for golden signal.
320    @param test_signal: A 1-D array for test signal.
321    @param threshold: The threshold of correlation index to be judge as matched.
322
323    @returns: True if there is a match. False otherwise.
324
325    @raises: ValueError: if test signal is longer than golden signal.
326
327    """
328    if len(golden_signal) < len(test_signal):
329        raise ValueError('Test signal is longer than golden signal')
330
331    block_length = len(test_signal)
332    number_of_movings = len(golden_signal) - block_length + 1
333    correlation_indices = []
334    for moving_index in range(number_of_movings):
335        # Cuts one block of golden signal from start index.
336        # The block length is the same as test signal.
337        start = moving_index
338        end = start + block_length
339        golden_signal_block = golden_signal[start:end]
340        try:
341            correlation_index = _get_correlation_index(
342                    golden_signal_block, test_signal)
343        except TestSignalNormTooSmallError:
344            logging.info('Caught one block of test signal that has no meaningful norm')
345            return False
346        correlation_indices.append(correlation_index)
347
348    # Checks if the maximum correlation index is high enough.
349    max_corr = max(correlation_indices)
350    if max_corr < threshold:
351        logging.debug('Got one unmatched block with max_corr: %s', max_corr)
352        return False
353    return True
354
355
356class GoldenSignalNormTooSmallError(Exception):
357    """Exception when golden signal norm is too small."""
358    pass
359
360
361class TestSignalNormTooSmallError(Exception):
362    """Exception when test signal norm is too small."""
363    pass
364
365
366_MINIMUM_SIGNAL_NORM = 0.001
367
368def _get_correlation_index(golden_signal, test_signal):
369    """Computes correlation index of two signal of same length.
370
371    @param golden_signal: An 1-D array-like object.
372    @param test_signal: An 1-D array-like object.
373
374    @raises: ValueError: if two signal have different lengths.
375    @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
376    @raises: TestSignalNormTooSmallError: if test signal norm is too small.
377
378    @returns: The correlation index.
379    """
380    if len(golden_signal) != len(test_signal):
381        raise ValueError(
382                'Only accepts signal of same length: %s, %s' % (
383                        len(golden_signal), len(test_signal)))
384
385    norm_golden = numpy.linalg.norm(golden_signal)
386    norm_test = numpy.linalg.norm(test_signal)
387    if norm_golden <= _MINIMUM_SIGNAL_NORM:
388        raise GoldenSignalNormTooSmallError(
389                'No meaningful data as norm is too small.')
390    if norm_test <= _MINIMUM_SIGNAL_NORM:
391        raise TestSignalNormTooSmallError(
392                'No meaningful data as norm is too small.')
393
394    # The 'valid' cross correlation result of two signals of same length will
395    # contain only one number.
396    correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
397    return correlation / (norm_golden * norm_test)
398