• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Server side Bluetooth audio tests."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12import logging
13import os
14import re
15import subprocess
16import time
17
18import common
19from autotest_lib.client.bin import utils
20from autotest_lib.client.common_lib import error
21from autotest_lib.client.cros.bluetooth.bluetooth_audio_test_data import (
22        A2DP, HFP_NBS, HFP_WBS, AUDIO_DATA_TARBALL_PATH, VISQOL_BUFFER_LENGTH,
23        DATA_DIR, VISQOL_PATH, VISQOL_SIMILARITY_MODEL, VISQOL_TEST_DIR,
24        AUDIO_RECORD_DIR, audio_test_data, get_audio_test_data,
25        get_visqol_binary)
26from autotest_lib.server.cros.bluetooth.bluetooth_adapter_tests import (
27    BluetoothAdapterTests, test_retry_and_log)
28from six.moves import range
29
30
31class BluetoothAdapterAudioTests(BluetoothAdapterTests):
32    """Server side Bluetooth adapter audio test class."""
33
34    DEVICE_TYPE = 'BLUETOOTH_AUDIO'
35    FREQUENCY_TOLERANCE_RATIO = 0.01
36    WAIT_DAEMONS_READY_SECS = 1
37    DEFAULT_CHUNK_IN_SECS = 1
38    IGNORE_LAST_FEW_CHUNKS = 2
39
40    # Useful constant for upsampling NBS files for compatibility with ViSQOL
41    MIN_VISQOL_SAMPLE_RATE = 16000
42
43    # The node types of the bluetooth output nodes in cras are the same for both
44    # A2DP and HFP.
45    CRAS_BLUETOOTH_OUTPUT_NODE_TYPE = 'BLUETOOTH'
46    # The node types of the bluetooth input nodes in cras are different for WBS
47    # and NBS.
48    CRAS_HFP_BLUETOOTH_INPUT_NODE_TYPE = {HFP_WBS: 'BLUETOOTH',
49                                          HFP_NBS: 'BLUETOOTH_NB_MIC'}
50
51    def _get_pulseaudio_bluez_source(self, get_source_method, device,
52                                     test_profile):
53        """Get the specified bluez device number in the pulseaudio source list.
54
55        @param get_source_method: the method to get distinct bluez source
56        @param device: the bluetooth peer device
57        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
58
59        @returns: True if the specified bluez source is derived
60        """
61        sources = device.ListSources(test_profile)
62        logging.debug('ListSources()\n%s', sources)
63        self.bluez_source = get_source_method(test_profile)
64        result = bool(self.bluez_source)
65        if result:
66            logging.debug('bluez_source device number: %s', self.bluez_source)
67        else:
68            logging.debug('waiting for bluez_source ready in pulseaudio...')
69        return result
70
71
72    def _get_pulseaudio_bluez_sink(self, get_sink_method, device, test_profile):
73        """Get the specified bluez device number in the pulseaudio sink list.
74
75        @param get_sink_method: the method to get distinct bluez sink
76        @param device: the bluetooth peer device
77        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
78
79        @returns: True if the specified bluez sink is derived
80        """
81        sinks = device.ListSinks(test_profile)
82        logging.debug('ListSinks()\n%s', sinks)
83        self.bluez_sink = get_sink_method(test_profile)
84        result = bool(self.bluez_sink)
85        if result:
86            logging.debug('bluez_sink device number: %s', self.bluez_sink)
87        else:
88            logging.debug('waiting for bluez_sink ready in pulseaudio...')
89        return result
90
91
92    def _get_pulseaudio_bluez_source_a2dp(self, device, test_profile):
93        """Get the a2dp bluez source device number.
94
95        @param device: the bluetooth peer device
96        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
97
98        @returns: True if the specified a2dp bluez source is derived
99        """
100        return self._get_pulseaudio_bluez_source(
101                device.GetBluezSourceA2DPDevice, device, test_profile)
102
103
104    def _get_pulseaudio_bluez_source_hfp(self, device, test_profile):
105        """Get the hfp bluez source device number.
106
107        @param device: the bluetooth peer device
108        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
109
110        @returns: True if the specified hfp bluez source is derived
111        """
112        return self._get_pulseaudio_bluez_source(
113                device.GetBluezSourceHFPDevice, device, test_profile)
114
115
116    def _get_pulseaudio_bluez_sink_hfp(self, device, test_profile):
117        """Get the hfp bluez sink device number.
118
119        @param device: the bluetooth peer device
120        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
121
122        @returns: True if the specified hfp bluez sink is derived
123        """
124        return self._get_pulseaudio_bluez_sink(
125                device.GetBluezSinkHFPDevice, device, test_profile)
126
127
128    def _check_audio_frames_legitimacy(self, audio_test_data, recording_device,
129                                       recorded_file=None):
130        """Check if audio frames in the recorded file are legitimate.
131
132        For a wav file, a simple check is to make sure the recorded audio file
133        is not empty.
134
135        For a raw file, a simple check is to make sure the recorded audio file
136        are not all zeros.
137
138        @param audio_test_data: a dictionary about the audio test data
139                defined in client/cros/bluetooth/bluetooth_audio_test_data.py
140        @param recording_device: which device recorded the audio,
141                possible values are 'recorded_by_dut' or 'recorded_by_peer'
142        @param recorded_file: the recorded file name
143
144        @returns: True if audio frames are legitimate.
145        """
146        result = self.bluetooth_facade.check_audio_frames_legitimacy(
147                audio_test_data, recording_device, recorded_file)
148        if not result:
149            self.results = {'audio_frames_legitimacy': 'empty or all zeros'}
150            logging.error('The recorded audio file is empty or all zeros.')
151        return result
152
153
154    def _check_frequency(self, test_profile, recorded_freq, expected_freq):
155        """Check if the recorded frequency is within tolerance.
156
157        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
158        @param recorded_freq: the frequency of recorded audio
159        @param expected_freq: the expected frequency
160
161        @returns: True if the recoreded frequency falls within the tolerance of
162                  the expected frequency
163        """
164        tolerance = expected_freq * self.FREQUENCY_TOLERANCE_RATIO
165        return abs(expected_freq - recorded_freq) <= tolerance
166
167
168    def _check_primary_frequencies(self, test_profile, audio_test_data,
169                                   recording_device, recorded_file=None):
170        """Check if the recorded frequencies meet expectation.
171
172        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
173        @param audio_test_data: a dictionary about the audio test data
174                defined in client/cros/bluetooth/bluetooth_audio_test_data.py
175        @param recording_device: which device recorded the audio,
176                possible values are 'recorded_by_dut' or 'recorded_by_peer'
177        @param recorded_file: the recorded file name
178
179        @returns: True if the recorded frequencies of all channels fall within
180                the tolerance of expected frequencies
181        """
182        recorded_frequencies = self.bluetooth_facade.get_primary_frequencies(
183                audio_test_data, recording_device, recorded_file)
184        expected_frequencies = audio_test_data['frequencies']
185        final_result = True
186        self.results = dict()
187
188        if len(recorded_frequencies) < len(expected_frequencies):
189            logging.error('recorded_frequencies: %s, expected_frequencies: %s',
190                          str(recorded_frequencies), str(expected_frequencies))
191            final_result = False
192        else:
193            for channel, expected_freq in enumerate(expected_frequencies):
194                recorded_freq = recorded_frequencies[channel]
195                ret_val = self._check_frequency(
196                        test_profile, recorded_freq, expected_freq)
197                pass_fail_str = 'pass' if ret_val else 'fail'
198                result = ('primary frequency %d (expected %d): %s' %
199                          (recorded_freq, expected_freq, pass_fail_str))
200                self.results['Channel %d' % channel] = result
201                logging.info('Channel %d: %s', channel, result)
202
203                if not ret_val:
204                    final_result = False
205
206        logging.debug(str(self.results))
207        if not final_result:
208            logging.error('Failure at checking primary frequencies')
209        return final_result
210
211
212    def _poll_for_condition(self, condition, timeout=20, sleep_interval=1,
213                            desc='waiting for condition'):
214        try:
215            utils.poll_for_condition(condition=condition,
216                                     timeout=timeout,
217                                     sleep_interval=sleep_interval,
218                                     desc=desc)
219        except Exception as e:
220            raise error.TestError('Exception occurred when %s (%s)' % (desc, e))
221
222
223    def initialize_bluetooth_audio(self, device, test_profile):
224        """Initialize the Bluetooth audio task.
225
226        Note: pulseaudio is not stable. Need to restart it in the beginning.
227
228        @param device: the bluetooth peer device
229        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
230
231        """
232        if not self.bluetooth_facade.create_audio_record_directory(
233                AUDIO_RECORD_DIR):
234            raise error.TestError('Failed to create %s on the DUT' %
235                                  AUDIO_RECORD_DIR)
236
237        if not device.StartPulseaudio(test_profile):
238            raise error.TestError('Failed to start pulseaudio.')
239        logging.debug('pulseaudio is started.')
240
241        if test_profile in (HFP_WBS, HFP_NBS):
242            if device.StartOfono():
243                logging.debug('ofono is started.')
244            else:
245                raise error.TestError('Failed to start ofono.')
246        elif device.StopOfono():
247            logging.debug('ofono is stopped.')
248        else:
249            logging.warn('Failed to stop ofono. Ignored.')
250
251        # Need time to complete starting services.
252        time.sleep(self.WAIT_DAEMONS_READY_SECS)
253
254
255    def cleanup_bluetooth_audio(self, device, test_profile):
256        """Cleanup for Bluetooth audio.
257
258        @param device: the bluetooth peer device
259        @param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
260
261        """
262        if device.StopPulseaudio():
263            logging.debug('pulseaudio is stopped.')
264        else:
265            logging.warn('Failed to stop pulseaudio. Ignored.')
266
267        if device.StopOfono():
268            logging.debug('ofono is stopped.')
269        else:
270            logging.warn('Failed to stop ofono. Ignored.')
271
272
273    def initialize_bluetooth_player(self, device):
274        """Initialize the Bluetooth media player.
275
276        @param device: the Bluetooth peer device.
277
278        """
279        if not device.ExportMediaPlayer():
280            raise error.TestError('Failed to export media player.')
281        logging.debug('mpris-proxy is started.')
282
283        # Wait for player to show up and observed by playerctl.
284        desc='waiting for media player'
285        self._poll_for_condition(
286                lambda: bool(device.GetExportedMediaPlayer()), desc=desc)
287
288
289    def cleanup_bluetooth_player(self, device):
290        """Cleanup for Bluetooth media player.
291
292        @param device: the bluetooth peer device.
293
294        """
295        device.UnexportMediaPlayer()
296
297
298    def select_audio_output_node(self):
299        """Select the audio output node through cras.
300
301        @raises: error.TestError if failed.
302        """
303        def bluetooth_type_selected(node_type):
304            """Check if the bluetooth node type is selected."""
305            selected = self.bluetooth_facade.get_selected_output_device_type()
306            logging.debug('active output node type: %s, expected %s',
307                          selected, node_type)
308            return selected == node_type
309
310        node_type = self.CRAS_BLUETOOTH_OUTPUT_NODE_TYPE
311        if not self.bluetooth_facade.select_output_node(node_type):
312            raise error.TestError('select_output_node failed')
313
314        desc='waiting for %s as active cras audio output node type' % node_type
315        logging.debug(desc)
316        self._poll_for_condition(lambda: bluetooth_type_selected(node_type),
317                                 desc=desc)
318
319
320    def initialize_hfp(self, device, test_profile, test_data,
321                       recording_device, bluez_function):
322        """Initial set up for hfp tests.
323
324        Setup that is required for all hfp tests where
325        dut is either source or sink. Selects input device, starts recording,
326        and lastly it waits for pulseaudio bluez source/sink.
327
328        @param device: the bluetooth peer device
329        @param test_profile: the test profile used, HFP_WBS or HFP_NBS
330        @param test_data: a dictionary about the audio test data defined in
331                client/cros/bluetooth/bluetooth_audio_test_data.py
332        @param recording_device: which device recorded the audio, possible
333                values are 'recorded_by_dut' or 'recorded_by_peer'
334        @param bluez_function: the appropriate bluez hfp function either
335                _get_pulseaudio_bluez_source_hfp or
336                _get_pulseaudio_bluez_sink_hfp depending on the role of the dut
337        """
338        device_type = 'DUT' if recording_device == 'recorded_by_dut' else 'Peer'
339        dut_role = 'sink' if recording_device == 'recorded_by_dut' else 'source'
340
341        # Select audio input device.
342        desc = 'waiting for cras to select audio input device'
343        logging.debug(desc)
344        self._poll_for_condition(
345                lambda: self.bluetooth_facade.select_input_device(device.name),
346                desc=desc)
347
348        # Select audio output node so that we do not rely on chrome to do it.
349        self.select_audio_output_node()
350
351        # Enable HFP profile.
352        logging.debug('Start recording audio on {}'.format(device_type))
353        if not self.bluetooth_facade.start_capturing_audio_subprocess(
354                test_data, recording_device):
355            desc = '{} failed to start capturing audio.'.format(device_type)
356            raise error.TestError(desc)
357
358        # Wait for pulseaudio bluez hfp source/sink
359        desc = 'waiting for pulseaudio bluez hfp {}'.format(dut_role)
360        logging.debug(desc)
361        self._poll_for_condition(lambda: bluez_function(device, test_profile),
362                                 desc=desc)
363
364
365    def hfp_record_on_dut(self, device, test_profile, test_data):
366        """Play audio from test_data dictionary from peer device to dut.
367
368        Play file described in test_data dictionary from peer device to dut
369        using test_profile, either HFP_WBS or HFP_NBS and record on dut.
370
371        @param device: the bluetooth peer device
372        @param test_profile: the test profile used, HFP_WBS or HFP_NBS
373        @param test_data: a dictionary about the audio test data defined in
374                client/cros/bluetooth/bluetooth_audio_test_data.py
375
376        @returns: True if the recorded audio frames are legitimate, False
377                if they are not, ie. it did not record.
378        """
379        # Select audio input device.
380        logging.debug('Select input device')
381        if not self.bluetooth_facade.select_input_device(device.name):
382            raise error.TestError('DUT failed to select audio input device.')
383
384        # Start playing audio on chameleon.
385        logging.debug('Start playing audio on Pi')
386        if not device.StartPlayingAudioSubprocess(test_profile, test_data):
387            err = 'Failed to start playing audio file on the peer device'
388            raise error.TestError(err)
389
390        time.sleep(test_data['duration'])
391
392        # Stop playing audio on chameleon.
393        logging.debug('Stop playing audio on Pi')
394        if not device.StopPlayingAudioSubprocess():
395            err = 'Failed to stop playing audio on the peer device'
396            raise error.TestError(err)
397
398        # Disable HFP profile.
399        logging.debug('Stop recording audio on DUT')
400        if not self.bluetooth_facade.stop_capturing_audio_subprocess():
401            raise error.TestError('DUT failed to stop capturing audio.')
402
403        # Check if the audio frames in the recorded file are legitimate.
404        return self._check_audio_frames_legitimacy(test_data, 'recorded_by_dut')
405
406
407    def hfp_record_on_peer(self, device, test_profile, test_data):
408        """Play audio from test_data dictionary from dut to peer device.
409
410        Play file described in test_data dictionary from dut to peer device
411        using test_profile, either HFP_WBS or HFP_NBS and record on peer.
412
413        @param device: The bluetooth peer device.
414        @param test_profile: The test profile used, HFP_WBS or HFP_NBS.
415        @param test_data: A dictionary about the audio test data defined in
416                client/cros/bluetooth/bluetooth_audio_test_data.py.
417
418        @returns: True if the recorded audio frames are legitimate, False
419                if they are not, ie. it did not record.
420        """
421        logging.debug('Start recording audio on Pi')
422        # Start recording audio on the peer Bluetooth audio device.
423        if not device.StartRecordingAudioSubprocess(test_profile, test_data):
424            raise error.TestError(
425                    'Failed to record on the peer Bluetooth audio device.')
426
427        # Play audio on the DUT in a non-blocked way.
428        # If there are issues, cras_test_client playing back might be blocked
429        # forever. We would like to avoid the testing procedure from that.
430        logging.debug('Start playing audio')
431        if not self.bluetooth_facade.start_playing_audio_subprocess(test_data):
432            raise error.TestError('DUT failed to play audio.')
433
434        time.sleep(test_data['duration'])
435
436        logging.debug('Stop recording audio on Pi')
437        # Stop recording audio on the peer Bluetooth audio device.
438        if not device.StopRecordingingAudioSubprocess():
439            msg = 'Failed to stop recording on the peer Bluetooth audio device'
440            logging.error(msg)
441
442        # Disable HFP profile.
443        logging.debug('Stop recording audio on DUT')
444        if not self.bluetooth_facade.stop_capturing_audio_subprocess():
445            raise error.TestError('DUT failed to stop capturing audio.')
446
447        # Stop playing audio on DUT.
448        logging.debug('Stop playing audio on DUT')
449        if not self.bluetooth_facade.stop_playing_audio_subprocess():
450            raise error.TestError('DUT failed to stop playing audio.')
451
452        # Copy the recorded audio file to the DUT for spectrum analysis.
453        logging.debug('Scp to DUT')
454        recorded_file = test_data['recorded_by_peer']
455        device.ScpToDut(recorded_file, recorded_file, self.host.ip)
456
457        # Check if the audio frames in the recorded file are legitimate.
458        return self._check_audio_frames_legitimacy(test_data,
459                                                   'recorded_by_peer')
460
461
462    def parse_visqol_output(self, stdout, stderr):
463        """
464        Parse stdout and stderr string from VISQOL output and parse into
465        a float score.
466
467        On error, stderr will contain the error message, otherwise will be None.
468        On success, stdout will be a string, first line will be
469        VISQOL version, followed by indication of speech mode. Followed by
470        paths to reference and degraded file, and a float MOS-LQO score, which
471        is what we're interested in. Followed by more detailed charts about
472        specific scoring by segments of the files. Stdout is None on error.
473
474        @param stdout: The stdout bytes from commandline output of VISQOL.
475        @param stderr: The stderr bytes from commandline output of VISQOL.
476
477        @returns: A tuple of a float score and string representation of the
478                srderr or None if there was no error.
479        """
480        string_out = stdout or ''
481
482        # Log verbose VISQOL output:
483        log_file = os.path.join(VISQOL_TEST_DIR, 'VISQOL_LOG.txt')
484        with open(log_file, 'w+') as f:
485            f.write('String Error:\n{}\n'.format(stderr))
486            f.write('String Out:\n{}\n'.format(stdout))
487
488        # pattern matches first float or int after 'MOS-LQO:' in stdout,
489        # e.g. it would match the line 'MOS-LQO       2.3' in the stdout
490        score_pattern = re.compile(r'.*MOS-LQO:\s*(\d+.?\d*)')
491        score_search = re.search(score_pattern, string_out)
492
493        # re.search returns None if no pattern match found, otherwise the score
494        # would be in the match object's group 1 matches just the float score
495        score = float(score_search.group(1)) if score_search else -1.0
496        return stderr, score
497
498
499    def get_visqol_score(self, ref_file, deg_file, speech_mode=True,
500                         verbose=True):
501        """
502        Runs VISQOL using the subprocess library on the provided reference file
503        and degraded file and returns the VISQOL score.
504
505        @param ref_file: File path to the reference wav file.
506        @param deg_file: File path to the degraded wav file.
507        @param speech_mode: [Optional] Defaults to True, accepts 16k sample
508                rate files and ignores frequencies > 8kHz for scoring.
509        @param verbose: [Optional] Defaults to True, outputs more details.
510
511        @returns: A float score for the tested file.
512        """
513        visqol_cmd = [VISQOL_PATH]
514        visqol_cmd += ['--reference_file', ref_file]
515        visqol_cmd += ['--degraded_file', deg_file]
516        visqol_cmd += ['--similarity_to_quality_model', VISQOL_SIMILARITY_MODEL]
517
518        if speech_mode:
519            visqol_cmd.append('--use_speech_mode')
520        if verbose:
521            visqol_cmd.append('--verbose')
522
523        visqol_process = subprocess.Popen(visqol_cmd, stdout=subprocess.PIPE,
524                                          stderr=subprocess.PIPE)
525        stdout, stderr = visqol_process.communicate()
526
527        err, score = self.parse_visqol_output(stdout, stderr)
528
529        if err:
530            raise error.TestError(err)
531        elif score < 0.0:
532            raise error.TestError('Failed to parse score, got {}'.format(score))
533
534        return score
535
536
537    def get_ref_and_deg_files(self, trimmed_file, test_profile, test_data):
538        """Return path for reference and degraded files to run visqol on.
539
540        @param trimmed_file: Path to the trimmed audio file on DUT.
541        @param test_profile: The test profile used HFP_WBS or HFP_NBS.
542        @param test_data: A dictionary about the audio test data defined in
543                client/cros/bluetooth/bluetooth_audio_test_data.py.
544
545        @returns: A tuple of path to the reference file and degraded file if
546                they exist, otherwise False for the files that aren't available.
547        """
548        # Path in autotest server in ViSQOL folder to store degraded file from
549        # retrieved from the DUT
550        deg_file = os.path.join(VISQOL_TEST_DIR, os.path.split(trimmed_file)[1])
551        played_file = test_data['file']
552        # If profile is WBS, no resampling required
553        if test_profile == HFP_WBS:
554            self.host.get_file(trimmed_file, deg_file)
555            return played_file, deg_file
556
557        # On NBS, degraded and reference files need to be resampled to 16 kHz
558        # Build path for the upsampled (us) reference (ref) file on DUT
559        ref_file = '{}_us_ref{}'.format(*os.path.splitext(played_file))
560        # If resampled ref file already exists, don't need to do it again
561        if not os.path.isfile(ref_file):
562            if not self.bluetooth_facade.convert_audio_sample_rate(
563                    played_file, ref_file, test_data,
564                    self.MIN_VISQOL_SAMPLE_RATE):
565                return False, False
566            # Move upsampled reference file to autotest server
567            self.host.get_file(ref_file, ref_file)
568
569        # Build path for resampled degraded file on DUT
570        deg_on_dut = '{}_us{}'.format(*os.path.splitext(trimmed_file))
571        # Resample degraded file to 16 kHz and move to autotest server
572        if not self.bluetooth_facade.convert_audio_sample_rate(
573                trimmed_file, deg_on_dut, test_data,
574                self.MIN_VISQOL_SAMPLE_RATE):
575            return ref_file, False
576
577        self.host.get_file(deg_on_dut, deg_file)
578
579        return ref_file, deg_file
580
581
582    def format_recorded_file(self, test_data, test_profile, recording_device):
583        """Format recorded files to be compatible with ViSQOL.
584
585        Convert raw files to wav if recorded file is a raw file, trim file to
586        duration, if required, resample the file, then lastly return the paths
587        for the reference file and degraded file on the autotest server.
588
589        @param test_data: A dictionary about the audio test data defined in
590                client/cros/bluetooth/bluetooth_audio_test_data.py.
591        @param test_profile: The test profile used, HFP_WBS or HFP_NBS.
592        @param recording_device: Which device recorded the audio, either
593                'recorded_by_dut' or 'recorded_by_peer'.
594
595        @returns: A tuple of path to the reference file and degraded file if
596                they exist, otherwise False for the files that aren't available.
597        """
598        # Path to recorded file either on DUT or BT peer
599        recorded_file = test_data[recording_device]
600        untrimmed_file = recorded_file
601        if recorded_file.endswith('.raw'):
602            # build path for file converted from raw to wav, i.e. change the ext
603            untrimmed_file = os.path.splitext(recorded_file)[0] + '.wav'
604            if not self.bluetooth_facade.convert_raw_to_wav(
605                    recorded_file, untrimmed_file, test_data):
606                raise error.TestError('Could not convert raw file to wav')
607
608        # Compute the duration of played file without added buffer
609        new_duration = test_data['duration'] - VISQOL_BUFFER_LENGTH
610        # build path for file resulting from trimming to desired duration
611        trimmed_file = '{}_t{}'.format(*os.path.splitext(untrimmed_file))
612        if not self.bluetooth_facade.trim_wav_file(
613                untrimmed_file, trimmed_file, new_duration, test_data):
614            raise error.TestError('Failed to trim recorded file')
615
616        return self.get_ref_and_deg_files(trimmed_file, test_profile, test_data)
617
618
619    def handle_chunks(self, device, test_profile, test_data, duration):
620        """Handle chunks of recorded streams and verify the primary frequencies.
621
622        @param device: the bluetooth peer device
623        @param test_profile: the a2dp test profile;
624                             choices are A2DP and A2DP_LONG
625        @param test_data: the test data of the test profile
626        @param duration: the duration of the audio file to test
627
628        @returns: True if all chunks pass the frequencies check.
629        """
630        chunk_in_secs = test_data['chunk_in_secs']
631        if not bool(chunk_in_secs):
632            chunk_in_secs = self.DEFAULT_CHUNK_IN_SECS
633        nchunks = duration // chunk_in_secs
634        logging.info('Number of chunks: %d', nchunks)
635
636        all_chunks_test_result = True
637        for i in range(nchunks):
638            logging.info('Handle chunk %d', i)
639            recorded_file = device.HandleOneChunk(chunk_in_secs, i,
640                                                  test_profile, self.host.ip)
641            if recorded_file is None:
642                raise error.TestError('Failed to handle chunk %d' % i)
643
644            # Check if the audio frames in the recorded file are legitimate.
645            if not self._check_audio_frames_legitimacy(
646                    test_data, 'recorded_by_peer', recorded_file=recorded_file):
647                if (i > self.IGNORE_LAST_FEW_CHUNKS and
648                        i >= nchunks - self.IGNORE_LAST_FEW_CHUNKS):
649                    logging.info('empty chunk %d ignored for last %d chunks',
650                                 i, self.IGNORE_LAST_FEW_CHUNKS)
651                else:
652                    all_chunks_test_result = False
653                break
654
655            # Check if the primary frequencies of the recorded file
656            # meet expectation.
657            if not self._check_primary_frequencies(A2DP, test_data,
658                                                   'recorded_by_peer',
659                                                   recorded_file=recorded_file):
660                if (i > self.IGNORE_LAST_FEW_CHUNKS and
661                        i >= nchunks - self.IGNORE_LAST_FEW_CHUNKS):
662                    msg = 'partially filled chunk %d ignored for last %d chunks'
663                    logging.info(msg, i, self.IGNORE_LAST_FEW_CHUNKS)
664                else:
665                    all_chunks_test_result = False
666                break
667
668        return all_chunks_test_result
669
670
671    # ---------------------------------------------------------------
672    # Definitions of all bluetooth audio test cases
673    # ---------------------------------------------------------------
674
675
676    @test_retry_and_log(False)
677    def test_hfp_dut_as_source_visqol_score(self, device, test_profile):
678        """Test Case: hfp test files streaming from peer device to dut
679
680        @param device: the bluetooth peer device
681        @param test_profile: which test profile is used, HFP_WBS or HFP_NBS
682
683        @returns: True if the all the test files score at or above their
684                  source_passing_score value as defined in
685                  bluetooth_audio_test_data.py
686        """
687        # list of test wav files
688        hfp_test_data = audio_test_data[test_profile]
689        test_files = hfp_test_data['visqol_test_files']
690
691        get_visqol_binary()
692        get_audio_test_data()
693
694        # Download test data to DUT
695        self.host.send_file(AUDIO_DATA_TARBALL_PATH, AUDIO_DATA_TARBALL_PATH)
696        if not self.bluetooth_facade.unzip_audio_test_data(
697                AUDIO_DATA_TARBALL_PATH, DATA_DIR):
698            logging.error('Audio data directory not found in DUT')
699            raise error.TestError('Failed to unzip audio test data to DUT')
700
701        # Result of visqol test on all files
702        visqol_results = dict()
703
704        for test_file in test_files:
705            filename = os.path.split(test_file['file'])[1]
706            logging.debug('Testing file: {}'.format(filename))
707
708            # Set up hfp test to record on peer
709            self.initialize_hfp(device, test_profile, test_file,
710                                'recorded_by_peer',
711                                self._get_pulseaudio_bluez_source_hfp)
712            logging.debug('Initialized HFP')
713
714            if not self.hfp_record_on_peer(device, test_profile, test_file):
715                return False
716            logging.debug('Recorded {} successfully'.format(filename))
717
718            ref_file, deg_file = self.format_recorded_file(test_file,
719                                                           test_profile,
720                                                           'recorded_by_peer')
721            if not ref_file or not deg_file:
722                desc = 'Failed to get ref and deg file: ref {}, deg {}'.format(
723                        ref_file, deg_file)
724                raise error.TestError(desc)
725
726            score = self.get_visqol_score(ref_file, deg_file,
727                                          speech_mode=test_file['speech_mode'])
728
729            logging.info('{} scored {}, min passing score: {}'.format(
730                    filename, score, test_file['source_passing_score']))
731            passed = score >= test_file['source_passing_score']
732            visqol_results[filename] = passed
733
734            if not passed:
735                logging.warning('Failed: {}'.format(filename))
736
737        return all(visqol_results.values())
738
739
740    @test_retry_and_log(False)
741    def test_hfp_dut_as_sink_visqol_score(self, device, test_profile):
742        """Test Case: hfp test files streaming from peer device to dut
743
744        @param device: the bluetooth peer device
745        @param test_profile: which test profile is used, HFP_WBS or HFP_NBS
746
747        @returns: True if the all the test files score at or above their
748                  sink_passing_score value as defined in
749                  bluetooth_audio_test_data.py
750        """
751        # list of test wav files
752        hfp_test_data = audio_test_data[test_profile]
753        test_files = hfp_test_data['visqol_test_files']
754
755        get_visqol_binary()
756        get_audio_test_data()
757        self.host.send_file(AUDIO_DATA_TARBALL_PATH, AUDIO_DATA_TARBALL_PATH)
758        if not self.bluetooth_facade.unzip_audio_test_data(
759                AUDIO_DATA_TARBALL_PATH, DATA_DIR):
760            logging.error('Audio data directory not found in DUT')
761            raise error.TestError('Failed to unzip audio test data to DUT')
762
763        # Result of visqol test on all files
764        visqol_results = dict()
765
766        for test_file in test_files:
767            filename = os.path.split(test_file['file'])[1]
768            logging.debug('Testing file: {}'.format(filename))
769
770            # Set up hfp test to record on dut
771            self.initialize_hfp(device, test_profile, test_file,
772                                'recorded_by_dut',
773                                self._get_pulseaudio_bluez_sink_hfp)
774            logging.debug('Initialized HFP')
775            # Record audio on dut played from pi, returns true if anything
776            # was successfully recorded, false otherwise
777            if not self.hfp_record_on_dut(device, test_profile, test_file):
778                return False
779            logging.debug('Recorded {} successfully'.format(filename))
780
781            ref_file, deg_file = self.format_recorded_file(test_file,
782                                                           test_profile,
783                                                           'recorded_by_dut')
784            if not ref_file or not deg_file:
785                desc = 'Failed to get ref and deg file: ref {}, deg {}'.format(
786                        ref_file, deg_file)
787                raise error.TestError(desc)
788
789            score = self.get_visqol_score(ref_file, deg_file,
790                                          speech_mode=test_file['speech_mode'])
791
792            logging.info('{} scored {}, min passing score: {}'.format(
793                    filename, score, test_file['sink_passing_score']))
794            passed = score >= test_file['sink_passing_score']
795            visqol_results[filename] = passed
796
797            if not passed:
798                logging.warning('Failed: {}'.format(filename))
799
800        return all(visqol_results.values())
801
802    @test_retry_and_log(False)
803    def test_device_a2dp_connected(self, device, timeout=15):
804        """ Tests a2dp profile is connected on device. """
805        self.results = {}
806        check_connection = lambda: self._get_pulseaudio_bluez_source_a2dp(
807                device, A2DP)
808        is_connected = self._wait_for_condition(check_connection,
809                                                'test_device_a2dp_connected',
810                                                timeout=timeout)
811        self.results['peer a2dp connected'] = is_connected
812
813        return all(self.results.values())
814
815    @test_retry_and_log(False)
816    def test_a2dp_sinewaves(self, device, test_profile, duration):
817        """Test Case: a2dp sinewaves
818
819        @param device: the bluetooth peer device
820        @param test_profile: the a2dp test profile;
821                             choices are A2DP and A2DP_LONG
822        @param duration: the duration of the audio file to test
823                         0 means to use the default value in the test profile
824
825        @returns: True if the recorded primary frequency is within the
826                  tolerance of the playback sine wave frequency.
827
828        """
829        # Make a copy since the test_data may be formatted with distinct
830        # arguments in the follow-up tests.
831        test_data = audio_test_data[test_profile].copy()
832        if bool(duration):
833            test_data['duration'] = duration
834        else:
835            duration = test_data['duration']
836
837        test_data['file'] %= duration
838        logging.info('%s test for %d seconds.', test_profile, duration)
839
840        # Wait for pulseaudio a2dp bluez source
841        desc = 'waiting for pulseaudio a2dp bluez source'
842        logging.debug(desc)
843        self._poll_for_condition(
844                lambda: self._get_pulseaudio_bluez_source_a2dp(device,
845                                                               test_profile),
846                desc=desc)
847
848        # Select audio output node so that we do not rely on chrome to do it.
849        self.select_audio_output_node()
850
851        # Start recording audio on the peer Bluetooth audio device.
852        logging.debug('Start recording a2dp')
853        if not device.StartRecordingAudioSubprocess(test_profile, test_data):
854            raise error.TestError(
855                    'Failed to record on the peer Bluetooth audio device.')
856
857        # Play audio on the DUT in a non-blocked way and check the recorded
858        # audio stream in a real-time manner.
859        logging.debug('Start playing audio')
860        if not self.bluetooth_facade.start_playing_audio_subprocess(test_data):
861            raise error.TestError('DUT failed to play audio.')
862
863        # Handle chunks of recorded streams and verify the primary frequencies.
864        # This is a blocking call until all chunks are completed.
865        all_chunks_test_result = self.handle_chunks(device, test_profile,
866                                                    test_data, duration)
867
868        # Stop recording audio on the peer Bluetooth audio device.
869        logging.debug('Stop recording a2dp')
870        if not device.StopRecordingingAudioSubprocess():
871            msg = 'Failed to stop recording on the peer Bluetooth audio device'
872            logging.error(msg)
873
874        # Stop playing audio on DUT.
875        logging.debug('Stop playing audio on DUT')
876        if not self.bluetooth_facade.stop_playing_audio_subprocess():
877            raise error.TestError('DUT failed to stop playing audio.')
878
879        return all_chunks_test_result
880
881    @test_retry_and_log(False)
882    def test_hfp_dut_as_source(self, device, test_profile):
883        """Test Case: hfp sinewave streaming from dut to peer device
884
885        @param device: the bluetooth peer device
886        @param test_profile: which test profile is used, HFP_WBS or HFP_NBS
887
888        @returns: True if the recorded primary frequency is within the
889                  tolerance of the playback sine wave frequency.
890        """
891        hfp_test_data = audio_test_data[test_profile]
892
893        self.initialize_hfp(device, test_profile, hfp_test_data,
894                            'recorded_by_peer',
895                            self._get_pulseaudio_bluez_source_hfp)
896
897        if not self.hfp_record_on_peer(device, test_profile, hfp_test_data):
898            return False
899
900        # Check if the primary frequencies of recorded file meet expectation.
901        check_freq_result = self._check_primary_frequencies(
902                test_profile, hfp_test_data, 'recorded_by_peer')
903        return check_freq_result
904
905
906    @test_retry_and_log(False)
907    def test_hfp_dut_as_sink(self, device, test_profile):
908        """Test Case: hfp sinewave streaming from peer device to dut
909
910        @param device: the bluetooth peer device
911        @param test_profile: which test profile is used, HFP_WBS or HFP_NBS
912
913        @returns: True if the recorded primary frequency is within the
914                  tolerance of the playback sine wave frequency.
915
916        """
917        hfp_test_data = audio_test_data[test_profile]
918
919        # Set up hfp test to record on dut
920        self.initialize_hfp(device, test_profile, hfp_test_data,
921                            'recorded_by_dut',
922                            self._get_pulseaudio_bluez_sink_hfp)
923
924        # Record audio on dut play from pi, returns true if anything recorded
925        if not self.hfp_record_on_dut(device, test_profile, hfp_test_data):
926            return False
927
928        # Check if the primary frequencies of recorded file meet expectation.
929        check_freq_result = self._check_primary_frequencies(
930                test_profile, hfp_test_data, 'recorded_by_dut')
931        return check_freq_result
932
933
934    @test_retry_and_log(False)
935    def test_avrcp_commands(self, device):
936        """Test Case: Test AVRCP commands issued by peer can be received at DUT
937
938        The very first AVRCP command (Linux evdev event) the DUT receives
939        contains extra information than just the AVRCP event, e.g. EV_REP
940        report used to specify delay settings. Send the first command before
941        the actual test starts to avoid dealing with them during test.
942
943        The peer device name is required to monitor the event reception on the
944        DUT. However, as the peer device itself already registered with the
945        kernel as an udev input device. The AVRCP profile will register as an
946        separate input device with the name pattern: name + (AVRCP), e.g.
947        RASPI_AUDIO (AVRCP). Using 'AVRCP' as device name to help search for
948        the device.
949
950        @param device: the Bluetooth peer device
951
952        @returns: True if the all AVRCP commands received by DUT, false
953                  otherwise
954
955        """
956        device.SendMediaPlayerCommand('play')
957
958        name = device.name
959        device.name = 'AVRCP'
960
961        result_pause = self.test_avrcp_event(device,
962            device.SendMediaPlayerCommand, 'pause')
963        result_play = self.test_avrcp_event(device,
964            device.SendMediaPlayerCommand, 'play')
965        result_stop = self.test_avrcp_event(device,
966            device.SendMediaPlayerCommand, 'stop')
967        result_next = self.test_avrcp_event(device,
968            device.SendMediaPlayerCommand, 'next')
969        result_previous = self.test_avrcp_event(device,
970            device.SendMediaPlayerCommand, 'previous')
971
972        device.name = name
973        self.results = {'pause': result_pause, 'play': result_play,
974                        'stop': result_stop, 'next': result_next,
975                        'previous': result_previous}
976        return all(self.results.values())
977
978
979    @test_retry_and_log(False)
980    def test_avrcp_media_info(self, device):
981        """Test Case: Test AVRCP media info sent by DUT can be received by peer
982
983        The test update all media information twice to prevent previous
984        leftover data affect the current iteration of test. Then compare the
985        expected results against the information received on the peer device.
986
987        This test verifies media information including: playback status,
988        length, title, artist, and album. Position of the media is not
989        currently support as playerctl on the peer side cannot correctly
990        retrieve such information.
991
992        Length and position information are transmitted in the unit of
993        microsecond. However, BlueZ process those time data in the resolution
994        of millisecond. Discard microsecond detail when comparing those media
995        information.
996
997        @param device: the Bluetooth peer device
998
999        @returns: True if the all AVRCP media info received by DUT, false
1000                  otherwise
1001
1002        """
1003        # First round of updating media information to overwrite all leftovers.
1004        init_status = 'stopped'
1005        init_length = 20200414
1006        init_position = 8686868
1007        init_metadata = {'album': 'metadata_album_init',
1008                         'artist': 'metadata_artist_init',
1009                         'title': 'metadata_title_init'}
1010        self.bluetooth_facade.set_player_playback_status(init_status)
1011        self.bluetooth_facade.set_player_length(init_length)
1012        self.bluetooth_facade.set_player_position(init_position)
1013        self.bluetooth_facade.set_player_metadata(init_metadata)
1014
1015        # Second round of updating for actual testing.
1016        expected_status = 'playing'
1017        expected_length = 68686868
1018        expected_position = 20200414
1019        expected_metadata = {'album': 'metadata_album_expected',
1020                             'artist': 'metadata_artist_expected',
1021                             'title': 'metadata_title_expected'}
1022        self.bluetooth_facade.set_player_playback_status(expected_status)
1023        self.bluetooth_facade.set_player_length(expected_length)
1024        self.bluetooth_facade.set_player_position(expected_position)
1025        self.bluetooth_facade.set_player_metadata(expected_metadata)
1026
1027        received_media_info = device.GetMediaPlayerMediaInfo()
1028        logging.debug(received_media_info)
1029
1030        try:
1031            actual_length = int(received_media_info.get('length'))
1032        except:
1033            actual_length = 0
1034
1035        result_status = bool(expected_status ==
1036            received_media_info.get('status').lower())
1037        result_album = bool(expected_metadata['album'] ==
1038            received_media_info.get('album'))
1039        result_artist = bool(expected_metadata['artist'] ==
1040            received_media_info.get('artist'))
1041        result_title = bool(expected_metadata['title'] ==
1042            received_media_info.get('title'))
1043        # The AVRCP time information is in the unit of microseconds but with
1044        # milliseconds resolution. Convert both send and received length into
1045        # milliseconds for comparison.
1046        result_length = bool(expected_length // 1000 == actual_length // 1000)
1047
1048        self.results = {'status': result_status, 'album': result_album,
1049                        'artist': result_artist, 'title': result_title,
1050                        'length': result_length}
1051        return all(self.results.values())
1052