• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from blueberry.tests.gd.cert import gd_base_test
18from blueberry.tests.gd.cert.py_hci import PyHci
19from blueberry.tests.gd.cert.py_acl_manager import PyAclManager
20from blueberry.tests.gd.cert.truth import assertThat
21from datetime import timedelta
22from blueberry.facade.neighbor import facade_pb2 as neighbor_facade
23from mobly import test_runner
24import hci_packets as hci
25
26
27class AclManagerTest(gd_base_test.GdBaseTestClass):
28
29    def setup_class(self):
30        gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HCI_INTERFACES', cert_module='HCI')
31
32    # todo: move into GdBaseTestClass, based on modules inited
33    def setup_test(self):
34        gd_base_test.GdBaseTestClass.setup_test(self)
35        self.cert_hci = PyHci(self.cert, acl_streaming=True)
36        self.dut_acl_manager = PyAclManager(self.dut)
37
38    def teardown_test(self):
39        self.cert_hci.close()
40        gd_base_test.GdBaseTestClass.teardown_test(self)
41
42    def test_dut_connects(self):
43        self.cert_hci.enable_inquiry_and_page_scan()
44        cert_address = self.cert_hci.read_own_address()
45
46        self.dut_acl_manager.initiate_connection(repr(cert_address))
47        cert_acl = self.cert_hci.accept_connection()
48        with self.dut_acl_manager.complete_outgoing_connection() as dut_acl:
49            cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
50            dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
51
52            assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
53            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
54
55    def test_cert_connects(self):
56        dut_address = self.dut.hci_controller.GetMacAddressSimple()
57        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
58
59        self.dut_acl_manager.listen_for_an_incoming_connection()
60        self.cert_hci.initiate_connection(dut_address)
61        with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
62            cert_acl = self.cert_hci.complete_connection()
63
64            dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
65
66            cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
67
68            assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
69            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
70
71    def test_reject_broadcast(self):
72        dut_address = self.dut.hci_controller.GetMacAddressSimple()
73        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
74
75        self.dut_acl_manager.listen_for_an_incoming_connection()
76        self.cert_hci.initiate_connection(dut_address)
77        with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
78            cert_acl = self.cert_hci.complete_connection()
79
80            cert_acl.send(hci.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
81                          hci.BroadcastFlag.ACTIVE_PERIPHERAL_BROADCAST,
82                          b'\x26\x00\x07\x00This is a Broadcast from the Cert')
83            assertThat(dut_acl).emitsNone(timeout=timedelta(seconds=0.5))
84
85            cert_acl.send(hci.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci.BroadcastFlag.POINT_TO_POINT,
86                          b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
87            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
88
89    def test_cert_connects_disconnects(self):
90        dut_address = self.dut.hci_controller.GetMacAddressSimple()
91        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
92
93        self.dut_acl_manager.listen_for_an_incoming_connection()
94        self.cert_hci.initiate_connection(dut_address)
95        with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
96            cert_acl = self.cert_hci.complete_connection()
97
98            dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
99
100            cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
101
102            assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
103            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
104
105            dut_acl.disconnect(hci.DisconnectReason.REMOTE_USER_TERMINATED_CONNECTION)
106            dut_acl.wait_for_disconnection_complete()
107
108    def test_recombination_l2cap_packet(self):
109        self.cert_hci.enable_inquiry_and_page_scan()
110        cert_address = self.cert_hci.read_own_address()
111
112        self.dut_acl_manager.initiate_connection(repr(cert_address))
113        cert_acl = self.cert_hci.accept_connection()
114        with self.dut_acl_manager.complete_outgoing_connection() as dut_acl:
115            cert_acl.send_first(b'\x06\x00\x07\x00Hello')
116            cert_acl.send_continuing(b'!')
117            cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200)
118
119            assertThat(dut_acl).emits(lambda packet: b'Hello!' in packet.payload,
120                                      lambda packet: b'Hello' * 200 in packet.payload).inOrder()
121
122
123if __name__ == '__main__':
124    test_runner.main()
125