1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts.controllers.fuchsia_lib.base_lib import BaseLib 18 19 20class FuchsiaGattcLib(BaseLib): 21 def __init__(self, addr, tc, client_id): 22 self.address = addr 23 self.test_counter = tc 24 self.client_id = client_id 25 26 def bleStartBleScan(self, scan_filter): 27 """Starts a BLE scan 28 29 Args: 30 scan_time_ms: int, Amount of time to scan for. 31 scan_filter: dictionary, Device filter for a scan. 32 scan_count: int, Number of devices to scan for before termination. 33 34 Returns: 35 None if pass, err if fail. 36 """ 37 test_cmd = "gatt_client_facade.BleStartScan" 38 test_args = { 39 "filter": scan_filter, 40 } 41 test_id = self.build_id(self.test_counter) 42 self.test_counter += 1 43 44 return self.send_command(test_id, test_cmd, test_args) 45 46 def bleStopBleScan(self): 47 """Stops a BLE scan 48 49 Returns: 50 Dictionary, List of devices discovered, error string if error. 51 """ 52 test_cmd = "gatt_client_facade.BleStopScan" 53 test_args = {} 54 test_id = self.build_id(self.test_counter) 55 self.test_counter += 1 56 57 return self.send_command(test_id, test_cmd, test_args) 58 59 def listServices(self, id): 60 """Lists services of a peripheral specified by id. 61 62 Args: 63 id: string, Peripheral identifier to list services. 64 65 Returns: 66 Dictionary, List of Service Info if success, error string if error. 67 """ 68 test_cmd = "gatt_client_facade.GattcListServices" 69 test_args = {"identifier": id} 70 test_id = self.build_id(self.test_counter) 71 self.test_counter += 1 72 73 return self.send_command(test_id, test_cmd, test_args) 74 75 def bleGetDiscoveredDevices(self): 76 """Stops a BLE scan 77 78 Returns: 79 Dictionary, List of devices discovered, error string if error. 80 """ 81 test_cmd = "gatt_client_facade.BleGetDiscoveredDevices" 82 test_args = {} 83 test_id = self.build_id(self.test_counter) 84 self.test_counter += 1 85 86 return self.send_command(test_id, test_cmd, test_args) 87 88 def discoverCharacteristics(self): 89 """Discover the characteristics of a connected service. 90 91 Returns: 92 Dictionary, List of Characteristics and Descriptors if success, 93 error string if error. 94 """ 95 test_cmd = "gatt_client_facade.GattcDiscoverCharacteristics" 96 test_args = {} 97 test_id = self.build_id(self.test_counter) 98 self.test_counter += 1 99 100 return self.send_command(test_id, test_cmd, test_args) 101 102 def writeCharById(self, id, offset, write_value): 103 """Write Characteristic by id.. 104 105 Args: 106 id: string, Characteristic identifier. 107 offset: int, The offset of bytes to write to. 108 write_value: byte array, The bytes to write. 109 110 Returns: 111 None if success, error string if error. 112 """ 113 test_cmd = "gatt_client_facade.GattcWriteCharacteristicById" 114 test_args = { 115 "identifier": id, 116 "offset": offset, 117 "write_value": write_value, 118 } 119 test_id = self.build_id(self.test_counter) 120 self.test_counter += 1 121 122 return self.send_command(test_id, test_cmd, test_args) 123 124 def writeLongCharById(self, id, offset, write_value, reliable_mode=False): 125 """Write Characteristic by id. 126 127 Args: 128 id: string, Characteristic identifier. 129 offset: int, The offset of bytes to write to. 130 write_value: byte array, The bytes to write. 131 reliable_mode: bool value representing reliable writes. 132 133 Returns: 134 None if success, error string if error. 135 """ 136 test_cmd = "gatt_client_facade.GattcWriteLongCharacteristicById" 137 test_args = { 138 "identifier": id, 139 "offset": offset, 140 "write_value": write_value, 141 "reliable_mode": reliable_mode 142 } 143 test_id = self.build_id(self.test_counter) 144 self.test_counter += 1 145 146 return self.send_command(test_id, test_cmd, test_args) 147 148 def writeLongDescById(self, id, offset, write_value): 149 """Write Descriptor by id. 150 151 Args: 152 id: string, Characteristic identifier. 153 offset: int, The offset of bytes to write to. 154 write_value: byte array, The bytes to write. 155 156 Returns: 157 None if success, error string if error. 158 """ 159 test_cmd = "gatt_client_facade.GattcWriteLongDescriptorById" 160 test_args = { 161 "identifier": id, 162 "offset": offset, 163 "write_value": write_value, 164 } 165 test_id = self.build_id(self.test_counter) 166 self.test_counter += 1 167 168 return self.send_command(test_id, test_cmd, test_args) 169 170 def writeCharByIdWithoutResponse(self, id, write_value): 171 """Write Characteristic by id without response. 172 173 Args: 174 id: string, Characteristic identifier. 175 write_value: byte array, The bytes to write. 176 177 Returns: 178 None if success, error string if error. 179 """ 180 test_cmd = "gatt_client_facade.GattcWriteCharacteristicByIdWithoutResponse" 181 test_args = { 182 "identifier": id, 183 "write_value": write_value, 184 } 185 test_id = self.build_id(self.test_counter) 186 self.test_counter += 1 187 188 return self.send_command(test_id, test_cmd, test_args) 189 190 def enableNotifyCharacteristic(self, id): 191 """Enable notifications on a Characteristic. 192 193 Args: 194 id: string, Characteristic identifier. 195 196 Returns: 197 None if success, error string if error. 198 """ 199 test_cmd = "gatt_client_facade.GattcEnableNotifyCharacteristic" 200 test_args = { 201 "identifier": id, 202 } 203 test_id = self.build_id(self.test_counter) 204 self.test_counter += 1 205 206 return self.send_command(test_id, test_cmd, test_args) 207 208 def disableNotifyCharacteristic(self, id): 209 """Disable notifications on a Characteristic. 210 211 Args: 212 id: string, Characteristic identifier. 213 214 Returns: 215 None if success, error string if error. 216 """ 217 test_cmd = "gatt_client_facade.GattcDisableNotifyCharacteristic" 218 test_args = { 219 "identifier": id, 220 "value": False, 221 } 222 test_id = self.build_id(self.test_counter) 223 self.test_counter += 1 224 225 return self.send_command(test_id, test_cmd, test_args) 226 227 def readCharacteristicById(self, id): 228 """Read Characteristic value by id.. 229 230 Args: 231 id: string, Characteristic identifier. 232 233 Returns: 234 Characteristic value if success, error string if error. 235 """ 236 test_cmd = "gatt_client_facade.GattcReadCharacteristicById" 237 test_args = { 238 "identifier": id, 239 } 240 test_id = self.build_id(self.test_counter) 241 self.test_counter += 1 242 243 return self.send_command(test_id, test_cmd, test_args) 244 245 def readCharacteristicByType(self, uuid): 246 """Read Characteristic value by id.. 247 248 Args: 249 uuid: string, Characteristic identifier. 250 251 Returns: 252 Characteristic value if success, error string if error. 253 """ 254 test_cmd = "gatt_client_facade.GattcReadCharacteristicByType" 255 test_args = { 256 "uuid": uuid, 257 } 258 test_id = self.build_id(self.test_counter) 259 self.test_counter += 1 260 261 return self.send_command(test_id, test_cmd, test_args) 262 263 def readDescriptorById(self, id): 264 """Read Descriptor value by id.. 265 266 Args: 267 id: string, Descriptor identifier. 268 269 Returns: 270 Descriptor value if success, error string if error. 271 """ 272 test_cmd = "gatt_client_facade.GattcReadDescriptorById" 273 test_args = { 274 "identifier": id, 275 } 276 test_id = self.build_id(self.test_counter) 277 self.test_counter += 1 278 279 return self.send_command(test_id, test_cmd, test_args) 280 281 def readLongDescriptorById(self, id, offset, max_bytes): 282 """Reads Long Descriptor value by id. 283 284 Args: 285 id: string, Descriptor identifier. 286 offset: int, The offset to start reading from. 287 max_bytes: int, The max bytes to return. 288 289 Returns: 290 Descriptor value if success, error string if error. 291 """ 292 test_cmd = "gatt_client_facade.GattcReadLongDescriptorById" 293 test_args = { 294 "identifier": id, 295 "offset": offset, 296 "max_bytes": max_bytes 297 } 298 test_id = self.build_id(self.test_counter) 299 self.test_counter += 1 300 301 return self.send_command(test_id, test_cmd, test_args) 302 303 def writeDescriptorById(self, id, offset, write_value): 304 """Write Descriptor by id. 305 306 Args: 307 id: string, Descriptor identifier. 308 write_value: byte array, The bytes to write. 309 310 Returns: 311 None if success, error string if error. 312 """ 313 test_cmd = "gatt_client_facade.GattcWriteDescriptorById" 314 test_args = { 315 "identifier": id, 316 "write_value": write_value, 317 } 318 test_id = self.build_id(self.test_counter) 319 self.test_counter += 1 320 321 return self.send_command(test_id, test_cmd, test_args) 322 323 def readLongCharacteristicById(self, id, offset, max_bytes): 324 """Reads Long Characteristic value by id. 325 326 Args: 327 id: string, Characteristic identifier. 328 offset: int, The offset to start reading from. 329 max_bytes: int, The max bytes to return. 330 331 Returns: 332 Characteristic value if success, error string if error. 333 """ 334 test_cmd = "gatt_client_facade.GattcReadLongCharacteristicById" 335 test_args = { 336 "identifier": id, 337 "offset": offset, 338 "max_bytes": max_bytes 339 } 340 test_id = self.build_id(self.test_counter) 341 self.test_counter += 1 342 343 return self.send_command(test_id, test_cmd, test_args) 344 345 def connectToService(self, id, service_id): 346 """ Connect to a specific Service specified by id. 347 348 Args: 349 id: string, Service id. 350 351 Returns: 352 None if success, error string if error. 353 """ 354 test_cmd = "gatt_client_facade.GattcConnectToService" 355 test_args = {"identifier": id, "service_identifier": service_id} 356 test_id = self.build_id(self.test_counter) 357 self.test_counter += 1 358 359 return self.send_command(test_id, test_cmd, test_args) 360 361 def bleConnectToPeripheral(self, id): 362 """Connects to a peripheral specified by id. 363 364 Args: 365 id: string, Peripheral identifier to connect to. 366 367 Returns: 368 Dictionary, List of Service Info if success, error string if error. 369 """ 370 test_cmd = "gatt_client_facade.BleConnectPeripheral" 371 test_args = {"identifier": id} 372 test_id = self.build_id(self.test_counter) 373 self.test_counter += 1 374 375 return self.send_command(test_id, test_cmd, test_args) 376 377 def bleDisconnectPeripheral(self, id): 378 """Disconnects from a peripheral specified by id. 379 380 Args: 381 id: string, Peripheral identifier to disconnect from. 382 383 Returns: 384 Dictionary, None if success, error string if error. 385 """ 386 test_cmd = "gatt_client_facade.BleDisconnectPeripheral" 387 test_args = {"identifier": id} 388 test_id = self.build_id(self.test_counter) 389 self.test_counter += 1 390 391 return self.send_command(test_id, test_cmd, test_args)