1# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. 2# 3# Use of this source code is governed by a BSD-style license 4# that can be found in the LICENSE file in the root of the source 5# tree. An additional intellectual property rights grant can be found 6# in the file PATENTS. All contributing project authors may 7# be found in the AUTHORS file in the root of the source tree. 8"""Test data generators producing signals pairs intended to be used to 9test the APM module. Each pair consists of a noisy input and a reference signal. 10The former is used as APM input and it is generated by adding noise to a 11clean audio track. The reference is the expected APM output. 12 13Throughout this file, the following naming convention is used: 14 - input signal: the clean signal (e.g., speech), 15 - noise signal: the noise to be summed up to the input signal (e.g., white 16 noise, Gaussian noise), 17 - noisy signal: input + noise. 18The noise signal may or may not be a function of the clean signal. For 19instance, white noise is independently generated, whereas reverberation is 20obtained by convolving the input signal with an impulse response. 21""" 22 23import logging 24import os 25import shutil 26import sys 27 28try: 29 import scipy.io 30except ImportError: 31 logging.critical('Cannot import the third-party Python package scipy') 32 sys.exit(1) 33 34from . import data_access 35from . import exceptions 36from . import signal_processing 37 38 39class TestDataGenerator(object): 40 """Abstract class responsible for the generation of noisy signals. 41 42 Given a clean signal, it generates two streams named noisy signal and 43 reference. The former is the clean signal deteriorated by the noise source, 44 the latter goes through the same deterioration process, but more "gently". 45 Noisy signal and reference are produced so that the reference is the signal 46 expected at the output of the APM module when the latter is fed with the noisy 47 signal. 48 49 An test data generator generates one or more pairs. 50 """ 51 52 NAME = None 53 REGISTERED_CLASSES = {} 54 55 def __init__(self, output_directory_prefix): 56 self._output_directory_prefix = output_directory_prefix 57 # Init dictionaries with one entry for each test data generator 58 # configuration (e.g., different SNRs). 59 # Noisy audio track files (stored separately in a cache folder). 60 self._noisy_signal_filepaths = None 61 # Path to be used for the APM simulation output files. 62 self._apm_output_paths = None 63 # Reference audio track files (stored separately in a cache folder). 64 self._reference_signal_filepaths = None 65 self.Clear() 66 67 @classmethod 68 def RegisterClass(cls, class_to_register): 69 """Registers a TestDataGenerator implementation. 70 71 Decorator to automatically register the classes that extend 72 TestDataGenerator. 73 Example usage: 74 75 @TestDataGenerator.RegisterClass 76 class IdentityGenerator(TestDataGenerator): 77 pass 78 """ 79 cls.REGISTERED_CLASSES[class_to_register.NAME] = class_to_register 80 return class_to_register 81 82 @property 83 def config_names(self): 84 return self._noisy_signal_filepaths.keys() 85 86 @property 87 def noisy_signal_filepaths(self): 88 return self._noisy_signal_filepaths 89 90 @property 91 def apm_output_paths(self): 92 return self._apm_output_paths 93 94 @property 95 def reference_signal_filepaths(self): 96 return self._reference_signal_filepaths 97 98 def Generate(self, input_signal_filepath, test_data_cache_path, 99 base_output_path): 100 """Generates a set of noisy input and reference audiotrack file pairs. 101 102 This method initializes an empty set of pairs and calls the _Generate() 103 method implemented in a concrete class. 104 105 Args: 106 input_signal_filepath: path to the clean input audio track file. 107 test_data_cache_path: path to the cache of the generated audio track 108 files. 109 base_output_path: base path where output is written. 110 """ 111 self.Clear() 112 self._Generate(input_signal_filepath, test_data_cache_path, 113 base_output_path) 114 115 def Clear(self): 116 """Clears the generated output path dictionaries. 117 """ 118 self._noisy_signal_filepaths = {} 119 self._apm_output_paths = {} 120 self._reference_signal_filepaths = {} 121 122 def _Generate(self, input_signal_filepath, test_data_cache_path, 123 base_output_path): 124 """Abstract method to be implemented in each concrete class. 125 """ 126 raise NotImplementedError() 127 128 def _AddNoiseSnrPairs(self, base_output_path, noisy_mix_filepaths, 129 snr_value_pairs): 130 """Adds noisy-reference signal pairs. 131 132 Args: 133 base_output_path: noisy tracks base output path. 134 noisy_mix_filepaths: nested dictionary of noisy signal paths organized 135 by noisy track name and SNR level. 136 snr_value_pairs: list of SNR pairs. 137 """ 138 for noise_track_name in noisy_mix_filepaths: 139 for snr_noisy, snr_refence in snr_value_pairs: 140 config_name = '{0}_{1:d}_{2:d}_SNR'.format( 141 noise_track_name, snr_noisy, snr_refence) 142 output_path = self._MakeDir(base_output_path, config_name) 143 self._AddNoiseReferenceFilesPair( 144 config_name=config_name, 145 noisy_signal_filepath=noisy_mix_filepaths[noise_track_name] 146 [snr_noisy], 147 reference_signal_filepath=noisy_mix_filepaths[ 148 noise_track_name][snr_refence], 149 output_path=output_path) 150 151 def _AddNoiseReferenceFilesPair(self, config_name, noisy_signal_filepath, 152 reference_signal_filepath, output_path): 153 """Adds one noisy-reference signal pair. 154 155 Args: 156 config_name: name of the APM configuration. 157 noisy_signal_filepath: path to noisy audio track file. 158 reference_signal_filepath: path to reference audio track file. 159 output_path: APM output path. 160 """ 161 assert config_name not in self._noisy_signal_filepaths 162 self._noisy_signal_filepaths[config_name] = os.path.abspath( 163 noisy_signal_filepath) 164 self._apm_output_paths[config_name] = os.path.abspath(output_path) 165 self._reference_signal_filepaths[config_name] = os.path.abspath( 166 reference_signal_filepath) 167 168 def _MakeDir(self, base_output_path, test_data_generator_config_name): 169 output_path = os.path.join( 170 base_output_path, 171 self._output_directory_prefix + test_data_generator_config_name) 172 data_access.MakeDirectory(output_path) 173 return output_path 174 175 176@TestDataGenerator.RegisterClass 177class IdentityTestDataGenerator(TestDataGenerator): 178 """Generator that adds no noise. 179 180 Both the noisy and the reference signals are the input signal. 181 """ 182 183 NAME = 'identity' 184 185 def __init__(self, output_directory_prefix, copy_with_identity): 186 TestDataGenerator.__init__(self, output_directory_prefix) 187 self._copy_with_identity = copy_with_identity 188 189 @property 190 def copy_with_identity(self): 191 return self._copy_with_identity 192 193 def _Generate(self, input_signal_filepath, test_data_cache_path, 194 base_output_path): 195 config_name = 'default' 196 output_path = self._MakeDir(base_output_path, config_name) 197 198 if self._copy_with_identity: 199 input_signal_filepath_new = os.path.join( 200 test_data_cache_path, 201 os.path.split(input_signal_filepath)[1]) 202 logging.info('copying ' + input_signal_filepath + ' to ' + 203 (input_signal_filepath_new)) 204 shutil.copy(input_signal_filepath, input_signal_filepath_new) 205 input_signal_filepath = input_signal_filepath_new 206 207 self._AddNoiseReferenceFilesPair( 208 config_name=config_name, 209 noisy_signal_filepath=input_signal_filepath, 210 reference_signal_filepath=input_signal_filepath, 211 output_path=output_path) 212 213 214@TestDataGenerator.RegisterClass 215class WhiteNoiseTestDataGenerator(TestDataGenerator): 216 """Generator that adds white noise. 217 """ 218 219 NAME = 'white_noise' 220 221 # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs. 222 # The reference (second value of each pair) always has a lower amount of noise 223 # - i.e., the SNR is 10 dB higher. 224 _SNR_VALUE_PAIRS = [ 225 [20, 30], # Smallest noise. 226 [10, 20], 227 [5, 15], 228 [0, 10], # Largest noise. 229 ] 230 231 _NOISY_SIGNAL_FILENAME_TEMPLATE = 'noise_{0:d}_SNR.wav' 232 233 def __init__(self, output_directory_prefix): 234 TestDataGenerator.__init__(self, output_directory_prefix) 235 236 def _Generate(self, input_signal_filepath, test_data_cache_path, 237 base_output_path): 238 # Load the input signal. 239 input_signal = signal_processing.SignalProcessingUtils.LoadWav( 240 input_signal_filepath) 241 242 # Create the noise track. 243 noise_signal = signal_processing.SignalProcessingUtils.GenerateWhiteNoise( 244 input_signal) 245 246 # Create the noisy mixes (once for each unique SNR value). 247 noisy_mix_filepaths = {} 248 snr_values = set( 249 [snr for pair in self._SNR_VALUE_PAIRS for snr in pair]) 250 for snr in snr_values: 251 noisy_signal_filepath = os.path.join( 252 test_data_cache_path, 253 self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(snr)) 254 255 # Create and save if not done. 256 if not os.path.exists(noisy_signal_filepath): 257 # Create noisy signal. 258 noisy_signal = signal_processing.SignalProcessingUtils.MixSignals( 259 input_signal, noise_signal, snr) 260 261 # Save. 262 signal_processing.SignalProcessingUtils.SaveWav( 263 noisy_signal_filepath, noisy_signal) 264 265 # Add file to the collection of mixes. 266 noisy_mix_filepaths[snr] = noisy_signal_filepath 267 268 # Add all the noisy-reference signal pairs. 269 for snr_noisy, snr_refence in self._SNR_VALUE_PAIRS: 270 config_name = '{0:d}_{1:d}_SNR'.format(snr_noisy, snr_refence) 271 output_path = self._MakeDir(base_output_path, config_name) 272 self._AddNoiseReferenceFilesPair( 273 config_name=config_name, 274 noisy_signal_filepath=noisy_mix_filepaths[snr_noisy], 275 reference_signal_filepath=noisy_mix_filepaths[snr_refence], 276 output_path=output_path) 277 278 279# TODO(alessiob): remove comment when class implemented. 280# @TestDataGenerator.RegisterClass 281class NarrowBandNoiseTestDataGenerator(TestDataGenerator): 282 """Generator that adds narrow-band noise. 283 """ 284 285 NAME = 'narrow_band_noise' 286 287 def __init__(self, output_directory_prefix): 288 TestDataGenerator.__init__(self, output_directory_prefix) 289 290 def _Generate(self, input_signal_filepath, test_data_cache_path, 291 base_output_path): 292 # TODO(alessiob): implement. 293 pass 294 295 296@TestDataGenerator.RegisterClass 297class AdditiveNoiseTestDataGenerator(TestDataGenerator): 298 """Generator that adds noise loops. 299 300 This generator uses all the wav files in a given path (default: noise_tracks/) 301 and mixes them to the clean speech with different target SNRs (hard-coded). 302 """ 303 304 NAME = 'additive_noise' 305 _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav' 306 307 DEFAULT_NOISE_TRACKS_PATH = os.path.join(os.path.dirname(__file__), 308 os.pardir, 'noise_tracks') 309 310 # TODO(alessiob): Make the list of SNR pairs customizable. 311 # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs. 312 # The reference (second value of each pair) always has a lower amount of noise 313 # - i.e., the SNR is 10 dB higher. 314 _SNR_VALUE_PAIRS = [ 315 [20, 30], # Smallest noise. 316 [10, 20], 317 [5, 15], 318 [0, 10], # Largest noise. 319 ] 320 321 def __init__(self, output_directory_prefix, noise_tracks_path): 322 TestDataGenerator.__init__(self, output_directory_prefix) 323 self._noise_tracks_path = noise_tracks_path 324 self._noise_tracks_file_names = [ 325 n for n in os.listdir(self._noise_tracks_path) 326 if n.lower().endswith('.wav') 327 ] 328 if len(self._noise_tracks_file_names) == 0: 329 raise exceptions.InitializationException( 330 'No wav files found in the noise tracks path %s' % 331 (self._noise_tracks_path)) 332 333 def _Generate(self, input_signal_filepath, test_data_cache_path, 334 base_output_path): 335 """Generates test data pairs using environmental noise. 336 337 For each noise track and pair of SNR values, the following two audio tracks 338 are created: the noisy signal and the reference signal. The former is 339 obtained by mixing the (clean) input signal to the corresponding noise 340 track enforcing the target SNR. 341 """ 342 # Init. 343 snr_values = set( 344 [snr for pair in self._SNR_VALUE_PAIRS for snr in pair]) 345 346 # Load the input signal. 347 input_signal = signal_processing.SignalProcessingUtils.LoadWav( 348 input_signal_filepath) 349 350 noisy_mix_filepaths = {} 351 for noise_track_filename in self._noise_tracks_file_names: 352 # Load the noise track. 353 noise_track_name, _ = os.path.splitext(noise_track_filename) 354 noise_track_filepath = os.path.join(self._noise_tracks_path, 355 noise_track_filename) 356 if not os.path.exists(noise_track_filepath): 357 logging.error('cannot find the <%s> noise track', 358 noise_track_filename) 359 raise exceptions.FileNotFoundError() 360 361 noise_signal = signal_processing.SignalProcessingUtils.LoadWav( 362 noise_track_filepath) 363 364 # Create the noisy mixes (once for each unique SNR value). 365 noisy_mix_filepaths[noise_track_name] = {} 366 for snr in snr_values: 367 noisy_signal_filepath = os.path.join( 368 test_data_cache_path, 369 self._NOISY_SIGNAL_FILENAME_TEMPLATE.format( 370 noise_track_name, snr)) 371 372 # Create and save if not done. 373 if not os.path.exists(noisy_signal_filepath): 374 # Create noisy signal. 375 noisy_signal = signal_processing.SignalProcessingUtils.MixSignals( 376 input_signal, 377 noise_signal, 378 snr, 379 pad_noise=signal_processing.SignalProcessingUtils. 380 MixPadding.LOOP) 381 382 # Save. 383 signal_processing.SignalProcessingUtils.SaveWav( 384 noisy_signal_filepath, noisy_signal) 385 386 # Add file to the collection of mixes. 387 noisy_mix_filepaths[noise_track_name][ 388 snr] = noisy_signal_filepath 389 390 # Add all the noise-SNR pairs. 391 self._AddNoiseSnrPairs(base_output_path, noisy_mix_filepaths, 392 self._SNR_VALUE_PAIRS) 393 394 395@TestDataGenerator.RegisterClass 396class ReverberationTestDataGenerator(TestDataGenerator): 397 """Generator that adds reverberation noise. 398 399 TODO(alessiob): Make this class more generic since the impulse response can be 400 anything (not just reverberation); call it e.g., 401 ConvolutionalNoiseTestDataGenerator. 402 """ 403 404 NAME = 'reverberation' 405 406 _IMPULSE_RESPONSES = { 407 'lecture': 'air_binaural_lecture_0_0_1.mat', # Long echo. 408 'booth': 'air_binaural_booth_0_0_1.mat', # Short echo. 409 } 410 _MAX_IMPULSE_RESPONSE_LENGTH = None 411 412 # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs. 413 # The reference (second value of each pair) always has a lower amount of noise 414 # - i.e., the SNR is 5 dB higher. 415 _SNR_VALUE_PAIRS = [ 416 [3, 8], # Smallest noise. 417 [-3, 2], # Largest noise. 418 ] 419 420 _NOISE_TRACK_FILENAME_TEMPLATE = '{0}.wav' 421 _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav' 422 423 def __init__(self, output_directory_prefix, aechen_ir_database_path): 424 TestDataGenerator.__init__(self, output_directory_prefix) 425 self._aechen_ir_database_path = aechen_ir_database_path 426 427 def _Generate(self, input_signal_filepath, test_data_cache_path, 428 base_output_path): 429 """Generates test data pairs using reverberation noise. 430 431 For each impulse response, one noise track is created. For each impulse 432 response and pair of SNR values, the following 2 audio tracks are 433 created: the noisy signal and the reference signal. The former is 434 obtained by mixing the (clean) input signal to the corresponding noise 435 track enforcing the target SNR. 436 """ 437 # Init. 438 snr_values = set( 439 [snr for pair in self._SNR_VALUE_PAIRS for snr in pair]) 440 441 # Load the input signal. 442 input_signal = signal_processing.SignalProcessingUtils.LoadWav( 443 input_signal_filepath) 444 445 noisy_mix_filepaths = {} 446 for impulse_response_name in self._IMPULSE_RESPONSES: 447 noise_track_filename = self._NOISE_TRACK_FILENAME_TEMPLATE.format( 448 impulse_response_name) 449 noise_track_filepath = os.path.join(test_data_cache_path, 450 noise_track_filename) 451 noise_signal = None 452 try: 453 # Load noise track. 454 noise_signal = signal_processing.SignalProcessingUtils.LoadWav( 455 noise_track_filepath) 456 except exceptions.FileNotFoundError: 457 # Generate noise track by applying the impulse response. 458 impulse_response_filepath = os.path.join( 459 self._aechen_ir_database_path, 460 self._IMPULSE_RESPONSES[impulse_response_name]) 461 noise_signal = self._GenerateNoiseTrack( 462 noise_track_filepath, input_signal, 463 impulse_response_filepath) 464 assert noise_signal is not None 465 466 # Create the noisy mixes (once for each unique SNR value). 467 noisy_mix_filepaths[impulse_response_name] = {} 468 for snr in snr_values: 469 noisy_signal_filepath = os.path.join( 470 test_data_cache_path, 471 self._NOISY_SIGNAL_FILENAME_TEMPLATE.format( 472 impulse_response_name, snr)) 473 474 # Create and save if not done. 475 if not os.path.exists(noisy_signal_filepath): 476 # Create noisy signal. 477 noisy_signal = signal_processing.SignalProcessingUtils.MixSignals( 478 input_signal, noise_signal, snr) 479 480 # Save. 481 signal_processing.SignalProcessingUtils.SaveWav( 482 noisy_signal_filepath, noisy_signal) 483 484 # Add file to the collection of mixes. 485 noisy_mix_filepaths[impulse_response_name][ 486 snr] = noisy_signal_filepath 487 488 # Add all the noise-SNR pairs. 489 self._AddNoiseSnrPairs(base_output_path, noisy_mix_filepaths, 490 self._SNR_VALUE_PAIRS) 491 492 def _GenerateNoiseTrack(self, noise_track_filepath, input_signal, 493 impulse_response_filepath): 494 """Generates noise track. 495 496 Generate a signal by convolving input_signal with the impulse response in 497 impulse_response_filepath; then save to noise_track_filepath. 498 499 Args: 500 noise_track_filepath: output file path for the noise track. 501 input_signal: (clean) input signal samples. 502 impulse_response_filepath: impulse response file path. 503 504 Returns: 505 AudioSegment instance. 506 """ 507 # Load impulse response. 508 data = scipy.io.loadmat(impulse_response_filepath) 509 impulse_response = data['h_air'].flatten() 510 if self._MAX_IMPULSE_RESPONSE_LENGTH is not None: 511 logging.info('truncating impulse response from %d to %d samples', 512 len(impulse_response), 513 self._MAX_IMPULSE_RESPONSE_LENGTH) 514 impulse_response = impulse_response[:self. 515 _MAX_IMPULSE_RESPONSE_LENGTH] 516 517 # Apply impulse response. 518 processed_signal = ( 519 signal_processing.SignalProcessingUtils.ApplyImpulseResponse( 520 input_signal, impulse_response)) 521 522 # Save. 523 signal_processing.SignalProcessingUtils.SaveWav( 524 noise_track_filepath, processed_signal) 525 526 return processed_signal 527