1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""SMP proxy module.""" 15import asyncio 16import sys 17from queue import Empty, Queue 18from threading import Thread 19 20from mmi2grpc._helpers import assert_description, match_description 21from mmi2grpc._proxy import ProfileProxy 22from mmi2grpc._rootcanal import Dongle 23from pandora.host_grpc import Host 24from pandora.host_pb2 import PUBLIC, RANDOM 25from pandora.security_grpc import Security 26from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer 27 28 29def debug(*args, **kwargs): 30 print(*args, file=sys.stderr, **kwargs) 31 32 33class SMProxy(ProfileProxy): 34 35 def __init__(self, channel, rootcanal): 36 super().__init__(channel) 37 self.security = Security(channel) 38 self.host = Host(channel) 39 self.rootcanal = rootcanal 40 self.connection = None 41 self.pairing_stream = None 42 self.passkey_queue = Queue() 43 self._handle_pairing_requests() 44 45 def test_started(self, test: str, **kwargs): 46 self.rootcanal.select_pts_dongle(Dongle.CSR_RCK_PTS_DONGLE) 47 48 return "OK" 49 50 @assert_description 51 def MMI_IUT_ENABLE_CONNECTION_SM(self, pts_addr: bytes, **kwargs): 52 """ 53 Initiate an connection from the IUT to the PTS. 54 """ 55 self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection 56 return "OK" 57 58 @assert_description 59 def MMI_ASK_IUT_PERFORM_PAIRING_PROCESS(self, **kwargs): 60 """ 61 Please start pairing process. 62 """ 63 64 def secure(): 65 if self.connection: 66 self.security.Secure(connection=self.connection, le=LE_LEVEL3) 67 68 Thread(target=secure).start() 69 return "OK" 70 71 @assert_description 72 def MMI_IUT_SEND_DISCONNECTION_REQUEST(self, **kwargs): 73 """ 74 Please initiate a disconnection to the PTS. 75 76 Description: Verify that 77 the Implementation Under Test(IUT) can initiate a disconnect request to 78 PTS. 79 """ 80 self.host.Disconnect(connection=self.connection) 81 self.connection = None 82 return "OK" 83 84 def MMI_LESC_NUMERIC_COMPARISON(self, **kwargs): 85 """ 86 Please confirm the following number matches IUT: 385874. 87 """ 88 return "OK" 89 90 @assert_description 91 def MMI_ASK_IUT_PERFORM_RESET(self, **kwargs): 92 """ 93 Please reset your device. 94 """ 95 self.host.Reset() 96 return "OK" 97 98 @assert_description 99 def MMI_TESTER_ENABLE_CONNECTION_SM(self, **kwargs): 100 """ 101 Action: Place the IUT in connectable mode 102 """ 103 self.advertise = self.host.Advertise( 104 legacy=True, 105 connectable=True, 106 own_address_type=PUBLIC, 107 ) 108 109 return "OK" 110 111 @assert_description 112 def MMI_IUT_SMP_TIMEOUT_30_SECONDS(self, **kwargs): 113 """ 114 Wait for the 30 seconds. Lower tester will not send corresponding or 115 next SMP message. 116 """ 117 return "OK" 118 119 @assert_description 120 def MMI_IUT_SMP_TIMEOUT_ADDITIONAL_10_SECONDS(self, **kwargs): 121 """ 122 Wait for an additional 10 seconds. Lower test will send corresponding or 123 next SMP message. 124 """ 125 return "OK" 126 127 @match_description 128 def MMI_DISPLAY_PASSKEY_CODE(self, passkey: str, **kwargs): 129 """ 130 Please enter (?P<passkey>[0-9]*) in the IUT. 131 """ 132 self.passkey_queue.put(passkey) 133 return "OK" 134 135 @assert_description 136 def MMI_ENTER_PASSKEY_CODE(self, **kwargs): 137 """ 138 Please enter 6 digit passkey code. 139 """ 140 141 return "OK" 142 143 @assert_description 144 def MMI_ENTER_WRONG_DYNAMIC_PASSKEY_CODE(self, **kwargs): 145 """ 146 Please enter invalid 6 digit pin code. 147 """ 148 149 return "OK" 150 151 @match_description 152 def MMI_IUT_ABORT_PAIRING_PROCESS_DISCONNECT(self, **kwargs): 153 """ 154 Lower tester expects IUT aborts pairing process(, and disconnect|. Click OK to confirm pairing is aborted). 155 """ 156 157 return "OK" 158 159 @assert_description 160 def MMI_IUT_ACCEPT_CONNECTION_BR_EDR(self, **kwargs): 161 """ 162 Please prepare IUT into a connectable mode in BR/EDR. 163 164 Description: 165 Verify that the Implementation Under Test (IUT) can accept a connect 166 request from PTS. 167 """ 168 169 return "OK" 170 171 @assert_description 172 def _mmi_2001(self, **kwargs): 173 """ 174 Please verify the passKey is correct: 000000 175 """ 176 return "OK" 177 178 @assert_description 179 def MMI_IUT_INITIATE_CONNECTION_BR_EDR_PAIRING(self, test: str, pts_addr: bytes, **kwargs): 180 """ 181 Please initiate a connection over BR/EDR to the PTS, and initiate 182 pairing process. 183 184 Description: Verify that the Implementation Under Test 185 (IUT) can initiate a connect request over BR/EDR to PTS, and initiate 186 pairing process. 187 """ 188 self.connection = self.host.Connect(address=pts_addr).connection 189 190 return "OK" 191 192 @assert_description 193 def MMI_ASK_IUT_PERFORM_FEATURE_EXCHANGE_OVER_BR(self, **kwargs): 194 """ 195 Please start pairing feature exchange over BR/EDR. 196 """ 197 198 return "OK" 199 200 @assert_description 201 def MMI_IUT_INITIATES_ENCRYPTION(self, **kwargs): 202 """ 203 Initiates encryption with the PTS. 204 """ 205 206 return "OK" 207 208 @assert_description 209 def _mmi_20117(self, **kwargs): 210 """ 211 Please start encryption using previously distributed key. 212 213 Description: 214 Verify that the Implementation Under Test (IUT) can successfully start 215 and complete encryption with previously distributed key. 216 """ 217 218 return "OK" 219 220 def _handle_pairing_requests(self): 221 222 def task(): 223 pairing_events = self.security.OnPairing() 224 for event in pairing_events: 225 if event.just_works or event.numeric_comparison: 226 pairing_events.send(PairingEventAnswer(event=event, confirm=True)) 227 if event.passkey_entry_request: 228 try: 229 passkey = self.passkey_queue.get(timeout=15) 230 pairing_events.send(PairingEventAnswer(event=event, passkey=int(passkey))) 231 except Empty: 232 debug("No passkey provided within 15 seconds") 233 234 Thread(target=task).start() 235