1# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. 2# 3# Use of this source code is governed by a BSD-style license 4# that can be found in the LICENSE file in the root of the source 5# tree. An additional intellectual property rights grant can be found 6# in the file PATENTS. All contributing project authors may 7# be found in the AUTHORS file in the root of the source tree. 8"""Signal processing utility module. 9""" 10 11import array 12import logging 13import os 14import sys 15import enum 16 17try: 18 import numpy as np 19except ImportError: 20 logging.critical('Cannot import the third-party Python package numpy') 21 sys.exit(1) 22 23try: 24 import pydub 25 import pydub.generators 26except ImportError: 27 logging.critical('Cannot import the third-party Python package pydub') 28 sys.exit(1) 29 30try: 31 import scipy.signal 32 import scipy.fftpack 33except ImportError: 34 logging.critical('Cannot import the third-party Python package scipy') 35 sys.exit(1) 36 37from . import exceptions 38 39 40class SignalProcessingUtils(object): 41 """Collection of signal processing utilities. 42 """ 43 44 @enum.unique 45 class MixPadding(enum.Enum): 46 NO_PADDING = 0 47 ZERO_PADDING = 1 48 LOOP = 2 49 50 def __init__(self): 51 pass 52 53 @classmethod 54 def LoadWav(cls, filepath, channels=1): 55 """Loads wav file. 56 57 Args: 58 filepath: path to the wav audio track file to load. 59 channels: number of channels (downmixing to mono by default). 60 61 Returns: 62 AudioSegment instance. 63 """ 64 if not os.path.exists(filepath): 65 logging.error('cannot find the <%s> audio track file', filepath) 66 raise exceptions.FileNotFoundError() 67 return pydub.AudioSegment.from_file(filepath, 68 format='wav', 69 channels=channels) 70 71 @classmethod 72 def SaveWav(cls, output_filepath, signal): 73 """Saves wav file. 74 75 Args: 76 output_filepath: path to the wav audio track file to save. 77 signal: AudioSegment instance. 78 """ 79 return signal.export(output_filepath, format='wav') 80 81 @classmethod 82 def CountSamples(cls, signal): 83 """Number of samples per channel. 84 85 Args: 86 signal: AudioSegment instance. 87 88 Returns: 89 An integer. 90 """ 91 number_of_samples = len(signal.get_array_of_samples()) 92 assert signal.channels > 0 93 assert number_of_samples % signal.channels == 0 94 return number_of_samples / signal.channels 95 96 @classmethod 97 def GenerateSilence(cls, duration=1000, sample_rate=48000): 98 """Generates silence. 99 100 This method can also be used to create a template AudioSegment instance. 101 A template can then be used with other Generate*() methods accepting an 102 AudioSegment instance as argument. 103 104 Args: 105 duration: duration in ms. 106 sample_rate: sample rate. 107 108 Returns: 109 AudioSegment instance. 110 """ 111 return pydub.AudioSegment.silent(duration, sample_rate) 112 113 @classmethod 114 def GeneratePureTone(cls, template, frequency=440.0): 115 """Generates a pure tone. 116 117 The pure tone is generated with the same duration and in the same format of 118 the given template signal. 119 120 Args: 121 template: AudioSegment instance. 122 frequency: Frequency of the pure tone in Hz. 123 124 Return: 125 AudioSegment instance. 126 """ 127 if frequency > template.frame_rate >> 1: 128 raise exceptions.SignalProcessingException('Invalid frequency') 129 130 generator = pydub.generators.Sine(sample_rate=template.frame_rate, 131 bit_depth=template.sample_width * 8, 132 freq=frequency) 133 134 return generator.to_audio_segment(duration=len(template), volume=0.0) 135 136 @classmethod 137 def GenerateWhiteNoise(cls, template): 138 """Generates white noise. 139 140 The white noise is generated with the same duration and in the same format 141 of the given template signal. 142 143 Args: 144 template: AudioSegment instance. 145 146 Return: 147 AudioSegment instance. 148 """ 149 generator = pydub.generators.WhiteNoise( 150 sample_rate=template.frame_rate, 151 bit_depth=template.sample_width * 8) 152 return generator.to_audio_segment(duration=len(template), volume=0.0) 153 154 @classmethod 155 def AudioSegmentToRawData(cls, signal): 156 samples = signal.get_array_of_samples() 157 if samples.typecode != 'h': 158 raise exceptions.SignalProcessingException( 159 'Unsupported samples type') 160 return np.array(signal.get_array_of_samples(), np.int16) 161 162 @classmethod 163 def Fft(cls, signal, normalize=True): 164 if signal.channels != 1: 165 raise NotImplementedError('multiple-channel FFT not implemented') 166 x = cls.AudioSegmentToRawData(signal).astype(np.float32) 167 if normalize: 168 x /= max(abs(np.max(x)), 1.0) 169 y = scipy.fftpack.fft(x) 170 return y[:len(y) / 2] 171 172 @classmethod 173 def DetectHardClipping(cls, signal, threshold=2): 174 """Detects hard clipping. 175 176 Hard clipping is simply detected by counting samples that touch either the 177 lower or upper bound too many times in a row (according to `threshold`). 178 The presence of a single sequence of samples meeting such property is enough 179 to label the signal as hard clipped. 180 181 Args: 182 signal: AudioSegment instance. 183 threshold: minimum number of samples at full-scale in a row. 184 185 Returns: 186 True if hard clipping is detect, False otherwise. 187 """ 188 if signal.channels != 1: 189 raise NotImplementedError( 190 'multiple-channel clipping not implemented') 191 if signal.sample_width != 2: # Note that signal.sample_width is in bytes. 192 raise exceptions.SignalProcessingException( 193 'hard-clipping detection only supported for 16 bit samples') 194 samples = cls.AudioSegmentToRawData(signal) 195 196 # Detect adjacent clipped samples. 197 samples_type_info = np.iinfo(samples.dtype) 198 mask_min = samples == samples_type_info.min 199 mask_max = samples == samples_type_info.max 200 201 def HasLongSequence(vector, min_legth=threshold): 202 """Returns True if there are one or more long sequences of True flags.""" 203 seq_length = 0 204 for b in vector: 205 seq_length = seq_length + 1 if b else 0 206 if seq_length >= min_legth: 207 return True 208 return False 209 210 return HasLongSequence(mask_min) or HasLongSequence(mask_max) 211 212 @classmethod 213 def ApplyImpulseResponse(cls, signal, impulse_response): 214 """Applies an impulse response to a signal. 215 216 Args: 217 signal: AudioSegment instance. 218 impulse_response: list or numpy vector of float values. 219 220 Returns: 221 AudioSegment instance. 222 """ 223 # Get samples. 224 assert signal.channels == 1, ( 225 'multiple-channel recordings not supported') 226 samples = signal.get_array_of_samples() 227 228 # Convolve. 229 logging.info( 230 'applying %d order impulse response to a signal lasting %d ms', 231 len(impulse_response), len(signal)) 232 convolved_samples = scipy.signal.fftconvolve(in1=samples, 233 in2=impulse_response, 234 mode='full').astype( 235 np.int16) 236 logging.info('convolution computed') 237 238 # Cast. 239 convolved_samples = array.array(signal.array_type, convolved_samples) 240 241 # Verify. 242 logging.debug('signal length: %d samples', len(samples)) 243 logging.debug('convolved signal length: %d samples', 244 len(convolved_samples)) 245 assert len(convolved_samples) > len(samples) 246 247 # Generate convolved signal AudioSegment instance. 248 convolved_signal = pydub.AudioSegment(data=convolved_samples, 249 metadata={ 250 'sample_width': 251 signal.sample_width, 252 'frame_rate': 253 signal.frame_rate, 254 'frame_width': 255 signal.frame_width, 256 'channels': signal.channels, 257 }) 258 assert len(convolved_signal) > len(signal) 259 260 return convolved_signal 261 262 @classmethod 263 def Normalize(cls, signal): 264 """Normalizes a signal. 265 266 Args: 267 signal: AudioSegment instance. 268 269 Returns: 270 An AudioSegment instance. 271 """ 272 return signal.apply_gain(-signal.max_dBFS) 273 274 @classmethod 275 def Copy(cls, signal): 276 """Makes a copy os a signal. 277 278 Args: 279 signal: AudioSegment instance. 280 281 Returns: 282 An AudioSegment instance. 283 """ 284 return pydub.AudioSegment(data=signal.get_array_of_samples(), 285 metadata={ 286 'sample_width': signal.sample_width, 287 'frame_rate': signal.frame_rate, 288 'frame_width': signal.frame_width, 289 'channels': signal.channels, 290 }) 291 292 @classmethod 293 def MixSignals(cls, 294 signal, 295 noise, 296 target_snr=0.0, 297 pad_noise=MixPadding.NO_PADDING): 298 """Mixes `signal` and `noise` with a target SNR. 299 300 Mix `signal` and `noise` with a desired SNR by scaling `noise`. 301 If the target SNR is +/- infinite, a copy of signal/noise is returned. 302 If `signal` is shorter than `noise`, the length of the mix equals that of 303 `signal`. Otherwise, the mix length depends on whether padding is applied. 304 When padding is not applied, that is `pad_noise` is set to NO_PADDING 305 (default), the mix length equals that of `noise` - i.e., `signal` is 306 truncated. Otherwise, `noise` is extended and the resulting mix has the same 307 length of `signal`. 308 309 Args: 310 signal: AudioSegment instance (signal). 311 noise: AudioSegment instance (noise). 312 target_snr: float, numpy.Inf or -numpy.Inf (dB). 313 pad_noise: SignalProcessingUtils.MixPadding, default: NO_PADDING. 314 315 Returns: 316 An AudioSegment instance. 317 """ 318 # Handle infinite target SNR. 319 if target_snr == -np.Inf: 320 # Return a copy of noise. 321 logging.warning('SNR = -Inf, returning noise') 322 return cls.Copy(noise) 323 elif target_snr == np.Inf: 324 # Return a copy of signal. 325 logging.warning('SNR = +Inf, returning signal') 326 return cls.Copy(signal) 327 328 # Check signal and noise power. 329 signal_power = float(signal.dBFS) 330 noise_power = float(noise.dBFS) 331 if signal_power == -np.Inf: 332 logging.error('signal has -Inf power, cannot mix') 333 raise exceptions.SignalProcessingException( 334 'cannot mix a signal with -Inf power') 335 if noise_power == -np.Inf: 336 logging.error('noise has -Inf power, cannot mix') 337 raise exceptions.SignalProcessingException( 338 'cannot mix a signal with -Inf power') 339 340 # Mix. 341 gain_db = signal_power - noise_power - target_snr 342 signal_duration = len(signal) 343 noise_duration = len(noise) 344 if signal_duration <= noise_duration: 345 # Ignore `pad_noise`, `noise` is truncated if longer that `signal`, the 346 # mix will have the same length of `signal`. 347 return signal.overlay(noise.apply_gain(gain_db)) 348 elif pad_noise == cls.MixPadding.NO_PADDING: 349 # `signal` is longer than `noise`, but no padding is applied to `noise`. 350 # Truncate `signal`. 351 return noise.overlay(signal, gain_during_overlay=gain_db) 352 elif pad_noise == cls.MixPadding.ZERO_PADDING: 353 # TODO(alessiob): Check that this works as expected. 354 return signal.overlay(noise.apply_gain(gain_db)) 355 elif pad_noise == cls.MixPadding.LOOP: 356 # `signal` is longer than `noise`, extend `noise` by looping. 357 return signal.overlay(noise.apply_gain(gain_db), loop=True) 358 else: 359 raise exceptions.SignalProcessingException('invalid padding type') 360