1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Map Bluetooth PTS Man Machine Interface to Pandora gRPC calls.""" 15 16__version__ = "0.0.1" 17 18import sys 19import time 20from threading import Thread 21from typing import List 22 23import grpc 24from mmi2grpc._helpers import format_proxy 25from mmi2grpc._modem import Modem 26from mmi2grpc._rootcanal import RootCanal 27from mmi2grpc.a2dp import A2DPProxy 28from mmi2grpc.avrcp import AVRCPProxy 29from mmi2grpc.gap import GAPProxy 30from mmi2grpc.gatt import GATTProxy 31from mmi2grpc.hap import HAPProxy 32from mmi2grpc.hfp import HFPProxy 33from mmi2grpc.hid import HIDProxy 34from mmi2grpc.hogp import HOGPProxy 35from mmi2grpc.l2cap import L2CAPProxy 36from mmi2grpc.map import MAPProxy 37from mmi2grpc.opp import OPPProxy 38from mmi2grpc.pan import PANProxy 39from mmi2grpc.pbap import PBAPProxy 40from mmi2grpc.rfcomm import RFCOMMProxy 41from mmi2grpc.sdp import SDPProxy 42from mmi2grpc.sm import SMProxy 43from mmi2grpc.vcp import VCPProxy 44from pandora.host_grpc import Host 45 46PANDORA_SERVER_PORT = 8999 47ROOTCANAL_CONTROL_PORT = 6212 48MODEM_SIMULATOR_PORT = 4242 49MAX_RETRIES = 10 50GRPC_SERVER_INIT_TIMEOUT = 10 # seconds 51 52 53class IUT: 54 """IUT class. 55 56 Handles MMI calls from the PTS and routes them to corresponding profile 57 proxy which translates MMI calls to gRPC calls to the IUT. 58 """ 59 60 def __init__(self, test: str, args: List[str], **kwargs): 61 """Init IUT class for a given test. 62 63 Args: 64 test: PTS test id. 65 args: test arguments. 66 """ 67 self.pandora_server_port = int(args[0]) if len(args) > 0 else PANDORA_SERVER_PORT 68 self.rootcanal_control_port = int(args[1]) if len(args) > 1 else ROOTCANAL_CONTROL_PORT 69 self.modem_simulator_port = int(args[2]) if len(args) > 2 else MODEM_SIMULATOR_PORT 70 71 self.test = test 72 self.rootcanal = None 73 self.modem = None 74 75 # Profile proxies. 76 self._a2dp = None 77 self._avrcp = None 78 self._bnep = None 79 self._gatt = None 80 self._hap = None 81 self._gap = None 82 self._hfp = None 83 self._hid = None 84 self._hogp = None 85 self._l2cap = None 86 self._map = None 87 self._opp = None 88 self._pan = None 89 self._pbap = None 90 self._rfcomm = None 91 self._sdp = None 92 self._sm = None 93 self._vcp = None 94 95 def __enter__(self): 96 """Resets the IUT when starting a PTS test.""" 97 self.rootcanal = RootCanal(port=self.rootcanal_control_port) 98 self.rootcanal.move_in_range() 99 100 self.modem = Modem(port=self.modem_simulator_port) 101 102 # Note: we don't keep a single gRPC channel instance in the IUT class 103 # because reset is allowed to close the gRPC server. 104 with grpc.insecure_channel(f'localhost:{self.pandora_server_port}') as channel: 105 self._retry(Host(channel).FactoryReset)(wait_for_ready=True) 106 107 def __exit__(self, exc_type, exc_value, exc_traceback): 108 self.rootcanal.close() 109 self.rootcanal = None 110 111 self.modem.close() 112 self.modem = None 113 114 self._a2dp = None 115 self._avrcp = None 116 self._bnep = None 117 self._gatt = None 118 self._gap = None 119 self._hfp = None 120 self._l2cap = None 121 self._hid = None 122 self._hogp = None 123 self._map = None 124 self._opp = None 125 self._pan = None 126 self._pbap = None 127 self._rfcomm = None 128 self._sdp = None 129 self._sm = None 130 self._vcp = None 131 132 def _retry(self, func): 133 134 def wrapper(*args, **kwargs): 135 tries = 0 136 while True: 137 try: 138 return func(*args, **kwargs) 139 except grpc.RpcError or grpc._channel._InactiveRpcError: 140 tries += 1 141 if tries >= MAX_RETRIES: 142 raise 143 else: 144 print(f"Retry {func.__name__}: {tries}/{MAX_RETRIES}") 145 time.sleep(1) 146 147 return wrapper 148 149 @property 150 def address(self) -> bytes: 151 """Bluetooth MAC address of the IUT. 152 153 Raises a timeout exception after GRPC_SERVER_INIT_TIMEOUT seconds. 154 """ 155 mut_address = None 156 157 def read_local_address(): 158 with grpc.insecure_channel(f"localhost:{self.pandora_server_port}") as channel: 159 nonlocal mut_address 160 mut_address = self._retry(Host(channel).ReadLocalAddress)(wait_for_ready=True).address 161 162 thread = Thread(target=read_local_address) 163 thread.start() 164 thread.join(timeout=GRPC_SERVER_INIT_TIMEOUT) 165 166 if not mut_address: 167 raise Exception("Pandora gRPC server timeout") 168 else: 169 return mut_address 170 171 def interact( 172 self, 173 pts_address: bytes, 174 profile: str, 175 test: str, 176 interaction: str, 177 description: str, 178 style: str, 179 **kwargs, 180 ) -> str: 181 """Routes MMI calls to corresponding profile proxy. 182 183 Args: 184 pts_address: Bluetooth MAC address of the PTS in bytes. 185 profile: Bluetooth profile. 186 test: PTS test id. 187 interaction: MMI name. 188 description: MMI description. 189 style: MMI popup style, unused for now. 190 """ 191 print(f"{profile} mmi: {interaction}", file=sys.stderr) 192 193 # Handles A2DP and AVDTP MMIs. 194 if profile in ("A2DP", "AVDTP"): 195 if not self._a2dp: 196 self._a2dp = A2DPProxy( 197 grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), 198 self.rootcanal, 199 ) 200 return self._a2dp.interact(test, interaction, description, pts_address) 201 # Handles AVRCP and AVCTP MMIs. 202 if profile in ("AVRCP", "AVCTP"): 203 if not self._avrcp: 204 self._avrcp = AVRCPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}")) 205 return self._avrcp.interact(test, interaction, description, pts_address) 206 # Handles GATT MMIs. 207 if profile in ("GATT"): 208 if not self._gatt: 209 self._gatt = GATTProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal) 210 return self._gatt.interact(test, interaction, description, pts_address) 211 # Handles GAP MMIs. 212 if profile in ("GAP"): 213 if not self._gap: 214 self._gap = GAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal) 215 return self._gap.interact(test, interaction, description, pts_address) 216 # Handles GAP MMIs. 217 if profile in ("HAP"): 218 if not self._hap: 219 self._hap = HAPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'), self.rootcanal) 220 return self._hap.interact(test, interaction, description, pts_address) 221 # Handles HFP MMIs. 222 if profile in ("HFP"): 223 if not self._hfp: 224 self._hfp = HFPProxy( 225 test, 226 grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), 227 self.rootcanal, 228 self.modem, 229 ) 230 return self._hfp.interact(test, interaction, description, pts_address) 231 # Handles HID MMIs. 232 if profile in ("HID"): 233 if not self._hid: 234 self._hid = HIDProxy( 235 grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), 236 self.rootcanal, 237 ) 238 return self._hid.interact(test, interaction, description, pts_address) 239 # Handles HOGP MMIs. 240 if profile in ("HOGP"): 241 if not self._hogp: 242 self._hogp = HOGPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal) 243 return self._hogp.interact(test, interaction, description, pts_address) 244 # Instantiates L2CAP proxy and reroutes corresponding MMIs to it. 245 if profile in ("L2CAP"): 246 if not self._l2cap: 247 self._l2cap = L2CAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal) 248 return self._l2cap.interact(test, interaction, description, pts_address) 249 # Handles MAP MMIs. 250 if profile in ("MAP"): 251 if not self._map: 252 self._map = MAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}")) 253 return self._map.interact(test, interaction, description, pts_address) 254 # Handles OPP MMIs. 255 if profile in ("OPP"): 256 if not self._opp: 257 self._opp = OPPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}")) 258 return self._opp.interact(test, interaction, description, pts_address) 259 # Instantiates PAN proxy and reroutes corresponding MMIs to it. 260 if profile in ("PAN", "BNEP"): 261 if not self._pan: 262 self._pan = PANProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}")) 263 return self._pan.interact(test, interaction, description, pts_address) 264 # Instantiates PBAP proxy and reroutes corresponding MMIs to it. 265 if profile in ("PBAP"): 266 if not self._pbap: 267 self._pbap = PBAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}")) 268 return self._pbap.interact(test, interaction, description, pts_address) 269 # Handles RFCOMM MMIs. 270 if profile in ("RFCOMM"): 271 if not self._rfcomm: 272 self._rfcomm = RFCOMMProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}")) 273 return self._rfcomm.interact(test, interaction, description, pts_address) 274 # Handles SDP MMIs. 275 if profile in ("SDP"): 276 if not self._sdp: 277 self._sdp = SDPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}")) 278 return self._sdp.interact(test, interaction, description, pts_address) 279 # Handles SM MMIs. 280 if profile in ("SM"): 281 if not self._sm: 282 self._sm = SMProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal) 283 return self._sm.interact(test, interaction, description, pts_address) 284 # HandlesVCP MMIs. 285 if profile in ("VCP"): 286 if not self._vcp: 287 self._vcp = VCPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal) 288 return self._vcp.interact(test, interaction, description, pts_address) 289 290 # Handles unsupported profiles. 291 code = format_proxy(profile, interaction, description) 292 error_msg = (f"Missing {profile} proxy and mmi: {interaction}\n" 293 f"Create a {profile.lower()}.py in mmi2grpc/:\n\n{code}\n" 294 f"Then, instantiate the corresponding proxy in __init__.py\n" 295 f"Finally, create a {profile.lower()}.proto in proto/pandora/" 296 f"and generate the corresponding interface.") 297 298 assert False, error_msg 299