1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts.controllers.fuchsia_lib.base_lib import BaseLib 18 19 20class FuchsiaAvdtpLib(BaseLib): 21 def __init__(self, addr, tc, client_id): 22 self.address = addr 23 self.test_counter = tc 24 self.client_id = client_id 25 26 def init(self, initiator_delay=None): 27 """Initializes the AVDTP service with optional initiator_delay. 28 29 Args: 30 initiator_delay: Optional. The delay in milliseconds to start a 31 stream. 32 33 Returns: 34 Dictionary, None if success, error if error. 35 """ 36 test_cmd = "avdtp_facade.AvdtpInit" 37 38 test_args = {"initiator_delay": initiator_delay} 39 test_id = self.build_id(self.test_counter) 40 self.test_counter += 1 41 42 return self.send_command(test_id, test_cmd, test_args) 43 44 def getConnectedPeers(self): 45 """Gets the AVDTP connected peers. 46 47 Returns: 48 Dictionary, None if success, error if error. 49 """ 50 test_cmd = "avdtp_facade.AvdtpGetConnectedPeers" 51 test_args = {} 52 test_id = self.build_id(self.test_counter) 53 self.test_counter += 1 54 55 return self.send_command(test_id, test_cmd, test_args) 56 57 def setConfiguration(self, peer_id): 58 """Sends the AVDTP command to input peer_id: set configuration 59 60 Args: 61 peer_id: The peer id to send the AVDTP command to. 62 63 Returns: 64 Dictionary, None if success, error if error. 65 """ 66 test_cmd = "avdtp_facade.AvdtpSetConfiguration" 67 test_args = {"identifier": peer_id} 68 test_id = self.build_id(self.test_counter) 69 self.test_counter += 1 70 71 return self.send_command(test_id, test_cmd, test_args) 72 73 def getConfiguration(self, peer_id): 74 """Sends the AVDTP command to input peer_id: get configuration 75 76 Args: 77 peer_id: The peer id to send the AVDTP command to. 78 79 Returns: 80 Dictionary, None if success, error if error. 81 """ 82 test_cmd = "avdtp_facade.AvdtpGetConfiguration" 83 test_args = {"identifier": peer_id} 84 test_id = self.build_id(self.test_counter) 85 self.test_counter += 1 86 87 return self.send_command(test_id, test_cmd, test_args) 88 89 def getCapabilities(self, peer_id): 90 """Sends the AVDTP command to input peer_id: get capabilities 91 92 Args: 93 peer_id: The peer id to send the AVDTP command to. 94 95 Returns: 96 Dictionary, None if success, error if error. 97 """ 98 test_cmd = "avdtp_facade.AvdtpGetCapabilities" 99 test_args = {"identifier": peer_id} 100 test_id = self.build_id(self.test_counter) 101 self.test_counter += 1 102 103 return self.send_command(test_id, test_cmd, test_args) 104 105 def getAllCapabilities(self, peer_id): 106 """Sends the AVDTP command to input peer_id: get all capabilities 107 108 Args: 109 peer_id: The peer id to send the AVDTP command to. 110 111 Returns: 112 Dictionary, None if success, error if error. 113 """ 114 test_cmd = "avdtp_facade.AvdtpGetAllCapabilities" 115 test_args = {"identifier": peer_id} 116 test_id = self.build_id(self.test_counter) 117 self.test_counter += 1 118 119 return self.send_command(test_id, test_cmd, test_args) 120 121 def reconfigureStream(self, peer_id): 122 """Sends the AVDTP command to input peer_id: reconfigure stream 123 124 Args: 125 peer_id: The peer id to send the AVDTP command to. 126 127 Returns: 128 Dictionary, None if success, error if error. 129 """ 130 test_cmd = "avdtp_facade.AvdtpReconfigureStream" 131 test_args = {"identifier": peer_id} 132 test_id = self.build_id(self.test_counter) 133 self.test_counter += 1 134 135 return self.send_command(test_id, test_cmd, test_args) 136 137 def suspendStream(self, peer_id): 138 """Sends the AVDTP command to input peer_id: suspend stream 139 Args: 140 peer_id: The peer id to send the AVDTP command to. 141 142 Returns: 143 Dictionary, None if success, error if error. 144 """ 145 test_cmd = "avdtp_facade.AvdtpSuspendStream" 146 test_args = {"identifier": peer_id} 147 test_id = self.build_id(self.test_counter) 148 self.test_counter += 1 149 150 return self.send_command(test_id, test_cmd, test_args) 151 152 def suspendAndReconfigure(self, peer_id): 153 """Sends the AVDTP command to input peer_id: suspend and reconfigure 154 155 Args: 156 peer_id: The peer id to send the AVDTP command to. 157 158 Returns: 159 Dictionary, None if success, error if error. 160 """ 161 test_cmd = "avdtp_facade.AvdtpSuspendAndReconfigure" 162 test_args = {"identifier": peer_id} 163 test_id = self.build_id(self.test_counter) 164 self.test_counter += 1 165 166 return self.send_command(test_id, test_cmd, test_args) 167 168 def releaseStream(self, peer_id): 169 """Sends the AVDTP command to input peer_id: release stream 170 171 Args: 172 peer_id: The peer id to send the AVDTP command to. 173 174 Returns: 175 Dictionary, None if success, error if error. 176 """ 177 test_cmd = "avdtp_facade.AvdtpReleaseStream" 178 test_args = {"identifier": peer_id} 179 test_id = self.build_id(self.test_counter) 180 self.test_counter += 1 181 182 return self.send_command(test_id, test_cmd, test_args) 183 184 def establishStream(self, peer_id): 185 """Sends the AVDTP command to input peer_id: establish stream 186 187 Args: 188 peer_id: The peer id to send the AVDTP command to. 189 190 Returns: 191 Dictionary, None if success, error if error. 192 """ 193 test_cmd = "avdtp_facade.AvdtpEstablishStream" 194 test_args = {"identifier": peer_id} 195 test_id = self.build_id(self.test_counter) 196 self.test_counter += 1 197 198 return self.send_command(test_id, test_cmd, test_args) 199 200 def startStream(self, peer_id): 201 """Sends the AVDTP command to input peer_id: start stream 202 203 Args: 204 peer_id: The peer id to send the AVDTP command to. 205 206 Returns: 207 Dictionary, None if success, error if error. 208 """ 209 test_cmd = "avdtp_facade.AvdtpStartStream" 210 test_args = {"identifier": peer_id} 211 test_id = self.build_id(self.test_counter) 212 self.test_counter += 1 213 214 return self.send_command(test_id, test_cmd, test_args) 215 216 def abortStream(self, peer_id): 217 """Sends the AVDTP command to input peer_id: abort stream 218 219 Args: 220 peer_id: The peer id to send the AVDTP command to. 221 222 Returns: 223 Dictionary, None if success, error if error. 224 """ 225 test_cmd = "avdtp_facade.AvdtpAbortStream" 226 test_args = {"identifier": peer_id} 227 test_id = self.build_id(self.test_counter) 228 self.test_counter += 1 229 230 return self.send_command(test_id, test_cmd, test_args) 231 232 def establishStream(self, peer_id): 233 """Sends the AVDTP command to input peer_id: establish stream 234 235 Args: 236 peer_id: The peer id to send the AVDTP command to. 237 238 Returns: 239 Dictionary, None if success, error if error. 240 """ 241 test_cmd = "avdtp_facade.AvdtpEstablishStream" 242 test_args = {"identifier": peer_id} 243 test_id = self.build_id(self.test_counter) 244 self.test_counter += 1 245 246 return self.send_command(test_id, test_cmd, test_args) 247 248 def removeService(self): 249 """Removes the AVDTP service from the Fuchsia device 250 251 Returns: 252 Dictionary, None if success, error if error. 253 """ 254 test_cmd = "avdtp_facade.AvdtpRemoveService" 255 test_args = {} 256 test_id = self.build_id(self.test_counter) 257 self.test_counter += 1 258 259 return self.send_command(test_id, test_cmd, test_args) 260