1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import argparse 18import json 19import logging 20import os 21import pyaudio 22import wave 23 24RECORD_FILE_TEMPLATE = 'recorded_audio_%s.wav' 25 26 27class DeviceNotFound(Exception): 28 """Raises exception if audio capture device is not found.""" 29 30 31class AudioCapture: 32 33 def __init__(self, test_params, path): 34 """Creates object to pyaudio and defines audio parameters. 35 36 Args: 37 test_params: Audio parameters fetched from config. 38 path: Result path. 39 """ 40 self.audio = pyaudio.PyAudio() 41 self.audio_format = pyaudio.paInt16 42 self.audio_params = test_params 43 self.channels = self.audio_params["channel"] 44 self.chunk = self.audio_params["chunk"] 45 self.sample_rate = self.audio_params["sample_rate"] 46 self.file_counter = 0 47 self.__input_device = None 48 self.record_file_template = os.path.join(path, RECORD_FILE_TEMPLATE) 49 if not self.audio_params["ssh_config"]: 50 self.__input_device = self.__get_input_device() 51 52 @property 53 def name(self): 54 try: 55 return self.audio_params["ssh_config"]["host"] 56 except KeyError: 57 return self.__input_device["name"] 58 59 def __get_input_device(self): 60 """Checks for the audio capture device.""" 61 if self.__input_device is None: 62 for i in range(self.audio.get_device_count()): 63 device_info = self.audio.get_device_info_by_index(i) 64 if self.audio_params['input_device'] in device_info['name']: 65 self.__input_device = device_info 66 break 67 else: 68 logging.error("Audio Capture device {} not found.".format( 69 self.audio_params["input_device"])) 70 raise DeviceNotFound("Audio Capture Input device not found") 71 return self.__input_device 72 73 def capture_and_store_audio(self, trim_beginning=0, trim_end=0): 74 """Records the A2DP streaming. 75 76 Args: 77 trim_beginning: how many seconds to trim from the beginning 78 trim_end: how many seconds to trim from the end 79 """ 80 if self.audio_params['ssh_config']: 81 self.__input_device = self.__get_input_device() 82 stream = self.audio.open( 83 format=self.audio_format, 84 channels=self.channels, 85 rate=self.sample_rate, 86 input=True, 87 frames_per_buffer=self.chunk, 88 input_device_index=self.__input_device['index']) 89 frames = [] 90 b_chunks = trim_beginning * (self.sample_rate // self.chunk) 91 e_chunks = trim_end * (self.sample_rate // self.chunk) 92 total_chunks = self.sample_rate // self.chunk * self.audio_params[ 93 'record_duration'] 94 for i in range(total_chunks): 95 try: 96 data = stream.read(self.chunk) 97 except IOError as ex: 98 logging.error("Cannot record audio :{}".format(ex)) 99 return False 100 if b_chunks <= i < total_chunks - e_chunks: 101 frames.append(data) 102 103 stream.stop_stream() 104 stream.close() 105 status = self.write_record_file(frames) 106 return status 107 108 def last_fileno(self): 109 return self.next_fileno() - 1 110 111 def next_fileno(self): 112 counter = 0 113 while os.path.exists(self.record_file_template % counter): 114 counter += 1 115 return counter 116 117 def write_record_file(self, frames): 118 """Writes the recorded audio into the file. 119 120 Args: 121 frames: Recorded audio frames. 122 """ 123 file_name = self.record_file_template % self.next_fileno() 124 logging.info('writing to %s' % file_name) 125 wf = wave.open(file_name, 'wb') 126 wf.setnchannels(self.channels) 127 wf.setsampwidth(self.audio.get_sample_size(self.audio_format)) 128 wf.setframerate(self.sample_rate) 129 wf.writeframes(b''.join(frames)) 130 wf.close() 131 return True 132 133 def terminate_audio(self): 134 """Terminates the pulse audio instance.""" 135 self.audio.terminate() 136 137 138if __name__ == '__main__': 139 parser = argparse.ArgumentParser() 140 parser.add_argument( 141 '-p', 142 '--path', 143 type=str, 144 help="Contains path where the recorded files to be stored") 145 parser.add_argument( 146 '-t', 147 '--test_params', 148 type=json.loads, 149 help="Contains sample rate, channels," 150 " chunk and device index for recording.") 151 args = parser.parse_args() 152 audio = AudioCapture(args.test_params, args.path) 153 audio.capture_and_store_audio(args.test_params['trim_beginning'], 154 args.test_params['trim_end']) 155 audio.terminate_audio() 156