• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts.controllers.fuchsia_lib.base_lib import BaseLib
18
19
20class FuchsiaGattcLib(BaseLib):
21    def __init__(self, addr, tc, client_id):
22        self.address = addr
23        self.test_counter = tc
24        self.client_id = client_id
25
26    def bleStartBleScan(self, scan_filter):
27        """Starts a BLE scan
28
29        Args:
30            scan_time_ms: int, Amount of time to scan for.
31            scan_filter: dictionary, Device filter for a scan.
32            scan_count: int, Number of devices to scan for before termination.
33
34        Returns:
35            None if pass, err if fail.
36        """
37        test_cmd = "gatt_client_facade.BleStartScan"
38        test_args = {
39            "filter": scan_filter,
40        }
41        test_id = self.build_id(self.test_counter)
42        self.test_counter += 1
43
44        return self.send_command(test_id, test_cmd, test_args)
45
46    def bleStopBleScan(self):
47        """Stops a BLE scan
48
49        Returns:
50            Dictionary, List of devices discovered, error string if error.
51        """
52        test_cmd = "gatt_client_facade.BleStopScan"
53        test_args = {}
54        test_id = self.build_id(self.test_counter)
55        self.test_counter += 1
56
57        return self.send_command(test_id, test_cmd, test_args)
58
59    def listServices(self, id):
60        """Lists services of a peripheral specified by id.
61
62        Args:
63            id: string, Peripheral identifier to list services.
64
65        Returns:
66            Dictionary, List of Service Info if success, error string if error.
67        """
68        test_cmd = "gatt_client_facade.GattcListServices"
69        test_args = {"identifier": id}
70        test_id = self.build_id(self.test_counter)
71        self.test_counter += 1
72
73        return self.send_command(test_id, test_cmd, test_args)
74
75    def bleGetDiscoveredDevices(self):
76        """Stops a BLE scan
77
78        Returns:
79            Dictionary, List of devices discovered, error string if error.
80        """
81        test_cmd = "gatt_client_facade.BleGetDiscoveredDevices"
82        test_args = {}
83        test_id = self.build_id(self.test_counter)
84        self.test_counter += 1
85
86        return self.send_command(test_id, test_cmd, test_args)
87
88    def discoverCharacteristics(self):
89        """Discover the characteristics of a connected service.
90
91        Returns:
92            Dictionary, List of Characteristics and Descriptors if success,
93            error string if error.
94        """
95        test_cmd = "gatt_client_facade.GattcDiscoverCharacteristics"
96        test_args = {}
97        test_id = self.build_id(self.test_counter)
98        self.test_counter += 1
99
100        return self.send_command(test_id, test_cmd, test_args)
101
102    def writeCharById(self, id, offset, write_value):
103        """Write Characteristic by id..
104
105        Args:
106            id: string, Characteristic identifier.
107            offset: int, The offset of bytes to write to.
108            write_value: byte array, The bytes to write.
109
110        Returns:
111            None if success, error string if error.
112        """
113        test_cmd = "gatt_client_facade.GattcWriteCharacteristicById"
114        test_args = {
115            "identifier": id,
116            "offset": offset,
117            "write_value": write_value,
118        }
119        test_id = self.build_id(self.test_counter)
120        self.test_counter += 1
121
122        return self.send_command(test_id, test_cmd, test_args)
123
124    def writeLongCharById(self, id, offset, write_value, reliable_mode=False):
125        """Write Characteristic by id.
126
127        Args:
128            id: string, Characteristic identifier.
129            offset: int, The offset of bytes to write to.
130            write_value: byte array, The bytes to write.
131            reliable_mode: bool value representing reliable writes.
132
133        Returns:
134            None if success, error string if error.
135        """
136        test_cmd = "gatt_client_facade.GattcWriteLongCharacteristicById"
137        test_args = {
138            "identifier": id,
139            "offset": offset,
140            "write_value": write_value,
141            "reliable_mode": reliable_mode
142        }
143        test_id = self.build_id(self.test_counter)
144        self.test_counter += 1
145
146        return self.send_command(test_id, test_cmd, test_args)
147
148    def writeLongDescById(self, id, offset, write_value):
149        """Write Descriptor by id.
150
151        Args:
152            id: string, Characteristic identifier.
153            offset: int, The offset of bytes to write to.
154            write_value: byte array, The bytes to write.
155
156        Returns:
157            None if success, error string if error.
158        """
159        test_cmd = "gatt_client_facade.GattcWriteLongDescriptorById"
160        test_args = {
161            "identifier": id,
162            "offset": offset,
163            "write_value": write_value,
164        }
165        test_id = self.build_id(self.test_counter)
166        self.test_counter += 1
167
168        return self.send_command(test_id, test_cmd, test_args)
169
170    def writeCharByIdWithoutResponse(self, id, write_value):
171        """Write Characteristic by id without response.
172
173        Args:
174            id: string, Characteristic identifier.
175            write_value: byte array, The bytes to write.
176
177        Returns:
178            None if success, error string if error.
179        """
180        test_cmd = "gatt_client_facade.GattcWriteCharacteristicByIdWithoutResponse"
181        test_args = {
182            "identifier": id,
183            "write_value": write_value,
184        }
185        test_id = self.build_id(self.test_counter)
186        self.test_counter += 1
187
188        return self.send_command(test_id, test_cmd, test_args)
189
190    def enableNotifyCharacteristic(self, id):
191        """Enable notifications on a Characteristic.
192
193        Args:
194            id: string, Characteristic identifier.
195
196        Returns:
197            None if success, error string if error.
198        """
199        test_cmd = "gatt_client_facade.GattcEnableNotifyCharacteristic"
200        test_args = {
201            "identifier": id,
202        }
203        test_id = self.build_id(self.test_counter)
204        self.test_counter += 1
205
206        return self.send_command(test_id, test_cmd, test_args)
207
208    def disableNotifyCharacteristic(self, id):
209        """Disable notifications on a Characteristic.
210
211        Args:
212            id: string, Characteristic identifier.
213
214        Returns:
215            None if success, error string if error.
216        """
217        test_cmd = "gatt_client_facade.GattcDisableNotifyCharacteristic"
218        test_args = {
219            "identifier": id,
220            "value": False,
221        }
222        test_id = self.build_id(self.test_counter)
223        self.test_counter += 1
224
225        return self.send_command(test_id, test_cmd, test_args)
226
227    def readCharacteristicById(self, id):
228        """Read Characteristic value by id..
229
230        Args:
231            id: string, Characteristic identifier.
232
233        Returns:
234            Characteristic value if success, error string if error.
235        """
236        test_cmd = "gatt_client_facade.GattcReadCharacteristicById"
237        test_args = {
238            "identifier": id,
239        }
240        test_id = self.build_id(self.test_counter)
241        self.test_counter += 1
242
243        return self.send_command(test_id, test_cmd, test_args)
244
245    def readCharacteristicByType(self, uuid):
246        """Read Characteristic value by id..
247
248        Args:
249            uuid: string, Characteristic identifier.
250
251        Returns:
252            Characteristic value if success, error string if error.
253        """
254        test_cmd = "gatt_client_facade.GattcReadCharacteristicByType"
255        test_args = {
256            "uuid": uuid,
257        }
258        test_id = self.build_id(self.test_counter)
259        self.test_counter += 1
260
261        return self.send_command(test_id, test_cmd, test_args)
262
263    def readDescriptorById(self, id):
264        """Read Descriptor value by id..
265
266        Args:
267            id: string, Descriptor identifier.
268
269        Returns:
270            Descriptor value if success, error string if error.
271        """
272        test_cmd = "gatt_client_facade.GattcReadDescriptorById"
273        test_args = {
274            "identifier": id,
275        }
276        test_id = self.build_id(self.test_counter)
277        self.test_counter += 1
278
279        return self.send_command(test_id, test_cmd, test_args)
280
281    def readLongDescriptorById(self, id, offset, max_bytes):
282        """Reads Long Descriptor value by id.
283
284        Args:
285            id: string, Descriptor identifier.
286            offset: int, The offset to start reading from.
287            max_bytes: int, The max bytes to return.
288
289        Returns:
290            Descriptor value if success, error string if error.
291        """
292        test_cmd = "gatt_client_facade.GattcReadLongDescriptorById"
293        test_args = {
294            "identifier": id,
295            "offset": offset,
296            "max_bytes": max_bytes
297        }
298        test_id = self.build_id(self.test_counter)
299        self.test_counter += 1
300
301        return self.send_command(test_id, test_cmd, test_args)
302
303    def writeDescriptorById(self, id, offset, write_value):
304        """Write Descriptor by id.
305
306        Args:
307            id: string, Descriptor identifier.
308            write_value: byte array, The bytes to write.
309
310        Returns:
311            None if success, error string if error.
312        """
313        test_cmd = "gatt_client_facade.GattcWriteDescriptorById"
314        test_args = {
315            "identifier": id,
316            "write_value": write_value,
317        }
318        test_id = self.build_id(self.test_counter)
319        self.test_counter += 1
320
321        return self.send_command(test_id, test_cmd, test_args)
322
323    def readLongCharacteristicById(self, id, offset, max_bytes):
324        """Reads Long Characteristic value by id.
325
326        Args:
327            id: string, Characteristic identifier.
328            offset: int, The offset to start reading from.
329            max_bytes: int, The max bytes to return.
330
331        Returns:
332            Characteristic value if success, error string if error.
333        """
334        test_cmd = "gatt_client_facade.GattcReadLongCharacteristicById"
335        test_args = {
336            "identifier": id,
337            "offset": offset,
338            "max_bytes": max_bytes
339        }
340        test_id = self.build_id(self.test_counter)
341        self.test_counter += 1
342
343        return self.send_command(test_id, test_cmd, test_args)
344
345    def connectToService(self, id, service_id):
346        """ Connect to a specific Service specified by id.
347
348        Args:
349            id: string, Service id.
350
351        Returns:
352            None if success, error string if error.
353        """
354        test_cmd = "gatt_client_facade.GattcConnectToService"
355        test_args = {"identifier": id, "service_identifier": service_id}
356        test_id = self.build_id(self.test_counter)
357        self.test_counter += 1
358
359        return self.send_command(test_id, test_cmd, test_args)
360
361    def bleConnectToPeripheral(self, id):
362        """Connects to a peripheral specified by id.
363
364        Args:
365            id: string, Peripheral identifier to connect to.
366
367        Returns:
368            Dictionary, List of Service Info if success, error string if error.
369        """
370        test_cmd = "gatt_client_facade.BleConnectPeripheral"
371        test_args = {"identifier": id}
372        test_id = self.build_id(self.test_counter)
373        self.test_counter += 1
374
375        return self.send_command(test_id, test_cmd, test_args)
376
377    def bleDisconnectPeripheral(self, id):
378        """Disconnects from a peripheral specified by id.
379
380        Args:
381            id: string, Peripheral identifier to disconnect from.
382
383        Returns:
384            Dictionary, None if success, error string if error.
385        """
386        test_cmd = "gatt_client_facade.BleDisconnectPeripheral"
387        test_args = {"identifier": id}
388        test_id = self.build_id(self.test_counter)
389        self.test_counter += 1
390
391        return self.send_command(test_id, test_cmd, test_args)