• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python3
2"""Tests for blueberry.tests.bluetooth.bluetooth_latency."""
3
4from __future__ import absolute_import
5from __future__ import division
6from __future__ import print_function
7
8import logging
9import math
10import random
11import string
12import time
13
14from mobly import asserts
15from mobly import test_runner
16from mobly.signals import TestAbortClass
17# Internal import
18from blueberry.utils import blueberry_base_test
19from blueberry.utils import bt_test_utils
20from blueberry.utils import metrics_utils
21# Internal import
22
23
24class BluetoothLatencyTest(blueberry_base_test.BlueberryBaseTest):
25
26  @retry.logged_retry_on_exception(
27      retry_intervals=retry.FuzzedExponentialIntervals(
28          initial_delay_sec=2, factor=5, num_retries=5, max_delay_sec=300))
29  def _measure_latency(self):
30    """Measures the latency of data transfer over RFCOMM.
31
32    Sends data from the client device that is read by the server device.
33    Calculates the latency of the transfer.
34
35    Returns:
36        The latency of the transfer milliseconds.
37    """
38
39    # Generates a random message to transfer
40    message = (''.join(
41        random.choice(string.ascii_letters + string.digits) for _ in range(6)))
42    start_time = time.time()
43    write_read_successful = bt_test_utils.write_read_verify_data_sl4a(
44        self.phone, self.derived_bt_device, message, False)
45    end_time = time.time()
46    asserts.assert_true(write_read_successful, 'Failed to send/receive message')
47    return (end_time - start_time) * 1000
48
49  def setup_class(self):
50    """Standard Mobly setup class."""
51    super(BluetoothLatencyTest, self).setup_class()
52    if len(self.android_devices) < 2:
53      raise TestAbortClass(
54          'Not enough android phones detected (need at least two)')
55    self.phone = self.android_devices[0]
56    self.phone.init_setup()
57    self.phone.sl4a_setup()
58
59    # We treat the secondary phone as a derived_bt_device in order for the
60    # generic script to work with this android phone properly. Data will be sent
61    # from first phone to the second phone.
62    self.derived_bt_device = self.android_devices[1]
63    self.derived_bt_device.init_setup()
64    self.derived_bt_device.sl4a_setup()
65    self.set_btsnooplogmode_full(self.phone)
66    self.set_btsnooplogmode_full(self.derived_bt_device)
67
68    self.metrics = (
69        metrics_utils.BluetoothMetricLogger(
70            metrics_pb2.BluetoothDataTestResult()))
71    self.metrics.add_primary_device_metrics(self.phone)
72    self.metrics.add_connected_device_metrics(self.derived_bt_device)
73
74    self.data_transfer_type = metrics_pb2.BluetoothDataTestResult.RFCOMM
75    self.iterations = int(self.user_params.get('iterations', 300))
76    logging.info('Running Bluetooth latency test %s times.', self.iterations)
77    logging.info('Successfully found required devices.')
78
79  def setup_test(self):
80    """Setup for bluetooth latency test."""
81    logging.info('Setup Test for test_bluetooth_latency')
82    super(BluetoothLatencyTest, self).setup_test()
83    asserts.assert_true(self.phone.connect_with_rfcomm(self.derived_bt_device),
84                        'Failed to establish RFCOMM connection')
85
86  def test_bluetooth_latency(self):
87    """Tests the latency for a data transfer over RFCOMM."""
88
89    metrics = {}
90    latency_list = []
91
92    for _ in range(self.iterations):
93      latency_list.append(self._measure_latency())
94
95    metrics['data_transfer_protocol'] = self.data_transfer_type
96    metrics['data_latency_min_millis'] = int(min(latency_list))
97    metrics['data_latency_max_millis'] = int(max(latency_list))
98    metrics['data_latency_avg_millis'] = int(
99        math.fsum(latency_list) / float(len(latency_list)))
100    logging.info('Latency: %s', metrics)
101
102    asserts.assert_true(metrics['data_latency_min_millis'] > 0,
103                        'Minimum latency must be greater than 0!')
104    self.metrics.add_test_metrics(metrics)
105    for metric in metrics:
106      self.record_data({
107          'Test Name': 'test_bluetooth_latency',
108          'sponge_properties': {
109              metric: metrics[metric],
110          }
111      })
112
113  def teardown_class(self):
114    logging.info('Factory resetting Bluetooth on devices.')
115    self.phone.sl4a.bluetoothSocketConnStop()
116    self.derived_bt_device.sl4a.bluetoothSocketConnStop()
117    self.phone.factory_reset_bluetooth()
118    self.derived_bt_device.factory_reset_bluetooth()
119    super(BluetoothLatencyTest, self).teardown_class()
120    self.record_data({
121        'Test Name': 'test_bluetooth_latency',
122        'sponge_properties': {
123            'proto_ascii':
124                self.metrics.proto_message_to_ascii(),
125            'primary_device_build':
126                self.phone.get_device_info()['android_release_id']
127        }
128    })
129
130
131if __name__ == '__main__':
132  test_runner.main()
133