1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts.controllers.fuchsia_lib.base_lib import BaseLib 18 19 20class FuchsiaRfcommLib(BaseLib): 21 def __init__(self, addr, tc, client_id): 22 self.address = addr 23 self.test_counter = tc 24 self.client_id = client_id 25 26 def init(self): 27 """Initializes the RFCOMM service. 28 29 Returns: 30 Dictionary, None if success, error if error. 31 """ 32 test_cmd = "rfcomm_facade.RfcommInit" 33 34 test_args = {} 35 test_id = self.build_id(self.test_counter) 36 self.test_counter += 1 37 38 return self.send_command(test_id, test_cmd, test_args) 39 40 def removeService(self): 41 """Removes the RFCOMM service from the Fuchsia device 42 43 Returns: 44 Dictionary, None if success, error if error. 45 """ 46 test_cmd = "rfcomm_facade.RfcommRemoveService" 47 test_args = {} 48 test_id = self.build_id(self.test_counter) 49 self.test_counter += 1 50 51 return self.send_command(test_id, test_cmd, test_args) 52 53 def disconnectSession(self, peer_id): 54 """Closes the RFCOMM Session with the remote peer 55 56 Returns: 57 Dictionary, None if success, error if error. 58 """ 59 test_cmd = "rfcomm_facade.DisconnectSession" 60 test_args = {"peer_id": peer_id} 61 test_id = self.build_id(self.test_counter) 62 self.test_counter += 1 63 64 return self.send_command(test_id, test_cmd, test_args) 65 66 def connectRfcommChannel(self, peer_id, server_channel_number): 67 """Makes an outgoing RFCOMM connection to the remote peer 68 69 Returns: 70 Dictionary, None if success, error if error. 71 """ 72 test_cmd = "rfcomm_facade.ConnectRfcommChannel" 73 test_args = { 74 "peer_id": peer_id, 75 "server_channel_number": server_channel_number 76 } 77 test_id = self.build_id(self.test_counter) 78 self.test_counter += 1 79 80 return self.send_command(test_id, test_cmd, test_args) 81 82 def disconnectRfcommChannel(self, peer_id, server_channel_number): 83 """Closes the RFCOMM channel with the remote peer 84 85 Returns: 86 Dictionary, None if success, error if error. 87 """ 88 test_cmd = "rfcomm_facade.DisconnectRfcommChannel" 89 test_args = { 90 "peer_id": peer_id, 91 "server_channel_number": server_channel_number 92 } 93 test_id = self.build_id(self.test_counter) 94 self.test_counter += 1 95 96 return self.send_command(test_id, test_cmd, test_args) 97 98 def sendRemoteLineStatus(self, peer_id, server_channel_number): 99 """Sends a Remote Line Status update to the remote peer for the provided channel number 100 101 Returns: 102 Dictionary, None if success, error if error. 103 """ 104 test_cmd = "rfcomm_facade.SendRemoteLineStatus" 105 test_args = { 106 "peer_id": peer_id, 107 "server_channel_number": server_channel_number 108 } 109 test_id = self.build_id(self.test_counter) 110 self.test_counter += 1 111 112 return self.send_command(test_id, test_cmd, test_args) 113 114 def writeRfcomm(self, peer_id, server_channel_number, data): 115 """Sends data to the remote peer over the RFCOMM channel 116 117 Returns: 118 Dictionary, None if success, error if error. 119 """ 120 test_cmd = "rfcomm_facade.RfcommWrite" 121 test_args = { 122 "peer_id": peer_id, 123 "server_channel_number": server_channel_number, 124 "data": data 125 } 126 test_id = self.build_id(self.test_counter) 127 self.test_counter += 1 128 129 return self.send_command(test_id, test_cmd, test_args) 130