1#/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import random 18import statistics 19import string 20import time 21from acts import asserts 22from acts.base_test import BaseTestClass 23from acts.signals import TestPass 24from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 25from acts.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection 26from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test 27from acts.test_utils.bt.bt_test_utils import verify_server_and_client_connected 28from acts.test_utils.bt.bt_test_utils import write_read_verify_data 29from acts.test_utils.bt.loggers.bluetooth_metric_logger import BluetoothMetricLogger 30from acts.test_utils.bt.loggers.protos import bluetooth_metric_pb2 as proto_module 31from acts.utils import set_location_service 32 33 34class BluetoothLatencyTest(BaseTestClass): 35 """Connects two Android phones and tests the RFCOMM latency. 36 37 Attributes: 38 client_device: An Android device object that will be sending data 39 server_device: An Android device object that will be receiving data 40 bt_logger: The proxy logger instance for each test case 41 data_transfer_type: Data transfer protocol used for the test 42 """ 43 44 def __init__(self, configs): 45 BaseTestClass.__init__(self, configs) 46 47 # Sanity check of the devices under test 48 # TODO(b/119051823): Investigate using a config validator to replace this. 49 if len(self.android_devices) < 2: 50 raise ValueError( 51 'Not enough android phones detected (need at least two)') 52 53 # Data will be sent from the client_device to the server_device 54 self.client_device = self.android_devices[0] 55 self.server_device = self.android_devices[1] 56 self.bt_logger = BluetoothMetricLogger.for_test_case() 57 self.data_transfer_type = proto_module.BluetoothDataTestResult.RFCOMM 58 self.log.info('Successfully found required devices.') 59 60 def setup_test(self): 61 setup_multiple_devices_for_bt_test(self.android_devices) 62 self._connect_rfcomm() 63 64 def teardown_test(self): 65 if verify_server_and_client_connected( 66 self.client_device, self.server_device, log=False): 67 self.client_device.droid.bluetoothSocketConnStop() 68 self.server_device.droid.bluetoothSocketConnStop() 69 70 def _connect_rfcomm(self): 71 """Establishes an RFCOMM connection between two phones. 72 73 Connects the client device to the server device given the hardware 74 address of the server device. 75 """ 76 77 set_location_service(self.client_device, True) 78 set_location_service(self.server_device, True) 79 server_address = self.server_device.droid.bluetoothGetLocalAddress() 80 self.log.info('Pairing and connecting devices') 81 asserts.assert_true(self.client_device.droid 82 .bluetoothDiscoverAndBond(server_address), 83 'Failed to pair and connect devices') 84 85 # Create RFCOMM connection 86 asserts.assert_true(orchestrate_rfcomm_connection 87 (self.client_device, self.server_device), 88 'Failed to establish RFCOMM connection') 89 90 def _measure_latency(self): 91 """Measures the latency of data transfer over RFCOMM. 92 93 Sends data from the client device that is read by the server device. 94 Calculates the latency of the transfer. 95 96 Returns: 97 The latency of the transfer milliseconds. 98 """ 99 100 # Generates a random message to transfer 101 message = (''.join(random.choice(string.ascii_letters + string.digits) 102 for _ in range(6))) 103 104 start_time = time.perf_counter() 105 write_read_successful = write_read_verify_data(self.client_device, 106 self.server_device, 107 message, 108 False) 109 end_time = time.perf_counter() 110 asserts.assert_true(write_read_successful, 111 'Failed to send/receive message') 112 return (end_time - start_time) * 1000 113 114 @BluetoothBaseTest.bt_test_wrap 115 def test_bluetooth_latency(self): 116 """Tests the latency for a data transfer over RFCOMM""" 117 118 metrics = {} 119 latency_list = [] 120 121 for _ in range(300): 122 latency_list.append(self._measure_latency()) 123 124 metrics['data_transfer_protocol'] = self.data_transfer_type 125 metrics['data_latency_min_millis'] = int(min(latency_list)) 126 metrics['data_latency_max_millis'] = int(max(latency_list)) 127 metrics['data_latency_avg_millis'] = int(statistics.mean(latency_list)) 128 self.log.info('Latency: {}'.format(metrics)) 129 130 proto = self.bt_logger.get_results(metrics, 131 self.__class__.__name__, 132 self.server_device, 133 self.client_device) 134 135 asserts.assert_true(metrics['data_latency_min_millis'] > 0, 136 'Minimum latency must be greater than 0!', 137 extras=proto) 138 139 raise TestPass('Latency test completed successfully', extras=proto) 140