1# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. 2# 3# Use of this source code is governed by a BSD-style license 4# that can be found in the LICENSE file in the root of the source 5# tree. An additional intellectual property rights grant can be found 6# in the file PATENTS. All contributing project authors may 7# be found in the AUTHORS file in the root of the source tree. 8 9"""Test data generators producing signals pairs intended to be used to 10test the APM module. Each pair consists of a noisy input and a reference signal. 11The former is used as APM input and it is generated by adding noise to a 12clean audio track. The reference is the expected APM output. 13 14Throughout this file, the following naming convention is used: 15 - input signal: the clean signal (e.g., speech), 16 - noise signal: the noise to be summed up to the input signal (e.g., white 17 noise, Gaussian noise), 18 - noisy signal: input + noise. 19The noise signal may or may not be a function of the clean signal. For 20instance, white noise is independently generated, whereas reverberation is 21obtained by convolving the input signal with an impulse response. 22""" 23 24import logging 25import os 26import shutil 27import sys 28 29try: 30 import scipy.io 31except ImportError: 32 logging.critical('Cannot import the third-party Python package scipy') 33 sys.exit(1) 34 35from . import data_access 36from . import exceptions 37from . import signal_processing 38 39 40class TestDataGenerator(object): 41 """Abstract class responsible for the generation of noisy signals. 42 43 Given a clean signal, it generates two streams named noisy signal and 44 reference. The former is the clean signal deteriorated by the noise source, 45 the latter goes through the same deterioration process, but more "gently". 46 Noisy signal and reference are produced so that the reference is the signal 47 expected at the output of the APM module when the latter is fed with the noisy 48 signal. 49 50 An test data generator generates one or more pairs. 51 """ 52 53 NAME = None 54 REGISTERED_CLASSES = {} 55 56 def __init__(self, output_directory_prefix): 57 self._output_directory_prefix = output_directory_prefix 58 # Init dictionaries with one entry for each test data generator 59 # configuration (e.g., different SNRs). 60 # Noisy audio track files (stored separately in a cache folder). 61 self._noisy_signal_filepaths = None 62 # Path to be used for the APM simulation output files. 63 self._apm_output_paths = None 64 # Reference audio track files (stored separately in a cache folder). 65 self._reference_signal_filepaths = None 66 self.Clear() 67 68 @classmethod 69 def RegisterClass(cls, class_to_register): 70 """Registers a TestDataGenerator implementation. 71 72 Decorator to automatically register the classes that extend 73 TestDataGenerator. 74 Example usage: 75 76 @TestDataGenerator.RegisterClass 77 class IdentityGenerator(TestDataGenerator): 78 pass 79 """ 80 cls.REGISTERED_CLASSES[class_to_register.NAME] = class_to_register 81 return class_to_register 82 83 @property 84 def config_names(self): 85 return self._noisy_signal_filepaths.keys() 86 87 @property 88 def noisy_signal_filepaths(self): 89 return self._noisy_signal_filepaths 90 91 @property 92 def apm_output_paths(self): 93 return self._apm_output_paths 94 95 @property 96 def reference_signal_filepaths(self): 97 return self._reference_signal_filepaths 98 99 def Generate( 100 self, input_signal_filepath, test_data_cache_path, base_output_path): 101 """Generates a set of noisy input and reference audiotrack file pairs. 102 103 This method initializes an empty set of pairs and calls the _Generate() 104 method implemented in a concrete class. 105 106 Args: 107 input_signal_filepath: path to the clean input audio track file. 108 test_data_cache_path: path to the cache of the generated audio track 109 files. 110 base_output_path: base path where output is written. 111 """ 112 self.Clear() 113 self._Generate( 114 input_signal_filepath, test_data_cache_path, base_output_path) 115 116 def Clear(self): 117 """Clears the generated output path dictionaries. 118 """ 119 self._noisy_signal_filepaths = {} 120 self._apm_output_paths = {} 121 self._reference_signal_filepaths = {} 122 123 def _Generate( 124 self, input_signal_filepath, test_data_cache_path, base_output_path): 125 """Abstract method to be implemented in each concrete class. 126 """ 127 raise NotImplementedError() 128 129 def _AddNoiseSnrPairs(self, base_output_path, noisy_mix_filepaths, 130 snr_value_pairs): 131 """Adds noisy-reference signal pairs. 132 133 Args: 134 base_output_path: noisy tracks base output path. 135 noisy_mix_filepaths: nested dictionary of noisy signal paths organized 136 by noisy track name and SNR level. 137 snr_value_pairs: list of SNR pairs. 138 """ 139 for noise_track_name in noisy_mix_filepaths: 140 for snr_noisy, snr_refence in snr_value_pairs: 141 config_name = '{0}_{1:d}_{2:d}_SNR'.format( 142 noise_track_name, snr_noisy, snr_refence) 143 output_path = self._MakeDir(base_output_path, config_name) 144 self._AddNoiseReferenceFilesPair( 145 config_name=config_name, 146 noisy_signal_filepath=noisy_mix_filepaths[ 147 noise_track_name][snr_noisy], 148 reference_signal_filepath=noisy_mix_filepaths[ 149 noise_track_name][snr_refence], 150 output_path=output_path) 151 152 def _AddNoiseReferenceFilesPair(self, config_name, noisy_signal_filepath, 153 reference_signal_filepath, output_path): 154 """Adds one noisy-reference signal pair. 155 156 Args: 157 config_name: name of the APM configuration. 158 noisy_signal_filepath: path to noisy audio track file. 159 reference_signal_filepath: path to reference audio track file. 160 output_path: APM output path. 161 """ 162 assert config_name not in self._noisy_signal_filepaths 163 self._noisy_signal_filepaths[config_name] = os.path.abspath( 164 noisy_signal_filepath) 165 self._apm_output_paths[config_name] = os.path.abspath(output_path) 166 self._reference_signal_filepaths[config_name] = os.path.abspath( 167 reference_signal_filepath) 168 169 def _MakeDir(self, base_output_path, test_data_generator_config_name): 170 output_path = os.path.join( 171 base_output_path, 172 self._output_directory_prefix + test_data_generator_config_name) 173 data_access.MakeDirectory(output_path) 174 return output_path 175 176 177@TestDataGenerator.RegisterClass 178class IdentityTestDataGenerator(TestDataGenerator): 179 """Generator that adds no noise. 180 181 Both the noisy and the reference signals are the input signal. 182 """ 183 184 NAME = 'identity' 185 186 def __init__(self, output_directory_prefix, copy_with_identity): 187 TestDataGenerator.__init__(self, output_directory_prefix) 188 self._copy_with_identity = copy_with_identity 189 190 @property 191 def copy_with_identity(self): 192 return self._copy_with_identity 193 194 def _Generate( 195 self, input_signal_filepath, test_data_cache_path, base_output_path): 196 config_name = 'default' 197 output_path = self._MakeDir(base_output_path, config_name) 198 199 if self._copy_with_identity: 200 input_signal_filepath_new = os.path.join( 201 test_data_cache_path, os.path.split(input_signal_filepath)[1]) 202 logging.info('copying ' + input_signal_filepath + ' to ' + ( 203 input_signal_filepath_new)) 204 shutil.copy(input_signal_filepath, input_signal_filepath_new) 205 input_signal_filepath = input_signal_filepath_new 206 207 self._AddNoiseReferenceFilesPair( 208 config_name=config_name, 209 noisy_signal_filepath=input_signal_filepath, 210 reference_signal_filepath=input_signal_filepath, 211 output_path=output_path) 212 213 214@TestDataGenerator.RegisterClass 215class WhiteNoiseTestDataGenerator(TestDataGenerator): 216 """Generator that adds white noise. 217 """ 218 219 NAME = 'white_noise' 220 221 # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs. 222 # The reference (second value of each pair) always has a lower amount of noise 223 # - i.e., the SNR is 10 dB higher. 224 _SNR_VALUE_PAIRS = [ 225 [20, 30], # Smallest noise. 226 [10, 20], 227 [5, 15], 228 [0, 10], # Largest noise. 229 ] 230 231 _NOISY_SIGNAL_FILENAME_TEMPLATE = 'noise_{0:d}_SNR.wav' 232 233 def __init__(self, output_directory_prefix): 234 TestDataGenerator.__init__(self, output_directory_prefix) 235 236 def _Generate( 237 self, input_signal_filepath, test_data_cache_path, base_output_path): 238 # Load the input signal. 239 input_signal = signal_processing.SignalProcessingUtils.LoadWav( 240 input_signal_filepath) 241 242 # Create the noise track. 243 noise_signal = signal_processing.SignalProcessingUtils.GenerateWhiteNoise( 244 input_signal) 245 246 # Create the noisy mixes (once for each unique SNR value). 247 noisy_mix_filepaths = {} 248 snr_values = set([snr for pair in self._SNR_VALUE_PAIRS for snr in pair]) 249 for snr in snr_values: 250 noisy_signal_filepath = os.path.join( 251 test_data_cache_path, 252 self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(snr)) 253 254 # Create and save if not done. 255 if not os.path.exists(noisy_signal_filepath): 256 # Create noisy signal. 257 noisy_signal = signal_processing.SignalProcessingUtils.MixSignals( 258 input_signal, noise_signal, snr) 259 260 # Save. 261 signal_processing.SignalProcessingUtils.SaveWav( 262 noisy_signal_filepath, noisy_signal) 263 264 # Add file to the collection of mixes. 265 noisy_mix_filepaths[snr] = noisy_signal_filepath 266 267 # Add all the noisy-reference signal pairs. 268 for snr_noisy, snr_refence in self._SNR_VALUE_PAIRS: 269 config_name = '{0:d}_{1:d}_SNR'.format(snr_noisy, snr_refence) 270 output_path = self._MakeDir(base_output_path, config_name) 271 self._AddNoiseReferenceFilesPair( 272 config_name=config_name, 273 noisy_signal_filepath=noisy_mix_filepaths[snr_noisy], 274 reference_signal_filepath=noisy_mix_filepaths[snr_refence], 275 output_path=output_path) 276 277 278# TODO(alessiob): remove comment when class implemented. 279# @TestDataGenerator.RegisterClass 280class NarrowBandNoiseTestDataGenerator(TestDataGenerator): 281 """Generator that adds narrow-band noise. 282 """ 283 284 NAME = 'narrow_band_noise' 285 286 def __init__(self, output_directory_prefix): 287 TestDataGenerator.__init__(self, output_directory_prefix) 288 289 def _Generate( 290 self, input_signal_filepath, test_data_cache_path, base_output_path): 291 # TODO(alessiob): implement. 292 pass 293 294 295@TestDataGenerator.RegisterClass 296class AdditiveNoiseTestDataGenerator(TestDataGenerator): 297 """Generator that adds noise loops. 298 299 This generator uses all the wav files in a given path (default: noise_tracks/) 300 and mixes them to the clean speech with different target SNRs (hard-coded). 301 """ 302 303 NAME = 'additive_noise' 304 _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav' 305 306 DEFAULT_NOISE_TRACKS_PATH = os.path.join( 307 os.path.dirname(__file__), os.pardir, 'noise_tracks') 308 309 # TODO(alessiob): Make the list of SNR pairs customizable. 310 # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs. 311 # The reference (second value of each pair) always has a lower amount of noise 312 # - i.e., the SNR is 10 dB higher. 313 _SNR_VALUE_PAIRS = [ 314 [20, 30], # Smallest noise. 315 [10, 20], 316 [5, 15], 317 [0, 10], # Largest noise. 318 ] 319 320 def __init__(self, output_directory_prefix, noise_tracks_path): 321 TestDataGenerator.__init__(self, output_directory_prefix) 322 self._noise_tracks_path = noise_tracks_path 323 self._noise_tracks_file_names = [n for n in os.listdir( 324 self._noise_tracks_path) if n.lower().endswith('.wav')] 325 if len(self._noise_tracks_file_names) == 0: 326 raise exceptions.InitializationException( 327 'No wav files found in the noise tracks path %s' % ( 328 self._noise_tracks_path)) 329 330 def _Generate( 331 self, input_signal_filepath, test_data_cache_path, base_output_path): 332 """Generates test data pairs using environmental noise. 333 334 For each noise track and pair of SNR values, the following two audio tracks 335 are created: the noisy signal and the reference signal. The former is 336 obtained by mixing the (clean) input signal to the corresponding noise 337 track enforcing the target SNR. 338 """ 339 # Init. 340 snr_values = set([snr for pair in self._SNR_VALUE_PAIRS for snr in pair]) 341 342 # Load the input signal. 343 input_signal = signal_processing.SignalProcessingUtils.LoadWav( 344 input_signal_filepath) 345 346 noisy_mix_filepaths = {} 347 for noise_track_filename in self._noise_tracks_file_names: 348 # Load the noise track. 349 noise_track_name, _ = os.path.splitext(noise_track_filename) 350 noise_track_filepath = os.path.join( 351 self._noise_tracks_path, noise_track_filename) 352 if not os.path.exists(noise_track_filepath): 353 logging.error('cannot find the <%s> noise track', noise_track_filename) 354 raise exceptions.FileNotFoundError() 355 356 noise_signal = signal_processing.SignalProcessingUtils.LoadWav( 357 noise_track_filepath) 358 359 # Create the noisy mixes (once for each unique SNR value). 360 noisy_mix_filepaths[noise_track_name] = {} 361 for snr in snr_values: 362 noisy_signal_filepath = os.path.join( 363 test_data_cache_path, 364 self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(noise_track_name, snr)) 365 366 # Create and save if not done. 367 if not os.path.exists(noisy_signal_filepath): 368 # Create noisy signal. 369 noisy_signal = signal_processing.SignalProcessingUtils.MixSignals( 370 input_signal, noise_signal, snr, 371 pad_noise=signal_processing.SignalProcessingUtils.MixPadding.LOOP) 372 373 # Save. 374 signal_processing.SignalProcessingUtils.SaveWav( 375 noisy_signal_filepath, noisy_signal) 376 377 # Add file to the collection of mixes. 378 noisy_mix_filepaths[noise_track_name][snr] = noisy_signal_filepath 379 380 # Add all the noise-SNR pairs. 381 self._AddNoiseSnrPairs( 382 base_output_path, noisy_mix_filepaths, self._SNR_VALUE_PAIRS) 383 384 385@TestDataGenerator.RegisterClass 386class ReverberationTestDataGenerator(TestDataGenerator): 387 """Generator that adds reverberation noise. 388 389 TODO(alessiob): Make this class more generic since the impulse response can be 390 anything (not just reverberation); call it e.g., 391 ConvolutionalNoiseTestDataGenerator. 392 """ 393 394 NAME = 'reverberation' 395 396 _IMPULSE_RESPONSES = { 397 'lecture': 'air_binaural_lecture_0_0_1.mat', # Long echo. 398 'booth': 'air_binaural_booth_0_0_1.mat', # Short echo. 399 } 400 _MAX_IMPULSE_RESPONSE_LENGTH = None 401 402 # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs. 403 # The reference (second value of each pair) always has a lower amount of noise 404 # - i.e., the SNR is 5 dB higher. 405 _SNR_VALUE_PAIRS = [ 406 [3, 8], # Smallest noise. 407 [-3, 2], # Largest noise. 408 ] 409 410 _NOISE_TRACK_FILENAME_TEMPLATE = '{0}.wav' 411 _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav' 412 413 def __init__(self, output_directory_prefix, aechen_ir_database_path): 414 TestDataGenerator.__init__(self, output_directory_prefix) 415 self._aechen_ir_database_path = aechen_ir_database_path 416 417 def _Generate( 418 self, input_signal_filepath, test_data_cache_path, base_output_path): 419 """Generates test data pairs using reverberation noise. 420 421 For each impulse response, one noise track is created. For each impulse 422 response and pair of SNR values, the following 2 audio tracks are 423 created: the noisy signal and the reference signal. The former is 424 obtained by mixing the (clean) input signal to the corresponding noise 425 track enforcing the target SNR. 426 """ 427 # Init. 428 snr_values = set([snr for pair in self._SNR_VALUE_PAIRS for snr in pair]) 429 430 # Load the input signal. 431 input_signal = signal_processing.SignalProcessingUtils.LoadWav( 432 input_signal_filepath) 433 434 noisy_mix_filepaths = {} 435 for impulse_response_name in self._IMPULSE_RESPONSES: 436 noise_track_filename = self._NOISE_TRACK_FILENAME_TEMPLATE.format( 437 impulse_response_name) 438 noise_track_filepath = os.path.join( 439 test_data_cache_path, noise_track_filename) 440 noise_signal = None 441 try: 442 # Load noise track. 443 noise_signal = signal_processing.SignalProcessingUtils.LoadWav( 444 noise_track_filepath) 445 except exceptions.FileNotFoundError: 446 # Generate noise track by applying the impulse response. 447 impulse_response_filepath = os.path.join( 448 self._aechen_ir_database_path, 449 self._IMPULSE_RESPONSES[impulse_response_name]) 450 noise_signal = self._GenerateNoiseTrack( 451 noise_track_filepath, input_signal, impulse_response_filepath) 452 assert noise_signal is not None 453 454 # Create the noisy mixes (once for each unique SNR value). 455 noisy_mix_filepaths[impulse_response_name] = {} 456 for snr in snr_values: 457 noisy_signal_filepath = os.path.join( 458 test_data_cache_path, 459 self._NOISY_SIGNAL_FILENAME_TEMPLATE.format( 460 impulse_response_name, snr)) 461 462 # Create and save if not done. 463 if not os.path.exists(noisy_signal_filepath): 464 # Create noisy signal. 465 noisy_signal = signal_processing.SignalProcessingUtils.MixSignals( 466 input_signal, noise_signal, snr) 467 468 # Save. 469 signal_processing.SignalProcessingUtils.SaveWav( 470 noisy_signal_filepath, noisy_signal) 471 472 # Add file to the collection of mixes. 473 noisy_mix_filepaths[impulse_response_name][snr] = noisy_signal_filepath 474 475 # Add all the noise-SNR pairs. 476 self._AddNoiseSnrPairs(base_output_path, noisy_mix_filepaths, 477 self._SNR_VALUE_PAIRS) 478 479 def _GenerateNoiseTrack(self, noise_track_filepath, input_signal, 480 impulse_response_filepath): 481 """Generates noise track. 482 483 Generate a signal by convolving input_signal with the impulse response in 484 impulse_response_filepath; then save to noise_track_filepath. 485 486 Args: 487 noise_track_filepath: output file path for the noise track. 488 input_signal: (clean) input signal samples. 489 impulse_response_filepath: impulse response file path. 490 491 Returns: 492 AudioSegment instance. 493 """ 494 # Load impulse response. 495 data = scipy.io.loadmat(impulse_response_filepath) 496 impulse_response = data['h_air'].flatten() 497 if self._MAX_IMPULSE_RESPONSE_LENGTH is not None: 498 logging.info('truncating impulse response from %d to %d samples', 499 len(impulse_response), self._MAX_IMPULSE_RESPONSE_LENGTH) 500 impulse_response = impulse_response[:self._MAX_IMPULSE_RESPONSE_LENGTH] 501 502 # Apply impulse response. 503 processed_signal = ( 504 signal_processing.SignalProcessingUtils.ApplyImpulseResponse( 505 input_signal, impulse_response)) 506 507 # Save. 508 signal_processing.SignalProcessingUtils.SaveWav( 509 noise_track_filepath, processed_signal) 510 511 return processed_signal 512