• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts.controllers.fuchsia_lib.base_lib import BaseLib
18
19
20class FuchsiaRfcommLib(BaseLib):
21    def __init__(self, addr, tc, client_id):
22        self.address = addr
23        self.test_counter = tc
24        self.client_id = client_id
25
26    def init(self):
27        """Initializes the RFCOMM service.
28
29        Returns:
30            Dictionary, None if success, error if error.
31        """
32        test_cmd = "rfcomm_facade.RfcommInit"
33
34        test_args = {}
35        test_id = self.build_id(self.test_counter)
36        self.test_counter += 1
37
38        return self.send_command(test_id, test_cmd, test_args)
39
40    def removeService(self):
41        """Removes the RFCOMM service from the Fuchsia device
42
43        Returns:
44            Dictionary, None if success, error if error.
45        """
46        test_cmd = "rfcomm_facade.RfcommRemoveService"
47        test_args = {}
48        test_id = self.build_id(self.test_counter)
49        self.test_counter += 1
50
51        return self.send_command(test_id, test_cmd, test_args)
52
53    def disconnectSession(self, peer_id):
54        """Closes the RFCOMM Session with the remote peer
55
56        Returns:
57            Dictionary, None if success, error if error.
58        """
59        test_cmd = "rfcomm_facade.DisconnectSession"
60        test_args = {"peer_id": peer_id}
61        test_id = self.build_id(self.test_counter)
62        self.test_counter += 1
63
64        return self.send_command(test_id, test_cmd, test_args)
65
66    def connectRfcommChannel(self, peer_id, server_channel_number):
67        """Makes an outgoing RFCOMM connection to the remote peer
68
69        Returns:
70            Dictionary, None if success, error if error.
71        """
72        test_cmd = "rfcomm_facade.ConnectRfcommChannel"
73        test_args = {
74            "peer_id": peer_id,
75            "server_channel_number": server_channel_number
76        }
77        test_id = self.build_id(self.test_counter)
78        self.test_counter += 1
79
80        return self.send_command(test_id, test_cmd, test_args)
81
82    def disconnectRfcommChannel(self, peer_id, server_channel_number):
83        """Closes the RFCOMM channel with the remote peer
84
85        Returns:
86            Dictionary, None if success, error if error.
87        """
88        test_cmd = "rfcomm_facade.DisconnectRfcommChannel"
89        test_args = {
90            "peer_id": peer_id,
91            "server_channel_number": server_channel_number
92        }
93        test_id = self.build_id(self.test_counter)
94        self.test_counter += 1
95
96        return self.send_command(test_id, test_cmd, test_args)
97
98    def sendRemoteLineStatus(self, peer_id, server_channel_number):
99        """Sends a Remote Line Status update to the remote peer for the provided channel number
100
101        Returns:
102            Dictionary, None if success, error if error.
103        """
104        test_cmd = "rfcomm_facade.SendRemoteLineStatus"
105        test_args = {
106            "peer_id": peer_id,
107            "server_channel_number": server_channel_number
108        }
109        test_id = self.build_id(self.test_counter)
110        self.test_counter += 1
111
112        return self.send_command(test_id, test_cmd, test_args)
113
114    def writeRfcomm(self, peer_id, server_channel_number, data):
115        """Sends data to the remote peer over the RFCOMM channel
116
117        Returns:
118            Dictionary, None if success, error if error.
119        """
120        test_cmd = "rfcomm_facade.RfcommWrite"
121        test_args = {
122            "peer_id": peer_id,
123            "server_channel_number": server_channel_number,
124            "data": data
125        }
126        test_id = self.build_id(self.test_counter)
127        self.test_counter += 1
128
129        return self.send_command(test_id, test_cmd, test_args)
130