• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts.controllers.fuchsia_lib.base_lib import BaseLib
18
19
20class FuchsiaGattcLib(BaseLib):
21    def __init__(self, addr, tc, client_id):
22        self.address = addr
23        self.test_counter = tc
24        self.client_id = client_id
25
26    def bleStartBleScan(self, scan_filter):
27        """Starts a BLE scan
28
29        Args:
30            scan_time_ms: int, Amount of time to scan for.
31            scan_filter: dictionary, Device filter for a scan.
32            scan_count: int, Number of devices to scan for before termination.
33
34        Returns:
35            None if pass, err if fail.
36        """
37        test_cmd = "gatt_client_facade.BleStartScan"
38        test_args = {
39            "filter": scan_filter,
40        }
41        test_id = self.build_id(self.test_counter)
42        self.test_counter += 1
43
44        return self.send_command(test_id, test_cmd, test_args)
45
46    def bleStopBleScan(self):
47        """Stops a BLE scan
48
49        Returns:
50            Dictionary, List of devices discovered, error string if error.
51        """
52        test_cmd = "gatt_client_facade.BleStopScan"
53        test_args = {}
54        test_id = self.build_id(self.test_counter)
55        self.test_counter += 1
56
57        return self.send_command(test_id, test_cmd, test_args)
58
59    def listServices(self, id):
60        """Lists services of a peripheral specified by id.
61
62        Args:
63            id: string, Peripheral identifier to list services.
64
65        Returns:
66            Dictionary, List of Service Info if success, error string if error.
67        """
68        test_cmd = "gatt_client_facade.GattcListServices"
69        test_args = {"identifier": id}
70        test_id = self.build_id(self.test_counter)
71        self.test_counter += 1
72
73        return self.send_command(test_id, test_cmd, test_args)
74
75    def bleGetDiscoveredDevices(self):
76        """Stops a BLE scan
77
78        Returns:
79            Dictionary, List of devices discovered, error string if error.
80        """
81        test_cmd = "gatt_client_facade.BleGetDiscoveredDevices"
82        test_args = {}
83        test_id = self.build_id(self.test_counter)
84        self.test_counter += 1
85
86        return self.send_command(test_id, test_cmd, test_args)
87
88    def discoverCharacteristics(self):
89        """Discover the characteristics of a connected service.
90
91        Returns:
92            Dictionary, List of Characteristics and Descriptors if success,
93            error string if error.
94        """
95        test_cmd = "gatt_client_facade.GattcDiscoverCharacteristics"
96        test_args = {}
97        test_id = self.build_id(self.test_counter)
98        self.test_counter += 1
99
100        return self.send_command(test_id, test_cmd, test_args)
101
102    def writeCharById(self, id, offset, write_value):
103        """Write Characteristic by id..
104
105        Args:
106            id: string, Characteristic identifier.
107            offset: int, The offset of bytes to write to.
108            write_value: byte array, The bytes to write.
109
110        Returns:
111            None if success, error string if error.
112        """
113        test_cmd = "gatt_client_facade.GattcWriteCharacteristicById"
114        test_args = {
115            "identifier": id,
116            "offset": offset,
117            "write_value": write_value,
118        }
119        test_id = self.build_id(self.test_counter)
120        self.test_counter += 1
121
122        return self.send_command(test_id, test_cmd, test_args)
123
124    def writeCharByIdWithoutResponse(self, id, write_value):
125        """Write Characteristic by id without response.
126
127        Args:
128            id: string, Characteristic identifier.
129            write_value: byte array, The bytes to write.
130
131        Returns:
132            None if success, error string if error.
133        """
134        test_cmd = "gatt_client_facade.GattcWriteCharacteristicByIdWithoutResponse"
135        test_args = {
136            "identifier": id,
137            "write_value": write_value,
138        }
139        test_id = self.build_id(self.test_counter)
140        self.test_counter += 1
141
142        return self.send_command(test_id, test_cmd, test_args)
143
144    def enableNotifyCharacteristic(self, id):
145        """Enable notifications on a Characteristic.
146
147        Args:
148            id: string, Characteristic identifier.
149
150        Returns:
151            None if success, error string if error.
152        """
153        test_cmd = "gatt_client_facade.GattcEnableNotifyCharacteristic"
154        test_args = {
155            "identifier": id,
156        }
157        test_id = self.build_id(self.test_counter)
158        self.test_counter += 1
159
160        return self.send_command(test_id, test_cmd, test_args)
161
162    def disableNotifyCharacteristic(self, id):
163        """Disable notifications on a Characteristic.
164
165        Args:
166            id: string, Characteristic identifier.
167
168        Returns:
169            None if success, error string if error.
170        """
171        test_cmd = "gatt_client_facade.GattcDisableNotifyCharacteristic"
172        test_args = {
173            "identifier": id,
174            "value": False,
175        }
176        test_id = self.build_id(self.test_counter)
177        self.test_counter += 1
178
179        return self.send_command(test_id, test_cmd, test_args)
180
181    def readCharacteristicById(self, id):
182        """Read Characteristic value by id..
183
184        Args:
185            id: string, Characteristic identifier.
186
187        Returns:
188            Characteristic value if success, error string if error.
189        """
190        test_cmd = "gatt_client_facade.GattcReadCharacteristicById"
191        test_args = {
192            "identifier": id,
193        }
194        test_id = self.build_id(self.test_counter)
195        self.test_counter += 1
196
197        return self.send_command(test_id, test_cmd, test_args)
198
199    def readDescriptorById(self, id):
200        """Read Descriptor value by id..
201
202        Args:
203            id: string, Descriptor identifier.
204
205        Returns:
206            Descriptor value if success, error string if error.
207        """
208        test_cmd = "gatt_client_facade.GattcReadDescriptorById"
209        test_args = {
210            "identifier": id,
211        }
212        test_id = self.build_id(self.test_counter)
213        self.test_counter += 1
214
215        return self.send_command(test_id, test_cmd, test_args)
216
217    def readLongDescriptorById(self, id, offset, max_bytes):
218        """Reads Long Descriptor value by id.
219
220        Args:
221            id: string, Descriptor identifier.
222            offset: int, The offset to start reading from.
223            max_bytes: int, The max bytes to return.
224
225        Returns:
226            Descriptor value if success, error string if error.
227        """
228        test_cmd = "gatt_client_facade.GattcReadLongDescriptorById"
229        test_args = {
230            "identifier": id,
231            "offset": offset,
232            "max_bytes": max_bytes
233        }
234        test_id = self.build_id(self.test_counter)
235        self.test_counter += 1
236
237        return self.send_command(test_id, test_cmd, test_args)
238
239    def writeDescriptorById(self, id, offset, write_value):
240        """Write Descriptor by id.
241
242        Args:
243            id: string, Descriptor identifier.
244            write_value: byte array, The bytes to write.
245
246        Returns:
247            None if success, error string if error.
248        """
249        test_cmd = "gatt_client_facade.GattcWriteDescriptorById"
250        test_args = {
251            "identifier": id,
252            "write_value": write_value,
253        }
254        test_id = self.build_id(self.test_counter)
255        self.test_counter += 1
256
257        return self.send_command(test_id, test_cmd, test_args)
258
259    def readLongCharacteristicById(self, id, offset, max_bytes):
260        """Reads Long Characteristic value by id.
261
262        Args:
263            id: string, Characteristic identifier.
264            offset: int, The offset to start reading from.
265            max_bytes: int, The max bytes to return.
266
267        Returns:
268            Characteristic value if success, error string if error.
269        """
270        test_cmd = "gatt_client_facade.GattcReadLongCharacteristicById"
271        test_args = {
272            "identifier": id,
273            "offset": offset,
274            "max_bytes": max_bytes
275        }
276        test_id = self.build_id(self.test_counter)
277        self.test_counter += 1
278
279        return self.send_command(test_id, test_cmd, test_args)
280
281    def connectToService(self, id, service_id):
282        """ Connect to a specific Service specified by id.
283
284        Args:
285            id: string, Service id.
286
287        Returns:
288            None if success, error string if error.
289        """
290        test_cmd = "gatt_client_facade.GattcConnectToService"
291        test_args = {"identifier": id, "service_identifier": service_id}
292        test_id = self.build_id(self.test_counter)
293        self.test_counter += 1
294
295        return self.send_command(test_id, test_cmd, test_args)
296
297    def bleConnectToPeripheral(self, id):
298        """Connects to a peripheral specified by id.
299
300        Args:
301            id: string, Peripheral identifier to connect to.
302
303        Returns:
304            Dictionary, List of Service Info if success, error string if error.
305        """
306        test_cmd = "gatt_client_facade.BleConnectPeripheral"
307        test_args = {"identifier": id}
308        test_id = self.build_id(self.test_counter)
309        self.test_counter += 1
310
311        return self.send_command(test_id, test_cmd, test_args)
312
313    def bleDisconnectPeripheral(self, id):
314        """Disconnects from a peripheral specified by id.
315
316        Args:
317            id: string, Peripheral identifier to disconnect from.
318
319        Returns:
320            Dictionary, None if success, error string if error.
321        """
322        test_cmd = "gatt_client_facade.BleDisconnectPeripheral"
323        test_args = {"identifier": id}
324        test_id = self.build_id(self.test_counter)
325        self.test_counter += 1
326
327        return self.send_command(test_id, test_cmd, test_args)