• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from blueberry.tests.gd.cert.captures import HciCaptures
20from blueberry.tests.gd.cert.closable import safeClose
21from blueberry.tests.gd.cert.event_stream import EventStream
22from blueberry.tests.gd.cert.matchers import HciMatchers
23from blueberry.tests.gd.cert.py_hci import PyHci
24from blueberry.tests.gd.cert.py_security import PySecurity
25from blueberry.tests.gd.cert.truth import assertThat
26from google.protobuf import empty_pb2 as empty_proto
27from blueberry.facade.l2cap.classic import facade_pb2 as l2cap_facade
28from blueberry.facade.security.facade_pb2 import IoCapabilities
29from blueberry.facade.security.facade_pb2 import AuthenticationRequirements
30from blueberry.facade.security.facade_pb2 import OobDataPresent
31from blueberry.utils import bluetooth
32import hci_packets as hci
33
34
35class CertSecurity(PySecurity):
36    """
37        Contain all of the certification stack logic for sending and receiving
38        HCI commands following the Classic Pairing flows.
39    """
40    _io_cap_lookup = {
41        IoCapabilities.DISPLAY_ONLY: hci.IoCapability.DISPLAY_ONLY,
42        IoCapabilities.DISPLAY_YES_NO_IO_CAP: hci.IoCapability.DISPLAY_YES_NO,
43        IoCapabilities.KEYBOARD_ONLY: hci.IoCapability.KEYBOARD_ONLY,
44        IoCapabilities.NO_INPUT_NO_OUTPUT: hci.IoCapability.NO_INPUT_NO_OUTPUT,
45    }
46
47    _auth_req_lookup = {
48        AuthenticationRequirements.NO_BONDING:
49            hci.AuthenticationRequirements.NO_BONDING,
50        AuthenticationRequirements.NO_BONDING_MITM_PROTECTION:
51            hci.AuthenticationRequirements.NO_BONDING_MITM_PROTECTION,
52        AuthenticationRequirements.DEDICATED_BONDING:
53            hci.AuthenticationRequirements.DEDICATED_BONDING,
54        AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION:
55            hci.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION,
56        AuthenticationRequirements.GENERAL_BONDING:
57            hci.AuthenticationRequirements.GENERAL_BONDING,
58        AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION:
59            hci.AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION,
60    }
61
62    _oob_present_lookup = {
63        OobDataPresent.NOT_PRESENT: hci.OobDataPresent.NOT_PRESENT,
64        OobDataPresent.P192_PRESENT: hci.OobDataPresent.P_192_PRESENT,
65        OobDataPresent.P256_PRESENT: hci.OobDataPresent.P_256_PRESENT,
66        OobDataPresent.P192_AND_256_PRESENT: hci.OobDataPresent.P_192_AND_256_PRESENT,
67    }
68
69    _hci_event_stream = None
70    _io_caps = hci.IoCapability.DISPLAY_ONLY
71    _auth_reqs = hci.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
72    _secure_connections_enabled = False
73
74    _hci = None
75
76    MAX_PIN_LENGTH = 16
77    MIN_PIN_LENGTH = 1
78
79    def _enqueue_hci_command(self, command, expect_complete):
80        if (expect_complete):
81            self._hci.send_command(command)
82        else:
83            self._hci.send_command(command)
84
85    def __init__(self, device):
86        """
87            Don't call super b/c the gRPC stream setup will crash test
88        """
89        logging.info("Cert: Init")
90        self._device = device
91        self._device.wait_channel_ready()
92        self._hci = PyHci(device)
93        self._hci.register_for_events(
94            hci.EventCode.ENCRYPTION_CHANGE, hci.EventCode.CHANGE_CONNECTION_LINK_KEY_COMPLETE,
95            hci.EventCode.CENTRAL_LINK_KEY_COMPLETE, hci.EventCode.RETURN_LINK_KEYS, hci.EventCode.PIN_CODE_REQUEST,
96            hci.EventCode.LINK_KEY_REQUEST, hci.EventCode.LINK_KEY_NOTIFICATION,
97            hci.EventCode.ENCRYPTION_KEY_REFRESH_COMPLETE, hci.EventCode.IO_CAPABILITY_REQUEST,
98            hci.EventCode.IO_CAPABILITY_RESPONSE, hci.EventCode.REMOTE_OOB_DATA_REQUEST,
99            hci.EventCode.SIMPLE_PAIRING_COMPLETE, hci.EventCode.USER_PASSKEY_NOTIFICATION,
100            hci.EventCode.KEYPRESS_NOTIFICATION, hci.EventCode.USER_CONFIRMATION_REQUEST,
101            hci.EventCode.USER_PASSKEY_REQUEST, hci.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)
102        self._hci_event_stream = self._hci.get_event_stream()
103
104    def create_bond(self, address, type):
105        """
106            Creates a bond from the cert perspective
107        """
108        logging.info("Cert: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address)))
109        # TODO(optedoblivion): Trigger connection to Send AuthenticationRequested
110
111    def remove_bond(self, address, type):
112        """
113            We store the link key locally in the test and pretend
114            So to remove_bond we need to Remove the "stored" data
115        """
116        pass
117
118    def set_io_capabilities(self, io_capabilities):
119        """
120            Set the IO Capabilities used for the cert
121        """
122        logging.info("Cert: setting IO Capabilities data to '%s'" %
123                     self._io_capabilities_name_lookup.get(io_capabilities, "ERROR"))
124        self._io_caps = self._io_cap_lookup.get(io_capabilities, hci.IoCapability.DISPLAY_ONLY)
125
126    def set_authentication_requirements(self, auth_reqs):
127        """
128            Establish authentication requirements for the stack
129        """
130        logging.info("Cert: setting Authentication Requirements data to '%s'" %
131                     self._auth_reqs_name_lookup.get(auth_reqs, "ERROR"))
132        self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci.AuthenticationRequirements.GENERAL_BONDING)
133
134    def get_oob_data_from_controller(self, pb_oob_data_type):
135        """
136            Get the Out-of-band data for SSP pairing
137
138            :param pb_oob_data_type: Type of data needed
139            :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value
140
141        """
142        oob_data_type = self._oob_present_lookup[pb_oob_data_type]
143
144        if (oob_data_type == hci.OobDataPresent.NOT_PRESENT):
145            logging.warn("No data present, no need to call get_oob_data")
146            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], [0 for i in range(0, 16)],
147                    [0 for i in range(0, 16)])
148
149        logging.info("Cert: Requesting OOB data")
150        if oob_data_type == hci.OobDataPresent.P_192_PRESENT:
151            # If host and controller supports secure connections we always used ReadLocalOobExtendedDataRequest
152            if self._secure_connections_enabled:
153                logging.info("Cert: Requesting P192 Data; secure connections")
154                complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
155                self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True)
156                logging.info("Cert: Waiting for OOB response from controller")
157                assertThat(self._hci_event_stream).emits(complete_capture)
158                complete = complete_capture.get()
159                return (list(complete.c192), list(complete.r192), [0 for i in range(0, 16)], [0 for i in range(0, 16)])
160            # else we use ReadLocalDataRequest
161            else:
162                logging.info("Cert: Requesting P192 Data; no secure connections")
163                complete_capture = HciCaptures.ReadLocalOobDataCompleteCapture()
164                self._enqueue_hci_command(hci.ReadLocalOobData(), True)
165                logging.info("Cert: Waiting for OOB response from controller")
166                assertThat(self._hci_event_stream).emits(complete_capture)
167                complete = complete_capture.get()
168                return (list(complete.c), list(complete.r), [0 for i in range(0, 16)], [0 for i in range(0, 16)])
169
170        # Must be secure connection compatible to use these
171        elif oob_data_type == hci.OobDataPresent.P_256_PRESENT:
172            logging.info("Cert: Requesting P256 Extended Data; secure connections")
173            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
174            self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True)
175            logging.info("Cert: Waiting for OOB response from controller")
176            assertThat(self._hci_event_stream).emits(complete_capture)
177            complete = complete_capture.get()
178            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], list(complete.c256), list(complete.r256))
179
180        else:  # Both
181            logging.info("Cert: Requesting P192 AND P256 Extended Data; secure connections")
182            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
183            self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True)
184            logging.info("Cert: Waiting for OOB response from controller")
185            assertThat(self._hci_event_stream).emits(complete_capture)
186            complete = complete_capture.get()
187            return (list(complete.c192), list(complete.r192), list(complete.c256), list(complete.r256))
188
189    def input_passkey(self, address, passkey):
190        """
191            Pretend to answer the pairing dialog as a user
192        """
193        logging.info("Cert: Waiting for PASSKEY request")
194        assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_REQUEST))
195        logging.info("Cert: Send user input passkey %d for %s" % (passkey, address))
196        peer = bluetooth.Address(address)
197        self._enqueue_hci_command(
198            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.ENTRY_STARTED),
199            True)
200        self._enqueue_hci_command(
201            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
202            True)
203        self._enqueue_hci_command(
204            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
205            True)
206        self._enqueue_hci_command(
207            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.CLEARED), True)
208        self._enqueue_hci_command(
209            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
210            True)
211        self._enqueue_hci_command(
212            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
213            True)
214        self._enqueue_hci_command(
215            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ERASED),
216            True)
217        self._enqueue_hci_command(
218            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
219            True)
220        self._enqueue_hci_command(
221            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
222            True)
223        self._enqueue_hci_command(
224            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
225            True)
226        self._enqueue_hci_command(
227            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
228            True)
229        self._enqueue_hci_command(
230            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
231            True)
232        self._enqueue_hci_command(
233            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.ENTRY_COMPLETED),
234            True)
235        self._enqueue_hci_command(hci.UserPasskeyRequestReply(bd_addr=peer, numerical_value=passkey), True)
236
237    def input_pin(self, address, pin):
238        """
239            Pretend to answer the pairing dialog as a user
240        """
241
242        if len(pin) > self.MAX_PIN_LENGTH or len(pin) < self.MIN_PIN_LENGTH:
243            raise Exception("Pin code must be within range")
244
245        logging.info("Cert: Waiting for PIN request")
246        assertThat(self._hci_event_stream).emits(HciMatchers.PinCodeRequest())
247        logging.info("Cert: Send user input PIN %s for %s" % (pin.decode(), address))
248        peer = address.decode('utf-8')
249        pin_list = list(pin)
250        # Pad
251        for i in range(self.MAX_PIN_LENGTH - len(pin_list)):
252            pin_list.append(0)
253        self._enqueue_hci_command(
254            hci.PinCodeRequestReply(bd_addr=bluetooth.Address(peer), pin_code_length=len(pin), pin_code=pin_list), True)
255
256    def __send_ui_callback(self, address, callback_type, b, uid, pin):
257        """
258            Pretend to answer the pairing dailog as a user
259        """
260        logging.info("Cert: Send user input callback uid:%d; response: %s" % (uid, b))
261        # TODO(optedoblivion): Make callback and set it to the module
262
263    def enable_secure_simple_pairing(self):
264        """
265            This is called when you want to enable SSP for testing
266        """
267        logging.info("Cert: Sending WRITE_SIMPLE_PAIRING_MODE [True]")
268        self._enqueue_hci_command(hci.WriteSimplePairingMode(simple_pairing_mode=hci.Enable.ENABLED), True)
269        logging.info("Cert: Waiting for controller response")
270        assertThat(self._hci_event_stream).emits(HciMatchers.CommandComplete(hci.OpCode.WRITE_SIMPLE_PAIRING_MODE))
271
272    def enable_secure_connections(self):
273        """
274            This is called when you want to enable secure connections support
275        """
276        logging.info("Cert: Sending WRITE_SECURE_CONNECTIONS_HOST_SUPPORT [True]")
277        self._enqueue_hci_command(
278            hci.WriteSecureConnectionsHostSupport(secure_connections_host_support=hci.Enable.ENABLED), True)
279        logging.info("Cert: Waiting for controller response")
280        assertThat(self._hci_event_stream).emits(
281            HciMatchers.CommandComplete(hci.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT))
282        # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc)
283        #self._secure_connections_enabled = True
284
285    def send_io_caps(self, address):
286        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
287        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
288        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
289        oob_data_present = hci.OobDataPresent.NOT_PRESENT
290        self._enqueue_hci_command(
291            hci.IoCapabilityRequestReply(bd_addr=bluetooth.Address(address),
292                                         io_capability=self._io_caps,
293                                         oob_present=oob_data_present,
294                                         authentication_requirements=self._auth_reqs), True)
295
296    def accept_pairing(self, dut_address, reply_boolean, expect_to_fail, on_responder_reply):
297        """
298            Here we handle the pairing events at the HCI level
299        """
300        logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE")
301        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse())
302        self.send_io_caps(dut_address)
303        logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST")
304        assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest())
305        logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean)
306        if reply_boolean:
307            logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_REPLY")
308            self._enqueue_hci_command(hci.UserConfirmationRequestReply(bd_addr=bluetooth.Address(dut_address)), True)
309            on_responder_reply()
310            logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
311            assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete())
312            if not expect_to_fail:
313                logging.info("Cert: Waiting for LINK_KEY_NOTIFICATION")
314                assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyNotification())
315        else:
316            logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY")
317            self._enqueue_hci_command(hci.UserConfirmationRequestNegativeReply(bd_addr=bluetooth.Address(dut_address)),
318                                      True)
319            on_responder_reply()
320            logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
321            assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete())
322
323    def accept_oob_pairing(self, dut_address):
324        logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE")
325        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse())
326        self.send_io_caps(dut_address)
327        logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
328        ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture()
329        assertThat(self._hci_event_stream).emits(ssp_complete_capture)
330        ssp_complete = ssp_complete_capture.get()
331        logging.info(ssp_complete.status)
332        assertThat(ssp_complete.status).isEqualTo(hci.ErrorCode.SUCCESS)
333
334    def on_user_input(self, dut_address, reply_boolean, expected_ui_event):
335        """
336            Cert doesn't need the test to respond to the ui event
337            Cert responds in accept pairing
338        """
339        pass
340
341    def wait_for_bond_event(self, expected_bond_event):
342        """
343            A bond event will be triggered once the bond process
344            is complete.  For the DUT we need to wait for it,
345            for Cert it isn't needed.
346        """
347        pass
348
349    def enforce_security_policy(self, address, type, policy):
350        """
351            Pass for now
352        """
353        pass
354
355    def wait_for_enforce_security_event(self, expected_enforce_security_event):
356        """
357            Cert side needs to pass
358        """
359        pass
360
361    def wait_for_disconnect_event(self):
362        """
363            Cert side needs to pass
364        """
365        pass
366        # FIXME: Gabeldorsche facade don't allow us to register for an DISCONNECT_COMPLETE event
367        # logging.info("Cert: Waiting for DISCONNECT_COMPLETE")
368        # assertThat(self._hci_event_stream).emits(HciMatchers.DisconnectionComplete())
369
370    def close(self):
371        safeClose(self._hci)
372