• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Test script to execute Bluetooth basic functionality test cases.
18This test was designed to be run in a shield box.
19"""
20
21import threading
22import time
23from contextlib import suppress
24from random import randint
25
26from queue import Empty
27from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
28from acts.test_utils.bt.bt_test_utils import reset_bluetooth
29from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
30from acts.test_utils.bt.bt_test_utils import rfcomm_accept
31from acts.test_utils.bt.bt_test_utils import rfcomm_connect
32from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
33from acts.test_utils.bt.bt_test_utils import write_read_verify_data
34
35class RfcommLongevityTest(BluetoothBaseTest):
36    default_timeout = 10
37    longev_iterations = 200
38    thread_list = []
39    generic_message = (
40        "Space: the final frontier. These are the voyages of "
41        "the starship Enterprise. Its continuing mission: to explore "
42        "strange new worlds, to seek out new life and new civilizations,"
43        " to boldly go where no man has gone before.")
44
45    def __init__(self, controllers):
46        BluetoothBaseTest.__init__(self, controllers)
47        self.client_ad = self.android_devices[0]
48        self.server_ad = self.android_devices[1]
49
50    def on_fail(self, test_name, begin_time):
51        take_btsnoop_logs(self.android_devices, self, test_name)
52        for _ in range(5):
53            if reset_bluetooth(self.android_devices):
54                break
55            else:
56                self.log.info("Failed to reset bluetooth state, retrying...")
57
58    def teardown_test(self):
59        with suppress(Exception):
60            for thread in self.thread_list:
61                thread.join()
62        for _ in range(5):
63            if reset_bluetooth(self.android_devices):
64                break
65            else:
66                self.log.info("Failed to reset bluetooth state, retrying...")
67        time.sleep(20) #safeguard in case of connId errors
68
69    def orchestrate_rfcomm_connect(self, server_mac):
70        accept_thread = threading.Thread(target=rfcomm_accept,
71                                         args=(self.server_ad, ))
72        self.thread_list.append(accept_thread)
73        accept_thread.start()
74        connect_thread = threading.Thread(
75            target=rfcomm_connect,
76            args=(self.client_ad, server_mac))
77        self.thread_list.append(connect_thread)
78        connect_thread.start()
79
80    def test_rfcomm_longev_read_write_message(self):
81        """Longevity test an RFCOMM connection's I/O with a generic message
82
83        Test the longevity of RFCOMM with a basic read/write
84        connect/disconnect sequence.
85
86        Steps:
87        1. Establish a bonding between two Android devices.
88        2. Write data to RFCOMM from the client droid.
89        3. Read data from RFCOMM from the server droid.
90        4. Verify data written matches data read.
91        5. Repeat steps 2-4 5000 times.
92        6. Disconnect RFCOMM connection.
93        7. Repeat steps 1-6 1000 times.
94
95        Expected Result:
96        Each iteration should read and write to the RFCOMM connection
97        successfully. Each connect and disconnect should be successful.
98
99        Returns:
100          Pass if True
101          Fail if False
102
103        TAGS: Classic, Longevity, RFCOMM
104        Priority: 2
105        """
106        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
107        write_iterations = 5000
108        for i in range(self.longev_iterations):
109            self.log.info("iteration {} connection".format(i))
110            self.orchestrate_rfcomm_connect(server_mac)
111            for n in range(write_iterations):
112                self.log.info("iteration {} data".format(((n + 1) + (
113                    i * write_iterations))))
114                if not write_read_verify_data(self.client_ad, self.server_ad,
115                    self.generic_message, False):
116                    return False
117                self.log.info("Iteration {} completed".format(n))
118            self.client_ad.droid.bluetoothRfcommStop()
119            self.server_ad.droid.bluetoothRfcommStop()
120        for t in self.thread_list:
121            t.join()
122        self.thread_list.clear()
123        return True
124
125    def test_rfcomm_longev_read_write_small_message(self):
126        """Longevity test an RFCOMM connection's I/O with a small message
127
128        Test the longevity of RFCOMM with a basic read/write
129        connect/disconnect sequence. The data being transfered is only
130        one character in size.
131
132        Steps:
133        1. Establish a bonding between two Android devices.
134        2. Write data to RFCOMM from the client droid.
135        3. Read data from RFCOMM from the server droid.
136        4. Verify data written matches data read.
137        5. Repeat steps 2-4 5000 times.
138        6. Disconnect RFCOMM connection.
139        7. Repeat steps 1-6 1000 times.
140
141        Expected Result:
142        Each iteration should read and write to the RFCOMM connection
143        successfully. Each connect and disconnect should be successful.
144
145        Returns:
146          Pass if True
147          Fail if False
148
149        TAGS: Classic, Longevity, RFCOMM
150        Priority: 2
151        """
152        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
153        message = "x"
154        write_iterations = 5000
155        for i in range(self.longev_iterations):
156            self.log.info("iteration {} connection".format(i))
157            self.orchestrate_rfcomm_connect(server_mac)
158            for n in range(write_iterations):
159                self.log.info("iteration {} data".format(((n + 1) + (
160                    i * write_iterations))))
161                if not write_read_verify_data(self.client_ad, self.server_ad,
162                    message, False):
163                    return False
164                self.log.info("Iteration {} completed".format(n))
165            self.client_ad.droid.bluetoothRfcommStop()
166            self.server_ad.droid.bluetoothRfcommStop()
167        for t in self.thread_list:
168            t.join()
169        self.thread_list.clear()
170        return True
171
172    def test_rfcomm_longev_read_write_binary_message(self):
173        """Longevity test an RFCOMM connection's I/O with a binary message
174
175        Test the longevity of RFCOMM with a basic read/write
176        connect/disconnect sequence. The data being transfered is in a
177        binary format.
178
179        Steps:
180        1. Establish a bonding between two Android devices.
181        2. Write data to RFCOMM from the client droid.
182        3. Read data from RFCOMM from the server droid.
183        4. Verify data written matches data read.
184        5. Repeat steps 2-4 5000 times.
185        6. Disconnect RFCOMM connection.
186        7. Repeat steps 1-6 1000 times.
187
188        Expected Result:
189        Each iteration should read and write to the RFCOMM connection
190        successfully. Each connect and disconnect should be successful.
191
192        Returns:
193          Pass if True
194          Fail if False
195
196        TAGS: Classic, Longevity, RFCOMM
197        Priority: 2
198        """
199        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
200        binary_message = "11010101"
201        write_iterations = 5000
202        for i in range(self.longev_iterations):
203            self.log.info("iteration {} connection".format(i))
204            self.orchestrate_rfcomm_connect(server_mac)
205            for n in range(write_iterations):
206                self.log.info("iteration {} data".format(((n + 1) + (
207                    i * write_iterations))))
208                if not write_read_verify_data(self.client_ad, self.server_ad,
209                    binary_message, True):
210                    return False
211                self.log.info("Iteration {} completed".format(n))
212            self.client_ad.droid.bluetoothRfcommStop()
213            self.server_ad.droid.bluetoothRfcommStop()
214        for t in self.thread_list:
215            t.join()
216        self.thread_list.clear()
217        return True
218
219    def test_rfcomm_longev_read_write_large_message(self):
220        """Longevity test an RFCOMM connection's I/O with a large message
221
222        Test the longevity of RFCOMM with a basic read/write
223        connect/disconnect sequence. The data being transfered is 990 chars
224        in size.
225
226        Steps:
227        1. Establish a bonding between two Android devices.
228        2. Write data to RFCOMM from the client droid.
229        3. Read data from RFCOMM from the server droid.
230        4. Verify data written matches data read.
231        5. Repeat steps 2-4 5000 times.
232        6. Disconnect RFCOMM connection.
233        7. Repeat steps 1-6 1000 times.
234
235        Expected Result:
236        Each iteration should read and write to the RFCOMM connection
237        successfully. Each connect and disconnect should be successful.
238
239        Returns:
240          Pass if True
241          Fail if False
242
243        TAGS: Classic, Longevity, RFCOMM
244        Priority: 2
245        """
246        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
247        message = "x" * 990  #largest message size till sl4a fixed
248        write_iterations = 5000
249        for i in range(self.longev_iterations):
250            self.log.info("iteration {} connection".format(i))
251            self.orchestrate_rfcomm_connect(server_mac)
252            for n in range(write_iterations):
253                self.log.info("iteration {} data".format(((n + 1) + (
254                    i * write_iterations))))
255                if not write_read_verify_data(self.client_ad, self.server_ad,
256                    message, False):
257                    return False
258                self.log.info("Iteration {} completed".format(n))
259            self.client_ad.droid.bluetoothRfcommStop()
260            self.server_ad.droid.bluetoothRfcommStop()
261        for t in self.thread_list:
262            t.join()
263        self.thread_list.clear()
264        return True
265
266    def test_rfcomm_longev_connection_interuption(self):
267        """Longevity test an RFCOMM connection's with socket interuptions
268
269        Test the longevity of RFCOMM with a basic read/write
270        connect/disconnect sequence. Randomly in the sequence of reads and
271        writes the socket on the client side will close. There should be
272        an exception thrown for writing the next set of data and the
273        test should start up a new connection and continue.
274
275        Steps:
276        1. Establish a bonding between two Android devices.
277        2. Write data to RFCOMM from the client droid.
278        3. Read data from RFCOMM from the server droid.
279        4. Verify data written matches data read.
280        5. Repeat steps 2-4 5000 times or until the random interupt occurs.
281        6. Re-establish an RFCOMM connection.
282        7. Repeat steps 1-6 1000 times.
283
284        Expected Result:
285        Each iteration should read and write to the RFCOMM connection
286        successfully. Each connect and disconnect should be successful.
287        Devices should recover a new connection after each interruption.
288
289        Returns:
290          Pass if True
291          Fail if False
292
293        TAGS: Classic, Longevity, RFCOMM
294        Priority: 2
295        """
296        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
297        write_iterations = 5000
298        for i in range(self.longev_iterations):
299            try:
300                self.log.info("iteration {} connection".format(i))
301                self.orchestrate_rfcomm_connect(server_mac)
302                random_interup_iteration = randint(0, write_iterations)
303                for n in range(write_iterations):
304                    self.log.info("iteration {} data".format(((n + 1) + (
305                        i * write_iterations))))
306                    if not write_read_verify_data(self.client_ad, self.server_ad,
307                        self.generic_message, False):
308                        return False
309                    self.log.info("Iteration {} completed".format(n))
310                    if n > random_interup_iteration:
311                        self.client_ad.droid.bluetoothRfcommCloseSocket()
312                self.client_ad.droid.bluetoothRfcommStop()
313                self.server_ad.droid.bluetoothRfcommStop()
314            except Exception:
315                self.log.info("Exception found as expected. Continuing...")
316                try:
317                    self.client_ad.droid.bluetoothRfcommStop()
318                except Exception as err:
319                    self.log.error(
320                        "Error closing client connection: {}".format(err))
321                    return False
322                try:
323                    self.server_ad.droid.bluetoothRfcommStop()
324                except Exception as err:
325                    self.log.error(
326                        "Error closing server connection: {}".format(err))
327                    return False
328                for t in self.thread_list:
329                    t.join()
330                self.thread_list.clear()
331        for t in self.thread_list:
332            t.join()
333        self.thread_list.clear()
334        return True
335
336    def test_rfcomm_longev_data_elasticity(self):
337        """Longevity test an RFCOMM connection's I/O with changing data size
338
339        Test the longevity of RFCOMM with a basic read/write
340        connect/disconnect sequence. The data being transfered changes
341        in size after each write/read sequence to increase up to 990
342        chars in size and decrease down to 1 in size. This repeats through
343        the entire test in order to exercise different size values being
344        written.
345
346        Steps:
347        1. Establish a bonding between two Android devices.
348        2. Write data to RFCOMM from the client droid.
349        3. Read data from RFCOMM from the server droid.
350        4. Verify data written matches data read.
351        5. Change data size according to above description.
352        6. Repeat steps 2-5 5000 times.
353        7. Disconnect RFCOMM connection.
354        8. Repeat steps 1-6 1000 times.
355
356        Expected Result:
357        Each iteration should read and write to the RFCOMM connection
358        successfully. Each connect and disconnect should be successful.
359
360        Returns:
361          Pass if True
362          Fail if False
363
364        TAGS: Classic, Longevity, RFCOMM
365        Priority: 2
366        """
367        server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
368        message = "x"
369        resize_toggle = 1
370        write_iterations = 5000
371        for i in range(self.longev_iterations):
372            try:
373                self.log.info("iteration {} connection".format(i))
374                self.orchestrate_rfcomm_connect(server_mac)
375                for n in range(write_iterations):
376                    self.log.info("iteration {} data".format(((n + 1) + (
377                        i * write_iterations))))
378                    if not write_read_verify_data(self.client_ad, self.server_ad,
379                        message, False):
380                        return False
381                    self.log.info("Iteration {} completed".format(n))
382                    size_of_message = len(message)
383                    #max size is 990 due to a bug in sl4a.
384                    if size_of_message >= 990:
385                        resize_toggle = 0
386                    elif size_of_message <= 1:
387                        resize_toggle = 1
388                    if resize_toggle == 0:
389                        message = "x" * (size_of_message - 1)
390                    else:
391                        message = "x" * (size_of_message + 1)
392                self.client_ad.droid.bluetoothRfcommStop()
393                self.server_ad.droid.bluetoothRfcommStop()
394            except Exception as err:
395                self.log.info("Error in longevity test: {}".format(err))
396                for t in self.thread_list:
397                    t.join()
398                self.thread_list.clear()
399                return False
400
401        for t in self.thread_list:
402            t.join()
403        self.thread_list.clear()
404        return True
405