• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Test script to execute Bluetooth basic functionality test cases.
18This test was designed to be run in a shield box.
19"""
20
21import threading
22import time
23from contextlib import suppress
24
25from queue import Empty
26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
27from acts.test_utils.bt.bt_test_utils import reset_bluetooth
28from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
29from acts.test_utils.bt.bt_test_utils import get_bt_mac_address
30from acts.test_utils.bt.bt_test_utils import rfcomm_accept
31from acts.test_utils.bt.bt_test_utils import rfcomm_connect
32from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
33
34
35class RfcommStressTest(BluetoothBaseTest):
36    default_timeout = 10
37    scan_discovery_time = 5
38    thread_list = []
39    message = (
40        "Space: the final frontier. These are the voyages of "
41        "the starship Enterprise. Its continuing mission: to explore "
42        "strange new worlds, to seek out new life and new civilizations,"
43        " to boldly go where no man has gone before.")
44
45    def __init__(self, controllers):
46        BluetoothBaseTest.__init__(self, controllers)
47        self.client_ad = self.android_devices[0]
48        self.server_ad = self.android_devices[1]
49        self.tests = (
50            "test_rfcomm_connection_stress",
51            "test_rfcomm_read_write_stress",
52        )
53
54    def on_fail(self, test_name, begin_time):
55        take_btsnoop_logs(self.android_devices, self, test_name)
56        reset_bluetooth(self.android_devices)
57
58    def teardown_test(self):
59        with suppress(Exception):
60            for thread in self.thread_list:
61                thread.join()
62
63    def orchestrate_rfcomm_connect(self, server_mac):
64        accept_thread = threading.Thread(target=rfcomm_accept,
65                                         args=(self.server_ad.droid, ))
66        self.thread_list.append(accept_thread)
67        accept_thread.start()
68        connect_thread = threading.Thread(
69            target=rfcomm_connect,
70            args=(self.client_ad, server_mac))
71        self.thread_list.append(connect_thread)
72        connect_thread.start()
73
74    def test_rfcomm_connection_stress(self):
75        """Stress test an RFCOMM connection
76
77        Test the integrity of RFCOMM. Verify that file descriptors are cleared
78        out properly.
79
80        Steps:
81        1. Establish a bonding between two Android devices.
82        2. Write data to RFCOMM from the client droid.
83        3. Read data from RFCOMM from the server droid.
84        4. Stop the RFCOMM connection.
85        5. Repeat steps 2-4 1000 times.
86
87        Expected Result:
88        Each iteration should read and write to the RFCOMM connection
89        successfully.
90
91        Returns:
92          Pass if True
93          Fail if False
94
95        TAGS: Classic, Stress, RFCOMM
96        Priority: 1
97        """
98        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
99        for n in range(1000):
100            self.orchestrate_rfcomm_connect(server_mac)
101            self.log.info("Write message.")
102            self.client_ad.droid.bluetoothRfcommWrite(self.message)
103            self.log.info("Read message.")
104            read_msg = self.server_ad.droid.bluetoothRfcommRead()
105            self.log.info("Verify message.")
106            assert self.message == read_msg, "Mismatch! Read {}".format(
107                read_msg)
108            self.client_ad.droid.bluetoothRfcommStop()
109            self.server_ad.droid.bluetoothRfcommStop()
110            for t in self.thread_list:
111                t.join()
112            self.thread_list.clear()
113            self.log.info("Iteration {} completed".format(n))
114        return True
115
116    def test_rfcomm_read_write_stress(self):
117        """Stress test an RFCOMM connection's read and write capabilities
118
119        Test the integrity of RFCOMM. Verify that file descriptors are cleared
120        out properly.
121
122        Steps:
123        1. Establish a bonding between two Android devices.
124        2. Write data to RFCOMM from the client droid.
125        3. Read data from RFCOMM from the server droid.
126        4. Repeat steps 2-3 10000 times.
127        5. Stop the RFCOMM connection.
128
129        Expected Result:
130        Each iteration should read and write to the RFCOMM connection
131        successfully.
132
133        Returns:
134          Pass if True
135          Fail if False
136
137        TAGS: Classic, Stress, RFCOMM
138        Priority: 1
139        """
140        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
141        reset_bluetooth([self.server_ad])
142        self.orchestrate_rfcomm_connect(server_mac)
143        for n in range(10000):
144            self.log.info("Write message.")
145            self.client_ad.droid.bluetoothRfcommWrite(self.message)
146            self.log.info("Read message.")
147            read_msg = self.server_ad.droid.bluetoothRfcommRead()
148            self.log.info("Verify message.")
149            assert self.message == read_msg, "Mismatch! Read {}".format(
150                read_msg)
151            self.log.info("Iteration {} completed".format(n))
152        self.client_ad.droid.bluetoothRfcommStop()
153        self.server_ad.droid.bluetoothRfcommStop()
154        for t in self.thread_list:
155            t.join()
156        self.thread_list.clear()
157        return True
158