1# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. 2# 3# Use of this source code is governed by a BSD-style license 4# that can be found in the LICENSE file in the root of the source 5# tree. An additional intellectual property rights grant can be found 6# in the file PATENTS. All contributing project authors may 7# be found in the AUTHORS file in the root of the source tree. 8 9"""Signal processing utility module. 10""" 11 12import array 13import logging 14import os 15import sys 16import enum 17 18try: 19 import numpy as np 20except ImportError: 21 logging.critical('Cannot import the third-party Python package numpy') 22 sys.exit(1) 23 24try: 25 import pydub 26 import pydub.generators 27except ImportError: 28 logging.critical('Cannot import the third-party Python package pydub') 29 sys.exit(1) 30 31try: 32 import scipy.signal 33 import scipy.fftpack 34except ImportError: 35 logging.critical('Cannot import the third-party Python package scipy') 36 sys.exit(1) 37 38from . import exceptions 39 40 41class SignalProcessingUtils(object): 42 """Collection of signal processing utilities. 43 """ 44 45 @enum.unique 46 class MixPadding(enum.Enum): 47 NO_PADDING = 0 48 ZERO_PADDING = 1 49 LOOP = 2 50 51 def __init__(self): 52 pass 53 54 @classmethod 55 def LoadWav(cls, filepath, channels=1): 56 """Loads wav file. 57 58 Args: 59 filepath: path to the wav audio track file to load. 60 channels: number of channels (downmixing to mono by default). 61 62 Returns: 63 AudioSegment instance. 64 """ 65 if not os.path.exists(filepath): 66 logging.error('cannot find the <%s> audio track file', filepath) 67 raise exceptions.FileNotFoundError() 68 return pydub.AudioSegment.from_file( 69 filepath, format='wav', channels=channels) 70 71 @classmethod 72 def SaveWav(cls, output_filepath, signal): 73 """Saves wav file. 74 75 Args: 76 output_filepath: path to the wav audio track file to save. 77 signal: AudioSegment instance. 78 """ 79 return signal.export(output_filepath, format='wav') 80 81 @classmethod 82 def CountSamples(cls, signal): 83 """Number of samples per channel. 84 85 Args: 86 signal: AudioSegment instance. 87 88 Returns: 89 An integer. 90 """ 91 number_of_samples = len(signal.get_array_of_samples()) 92 assert signal.channels > 0 93 assert number_of_samples % signal.channels == 0 94 return number_of_samples / signal.channels 95 96 @classmethod 97 def GenerateSilence(cls, duration=1000, sample_rate=48000): 98 """Generates silence. 99 100 This method can also be used to create a template AudioSegment instance. 101 A template can then be used with other Generate*() methods accepting an 102 AudioSegment instance as argument. 103 104 Args: 105 duration: duration in ms. 106 sample_rate: sample rate. 107 108 Returns: 109 AudioSegment instance. 110 """ 111 return pydub.AudioSegment.silent(duration, sample_rate) 112 113 @classmethod 114 def GeneratePureTone(cls, template, frequency=440.0): 115 """Generates a pure tone. 116 117 The pure tone is generated with the same duration and in the same format of 118 the given template signal. 119 120 Args: 121 template: AudioSegment instance. 122 frequency: Frequency of the pure tone in Hz. 123 124 Return: 125 AudioSegment instance. 126 """ 127 if frequency > template.frame_rate >> 1: 128 raise exceptions.SignalProcessingException('Invalid frequency') 129 130 generator = pydub.generators.Sine( 131 sample_rate=template.frame_rate, 132 bit_depth=template.sample_width * 8, 133 freq=frequency) 134 135 return generator.to_audio_segment( 136 duration=len(template), 137 volume=0.0) 138 139 @classmethod 140 def GenerateWhiteNoise(cls, template): 141 """Generates white noise. 142 143 The white noise is generated with the same duration and in the same format 144 of the given template signal. 145 146 Args: 147 template: AudioSegment instance. 148 149 Return: 150 AudioSegment instance. 151 """ 152 generator = pydub.generators.WhiteNoise( 153 sample_rate=template.frame_rate, 154 bit_depth=template.sample_width * 8) 155 return generator.to_audio_segment( 156 duration=len(template), 157 volume=0.0) 158 159 @classmethod 160 def AudioSegmentToRawData(cls, signal): 161 samples = signal.get_array_of_samples() 162 if samples.typecode != 'h': 163 raise exceptions.SignalProcessingException('Unsupported samples type') 164 return np.array(signal.get_array_of_samples(), np.int16) 165 166 @classmethod 167 def Fft(cls, signal, normalize=True): 168 if signal.channels != 1: 169 raise NotImplementedError('multiple-channel FFT not implemented') 170 x = cls.AudioSegmentToRawData(signal).astype(np.float32) 171 if normalize: 172 x /= max(abs(np.max(x)), 1.0) 173 y = scipy.fftpack.fft(x) 174 return y[:len(y) / 2] 175 176 @classmethod 177 def DetectHardClipping(cls, signal, threshold=2): 178 """Detects hard clipping. 179 180 Hard clipping is simply detected by counting samples that touch either the 181 lower or upper bound too many times in a row (according to |threshold|). 182 The presence of a single sequence of samples meeting such property is enough 183 to label the signal as hard clipped. 184 185 Args: 186 signal: AudioSegment instance. 187 threshold: minimum number of samples at full-scale in a row. 188 189 Returns: 190 True if hard clipping is detect, False otherwise. 191 """ 192 if signal.channels != 1: 193 raise NotImplementedError('multiple-channel clipping not implemented') 194 if signal.sample_width != 2: # Note that signal.sample_width is in bytes. 195 raise exceptions.SignalProcessingException( 196 'hard-clipping detection only supported for 16 bit samples') 197 samples = cls.AudioSegmentToRawData(signal) 198 199 # Detect adjacent clipped samples. 200 samples_type_info = np.iinfo(samples.dtype) 201 mask_min = samples == samples_type_info.min 202 mask_max = samples == samples_type_info.max 203 204 def HasLongSequence(vector, min_legth=threshold): 205 """Returns True if there are one or more long sequences of True flags.""" 206 seq_length = 0 207 for b in vector: 208 seq_length = seq_length + 1 if b else 0 209 if seq_length >= min_legth: 210 return True 211 return False 212 213 return HasLongSequence(mask_min) or HasLongSequence(mask_max) 214 215 @classmethod 216 def ApplyImpulseResponse(cls, signal, impulse_response): 217 """Applies an impulse response to a signal. 218 219 Args: 220 signal: AudioSegment instance. 221 impulse_response: list or numpy vector of float values. 222 223 Returns: 224 AudioSegment instance. 225 """ 226 # Get samples. 227 assert signal.channels == 1, ( 228 'multiple-channel recordings not supported') 229 samples = signal.get_array_of_samples() 230 231 # Convolve. 232 logging.info('applying %d order impulse response to a signal lasting %d ms', 233 len(impulse_response), len(signal)) 234 convolved_samples = scipy.signal.fftconvolve( 235 in1=samples, 236 in2=impulse_response, 237 mode='full').astype(np.int16) 238 logging.info('convolution computed') 239 240 # Cast. 241 convolved_samples = array.array(signal.array_type, convolved_samples) 242 243 # Verify. 244 logging.debug('signal length: %d samples', len(samples)) 245 logging.debug('convolved signal length: %d samples', len(convolved_samples)) 246 assert len(convolved_samples) > len(samples) 247 248 # Generate convolved signal AudioSegment instance. 249 convolved_signal = pydub.AudioSegment( 250 data=convolved_samples, 251 metadata={ 252 'sample_width': signal.sample_width, 253 'frame_rate': signal.frame_rate, 254 'frame_width': signal.frame_width, 255 'channels': signal.channels, 256 }) 257 assert len(convolved_signal) > len(signal) 258 259 return convolved_signal 260 261 @classmethod 262 def Normalize(cls, signal): 263 """Normalizes a signal. 264 265 Args: 266 signal: AudioSegment instance. 267 268 Returns: 269 An AudioSegment instance. 270 """ 271 return signal.apply_gain(-signal.max_dBFS) 272 273 @classmethod 274 def Copy(cls, signal): 275 """Makes a copy os a signal. 276 277 Args: 278 signal: AudioSegment instance. 279 280 Returns: 281 An AudioSegment instance. 282 """ 283 return pydub.AudioSegment( 284 data=signal.get_array_of_samples(), 285 metadata={ 286 'sample_width': signal.sample_width, 287 'frame_rate': signal.frame_rate, 288 'frame_width': signal.frame_width, 289 'channels': signal.channels, 290 }) 291 292 @classmethod 293 def MixSignals(cls, signal, noise, target_snr=0.0, 294 pad_noise=MixPadding.NO_PADDING): 295 """Mixes |signal| and |noise| with a target SNR. 296 297 Mix |signal| and |noise| with a desired SNR by scaling |noise|. 298 If the target SNR is +/- infinite, a copy of signal/noise is returned. 299 If |signal| is shorter than |noise|, the length of the mix equals that of 300 |signal|. Otherwise, the mix length depends on whether padding is applied. 301 When padding is not applied, that is |pad_noise| is set to NO_PADDING 302 (default), the mix length equals that of |noise| - i.e., |signal| is 303 truncated. Otherwise, |noise| is extended and the resulting mix has the same 304 length of |signal|. 305 306 Args: 307 signal: AudioSegment instance (signal). 308 noise: AudioSegment instance (noise). 309 target_snr: float, numpy.Inf or -numpy.Inf (dB). 310 pad_noise: SignalProcessingUtils.MixPadding, default: NO_PADDING. 311 312 Returns: 313 An AudioSegment instance. 314 """ 315 # Handle infinite target SNR. 316 if target_snr == -np.Inf: 317 # Return a copy of noise. 318 logging.warning('SNR = -Inf, returning noise') 319 return cls.Copy(noise) 320 elif target_snr == np.Inf: 321 # Return a copy of signal. 322 logging.warning('SNR = +Inf, returning signal') 323 return cls.Copy(signal) 324 325 # Check signal and noise power. 326 signal_power = float(signal.dBFS) 327 noise_power = float(noise.dBFS) 328 if signal_power == -np.Inf: 329 logging.error('signal has -Inf power, cannot mix') 330 raise exceptions.SignalProcessingException( 331 'cannot mix a signal with -Inf power') 332 if noise_power == -np.Inf: 333 logging.error('noise has -Inf power, cannot mix') 334 raise exceptions.SignalProcessingException( 335 'cannot mix a signal with -Inf power') 336 337 # Mix. 338 gain_db = signal_power - noise_power - target_snr 339 signal_duration = len(signal) 340 noise_duration = len(noise) 341 if signal_duration <= noise_duration: 342 # Ignore |pad_noise|, |noise| is truncated if longer that |signal|, the 343 # mix will have the same length of |signal|. 344 return signal.overlay(noise.apply_gain(gain_db)) 345 elif pad_noise == cls.MixPadding.NO_PADDING: 346 # |signal| is longer than |noise|, but no padding is applied to |noise|. 347 # Truncate |signal|. 348 return noise.overlay(signal, gain_during_overlay=gain_db) 349 elif pad_noise == cls.MixPadding.ZERO_PADDING: 350 # TODO(alessiob): Check that this works as expected. 351 return signal.overlay(noise.apply_gain(gain_db)) 352 elif pad_noise == cls.MixPadding.LOOP: 353 # |signal| is longer than |noise|, extend |noise| by looping. 354 return signal.overlay(noise.apply_gain(gain_db), loop=True) 355 else: 356 raise exceptions.SignalProcessingException('invalid padding type') 357