1#/usr/bin/env python3.4 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Test script to execute Bluetooth basic functionality test cases. 18This test was designed to be run in a shield box. 19""" 20 21import threading 22import time 23from contextlib import suppress 24 25from queue import Empty 26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 27from acts.test_utils.bt.bt_test_utils import reset_bluetooth 28from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 29from acts.test_utils.bt.bt_test_utils import get_bt_mac_address 30from acts.test_utils.bt.bt_test_utils import rfcomm_accept 31from acts.test_utils.bt.bt_test_utils import rfcomm_connect 32from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs 33 34 35class RfcommStressTest(BluetoothBaseTest): 36 default_timeout = 10 37 scan_discovery_time = 5 38 thread_list = [] 39 message = ( 40 "Space: the final frontier. These are the voyages of " 41 "the starship Enterprise. Its continuing mission: to explore " 42 "strange new worlds, to seek out new life and new civilizations," 43 " to boldly go where no man has gone before.") 44 45 def __init__(self, controllers): 46 BluetoothBaseTest.__init__(self, controllers) 47 self.client_ad = self.android_devices[0] 48 self.server_ad = self.android_devices[1] 49 self.tests = ( 50 "test_rfcomm_connection_stress", 51 "test_rfcomm_read_write_stress", 52 ) 53 54 def on_fail(self, test_name, begin_time): 55 take_btsnoop_logs(self.android_devices, self, test_name) 56 reset_bluetooth(self.android_devices) 57 58 def teardown_test(self): 59 with suppress(Exception): 60 for thread in self.thread_list: 61 thread.join() 62 63 def orchestrate_rfcomm_connect(self, server_mac): 64 accept_thread = threading.Thread(target=rfcomm_accept, 65 args=(self.server_ad.droid, )) 66 self.thread_list.append(accept_thread) 67 accept_thread.start() 68 connect_thread = threading.Thread( 69 target=rfcomm_connect, 70 args=(self.client_ad, server_mac)) 71 self.thread_list.append(connect_thread) 72 connect_thread.start() 73 74 def test_rfcomm_connection_stress(self): 75 """Stress test an RFCOMM connection 76 77 Test the integrity of RFCOMM. Verify that file descriptors are cleared 78 out properly. 79 80 Steps: 81 1. Establish a bonding between two Android devices. 82 2. Write data to RFCOMM from the client droid. 83 3. Read data from RFCOMM from the server droid. 84 4. Stop the RFCOMM connection. 85 5. Repeat steps 2-4 1000 times. 86 87 Expected Result: 88 Each iteration should read and write to the RFCOMM connection 89 successfully. 90 91 Returns: 92 Pass if True 93 Fail if False 94 95 TAGS: Classic, Stress, RFCOMM 96 Priority: 1 97 """ 98 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 99 for n in range(1000): 100 self.orchestrate_rfcomm_connect(server_mac) 101 self.log.info("Write message.") 102 self.client_ad.droid.bluetoothRfcommWrite(self.message) 103 self.log.info("Read message.") 104 read_msg = self.server_ad.droid.bluetoothRfcommRead() 105 self.log.info("Verify message.") 106 assert self.message == read_msg, "Mismatch! Read {}".format( 107 read_msg) 108 self.client_ad.droid.bluetoothRfcommStop() 109 self.server_ad.droid.bluetoothRfcommStop() 110 for t in self.thread_list: 111 t.join() 112 self.thread_list.clear() 113 self.log.info("Iteration {} completed".format(n)) 114 return True 115 116 def test_rfcomm_read_write_stress(self): 117 """Stress test an RFCOMM connection's read and write capabilities 118 119 Test the integrity of RFCOMM. Verify that file descriptors are cleared 120 out properly. 121 122 Steps: 123 1. Establish a bonding between two Android devices. 124 2. Write data to RFCOMM from the client droid. 125 3. Read data from RFCOMM from the server droid. 126 4. Repeat steps 2-3 10000 times. 127 5. Stop the RFCOMM connection. 128 129 Expected Result: 130 Each iteration should read and write to the RFCOMM connection 131 successfully. 132 133 Returns: 134 Pass if True 135 Fail if False 136 137 TAGS: Classic, Stress, RFCOMM 138 Priority: 1 139 """ 140 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 141 reset_bluetooth([self.server_ad]) 142 self.orchestrate_rfcomm_connect(server_mac) 143 for n in range(10000): 144 self.log.info("Write message.") 145 self.client_ad.droid.bluetoothRfcommWrite(self.message) 146 self.log.info("Read message.") 147 read_msg = self.server_ad.droid.bluetoothRfcommRead() 148 self.log.info("Verify message.") 149 assert self.message == read_msg, "Mismatch! Read {}".format( 150 read_msg) 151 self.log.info("Iteration {} completed".format(n)) 152 self.client_ad.droid.bluetoothRfcommStop() 153 self.server_ad.droid.bluetoothRfcommStop() 154 for t in self.thread_list: 155 t.join() 156 self.thread_list.clear() 157 return True 158