• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from datetime import timedelta
18import os
19import sys
20import logging
21
22from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
23from cert.event_callback_stream import EventCallbackStream
24from cert.event_asserts import EventAsserts
25from google.protobuf import empty_pb2 as empty_proto
26from facade import common_pb2 as common
27from facade import rootservice_pb2 as facade_rootservice_pb2
28from hci.facade import facade_pb2 as hci_facade
29from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
30from hci.facade import controller_facade_pb2 as controller_facade
31from l2cap.classic import facade_pb2 as l2cap_facade
32from neighbor.facade import facade_pb2 as neighbor_facade
33from security import facade_pb2 as security_facade
34from bluetooth_packets_python3 import hci_packets
35import bluetooth_packets_python3 as bt_packets
36
37
38class SimpleSecurityTest(GdFacadeOnlyBaseTestClass):
39
40    def setup_test(self):
41        self.device_under_test.rootservice.StartStack(
42            facade_rootservice_pb2.StartStackRequest(
43                module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
44                    'SECURITY'),))
45        self.cert_device.rootservice.StartStack(
46            facade_rootservice_pb2.StartStackRequest(
47                module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
48                    'L2CAP'),))
49
50        self.device_under_test.address = self.device_under_test.controller_read_only_property.ReadLocalAddress(
51            empty_proto.Empty()).address
52        self.cert_device.address = self.cert_device.controller_read_only_property.ReadLocalAddress(
53            empty_proto.Empty()).address
54
55        self.device_under_test.neighbor.EnablePageScan(
56            neighbor_facade.EnableMsg(enabled=True))
57        self.cert_device.neighbor.EnablePageScan(
58            neighbor_facade.EnableMsg(enabled=True))
59
60        self.dut_address = common.BluetoothAddress(
61            address=self.device_under_test.address)
62        self.cert_address = common.BluetoothAddress(
63            address=self.cert_device.address)
64
65        self.dut_address_with_type = common.BluetoothAddressWithType()
66        self.dut_address_with_type.address.CopyFrom(self.dut_address)
67        self.dut_address_with_type.type = common.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS
68
69        self.cert_address_with_type = common.BluetoothAddressWithType()
70        self.cert_address_with_type.address.CopyFrom(self.cert_address)
71        self.cert_address_with_type.type = common.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS
72
73        self.device_under_test.wait_channel_ready()
74        self.cert_device.wait_channel_ready()
75
76        self.cert_name = b'ImTheCert'
77        self.cert_device.hci_controller.WriteLocalName(
78            controller_facade.NameMsg(name=self.cert_name))
79        self.dut_name = b'ImTheDUT'
80        self.device_under_test.hci_controller.WriteLocalName(
81            controller_facade.NameMsg(name=self.dut_name))
82
83    def teardown_test(self):
84        self.device_under_test.rootservice.StopStack(
85            facade_rootservice_pb2.StopStackRequest())
86        self.cert_device.rootservice.StopStack(
87            facade_rootservice_pb2.StopStackRequest())
88
89    def tmp_register_for_event(self, event_code):
90        msg = hci_facade.EventCodeMsg(code=int(event_code))
91        self.device_under_test.hci.RegisterEventHandler(msg)
92
93    def tmp_enqueue_hci_command(self, command, expect_complete):
94        cmd_bytes = bytes(command.Serialize())
95        cmd = hci_facade.CommandMsg(command=cmd_bytes)
96        if (expect_complete):
97            self.device_under_test.hci.EnqueueCommandWithComplete(cmd)
98        else:
99            self.device_under_test.hci.EnqueueCommandWithStatus(cmd)
100
101    def register_for_event(self, event_code):
102        msg = hci_facade.EventCodeMsg(code=int(event_code))
103        self.cert_device.hci.RegisterEventHandler(msg)
104
105    def enqueue_hci_command(self, command, expect_complete):
106        cmd_bytes = bytes(command.Serialize())
107        cmd = hci_facade.CommandMsg(command=cmd_bytes)
108        if (expect_complete):
109            self.cert_device.hci.EnqueueCommandWithComplete(cmd)
110        else:
111            self.cert_device.hci.EnqueueCommandWithStatus(cmd)
112
113    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
114        acl_msg = hci_facade.AclMsg(
115            handle=int(handle),
116            packet_boundary_flag=int(pb_flag),
117            broadcast_flag=int(b_flag),
118            data=acl)
119        self.cert_device.hci.SendAclData(acl_msg)
120
121    def pair_justworks(self, cert_iocap_reply, expected_ui_event):
122        # Cert event registration
123        self.register_for_event(hci_packets.EventCode.LINK_KEY_REQUEST)
124        self.register_for_event(hci_packets.EventCode.IO_CAPABILITY_REQUEST)
125        self.register_for_event(hci_packets.EventCode.IO_CAPABILITY_RESPONSE)
126        self.register_for_event(hci_packets.EventCode.USER_PASSKEY_NOTIFICATION)
127        self.register_for_event(hci_packets.EventCode.USER_CONFIRMATION_REQUEST)
128        self.register_for_event(
129            hci_packets.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)
130        self.register_for_event(hci_packets.EventCode.LINK_KEY_NOTIFICATION)
131        self.register_for_event(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)
132        with EventCallbackStream(self.device_under_test.security.FetchUiEvents(empty_proto.Empty())) as dut_ui_stream, \
133            EventCallbackStream(self.device_under_test.security.FetchBondEvents(empty_proto.Empty())) as dut_bond_stream, \
134            EventCallbackStream(self.device_under_test.neighbor.GetRemoteNameEvents(empty_proto.Empty())) as name_event_stream, \
135            EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream:
136
137            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
138            dut_ui_event_asserts = EventAsserts(dut_ui_stream)
139            dut_bond_asserts = EventAsserts(dut_bond_stream)
140            dut_name_asserts = EventAsserts(name_event_stream)
141
142            dut_address = self.device_under_test.hci_controller.GetMacAddress(
143                empty_proto.Empty()).address
144            cert_address = self.cert_device.hci_controller.GetMacAddress(
145                empty_proto.Empty()).address
146
147            # Enable Simple Secure Pairing
148            self.enqueue_hci_command(
149                hci_packets.WriteSimplePairingModeBuilder(
150                    hci_packets.Enable.ENABLED), True)
151
152            cert_hci_event_asserts.assert_event_occurs(
153                lambda msg: b'\x0e\x04\x01\x56\x0c' in msg.event)
154
155            # Get the name
156            self.device_under_test.neighbor.ReadRemoteName(
157                neighbor_facade.RemoteNameRequestMsg(
158                    address=cert_address,
159                    page_scan_repetition_mode=1,
160                    clock_offset=0x6855))
161
162            dut_name_asserts.assert_event_occurs(
163                lambda msg: self.cert_name in msg.name)
164
165            self.device_under_test.security.CreateBond(
166                common.BluetoothAddressWithType(
167                    address=common.BluetoothAddress(address=cert_address),
168                    type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS))
169
170            cert_hci_event_asserts.assert_event_occurs(
171                lambda event: logging.debug(event.event) or hci_packets.EventCode.IO_CAPABILITY_REQUEST in event.event
172            )
173
174            self.enqueue_hci_command(cert_iocap_reply, True)
175
176            cert_hci_event_asserts.assert_event_occurs(
177                lambda event: logging.debug(event.event) or hci_packets.EventCode.USER_CONFIRMATION_REQUEST in event.event
178            )
179            self.enqueue_hci_command(
180                hci_packets.UserConfirmationRequestReplyBuilder(
181                    dut_address.decode('utf8')), True)
182
183            logging.info("Waiting for UI event")
184            ui_id = -1
185
186            def get_unique_id(event):
187                if (event.message_type == expected_ui_event):
188                    nonlocal ui_id
189                    ui_id = event.unique_id
190                    return True
191                return False
192
193            dut_ui_event_asserts.assert_event_occurs(get_unique_id)
194
195            logging.info("Sending UI response")
196            self.device_under_test.security.SendUiCallback(
197                security_facade.UiCallbackMsg(
198                    message_type=security_facade.UiCallbackType.YES_NO,
199                    boolean=True,
200                    unique_id=ui_id))
201
202            dut_bond_asserts.assert_event_occurs(
203                lambda bond_event: bond_event.message_type == security_facade.BondMsgType.DEVICE_BONDED
204            )
205
206    def test_display_only(self):
207        dut_address = self.device_under_test.hci_controller.GetMacAddress(
208            empty_proto.Empty()).address
209        self.pair_justworks(
210            hci_packets.IoCapabilityRequestReplyBuilder(
211                dut_address.decode('utf8'),
212                hci_packets.IoCapability.DISPLAY_ONLY,
213                hci_packets.OobDataPresent.NOT_PRESENT, hci_packets.
214                AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION),
215            security_facade.UiMsgType.DISPLAY_YES_NO_WITH_VALUE)
216
217    def test_no_input_no_output(self):
218        dut_address = self.device_under_test.hci_controller.GetMacAddress(
219            empty_proto.Empty()).address
220        self.pair_justworks(
221            hci_packets.IoCapabilityRequestReplyBuilder(
222                dut_address.decode('utf8'),
223                hci_packets.IoCapability.NO_INPUT_NO_OUTPUT,
224                hci_packets.OobDataPresent.NOT_PRESENT, hci_packets.
225                AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION),
226            security_facade.UiMsgType.DISPLAY_YES_NO)
227
228    def test_display_yes_no(self):
229        dut_address = self.device_under_test.hci_controller.GetMacAddress(
230            empty_proto.Empty()).address
231        self.pair_justworks(
232            hci_packets.IoCapabilityRequestReplyBuilder(
233                dut_address.decode('utf8'),
234                hci_packets.IoCapability.DISPLAY_YES_NO,
235                hci_packets.OobDataPresent.NOT_PRESENT, hci_packets.
236                AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION),
237            security_facade.UiMsgType.DISPLAY_YES_NO_WITH_VALUE)
238