• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""SMP proxy module."""
15import asyncio
16import sys
17from queue import Empty, Queue
18from threading import Thread
19
20from mmi2grpc._helpers import assert_description, match_description
21from mmi2grpc._proxy import ProfileProxy
22from mmi2grpc._rootcanal import Dongle
23from pandora.host_grpc import Host
24from pandora.host_pb2 import PUBLIC, RANDOM
25from pandora.security_grpc import Security
26from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer
27
28
29def debug(*args, **kwargs):
30    print(*args, file=sys.stderr, **kwargs)
31
32
33class SMProxy(ProfileProxy):
34
35    def __init__(self, channel, rootcanal):
36        super().__init__(channel)
37        self.security = Security(channel)
38        self.host = Host(channel)
39        self.rootcanal = rootcanal
40        self.connection = None
41        self.pairing_stream = None
42        self.passkey_queue = Queue()
43        self._handle_pairing_requests()
44
45    def test_started(self, test: str, **kwargs):
46        self.rootcanal.select_pts_dongle(Dongle.CSR_RCK_PTS_DONGLE)
47
48        return "OK"
49
50    @assert_description
51    def MMI_IUT_ENABLE_CONNECTION_SM(self, pts_addr: bytes, **kwargs):
52        """
53        Initiate an connection from the IUT to the PTS.
54        """
55        self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection
56        return "OK"
57
58    @assert_description
59    def MMI_ASK_IUT_PERFORM_PAIRING_PROCESS(self, **kwargs):
60        """
61        Please start pairing process.
62        """
63
64        def secure():
65            if self.connection:
66                self.security.Secure(connection=self.connection, le=LE_LEVEL3)
67
68        Thread(target=secure).start()
69        return "OK"
70
71    @assert_description
72    def MMI_IUT_SEND_DISCONNECTION_REQUEST(self, **kwargs):
73        """
74        Please initiate a disconnection to the PTS.
75
76        Description: Verify that
77        the Implementation Under Test(IUT) can initiate a disconnect request to
78        PTS.
79        """
80        self.host.Disconnect(connection=self.connection)
81        self.connection = None
82        return "OK"
83
84    def MMI_LESC_NUMERIC_COMPARISON(self, **kwargs):
85        """
86        Please confirm the following number matches IUT: 385874.
87        """
88        return "OK"
89
90    @assert_description
91    def MMI_ASK_IUT_PERFORM_RESET(self, **kwargs):
92        """
93        Please reset your device.
94        """
95        self.host.Reset()
96        return "OK"
97
98    @assert_description
99    def MMI_TESTER_ENABLE_CONNECTION_SM(self, **kwargs):
100        """
101        Action: Place the IUT in connectable mode
102        """
103        self.advertise = self.host.Advertise(
104            legacy=True,
105            connectable=True,
106            own_address_type=PUBLIC,
107        )
108
109        return "OK"
110
111    @assert_description
112    def MMI_IUT_SMP_TIMEOUT_30_SECONDS(self, **kwargs):
113        """
114        Wait for the 30 seconds. Lower tester will not send corresponding or
115        next SMP message.
116        """
117        return "OK"
118
119    @assert_description
120    def MMI_IUT_SMP_TIMEOUT_ADDITIONAL_10_SECONDS(self, **kwargs):
121        """
122        Wait for an additional 10 seconds. Lower test will send corresponding or
123        next SMP message.
124        """
125        return "OK"
126
127    @match_description
128    def MMI_DISPLAY_PASSKEY_CODE(self, passkey: str, **kwargs):
129        """
130        Please enter (?P<passkey>[0-9]*) in the IUT.
131        """
132        self.passkey_queue.put(passkey)
133        return "OK"
134
135    @assert_description
136    def MMI_ENTER_PASSKEY_CODE(self, **kwargs):
137        """
138        Please enter 6 digit passkey code.
139        """
140
141        return "OK"
142
143    @assert_description
144    def MMI_ENTER_WRONG_DYNAMIC_PASSKEY_CODE(self, **kwargs):
145        """
146        Please enter invalid 6 digit pin code.
147        """
148
149        return "OK"
150
151    @match_description
152    def MMI_IUT_ABORT_PAIRING_PROCESS_DISCONNECT(self, **kwargs):
153        """
154        Lower tester expects IUT aborts pairing process(, and disconnect|. Click OK to confirm pairing is aborted).
155        """
156
157        return "OK"
158
159    @assert_description
160    def MMI_IUT_ACCEPT_CONNECTION_BR_EDR(self, **kwargs):
161        """
162        Please prepare IUT into a connectable mode in BR/EDR.
163
164        Description:
165        Verify that the Implementation Under Test (IUT) can accept a connect
166        request from PTS.
167        """
168
169        return "OK"
170
171    @assert_description
172    def _mmi_2001(self, **kwargs):
173        """
174        Please verify the passKey is correct: 000000
175        """
176        return "OK"
177
178    @assert_description
179    def MMI_IUT_INITIATE_CONNECTION_BR_EDR_PAIRING(self, test: str, pts_addr: bytes, **kwargs):
180        """
181        Please initiate a connection over BR/EDR to the PTS, and initiate
182        pairing process.
183
184        Description: Verify that the Implementation Under Test
185        (IUT) can initiate a connect request over BR/EDR to PTS, and initiate
186        pairing process.
187        """
188        self.connection = self.host.Connect(address=pts_addr).connection
189
190        return "OK"
191
192    @assert_description
193    def MMI_ASK_IUT_PERFORM_FEATURE_EXCHANGE_OVER_BR(self, **kwargs):
194        """
195        Please start pairing feature exchange over BR/EDR.
196        """
197
198        return "OK"
199
200    @assert_description
201    def MMI_IUT_INITIATES_ENCRYPTION(self, **kwargs):
202        """
203        Initiates encryption with the PTS.
204        """
205
206        return "OK"
207
208    @assert_description
209    def _mmi_20117(self, **kwargs):
210        """
211        Please start encryption using previously distributed key.
212
213        Description:
214        Verify that the Implementation Under Test (IUT) can successfully start
215        and complete encryption with previously distributed key.
216        """
217
218        return "OK"
219
220    def _handle_pairing_requests(self):
221
222        def task():
223            pairing_events = self.security.OnPairing()
224            for event in pairing_events:
225                if event.just_works or event.numeric_comparison:
226                    pairing_events.send(PairingEventAnswer(event=event, confirm=True))
227                if event.passkey_entry_request:
228                    try:
229                        passkey = self.passkey_queue.get(timeout=15)
230                        pairing_events.send(PairingEventAnswer(event=event, passkey=int(passkey)))
231                    except Empty:
232                        debug("No passkey provided within 15 seconds")
233
234        Thread(target=task).start()
235