1# Copyright 2016 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import random 8import re 9import subprocess 10import time 11 12from autotest_lib.client.bin import test 13from autotest_lib.client.common_lib import error 14from autotest_lib.client.cros import upstart 15from autotest_lib.client.cros.audio import audio_helper 16 17_STREAM_TYPE_INPUT = 0 18_STREAM_TYPE_OUTPUT = 1 19 20class audio_CrasStress(test.test): 21 """Checks if output buffer will drift to super high level.""" 22 version = 2 23 _MAX_STREAMS = 3 24 _LOOP_COUNT = 300 25 _INPUT_BUFFER_LEVEL = '.*?READ_AUDIO.*?hw_level.*?(\d+).*?' 26 _OUTPUT_BUFFER_LEVEL = '.*?FILL_AUDIO.*?hw_level.*?(\d+).*?' 27 _CHECK_PERIOD_TIME_SECS = 1 # Check buffer level every second. 28 29 """ 30 We only run 1024 and 512 block size streams in this test. So buffer level 31 of input device should stay between 0 and 1024. Buffer level of output 32 device should between 1024 to 2048. Sometimes it will be a little more. 33 Therefore, we set input buffer criteria to 2 * 1024 and output buffer 34 criteria to 3 * 1024. 35 """ 36 _RATES = ['48000', '44100'] 37 _BLOCK_SIZES = ['512', '1024'] 38 _INPUT_BUFFER_DRIFT_CRITERIA = 2 * 1024 39 _OUTPUT_BUFFER_DRIFT_CRITERIA = 3 * 1024 40 41 def initialize(self): 42 """Initialize the test""" 43 upstart.stop_job('ui') 44 45 def _new_stream(self, stream_type): 46 """Runs new stream by cras_test_client.""" 47 if stream_type == _STREAM_TYPE_INPUT: 48 cmd = ['cras_test_client', '--capture_file', '/dev/null'] 49 else: 50 cmd = ['cras_test_client', '--playback_file', '/dev/zero'] 51 52 cmd += ['--rate', self._RATES[random.randint(0, 1)], 53 '--block_size', self._BLOCK_SIZES[random.randint(0, 1)]] 54 55 return subprocess.Popen(cmd) 56 57 def _check_buffer_level(self, stream_type): 58 59 buffer_level = self._get_buffer_level(stream_type) 60 61 if stream_type == _STREAM_TYPE_INPUT: 62 logging.debug("Max input buffer level: %d", buffer_level) 63 if buffer_level > self._INPUT_BUFFER_DRIFT_CRITERIA: 64 audio_helper.dump_audio_diagnostics( 65 os.path.join(self.resultsdir, "audio_diagnostics.txt")) 66 raise error.TestFail('Input buffer level %d drift too high' % 67 buffer_level) 68 69 if stream_type == _STREAM_TYPE_OUTPUT: 70 logging.debug("Max output buffer level: %d", buffer_level) 71 if buffer_level > self._OUTPUT_BUFFER_DRIFT_CRITERIA: 72 audio_helper.dump_audio_diagnostics( 73 os.path.join(self.resultsdir, "audio_diagnostics.txt")) 74 raise error.TestFail('Output buffer level %d drift too high' % 75 buffer_level) 76 77 def cleanup(self): 78 """Clean up all streams.""" 79 while len(self._streams) > 0: 80 self._streams[0].kill() 81 self._streams.remove(self._streams[0]) 82 upstart.restart_job('ui') 83 84 def run_once(self, input_stream=True, output_stream=True): 85 """ 86 Repeatedly add output streams of random configurations and 87 remove them to verify if output buffer level would drift. 88 89 @params input_stream: If true, run input stream in the test. 90 @params output_stream: If true, run output stream in the test. 91 """ 92 93 if not input_stream and not output_stream: 94 raise error.TestError('Not supported mode.') 95 96 self._streams = [] 97 98 loop_count = 0 99 past_time = time.time() 100 while loop_count < self._LOOP_COUNT: 101 102 # 1 for adding stream, 0 for removing stream. 103 add = random.randint(0, 1) 104 if not self._streams: 105 add = 1 106 elif len(self._streams) == self._MAX_STREAMS: 107 add = 0 108 109 if add == 1: 110 # 0 for input stream, 1 for output stream. 111 stream_type = random.randint(0, 1) 112 if not input_stream: 113 stream_type = _STREAM_TYPE_OUTPUT 114 elif not output_stream: 115 stream_type = _STREAM_TYPE_INPUT 116 117 self._streams.append(self._new_stream(stream_type)) 118 else: 119 self._streams[0].kill() 120 self._streams.remove(self._streams[0]) 121 time.sleep(0.1) 122 123 now = time.time() 124 125 # Check buffer level. 126 if now - past_time > self._CHECK_PERIOD_TIME_SECS: 127 past_time = now 128 if input_stream: 129 self._check_buffer_level(_STREAM_TYPE_INPUT) 130 if output_stream: 131 self._check_buffer_level(_STREAM_TYPE_OUTPUT) 132 133 loop_count += 1 134 135 def _get_buffer_level(self, stream_type): 136 """Gets a rough number about current buffer level. 137 138 @returns: The current buffer level. 139 140 """ 141 if stream_type == _STREAM_TYPE_INPUT: 142 match_str = self._INPUT_BUFFER_LEVEL 143 else: 144 match_str = self._OUTPUT_BUFFER_LEVEL 145 146 proc = subprocess.Popen(['cras_test_client', '--dump_a'], 147 stdout=subprocess.PIPE) 148 output, err = proc.communicate() 149 buffer_level = 0 150 for line in output.split('\n'): 151 search = re.match(match_str, line) 152 if search: 153 tmp = int(search.group(1)) 154 if tmp > buffer_level: 155 buffer_level = tmp 156 return buffer_level 157