1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts.controllers.fuchsia_lib.base_lib import BaseLib 18 19 20class FuchsiaGattcLib(BaseLib): 21 22 def __init__(self, addr: str) -> None: 23 super().__init__(addr, "gatt_client") 24 25 def bleStartBleScan(self, scan_filter): 26 """Starts a BLE scan 27 28 Args: 29 scan_time_ms: int, Amount of time to scan for. 30 scan_filter: dictionary, Device filter for a scan. 31 scan_count: int, Number of devices to scan for before termination. 32 33 Returns: 34 None if pass, err if fail. 35 """ 36 test_cmd = "gatt_client_facade.BleStartScan" 37 test_args = { 38 "filter": scan_filter, 39 } 40 41 return self.send_command(test_cmd, test_args) 42 43 def bleStopBleScan(self): 44 """Stops a BLE scan 45 46 Returns: 47 Dictionary, List of devices discovered, error string if error. 48 """ 49 test_cmd = "gatt_client_facade.BleStopScan" 50 test_args = {} 51 52 return self.send_command(test_cmd, test_args) 53 54 def listServices(self, id): 55 """Lists services of a peripheral specified by id. 56 57 Args: 58 id: string, Peripheral identifier to list services. 59 60 Returns: 61 Dictionary, List of Service Info if success, error string if error. 62 """ 63 test_cmd = "gatt_client_facade.GattcListServices" 64 test_args = {"identifier": id} 65 66 return self.send_command(test_cmd, test_args) 67 68 def bleGetDiscoveredDevices(self): 69 """Stops a BLE scan 70 71 Returns: 72 Dictionary, List of devices discovered, error string if error. 73 """ 74 test_cmd = "gatt_client_facade.BleGetDiscoveredDevices" 75 test_args = {} 76 77 return self.send_command(test_cmd, test_args) 78 79 def discoverCharacteristics(self): 80 """Discover the characteristics of a connected service. 81 82 Returns: 83 Dictionary, List of Characteristics and Descriptors if success, 84 error string if error. 85 """ 86 test_cmd = "gatt_client_facade.GattcDiscoverCharacteristics" 87 test_args = {} 88 89 return self.send_command(test_cmd, test_args) 90 91 def writeCharById(self, id, offset, write_value): 92 """Write Characteristic by id.. 93 94 Args: 95 id: string, Characteristic identifier. 96 offset: int, The offset of bytes to write to. 97 write_value: byte array, The bytes to write. 98 99 Returns: 100 None if success, error string if error. 101 """ 102 test_cmd = "gatt_client_facade.GattcWriteCharacteristicById" 103 test_args = { 104 "identifier": id, 105 "offset": offset, 106 "write_value": write_value, 107 } 108 109 return self.send_command(test_cmd, test_args) 110 111 def writeLongCharById(self, id, offset, write_value, reliable_mode=False): 112 """Write Characteristic by id. 113 114 Args: 115 id: string, Characteristic identifier. 116 offset: int, The offset of bytes to write to. 117 write_value: byte array, The bytes to write. 118 reliable_mode: bool value representing reliable writes. 119 120 Returns: 121 None if success, error string if error. 122 """ 123 test_cmd = "gatt_client_facade.GattcWriteLongCharacteristicById" 124 test_args = { 125 "identifier": id, 126 "offset": offset, 127 "write_value": write_value, 128 "reliable_mode": reliable_mode 129 } 130 131 return self.send_command(test_cmd, test_args) 132 133 def writeLongDescById(self, id, offset, write_value): 134 """Write Descriptor by id. 135 136 Args: 137 id: string, Characteristic identifier. 138 offset: int, The offset of bytes to write to. 139 write_value: byte array, The bytes to write. 140 141 Returns: 142 None if success, error string if error. 143 """ 144 test_cmd = "gatt_client_facade.GattcWriteLongDescriptorById" 145 test_args = { 146 "identifier": id, 147 "offset": offset, 148 "write_value": write_value, 149 } 150 151 return self.send_command(test_cmd, test_args) 152 153 def writeCharByIdWithoutResponse(self, id, write_value): 154 """Write Characteristic by id without response. 155 156 Args: 157 id: string, Characteristic identifier. 158 write_value: byte array, The bytes to write. 159 160 Returns: 161 None if success, error string if error. 162 """ 163 test_cmd = "gatt_client_facade.GattcWriteCharacteristicByIdWithoutResponse" 164 test_args = { 165 "identifier": id, 166 "write_value": write_value, 167 } 168 169 return self.send_command(test_cmd, test_args) 170 171 def enableNotifyCharacteristic(self, id): 172 """Enable notifications on a Characteristic. 173 174 Args: 175 id: string, Characteristic identifier. 176 177 Returns: 178 None if success, error string if error. 179 """ 180 test_cmd = "gatt_client_facade.GattcEnableNotifyCharacteristic" 181 test_args = { 182 "identifier": id, 183 } 184 185 return self.send_command(test_cmd, test_args) 186 187 def disableNotifyCharacteristic(self, id): 188 """Disable notifications on a Characteristic. 189 190 Args: 191 id: string, Characteristic identifier. 192 193 Returns: 194 None if success, error string if error. 195 """ 196 test_cmd = "gatt_client_facade.GattcDisableNotifyCharacteristic" 197 test_args = { 198 "identifier": id, 199 "value": False, 200 } 201 202 return self.send_command(test_cmd, test_args) 203 204 def readCharacteristicById(self, id): 205 """Read Characteristic value by id.. 206 207 Args: 208 id: string, Characteristic identifier. 209 210 Returns: 211 Characteristic value if success, error string if error. 212 """ 213 test_cmd = "gatt_client_facade.GattcReadCharacteristicById" 214 test_args = { 215 "identifier": id, 216 } 217 218 return self.send_command(test_cmd, test_args) 219 220 def readCharacteristicByType(self, uuid): 221 """Read Characteristic value by id.. 222 223 Args: 224 uuid: string, Characteristic identifier. 225 226 Returns: 227 Characteristic value if success, error string if error. 228 """ 229 test_cmd = "gatt_client_facade.GattcReadCharacteristicByType" 230 test_args = { 231 "uuid": uuid, 232 } 233 234 return self.send_command(test_cmd, test_args) 235 236 def readDescriptorById(self, id): 237 """Read Descriptor value by id.. 238 239 Args: 240 id: string, Descriptor identifier. 241 242 Returns: 243 Descriptor value if success, error string if error. 244 """ 245 test_cmd = "gatt_client_facade.GattcReadDescriptorById" 246 test_args = { 247 "identifier": id, 248 } 249 250 return self.send_command(test_cmd, test_args) 251 252 def readLongDescriptorById(self, id, offset, max_bytes): 253 """Reads Long Descriptor value by id. 254 255 Args: 256 id: string, Descriptor identifier. 257 offset: int, The offset to start reading from. 258 max_bytes: int, The max bytes to return. 259 260 Returns: 261 Descriptor value if success, error string if error. 262 """ 263 test_cmd = "gatt_client_facade.GattcReadLongDescriptorById" 264 test_args = { 265 "identifier": id, 266 "offset": offset, 267 "max_bytes": max_bytes 268 } 269 270 return self.send_command(test_cmd, test_args) 271 272 def writeDescriptorById(self, id, offset, write_value): 273 """Write Descriptor by id. 274 275 Args: 276 id: string, Descriptor identifier. 277 write_value: byte array, The bytes to write. 278 279 Returns: 280 None if success, error string if error. 281 """ 282 test_cmd = "gatt_client_facade.GattcWriteDescriptorById" 283 test_args = { 284 "identifier": id, 285 "write_value": write_value, 286 } 287 288 return self.send_command(test_cmd, test_args) 289 290 def readLongCharacteristicById(self, id, offset, max_bytes): 291 """Reads Long Characteristic value by id. 292 293 Args: 294 id: string, Characteristic identifier. 295 offset: int, The offset to start reading from. 296 max_bytes: int, The max bytes to return. 297 298 Returns: 299 Characteristic value if success, error string if error. 300 """ 301 test_cmd = "gatt_client_facade.GattcReadLongCharacteristicById" 302 test_args = { 303 "identifier": id, 304 "offset": offset, 305 "max_bytes": max_bytes 306 } 307 308 return self.send_command(test_cmd, test_args) 309 310 def connectToService(self, id, service_id): 311 """ Connect to a specific Service specified by id. 312 313 Args: 314 id: string, Service id. 315 316 Returns: 317 None if success, error string if error. 318 """ 319 test_cmd = "gatt_client_facade.GattcConnectToService" 320 test_args = {"identifier": id, "service_identifier": service_id} 321 322 return self.send_command(test_cmd, test_args) 323 324 def bleConnectToPeripheral(self, id): 325 """Connects to a peripheral specified by id. 326 327 Args: 328 id: string, Peripheral identifier to connect to. 329 330 Returns: 331 Dictionary, List of Service Info if success, error string if error. 332 """ 333 test_cmd = "gatt_client_facade.BleConnectPeripheral" 334 test_args = {"identifier": id} 335 336 return self.send_command(test_cmd, test_args) 337 338 def bleDisconnectPeripheral(self, id): 339 """Disconnects from a peripheral specified by id. 340 341 Args: 342 id: string, Peripheral identifier to disconnect from. 343 344 Returns: 345 Dictionary, None if success, error string if error. 346 """ 347 test_cmd = "gatt_client_facade.BleDisconnectPeripheral" 348 test_args = {"identifier": id} 349 350 return self.send_command(test_cmd, test_args) 351