1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Rfcomm proxy module.""" 15 16import os 17import socket 18import sys 19import threading 20 21from mmi2grpc._helpers import assert_description 22from mmi2grpc._proxy import ProfileProxy 23from pandora.host_grpc import Host 24from pandora_experimental.rfcomm_grpc import RFCOMM 25 26 27class RFCOMMProxy(ProfileProxy): 28 29 # The UUID for Serial-Port Profile 30 SPP_UUID = "00001101-0000-1000-8000-00805f9b34fb" 31 # TSPX_SERVICE_NAME_TESTER 32 SERVICE_NAME = "COM5" 33 34 def __init__(self, channel: str): 35 super().__init__(channel) 36 self.rfcomm = RFCOMM(channel) 37 self.host = Host(channel) 38 self.server = None 39 self.connection = None 40 41 @assert_description 42 def TSC_RFCOMM_mmi_iut_initiate_slc(self, pts_addr: bytes, test: str, **kwargs): 43 """ 44 Take action to initiate an RFCOMM service level connection (l2cap). 45 """ 46 47 try: 48 self.connection = self.rfcomm.ConnectToServer(address=pts_addr, uuid=self.SPP_UUID).connection 49 except Exception as e: 50 if test == "RFCOMM/DEVA/RFC/BV-01-C": 51 print(f'{test}: PTS disconnected as expected', file=sys.stderr) 52 return "OK" 53 else: 54 print(f'{test}: PTS disconnected unexpectedly', file=sys.stderr) 55 raise e 56 return "OK" 57 58 @assert_description 59 def TSC_RFCOMM_mmi_iut_accept_slc(self, pts_addr: bytes, **kwargs): 60 """ 61 Take action to accept the RFCOMM service level connection from the 62 tester. 63 """ 64 65 self.server = self.rfcomm.StartServer(uuid=self.SPP_UUID, name=self.SERVICE_NAME).server 66 67 _ = self.host.WaitConnection(address=pts_addr).connection 68 69 return "OK" 70 71 @assert_description 72 def TSC_RFCOMM_mmi_iut_accept_sabm(self, test: str, **kwargs): 73 """ 74 Take action to accept the SABM operation initiated by the tester. 75 76 Note: 77 Make sure that the RFCOMM server channel is set correctly in 78 TSPX_server_channel_iut 79 """ 80 81 def accept_connection(): 82 self.connection = self.rfcomm.AcceptConnection(server=self.server).connection 83 84 if test in [ 85 "RFCOMM/DEVA-DEVB/RFC/BV-03-C", 86 "RFCOMM/DEVA-DEVB/RFC/BV-11-C", 87 "RFCOMM/DEVA-DEVB/RFC/BV-15-C", 88 "RFCOMM/DEVA-DEVB/RFC/BV-17-C", 89 "RFCOMM/DEVA-DEVB/RFC/BV-19-C", 90 "RFCOMM/DEVB/RFC/BV-02-C", 91 ]: 92 # For the tests listed above, the PTS does not complete the service 93 # level connection but only executes part of the setup. 94 threading.Thread(target=accept_connection).start() 95 else: 96 accept_connection() 97 98 return "OK" 99 100 @assert_description 101 def TSC_RFCOMM_mmi_iut_respond_PN(self, **kwargs): 102 """ 103 Take action to respond PN. 104 """ 105 106 return "OK" 107 108 @assert_description 109 def TSC_RFCOMM_mmi_iut_initiate_sabm_control_channel(self, **kwargs): 110 """ 111 Take action to initiate an SABM operation for the RFCOMM control 112 channel. 113 """ 114 115 return "OK" 116 117 @assert_description 118 def TSC_RFCOMM_mmi_iut_initiate_PN(self, **kwargs): 119 """ 120 Take action to initiate PN. 121 """ 122 123 return "OK" 124 125 def TSC_RFCOMM_mmi_iut_initiate_sabm_data_channel(self, **kwargs): 126 """ 127 Take action to initiate an SABM operation for an RFCOMM data channel. 128 Note: RFCOMM server channel can be found on PTS's SDP record 129 """ 130 131 return "OK" 132 133 @assert_description 134 def TSC_RFCOMM_mmi_iut_accept_disc(self, **kwargs): 135 """ 136 Take action to accept the DISC operation initiated by the tester. 137 """ 138 139 return "OK" 140 141 @assert_description 142 def TSC_RFCOMM_mmi_iut_accept_data_link_connection(self, **kwargs): 143 """ 144 Take action to accept a new DLC initiated by the tester. 145 """ 146 147 return "OK" 148 149 @assert_description 150 def TSC_RFCOMM_mmi_iut_initiate_close_session(self, **kwargs): 151 """ 152 Take action to close the RFCOMM session. 153 """ 154 155 self.rfcomm.Disconnect(connection=self.connection) 156 157 return "OK" 158 159 @assert_description 160 def TSC_RFCOMM_mmi_iut_respond_RLS(self, **kwargs): 161 """ 162 Take action to respond RLS command. 163 """ 164 165 return "OK" 166 167 @assert_description 168 def TSC_RFCOMM_mmi_iut_initiate_MSC(self, **kwargs): 169 """ 170 Take action to initiate MSC command. 171 """ 172 173 return "OK" 174 175 @assert_description 176 def TSC_RFCOMM_mmi_iut_respond_RPN(self, **kwargs): 177 """ 178 Take action to respond RPN. 179 """ 180 181 return "OK" 182 183 @assert_description 184 def TSC_RFCOMM_mmi_iut_respond_NSC(self, **kwargs): 185 """ 186 Take action to respond NSC. 187 """ 188 189 return "OK" 190 191 @assert_description 192 def TSC_RFCOMM_mmi_iut_initiate_close_dlc(self, **kwargs): 193 """ 194 Take action to close the DLC. 195 """ 196 197 self.rfcomm.Disconnect(connection=self.connection) 198 199 return "OK" 200 201 @assert_description 202 def TSC_RFCOMM_mmi_iut_respond_Test(self, **kwargs): 203 """ 204 Take action to respond Test. 205 """ 206 207 return "OK" 208 209 @assert_description 210 def TSC_RFCOMM_mmi_iut_respond_MSC(self, **kwargs): 211 """ 212 Take action to respond MSC. 213 """ 214 215 return "OK" 216 217 @assert_description 218 def TSC_RFCOMM_mmi_iut_send_data(self, **kwargs): 219 """ 220 Take action to send data on the open DLC on PTS with at least 2 frames. 221 """ 222 223 self.rfcomm.Send(connection=self.connection, data=b'Some data to send') 224 self.rfcomm.Send(connection=self.connection, data=b'More data to send') 225 return "OK" 226 227 @assert_description 228 def TSC_RFCOMM_mmi_user_wait_no_uih_data(self, **kwargs): 229 """ 230 Please wait while the tester confirms no data is sent ... 231 """ 232 233 return "OK" 234 235 @assert_description 236 def TSC_RFCOMM_mmi_iut_initiate_RLS_framing_error(self, **kwargs): 237 """ 238 Take action to initiate RLS command with Framing Error status. 239 """ 240 241 return "OK" 242