• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Test script to execute Bluetooth basic functionality test cases.
18This test was designed to be run in a shield box.
19"""
20
21import threading
22import time
23from contextlib import suppress
24
25from queue import Empty
26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
27from acts.test_utils.bt.bt_test_utils import reset_bluetooth
28from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
29from acts.test_utils.bt.bt_test_utils import rfcomm_accept
30from acts.test_utils.bt.bt_test_utils import rfcomm_connect
31from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
32
33
34class RfcommStressTest(BluetoothBaseTest):
35    default_timeout = 10
36    scan_discovery_time = 5
37    thread_list = []
38    message = (
39        "Space: the final frontier. These are the voyages of "
40        "the starship Enterprise. Its continuing mission: to explore "
41        "strange new worlds, to seek out new life and new civilizations,"
42        " to boldly go where no man has gone before.")
43
44    def __init__(self, controllers):
45        BluetoothBaseTest.__init__(self, controllers)
46        self.client_ad = self.android_devices[0]
47        self.server_ad = self.android_devices[1]
48        self.tests = (
49            "test_rfcomm_connection_stress",
50            "test_rfcomm_read_write_stress",
51        )
52
53    def on_fail(self, test_name, begin_time):
54        take_btsnoop_logs(self.android_devices, self, test_name)
55        reset_bluetooth(self.android_devices)
56
57    def teardown_test(self):
58        with suppress(Exception):
59            for thread in self.thread_list:
60                thread.join()
61
62    def orchestrate_rfcomm_connect(self, server_mac):
63        accept_thread = threading.Thread(target=rfcomm_accept,
64                                         args=(self.server_ad, ))
65        self.thread_list.append(accept_thread)
66        accept_thread.start()
67        connect_thread = threading.Thread(
68            target=rfcomm_connect,
69            args=(self.client_ad, server_mac))
70        self.thread_list.append(connect_thread)
71        connect_thread.start()
72
73    def test_rfcomm_connection_stress(self):
74        """Stress test an RFCOMM connection
75
76        Test the integrity of RFCOMM. Verify that file descriptors are cleared
77        out properly.
78
79        Steps:
80        1. Establish a bonding between two Android devices.
81        2. Write data to RFCOMM from the client droid.
82        3. Read data from RFCOMM from the server droid.
83        4. Stop the RFCOMM connection.
84        5. Repeat steps 2-4 1000 times.
85
86        Expected Result:
87        Each iteration should read and write to the RFCOMM connection
88        successfully.
89
90        Returns:
91          Pass if True
92          Fail if False
93
94        TAGS: Classic, Stress, RFCOMM
95        Priority: 1
96        """
97        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
98        for n in range(1000):
99            self.orchestrate_rfcomm_connect(server_mac)
100            self.log.info("Write message.")
101            self.client_ad.droid.bluetoothRfcommWrite(self.message)
102            self.log.info("Read message.")
103            read_msg = self.server_ad.droid.bluetoothRfcommRead()
104            self.log.info("Verify message.")
105            assert self.message == read_msg, "Mismatch! Read {}".format(
106                read_msg)
107            self.client_ad.droid.bluetoothRfcommStop()
108            self.server_ad.droid.bluetoothRfcommStop()
109            for t in self.thread_list:
110                t.join()
111            self.thread_list.clear()
112            self.log.info("Iteration {} completed".format(n))
113        return True
114
115    def test_rfcomm_read_write_stress(self):
116        """Stress test an RFCOMM connection's read and write capabilities
117
118        Test the integrity of RFCOMM. Verify that file descriptors are cleared
119        out properly.
120
121        Steps:
122        1. Establish a bonding between two Android devices.
123        2. Write data to RFCOMM from the client droid.
124        3. Read data from RFCOMM from the server droid.
125        4. Repeat steps 2-3 10000 times.
126        5. Stop the RFCOMM connection.
127
128        Expected Result:
129        Each iteration should read and write to the RFCOMM connection
130        successfully.
131
132        Returns:
133          Pass if True
134          Fail if False
135
136        TAGS: Classic, Stress, RFCOMM
137        Priority: 1
138        """
139        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
140        reset_bluetooth([self.server_ad])
141        self.orchestrate_rfcomm_connect(server_mac)
142        for n in range(10000):
143            self.log.info("Write message.")
144            self.client_ad.droid.bluetoothRfcommWrite(self.message)
145            self.log.info("Read message.")
146            read_msg = self.server_ad.droid.bluetoothRfcommRead()
147            self.log.info("Verify message.")
148            assert self.message == read_msg, "Mismatch! Read {}".format(
149                read_msg)
150            self.log.info("Iteration {} completed".format(n))
151        self.client_ad.droid.bluetoothRfcommStop()
152        self.server_ad.droid.bluetoothRfcommStop()
153        for t in self.thread_list:
154            t.join()
155        self.thread_list.clear()
156        return True
157