• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Rfcomm proxy module."""
15
16import os
17import socket
18import sys
19import threading
20
21from mmi2grpc._helpers import assert_description
22from mmi2grpc._proxy import ProfileProxy
23from pandora.host_grpc import Host
24from pandora_experimental.rfcomm_grpc import RFCOMM
25
26
27class RFCOMMProxy(ProfileProxy):
28
29    # The UUID for Serial-Port Profile
30    SPP_UUID = "00001101-0000-1000-8000-00805f9b34fb"
31    # TSPX_SERVICE_NAME_TESTER
32    SERVICE_NAME = "COM5"
33
34    def __init__(self, channel: str):
35        super().__init__(channel)
36        self.rfcomm = RFCOMM(channel)
37        self.host = Host(channel)
38        self.server = None
39        self.connection = None
40
41    @assert_description
42    def TSC_RFCOMM_mmi_iut_initiate_slc(self, pts_addr: bytes, test: str, **kwargs):
43        """
44        Take action to initiate an RFCOMM service level connection (l2cap).
45        """
46
47        try:
48            self.connection = self.rfcomm.ConnectToServer(address=pts_addr, uuid=self.SPP_UUID).connection
49        except Exception as e:
50            if test == "RFCOMM/DEVA/RFC/BV-01-C":
51                print(f'{test}: PTS disconnected as expected', file=sys.stderr)
52                return "OK"
53            else:
54                print(f'{test}: PTS disconnected unexpectedly', file=sys.stderr)
55                raise e
56        return "OK"
57
58    @assert_description
59    def TSC_RFCOMM_mmi_iut_accept_slc(self, pts_addr: bytes, **kwargs):
60        """
61        Take action to accept the RFCOMM service level connection from the
62        tester.
63        """
64
65        self.server = self.rfcomm.StartServer(uuid=self.SPP_UUID, name=self.SERVICE_NAME).server
66
67        _ = self.host.WaitConnection(address=pts_addr).connection
68
69        return "OK"
70
71    @assert_description
72    def TSC_RFCOMM_mmi_iut_accept_sabm(self, test: str, **kwargs):
73        """
74        Take action to accept the SABM operation initiated by the tester.
75
76        Note:
77        Make sure that the RFCOMM server channel is set correctly in
78        TSPX_server_channel_iut
79        """
80
81        def accept_connection():
82            self.connection = self.rfcomm.AcceptConnection(server=self.server).connection
83
84        if test in [
85            "RFCOMM/DEVA-DEVB/RFC/BV-03-C",
86            "RFCOMM/DEVA-DEVB/RFC/BV-11-C",
87            "RFCOMM/DEVA-DEVB/RFC/BV-15-C",
88            "RFCOMM/DEVA-DEVB/RFC/BV-17-C",
89            "RFCOMM/DEVA-DEVB/RFC/BV-19-C",
90            "RFCOMM/DEVB/RFC/BV-02-C",
91        ]:
92            # For the tests listed above, the PTS does not complete the service
93            # level connection but only executes part of the setup.
94            threading.Thread(target=accept_connection).start()
95        else:
96            accept_connection()
97
98        return "OK"
99
100    @assert_description
101    def TSC_RFCOMM_mmi_iut_respond_PN(self, **kwargs):
102        """
103        Take action to respond PN.
104        """
105
106        return "OK"
107
108    @assert_description
109    def TSC_RFCOMM_mmi_iut_initiate_sabm_control_channel(self, **kwargs):
110        """
111        Take action to initiate an SABM operation for the RFCOMM control
112        channel.
113        """
114
115        return "OK"
116
117    @assert_description
118    def TSC_RFCOMM_mmi_iut_initiate_PN(self, **kwargs):
119        """
120        Take action to initiate PN.
121        """
122
123        return "OK"
124
125    def TSC_RFCOMM_mmi_iut_initiate_sabm_data_channel(self, **kwargs):
126        """
127        Take action to initiate an SABM operation for an RFCOMM data channel.
128        Note: RFCOMM server channel can be found on PTS's SDP record
129        """
130
131        return "OK"
132
133    @assert_description
134    def TSC_RFCOMM_mmi_iut_accept_disc(self, **kwargs):
135        """
136        Take action to accept the DISC operation initiated by the tester.
137        """
138
139        return "OK"
140
141    @assert_description
142    def TSC_RFCOMM_mmi_iut_accept_data_link_connection(self, **kwargs):
143        """
144        Take action to accept a new DLC initiated by the tester.
145        """
146
147        return "OK"
148
149    @assert_description
150    def TSC_RFCOMM_mmi_iut_initiate_close_session(self, **kwargs):
151        """
152        Take action to close the RFCOMM session.
153        """
154
155        self.rfcomm.Disconnect(connection=self.connection)
156
157        return "OK"
158
159    @assert_description
160    def TSC_RFCOMM_mmi_iut_respond_RLS(self, **kwargs):
161        """
162        Take action to respond RLS command.
163        """
164
165        return "OK"
166
167    @assert_description
168    def TSC_RFCOMM_mmi_iut_initiate_MSC(self, **kwargs):
169        """
170        Take action to initiate MSC command.
171        """
172
173        return "OK"
174
175    @assert_description
176    def TSC_RFCOMM_mmi_iut_respond_RPN(self, **kwargs):
177        """
178        Take action to respond RPN.
179        """
180
181        return "OK"
182
183    @assert_description
184    def TSC_RFCOMM_mmi_iut_respond_NSC(self, **kwargs):
185        """
186        Take action to respond NSC.
187        """
188
189        return "OK"
190
191    @assert_description
192    def TSC_RFCOMM_mmi_iut_initiate_close_dlc(self, **kwargs):
193        """
194        Take action to close the DLC.
195        """
196
197        self.rfcomm.Disconnect(connection=self.connection)
198
199        return "OK"
200
201    @assert_description
202    def TSC_RFCOMM_mmi_iut_respond_Test(self, **kwargs):
203        """
204        Take action to respond Test.
205        """
206
207        return "OK"
208
209    @assert_description
210    def TSC_RFCOMM_mmi_iut_respond_MSC(self, **kwargs):
211        """
212        Take action to respond MSC.
213        """
214
215        return "OK"
216
217    @assert_description
218    def TSC_RFCOMM_mmi_iut_send_data(self, **kwargs):
219        """
220        Take action to send data on the open DLC on PTS with at least 2 frames.
221        """
222
223        self.rfcomm.Send(connection=self.connection, data=b'Some data to send')
224        self.rfcomm.Send(connection=self.connection, data=b'More data to send')
225        return "OK"
226
227    @assert_description
228    def TSC_RFCOMM_mmi_user_wait_no_uih_data(self, **kwargs):
229        """
230        Please wait while the tester confirms no data is sent ...
231        """
232
233        return "OK"
234
235    @assert_description
236    def TSC_RFCOMM_mmi_iut_initiate_RLS_framing_error(self, **kwargs):
237        """
238        Take action to initiate RLS command with Framing Error status.
239        """
240
241        return "OK"
242