• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Map Bluetooth PTS Man Machine Interface to Pandora gRPC calls."""
15
16__version__ = "0.0.1"
17
18import sys
19import time
20from threading import Thread
21from typing import List
22
23import grpc
24from mmi2grpc._helpers import format_proxy
25from mmi2grpc._modem import Modem
26from mmi2grpc._rootcanal import RootCanal
27from mmi2grpc.a2dp import A2DPProxy
28from mmi2grpc.avrcp import AVRCPProxy
29from mmi2grpc.gap import GAPProxy
30from mmi2grpc.gatt import GATTProxy
31from mmi2grpc.hap import HAPProxy
32from mmi2grpc.hfp import HFPProxy
33from mmi2grpc.hid import HIDProxy
34from mmi2grpc.hogp import HOGPProxy
35from mmi2grpc.l2cap import L2CAPProxy
36from mmi2grpc.map import MAPProxy
37from mmi2grpc.opp import OPPProxy
38from mmi2grpc.pan import PANProxy
39from mmi2grpc.pbap import PBAPProxy
40from mmi2grpc.rfcomm import RFCOMMProxy
41from mmi2grpc.sdp import SDPProxy
42from mmi2grpc.sm import SMProxy
43from mmi2grpc.vcp import VCPProxy
44from pandora.host_grpc import Host
45
46PANDORA_SERVER_PORT = 8999
47ROOTCANAL_CONTROL_PORT = 6212
48MODEM_SIMULATOR_PORT = 4242
49MAX_RETRIES = 10
50GRPC_SERVER_INIT_TIMEOUT = 10  # seconds
51
52
53class IUT:
54    """IUT class.
55
56    Handles MMI calls from the PTS and routes them to corresponding profile
57    proxy which translates MMI calls to gRPC calls to the IUT.
58    """
59
60    def __init__(self, test: str, args: List[str], **kwargs):
61        """Init IUT class for a given test.
62
63        Args:
64            test: PTS test id.
65            args: test arguments.
66        """
67        self.pandora_server_port = int(args[0]) if len(args) > 0 else PANDORA_SERVER_PORT
68        self.rootcanal_control_port = int(args[1]) if len(args) > 1 else ROOTCANAL_CONTROL_PORT
69        self.modem_simulator_port = int(args[2]) if len(args) > 2 else MODEM_SIMULATOR_PORT
70
71        self.test = test
72        self.rootcanal = None
73        self.modem = None
74
75        # Profile proxies.
76        self._a2dp = None
77        self._avrcp = None
78        self._bnep = None
79        self._gatt = None
80        self._hap = None
81        self._gap = None
82        self._hfp = None
83        self._hid = None
84        self._hogp = None
85        self._l2cap = None
86        self._map = None
87        self._opp = None
88        self._pan = None
89        self._pbap = None
90        self._rfcomm = None
91        self._sdp = None
92        self._sm = None
93        self._vcp = None
94
95    def __enter__(self):
96        """Resets the IUT when starting a PTS test."""
97        self.rootcanal = RootCanal(port=self.rootcanal_control_port)
98        self.rootcanal.move_in_range()
99
100        self.modem = Modem(port=self.modem_simulator_port)
101
102        # Note: we don't keep a single gRPC channel instance in the IUT class
103        # because reset is allowed to close the gRPC server.
104        with grpc.insecure_channel(f'localhost:{self.pandora_server_port}') as channel:
105            self._retry(Host(channel).FactoryReset)(wait_for_ready=True)
106
107    def __exit__(self, exc_type, exc_value, exc_traceback):
108        self.rootcanal.close()
109        self.rootcanal = None
110
111        self.modem.close()
112        self.modem = None
113
114        self._a2dp = None
115        self._avrcp = None
116        self._bnep = None
117        self._gatt = None
118        self._gap = None
119        self._hfp = None
120        self._l2cap = None
121        self._hid = None
122        self._hogp = None
123        self._map = None
124        self._opp = None
125        self._pan = None
126        self._pbap = None
127        self._rfcomm = None
128        self._sdp = None
129        self._sm = None
130        self._vcp = None
131
132    def _retry(self, func):
133
134        def wrapper(*args, **kwargs):
135            tries = 0
136            while True:
137                try:
138                    return func(*args, **kwargs)
139                except grpc.RpcError or grpc._channel._InactiveRpcError:
140                    tries += 1
141                    if tries >= MAX_RETRIES:
142                        raise
143                    else:
144                        print(f"Retry {func.__name__}: {tries}/{MAX_RETRIES}")
145                        time.sleep(1)
146
147        return wrapper
148
149    @property
150    def address(self) -> bytes:
151        """Bluetooth MAC address of the IUT.
152
153        Raises a timeout exception after GRPC_SERVER_INIT_TIMEOUT seconds.
154        """
155        mut_address = None
156
157        def read_local_address():
158            with grpc.insecure_channel(f"localhost:{self.pandora_server_port}") as channel:
159                nonlocal mut_address
160                mut_address = self._retry(Host(channel).ReadLocalAddress)(wait_for_ready=True).address
161
162        thread = Thread(target=read_local_address)
163        thread.start()
164        thread.join(timeout=GRPC_SERVER_INIT_TIMEOUT)
165
166        if not mut_address:
167            raise Exception("Pandora gRPC server timeout")
168        else:
169            return mut_address
170
171    def interact(
172        self,
173        pts_address: bytes,
174        profile: str,
175        test: str,
176        interaction: str,
177        description: str,
178        style: str,
179        **kwargs,
180    ) -> str:
181        """Routes MMI calls to corresponding profile proxy.
182
183        Args:
184            pts_address: Bluetooth MAC address of the PTS in bytes.
185            profile: Bluetooth profile.
186            test: PTS test id.
187            interaction: MMI name.
188            description: MMI description.
189            style: MMI popup style, unused for now.
190        """
191        print(f"{profile} mmi: {interaction}", file=sys.stderr)
192
193        # Handles A2DP and AVDTP MMIs.
194        if profile in ("A2DP", "AVDTP"):
195            if not self._a2dp:
196                self._a2dp = A2DPProxy(
197                    grpc.insecure_channel(f"localhost:{self.pandora_server_port}"),
198                    self.rootcanal,
199                )
200            return self._a2dp.interact(test, interaction, description, pts_address)
201        # Handles AVRCP and AVCTP MMIs.
202        if profile in ("AVRCP", "AVCTP"):
203            if not self._avrcp:
204                self._avrcp = AVRCPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
205            return self._avrcp.interact(test, interaction, description, pts_address)
206        # Handles GATT MMIs.
207        if profile in ("GATT"):
208            if not self._gatt:
209                self._gatt = GATTProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
210            return self._gatt.interact(test, interaction, description, pts_address)
211        # Handles GAP MMIs.
212        if profile in ("GAP"):
213            if not self._gap:
214                self._gap = GAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
215            return self._gap.interact(test, interaction, description, pts_address)
216        # Handles GAP MMIs.
217        if profile in ("HAP"):
218            if not self._hap:
219                self._hap = HAPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'), self.rootcanal)
220            return self._hap.interact(test, interaction, description, pts_address)
221        # Handles HFP MMIs.
222        if profile in ("HFP"):
223            if not self._hfp:
224                self._hfp = HFPProxy(
225                    test,
226                    grpc.insecure_channel(f"localhost:{self.pandora_server_port}"),
227                    self.rootcanal,
228                    self.modem,
229                )
230            return self._hfp.interact(test, interaction, description, pts_address)
231        # Handles HID MMIs.
232        if profile in ("HID"):
233            if not self._hid:
234                self._hid = HIDProxy(
235                    grpc.insecure_channel(f"localhost:{self.pandora_server_port}"),
236                    self.rootcanal,
237                )
238            return self._hid.interact(test, interaction, description, pts_address)
239        # Handles HOGP MMIs.
240        if profile in ("HOGP"):
241            if not self._hogp:
242                self._hogp = HOGPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
243            return self._hogp.interact(test, interaction, description, pts_address)
244        # Instantiates L2CAP proxy and reroutes corresponding MMIs to it.
245        if profile in ("L2CAP"):
246            if not self._l2cap:
247                self._l2cap = L2CAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
248            return self._l2cap.interact(test, interaction, description, pts_address)
249        # Handles MAP MMIs.
250        if profile in ("MAP"):
251            if not self._map:
252                self._map = MAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
253            return self._map.interact(test, interaction, description, pts_address)
254        # Handles OPP MMIs.
255        if profile in ("OPP"):
256            if not self._opp:
257                self._opp = OPPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
258            return self._opp.interact(test, interaction, description, pts_address)
259        # Instantiates PAN proxy and reroutes corresponding MMIs to it.
260        if profile in ("PAN", "BNEP"):
261            if not self._pan:
262                self._pan = PANProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
263            return self._pan.interact(test, interaction, description, pts_address)
264        # Instantiates PBAP proxy and reroutes corresponding MMIs to it.
265        if profile in ("PBAP"):
266            if not self._pbap:
267                self._pbap = PBAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
268            return self._pbap.interact(test, interaction, description, pts_address)
269        # Handles RFCOMM MMIs.
270        if profile in ("RFCOMM"):
271            if not self._rfcomm:
272                self._rfcomm = RFCOMMProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
273            return self._rfcomm.interact(test, interaction, description, pts_address)
274        # Handles SDP MMIs.
275        if profile in ("SDP"):
276            if not self._sdp:
277                self._sdp = SDPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
278            return self._sdp.interact(test, interaction, description, pts_address)
279        # Handles SM MMIs.
280        if profile in ("SM"):
281            if not self._sm:
282                self._sm = SMProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
283            return self._sm.interact(test, interaction, description, pts_address)
284        # HandlesVCP MMIs.
285        if profile in ("VCP"):
286            if not self._vcp:
287                self._vcp = VCPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
288            return self._vcp.interact(test, interaction, description, pts_address)
289
290        # Handles unsupported profiles.
291        code = format_proxy(profile, interaction, description)
292        error_msg = (f"Missing {profile} proxy and mmi: {interaction}\n"
293                     f"Create a {profile.lower()}.py in mmi2grpc/:\n\n{code}\n"
294                     f"Then, instantiate the corresponding proxy in __init__.py\n"
295                     f"Finally, create a {profile.lower()}.proto in proto/pandora/"
296                     f"and generate the corresponding interface.")
297
298        assert False, error_msg
299