1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts.controllers.fuchsia_lib.base_lib import BaseLib 18 19 20class FuchsiaGattcLib(BaseLib): 21 def __init__(self, addr, tc, client_id): 22 self.address = addr 23 self.test_counter = tc 24 self.client_id = client_id 25 26 def bleStartBleScan(self, scan_filter): 27 """Starts a BLE scan 28 29 Args: 30 scan_time_ms: int, Amount of time to scan for. 31 scan_filter: dictionary, Device filter for a scan. 32 scan_count: int, Number of devices to scan for before termination. 33 34 Returns: 35 None if pass, err if fail. 36 """ 37 test_cmd = "gatt_client_facade.BleStartScan" 38 test_args = { 39 "filter": scan_filter, 40 } 41 test_id = self.build_id(self.test_counter) 42 self.test_counter += 1 43 44 return self.send_command(test_id, test_cmd, test_args) 45 46 def bleStopBleScan(self): 47 """Stops a BLE scan 48 49 Returns: 50 Dictionary, List of devices discovered, error string if error. 51 """ 52 test_cmd = "gatt_client_facade.BleStopScan" 53 test_args = {} 54 test_id = self.build_id(self.test_counter) 55 self.test_counter += 1 56 57 return self.send_command(test_id, test_cmd, test_args) 58 59 def listServices(self, id): 60 """Lists services of a peripheral specified by id. 61 62 Args: 63 id: string, Peripheral identifier to list services. 64 65 Returns: 66 Dictionary, List of Service Info if success, error string if error. 67 """ 68 test_cmd = "gatt_client_facade.GattcListServices" 69 test_args = {"identifier": id} 70 test_id = self.build_id(self.test_counter) 71 self.test_counter += 1 72 73 return self.send_command(test_id, test_cmd, test_args) 74 75 def bleGetDiscoveredDevices(self): 76 """Stops a BLE scan 77 78 Returns: 79 Dictionary, List of devices discovered, error string if error. 80 """ 81 test_cmd = "gatt_client_facade.BleGetDiscoveredDevices" 82 test_args = {} 83 test_id = self.build_id(self.test_counter) 84 self.test_counter += 1 85 86 return self.send_command(test_id, test_cmd, test_args) 87 88 def discoverCharacteristics(self): 89 """Discover the characteristics of a connected service. 90 91 Returns: 92 Dictionary, List of Characteristics and Descriptors if success, 93 error string if error. 94 """ 95 test_cmd = "gatt_client_facade.GattcDiscoverCharacteristics" 96 test_args = {} 97 test_id = self.build_id(self.test_counter) 98 self.test_counter += 1 99 100 return self.send_command(test_id, test_cmd, test_args) 101 102 def writeCharById(self, id, offset, write_value): 103 """Write Characteristic by id.. 104 105 Args: 106 id: string, Characteristic identifier. 107 offset: int, The offset of bytes to write to. 108 write_value: byte array, The bytes to write. 109 110 Returns: 111 None if success, error string if error. 112 """ 113 test_cmd = "gatt_client_facade.GattcWriteCharacteristicById" 114 test_args = { 115 "identifier": id, 116 "offset": offset, 117 "write_value": write_value, 118 } 119 test_id = self.build_id(self.test_counter) 120 self.test_counter += 1 121 122 return self.send_command(test_id, test_cmd, test_args) 123 124 def writeCharByIdWithoutResponse(self, id, write_value): 125 """Write Characteristic by id without response. 126 127 Args: 128 id: string, Characteristic identifier. 129 write_value: byte array, The bytes to write. 130 131 Returns: 132 None if success, error string if error. 133 """ 134 test_cmd = "gatt_client_facade.GattcWriteCharacteristicByIdWithoutResponse" 135 test_args = { 136 "identifier": id, 137 "write_value": write_value, 138 } 139 test_id = self.build_id(self.test_counter) 140 self.test_counter += 1 141 142 return self.send_command(test_id, test_cmd, test_args) 143 144 def enableNotifyCharacteristic(self, id): 145 """Enable notifications on a Characteristic. 146 147 Args: 148 id: string, Characteristic identifier. 149 150 Returns: 151 None if success, error string if error. 152 """ 153 test_cmd = "gatt_client_facade.GattcEnableNotifyCharacteristic" 154 test_args = { 155 "identifier": id, 156 } 157 test_id = self.build_id(self.test_counter) 158 self.test_counter += 1 159 160 return self.send_command(test_id, test_cmd, test_args) 161 162 def disableNotifyCharacteristic(self, id): 163 """Disable notifications on a Characteristic. 164 165 Args: 166 id: string, Characteristic identifier. 167 168 Returns: 169 None if success, error string if error. 170 """ 171 test_cmd = "gatt_client_facade.GattcDisableNotifyCharacteristic" 172 test_args = { 173 "identifier": id, 174 "value": False, 175 } 176 test_id = self.build_id(self.test_counter) 177 self.test_counter += 1 178 179 return self.send_command(test_id, test_cmd, test_args) 180 181 def readCharacteristicById(self, id): 182 """Read Characteristic value by id.. 183 184 Args: 185 id: string, Characteristic identifier. 186 187 Returns: 188 Characteristic value if success, error string if error. 189 """ 190 test_cmd = "gatt_client_facade.GattcReadCharacteristicById" 191 test_args = { 192 "identifier": id, 193 } 194 test_id = self.build_id(self.test_counter) 195 self.test_counter += 1 196 197 return self.send_command(test_id, test_cmd, test_args) 198 199 def readDescriptorById(self, id): 200 """Read Descriptor value by id.. 201 202 Args: 203 id: string, Descriptor identifier. 204 205 Returns: 206 Descriptor value if success, error string if error. 207 """ 208 test_cmd = "gatt_client_facade.GattcReadDescriptorById" 209 test_args = { 210 "identifier": id, 211 } 212 test_id = self.build_id(self.test_counter) 213 self.test_counter += 1 214 215 return self.send_command(test_id, test_cmd, test_args) 216 217 def readLongDescriptorById(self, id, offset, max_bytes): 218 """Reads Long Descriptor value by id. 219 220 Args: 221 id: string, Descriptor identifier. 222 offset: int, The offset to start reading from. 223 max_bytes: int, The max bytes to return. 224 225 Returns: 226 Descriptor value if success, error string if error. 227 """ 228 test_cmd = "gatt_client_facade.GattcReadLongDescriptorById" 229 test_args = { 230 "identifier": id, 231 "offset": offset, 232 "max_bytes": max_bytes 233 } 234 test_id = self.build_id(self.test_counter) 235 self.test_counter += 1 236 237 return self.send_command(test_id, test_cmd, test_args) 238 239 def writeDescriptorById(self, id, offset, write_value): 240 """Write Descriptor by id. 241 242 Args: 243 id: string, Descriptor identifier. 244 write_value: byte array, The bytes to write. 245 246 Returns: 247 None if success, error string if error. 248 """ 249 test_cmd = "gatt_client_facade.GattcWriteDescriptorById" 250 test_args = { 251 "identifier": id, 252 "write_value": write_value, 253 } 254 test_id = self.build_id(self.test_counter) 255 self.test_counter += 1 256 257 return self.send_command(test_id, test_cmd, test_args) 258 259 def readLongCharacteristicById(self, id, offset, max_bytes): 260 """Reads Long Characteristic value by id. 261 262 Args: 263 id: string, Characteristic identifier. 264 offset: int, The offset to start reading from. 265 max_bytes: int, The max bytes to return. 266 267 Returns: 268 Characteristic value if success, error string if error. 269 """ 270 test_cmd = "gatt_client_facade.GattcReadLongCharacteristicById" 271 test_args = { 272 "identifier": id, 273 "offset": offset, 274 "max_bytes": max_bytes 275 } 276 test_id = self.build_id(self.test_counter) 277 self.test_counter += 1 278 279 return self.send_command(test_id, test_cmd, test_args) 280 281 def connectToService(self, id, service_id): 282 """ Connect to a specific Service specified by id. 283 284 Args: 285 id: string, Service id. 286 287 Returns: 288 None if success, error string if error. 289 """ 290 test_cmd = "gatt_client_facade.GattcConnectToService" 291 test_args = {"identifier": id, "service_identifier": service_id} 292 test_id = self.build_id(self.test_counter) 293 self.test_counter += 1 294 295 return self.send_command(test_id, test_cmd, test_args) 296 297 def bleConnectToPeripheral(self, id): 298 """Connects to a peripheral specified by id. 299 300 Args: 301 id: string, Peripheral identifier to connect to. 302 303 Returns: 304 Dictionary, List of Service Info if success, error string if error. 305 """ 306 test_cmd = "gatt_client_facade.BleConnectPeripheral" 307 test_args = {"identifier": id} 308 test_id = self.build_id(self.test_counter) 309 self.test_counter += 1 310 311 return self.send_command(test_id, test_cmd, test_args) 312 313 def bleDisconnectPeripheral(self, id): 314 """Disconnects from a peripheral specified by id. 315 316 Args: 317 id: string, Peripheral identifier to disconnect from. 318 319 Returns: 320 Dictionary, None if success, error string if error. 321 """ 322 test_cmd = "gatt_client_facade.BleDisconnectPeripheral" 323 test_args = {"identifier": id} 324 test_id = self.build_id(self.test_counter) 325 self.test_counter += 1 326 327 return self.send_command(test_id, test_cmd, test_args)