1#/usr/bin/env python3.4 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Test script to execute Bluetooth basic functionality test cases. 18This test was designed to be run in a shield box. 19""" 20 21import threading 22import time 23from contextlib import suppress 24 25from queue import Empty 26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 27from acts.test_utils.bt.bt_test_utils import reset_bluetooth 28from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 29from acts.test_utils.bt.bt_test_utils import rfcomm_accept 30from acts.test_utils.bt.bt_test_utils import rfcomm_connect 31from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs 32 33 34class RfcommStressTest(BluetoothBaseTest): 35 default_timeout = 10 36 scan_discovery_time = 5 37 thread_list = [] 38 message = ( 39 "Space: the final frontier. These are the voyages of " 40 "the starship Enterprise. Its continuing mission: to explore " 41 "strange new worlds, to seek out new life and new civilizations," 42 " to boldly go where no man has gone before.") 43 44 def __init__(self, controllers): 45 BluetoothBaseTest.__init__(self, controllers) 46 self.client_ad = self.android_devices[0] 47 self.server_ad = self.android_devices[1] 48 self.tests = ( 49 "test_rfcomm_connection_stress", 50 "test_rfcomm_read_write_stress", 51 ) 52 53 def on_fail(self, test_name, begin_time): 54 take_btsnoop_logs(self.android_devices, self, test_name) 55 reset_bluetooth(self.android_devices) 56 57 def teardown_test(self): 58 with suppress(Exception): 59 for thread in self.thread_list: 60 thread.join() 61 62 def orchestrate_rfcomm_connect(self, server_mac): 63 accept_thread = threading.Thread(target=rfcomm_accept, 64 args=(self.server_ad, )) 65 self.thread_list.append(accept_thread) 66 accept_thread.start() 67 connect_thread = threading.Thread( 68 target=rfcomm_connect, 69 args=(self.client_ad, server_mac)) 70 self.thread_list.append(connect_thread) 71 connect_thread.start() 72 73 def test_rfcomm_connection_stress(self): 74 """Stress test an RFCOMM connection 75 76 Test the integrity of RFCOMM. Verify that file descriptors are cleared 77 out properly. 78 79 Steps: 80 1. Establish a bonding between two Android devices. 81 2. Write data to RFCOMM from the client droid. 82 3. Read data from RFCOMM from the server droid. 83 4. Stop the RFCOMM connection. 84 5. Repeat steps 2-4 1000 times. 85 86 Expected Result: 87 Each iteration should read and write to the RFCOMM connection 88 successfully. 89 90 Returns: 91 Pass if True 92 Fail if False 93 94 TAGS: Classic, Stress, RFCOMM 95 Priority: 1 96 """ 97 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 98 for n in range(1000): 99 self.orchestrate_rfcomm_connect(server_mac) 100 self.log.info("Write message.") 101 self.client_ad.droid.bluetoothRfcommWrite(self.message) 102 self.log.info("Read message.") 103 read_msg = self.server_ad.droid.bluetoothRfcommRead() 104 self.log.info("Verify message.") 105 assert self.message == read_msg, "Mismatch! Read {}".format( 106 read_msg) 107 self.client_ad.droid.bluetoothRfcommStop() 108 self.server_ad.droid.bluetoothRfcommStop() 109 for t in self.thread_list: 110 t.join() 111 self.thread_list.clear() 112 self.log.info("Iteration {} completed".format(n)) 113 return True 114 115 def test_rfcomm_read_write_stress(self): 116 """Stress test an RFCOMM connection's read and write capabilities 117 118 Test the integrity of RFCOMM. Verify that file descriptors are cleared 119 out properly. 120 121 Steps: 122 1. Establish a bonding between two Android devices. 123 2. Write data to RFCOMM from the client droid. 124 3. Read data from RFCOMM from the server droid. 125 4. Repeat steps 2-3 10000 times. 126 5. Stop the RFCOMM connection. 127 128 Expected Result: 129 Each iteration should read and write to the RFCOMM connection 130 successfully. 131 132 Returns: 133 Pass if True 134 Fail if False 135 136 TAGS: Classic, Stress, RFCOMM 137 Priority: 1 138 """ 139 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 140 reset_bluetooth([self.server_ad]) 141 self.orchestrate_rfcomm_connect(server_mac) 142 for n in range(10000): 143 self.log.info("Write message.") 144 self.client_ad.droid.bluetoothRfcommWrite(self.message) 145 self.log.info("Read message.") 146 read_msg = self.server_ad.droid.bluetoothRfcommRead() 147 self.log.info("Verify message.") 148 assert self.message == read_msg, "Mismatch! Read {}".format( 149 read_msg) 150 self.log.info("Iteration {} completed".format(n)) 151 self.client_ad.droid.bluetoothRfcommStop() 152 self.server_ad.droid.bluetoothRfcommStop() 153 for t in self.thread_list: 154 t.join() 155 self.thread_list.clear() 156 return True 157