• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import random
8import re
9import subprocess
10import time
11
12from autotest_lib.client.bin import test
13from autotest_lib.client.common_lib import error
14from autotest_lib.client.cros import upstart
15from autotest_lib.client.cros.audio import audio_helper
16
17_STREAM_TYPE_INPUT = 0
18_STREAM_TYPE_OUTPUT = 1
19
20class audio_CrasStress(test.test):
21    """Checks if output buffer will drift to super high level."""
22    version = 2
23    _MAX_STREAMS = 3
24    _LOOP_COUNT = 300
25    _INPUT_BUFFER_LEVEL = '.*?READ_AUDIO.*?hw_level.*?(\d+).*?'
26    _OUTPUT_BUFFER_LEVEL = '.*?FILL_AUDIO.*?hw_level.*?(\d+).*?'
27    _CHECK_PERIOD_TIME_SECS = 1 # Check buffer level every second.
28
29    """
30    We only run 1024 and 512 block size streams in this test. So buffer level
31    of input device should stay between 0 and 1024. Buffer level of output
32    device should between 1024 to 2048. Sometimes it will be a little more.
33    Therefore, we set input buffer criteria to 2 * 1024 and output buffer
34    criteria to 3 * 1024.
35    """
36    _RATES = ['48000', '44100']
37    _BLOCK_SIZES = ['512', '1024']
38    _INPUT_BUFFER_DRIFT_CRITERIA = 2 * 1024
39    _OUTPUT_BUFFER_DRIFT_CRITERIA = 3 * 1024
40
41    def initialize(self):
42        """Initialize the test"""
43        upstart.stop_job('ui')
44
45    def _new_stream(self, stream_type):
46        """Runs new stream by cras_test_client."""
47        if stream_type == _STREAM_TYPE_INPUT:
48            cmd = ['cras_test_client', '--capture_file', '/dev/null']
49        else:
50            cmd = ['cras_test_client', '--playback_file', '/dev/zero']
51
52        cmd += ['--rate', self._RATES[random.randint(0, 1)],
53                '--block_size', self._BLOCK_SIZES[random.randint(0, 1)]]
54
55        return subprocess.Popen(cmd)
56
57    def _check_buffer_level(self, stream_type):
58
59        buffer_level = self._get_buffer_level(stream_type)
60
61        if stream_type == _STREAM_TYPE_INPUT:
62            logging.debug("Max input buffer level: %d", buffer_level)
63            if buffer_level > self._INPUT_BUFFER_DRIFT_CRITERIA:
64                audio_helper.dump_audio_diagnostics(
65                        os.path.join(self.resultsdir, "audio_diagnostics.txt"))
66                raise error.TestFail('Input buffer level %d drift too high' %
67                                     buffer_level)
68
69        if stream_type == _STREAM_TYPE_OUTPUT:
70            logging.debug("Max output buffer level: %d", buffer_level)
71            if buffer_level > self._OUTPUT_BUFFER_DRIFT_CRITERIA:
72                audio_helper.dump_audio_diagnostics(
73                        os.path.join(self.resultsdir, "audio_diagnostics.txt"))
74                raise error.TestFail('Output buffer level %d drift too high' %
75                                     buffer_level)
76
77    def cleanup(self):
78        """Clean up all streams."""
79        while len(self._streams) > 0:
80            self._streams[0].kill()
81            self._streams.remove(self._streams[0])
82        upstart.restart_job('ui')
83
84    def run_once(self, input_stream=True, output_stream=True):
85        """
86        Repeatedly add output streams of random configurations and
87        remove them to verify if output buffer level would drift.
88
89        @params input_stream: If true, run input stream in the test.
90        @params output_stream: If true, run output stream in the test.
91        """
92
93        if not input_stream and not output_stream:
94            raise error.TestError('Not supported mode.')
95
96        self._streams = []
97
98        loop_count = 0
99        past_time = time.time()
100        while loop_count < self._LOOP_COUNT:
101
102            # 1 for adding stream, 0 for removing stream.
103            add = random.randint(0, 1)
104            if not self._streams:
105                add = 1
106            elif len(self._streams) == self._MAX_STREAMS:
107                add = 0
108
109            if add == 1:
110                # 0 for input stream, 1 for output stream.
111                stream_type = random.randint(0, 1)
112                if not input_stream:
113                    stream_type = _STREAM_TYPE_OUTPUT
114                elif not output_stream:
115                    stream_type = _STREAM_TYPE_INPUT
116
117                self._streams.append(self._new_stream(stream_type))
118            else:
119                self._streams[0].kill()
120                self._streams.remove(self._streams[0])
121                time.sleep(0.1)
122
123            now = time.time()
124
125            # Check buffer level.
126            if now - past_time > self._CHECK_PERIOD_TIME_SECS:
127                past_time = now
128                if input_stream:
129                    self._check_buffer_level(_STREAM_TYPE_INPUT)
130                if output_stream:
131                    self._check_buffer_level(_STREAM_TYPE_OUTPUT)
132
133            loop_count += 1
134
135    def _get_buffer_level(self, stream_type):
136        """Gets a rough number about current buffer level.
137
138        @returns: The current buffer level.
139
140        """
141        if stream_type == _STREAM_TYPE_INPUT:
142            match_str = self._INPUT_BUFFER_LEVEL
143        else:
144            match_str = self._OUTPUT_BUFFER_LEVEL
145
146        proc = subprocess.Popen(['cras_test_client', '--dump_a'],
147                                stdout=subprocess.PIPE)
148        output, err = proc.communicate()
149        buffer_level = 0
150        for line in output.split('\n'):
151            search = re.match(match_str, line)
152            if search:
153                tmp = int(search.group(1))
154                if tmp > buffer_level:
155                    buffer_level = tmp
156        return buffer_level
157