#/usr/bin/env python3 # # Copyright (C) 2018 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. import random import statistics import string import time from acts import asserts from acts.base_test import BaseTestClass from acts.signals import TestPass from acts.test_decorators import test_tracker_info from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest from acts_contrib.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test from acts_contrib.test_utils.bt.bt_test_utils import verify_server_and_client_connected from acts_contrib.test_utils.bt.bt_test_utils import write_read_verify_data from acts_contrib.test_utils.bt.loggers.bluetooth_metric_logger import BluetoothMetricLogger from acts_contrib.test_utils.bt.loggers.protos import bluetooth_metric_pb2 as proto_module from acts.utils import set_location_service class BluetoothLatencyTest(BaseTestClass): """Connects two Android phones and tests the RFCOMM latency. Attributes: client_device: An Android device object that will be sending data server_device: An Android device object that will be receiving data bt_logger: The proxy logger instance for each test case data_transfer_type: Data transfer protocol used for the test """ def setup_class(self): super().setup_class() # Sanity check of the devices under test # TODO(b/119051823): Investigate using a config validator to replace this. if len(self.android_devices) < 2: raise ValueError( 'Not enough android phones detected (need at least two)') # Data will be sent from the client_device to the server_device self.client_device = self.android_devices[0] self.server_device = self.android_devices[1] self.bt_logger = BluetoothMetricLogger.for_test_case() self.data_transfer_type = proto_module.BluetoothDataTestResult.RFCOMM self.log.info('Successfully found required devices.') def setup_test(self): setup_multiple_devices_for_bt_test(self.android_devices) self._connect_rfcomm() def teardown_test(self): if verify_server_and_client_connected( self.client_device, self.server_device, log=False): self.client_device.droid.bluetoothSocketConnStop() self.server_device.droid.bluetoothSocketConnStop() def _connect_rfcomm(self): """Establishes an RFCOMM connection between two phones. Connects the client device to the server device given the hardware address of the server device. """ set_location_service(self.client_device, True) set_location_service(self.server_device, True) server_address = self.server_device.droid.bluetoothGetLocalAddress() self.log.info('Pairing and connecting devices') asserts.assert_true(self.client_device.droid .bluetoothDiscoverAndBond(server_address), 'Failed to pair and connect devices') # Create RFCOMM connection asserts.assert_true(orchestrate_rfcomm_connection (self.client_device, self.server_device), 'Failed to establish RFCOMM connection') def _measure_latency(self): """Measures the latency of data transfer over RFCOMM. Sends data from the client device that is read by the server device. Calculates the latency of the transfer. Returns: The latency of the transfer milliseconds. """ # Generates a random message to transfer message = (''.join(random.choice(string.ascii_letters + string.digits) for _ in range(6))) start_time = time.perf_counter() write_read_successful = write_read_verify_data(self.client_device, self.server_device, message, False) end_time = time.perf_counter() asserts.assert_true(write_read_successful, 'Failed to send/receive message') return (end_time - start_time) * 1000 @BluetoothBaseTest.bt_test_wrap @test_tracker_info(uuid='7748295d-204e-4ad0-adf5-7591380b940a') def test_bluetooth_latency(self): """Tests the latency for a data transfer over RFCOMM""" metrics = {} latency_list = [] for _ in range(300): latency_list.append(self._measure_latency()) metrics['data_transfer_protocol'] = self.data_transfer_type metrics['data_latency_min_millis'] = int(min(latency_list)) metrics['data_latency_max_millis'] = int(max(latency_list)) metrics['data_latency_avg_millis'] = int(statistics.mean(latency_list)) self.log.info('Latency: {}'.format(metrics)) proto = self.bt_logger.get_results(metrics, self.__class__.__name__, self.server_device, self.client_device) asserts.assert_true(metrics['data_latency_min_millis'] > 0, 'Minimum latency must be greater than 0!', extras=proto) raise TestPass('Latency test completed successfully', extras=proto)