1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import dbus 18import dbus.mainloop.glib 19import dbus.service 20import logging 21import time 22import os 23import subprocess 24 25from acts.test_utils.coex.coex_constants import ADAPTER_INTERFACE 26from acts.test_utils.coex.coex_constants import CALL_MANAGER 27from acts.test_utils.coex.coex_constants import CMD_FIND 28from acts.test_utils.coex.coex_constants import CMD_HCI 29from acts.test_utils.coex.coex_constants import CMD_PATH 30from acts.test_utils.coex.coex_constants import commands 31from acts.test_utils.coex.coex_constants import DBUS_INTERFACE 32from acts.test_utils.coex.coex_constants import DEVICE_INTERFACE 33from acts.test_utils.coex.coex_constants import DISCOVERY_TIME 34from acts.test_utils.coex.coex_constants import KILL_CMD 35from acts.test_utils.coex.coex_constants import MEDIA_CONTROL_INTERFACE 36from acts.test_utils.coex.coex_constants import MEDIA_PLAY_INTERFACE 37from acts.test_utils.coex.coex_constants import OBJECT_MANGER 38from acts.test_utils.coex.coex_constants import OFONO_MANAGER 39from acts.test_utils.coex.coex_constants import PROPERTIES 40from acts.test_utils.coex.coex_constants import PROPERTIES_CHANGED 41from acts.test_utils.coex.coex_constants import SERVICE_NAME 42from acts.test_utils.coex.coex_constants import VOICE_CALL 43from acts.test_utils.coex.coex_constants import WAIT_TIME 44from acts.utils import create_dir 45 46from gi.repository import GObject 47 48dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 49 50 51class BluezUtils(): 52 53 def __init__(self, profile, password, log_path): 54 devices = {} 55 self.device_interface = False 56 self.mainloop = 0 57 self.property_changed = False 58 self.bd_address = None 59 self.list_daemon = ["dbus", "bluez"] 60 self.log_path = os.path.join(log_path, "bluez") 61 create_dir(self.log_path) 62 self.sudo_command = "echo " + password + " | sudo -S " 63 if profile.lower() == "hfp": 64 self.list_daemon.append("ofonod") 65 elif profile.lower() == "a2dp": 66 self.list_daemon.append("pulseaudio") 67 elif profile.lower() == "multiprofile": 68 self.list_daemon.extend(["pulseaudio", "ofonod"]) 69 if not self.hci_config("up"): 70 logging.error("Can't get device info: No such device") 71 self.run_daemons() 72 self.bus = dbus.SystemBus() 73 self.bus.add_signal_receiver( 74 self.properties_changed, 75 dbus_interface=DBUS_INTERFACE, 76 signal_name=PROPERTIES_CHANGED, 77 arg0=DEVICE_INTERFACE, 78 path_keyword="path") 79 self.om = dbus.Interface( 80 self.bus.get_object(SERVICE_NAME, "/"), OBJECT_MANGER) 81 objects = self.om.GetManagedObjects() 82 for path, interfaces in objects.items(): 83 if ADAPTER_INTERFACE in interfaces: 84 devices[path] = interfaces[ADAPTER_INTERFACE] 85 self.adapter = self.find_adapter(0) 86 87 def hci_config(self, hci_params): 88 """ Sets the interface up or down based on hci_params 89 90 Args: 91 hci_params : String denoting hciconfig paramters. 92 93 Returns: 94 True if success, False otherwise. 95 """ 96 output, err = self.run_subprocess(CMD_HCI) 97 if hci_params not in output.decode().split("\n")[2].lower(): 98 cmd = self.sudo_command + CMD_HCI + hci_params 99 out, err = self.run_subprocess(cmd) 100 if err: 101 logging.debug("command HCI not executed error = {}".format(err)) 102 return False 103 return True 104 105 def run_daemons(self): 106 """Runs all the bluez related daemons which are in the list.""" 107 self.kill_all_daemon() 108 for daemon in self.list_daemon: 109 cmd = CMD_PATH + commands[daemon] 110 if daemon != "pulseaudio": 111 cmd = self.sudo_command + cmd 112 113 # is_async = True if process runs on background. 114 self.run_subprocess(cmd, is_async=True) 115 time.sleep(5) 116 117 def kill_all_daemon(self): 118 """Kills all the bluez related daemons running.""" 119 for daemon in self.list_daemon: 120 cmd = CMD_FIND + CMD_PATH + daemon \ 121 + "|grep -v grep|grep -v sudo" 122 result, err = self.run_subprocess(cmd) 123 if err: 124 logging.info("cmd {} not executed".format(cmd)) 125 if result: 126 data = result.decode().split("\n") 127 command = self.sudo_command + KILL_CMD 128 for i in range(len(data) - 1): 129 pid = data[i].split()[3] 130 cmd = command + pid 131 out, err = self.run_subprocess(cmd) 132 if err: 133 logging.error("command {} with error {}".format( 134 err, cmd)) 135 logging.error("process with pid {}" 136 " not terminated".format(pid)) 137 138 def run_subprocess(self, cmd, is_async=None, error_fd=subprocess.PIPE): 139 """Runs subprocess in the background and moves error data to a file. 140 141 Args: 142 cmd: command to be executed. 143 is_async: Boolean value to set background process. 144 error_fd: handler for error file. 145 146 Returns: 147 Output of proc.communicate(), if is_async is not set. 148 """ 149 if is_async: 150 is_async = os.setpgrp 151 if "dbus" not in cmd: 152 file_name = os.path.join(self.log_path, 153 (cmd.split("/")[4].split("-")[0])) + "_daemon_error.txt" 154 error_fd = open(file_name, "w+") 155 156 proc = subprocess.Popen( 157 cmd, 158 stdout=subprocess.PIPE, 159 stderr=error_fd, 160 preexec_fn=is_async, 161 shell=True) 162 logging.debug("Start standing subprocess with cmd: %s", cmd) 163 if is_async: 164 return 165 return proc.communicate() 166 167 def register_signal(self): 168 """Start signal_dispatcher""" 169 self.mainloop = GObject.MainLoop() 170 self.mainloop.run() 171 172 def unregister_signal(self): 173 """Stops signal_dispatcher""" 174 self.mainloop.quit() 175 176 def get_properties(self, props, path, check): 177 """Return's status for parameter check . 178 179 Args: 180 props:dbus interface 181 path:path for getting status 182 check:String for which status need to be checked 183 """ 184 return props.Get(path, check) 185 186 def properties_changed(self, interface, changed, invalidated, path): 187 """Function to be executed when specified signal is caught""" 188 if path == "/org/bluez/hci0/dev_" + (self.bd_address).replace(":", "_"): 189 self.unregister_signal() 190 return 191 192 def get_managed_objects(self): 193 """Gets the instance of all the objects in dbus. 194 195 Returns: 196 Dictionary containing path and interface of 197 all the instance in dbus. 198 """ 199 manager = dbus.Interface( 200 self.bus.get_object(SERVICE_NAME, "/"), OBJECT_MANGER) 201 return manager.GetManagedObjects() 202 203 def find_adapter(self, pattern=None): 204 """Gets the adapter interface with specified pattern in dbus. 205 206 Args: 207 pattern: Adapter name pattern to be found out. 208 209 Returns: 210 Adapter interface with specified pattern. 211 """ 212 return self.find_adapter_in_objects(self.get_managed_objects(), pattern) 213 214 def find_adapter_in_objects(self, objects, pattern=None): 215 """Gets the adapter interface with specified pattern in dbus. 216 217 Args: 218 objects: Dictionary containing path and interface of 219 all the instance in dbus. 220 pattern: Adapter name pattern to be found out. 221 222 Returns: 223 Adapter interface if successful else raises an exception. 224 """ 225 for path, ifaces in objects.items(): 226 adapter = ifaces.get(ADAPTER_INTERFACE) 227 if adapter is None: 228 continue 229 if not pattern or pattern == adapter["Address"] or \ 230 path.endswith(pattern): 231 adapter_obj = self.bus.get_object(SERVICE_NAME, path) 232 return dbus.Interface(adapter_obj, ADAPTER_INTERFACE) 233 raise Exception("Bluetooth adapter not found") 234 235 def find_device_in_objects(self, 236 objects, 237 device_address, 238 adapter_pattern=None): 239 """Gets the device interface in objects with specified device 240 address and pattern. 241 242 Args: 243 objects: Dictionary containing path and interface of 244 all the instance in dbus. 245 device_address: Bluetooth interface MAC address of the device 246 which is to be found out. 247 adapter_pattern: Adapter name pattern to be found out. 248 249 Returns: 250 Device interface if successful else raises an exception. 251 """ 252 path_prefix = "" 253 if adapter_pattern: 254 adapter = self.find_adapter_in_objects(objects, adapter_pattern) 255 path_prefix = adapter.object_path 256 for path, ifaces in objects.items(): 257 device = ifaces.get(DEVICE_INTERFACE) 258 if device is None: 259 continue 260 if (device["Address"] == device_address and 261 path.startswith(path_prefix)): 262 device_obj = self.bus.get_object(SERVICE_NAME, path) 263 return dbus.Interface(device_obj, DEVICE_INTERFACE) 264 raise Exception("Bluetooth device not found") 265 266 def get_bluetooth_adapter_address(self): 267 """Gets the bluetooth adapter address. 268 269 Returns: 270 Address of bluetooth adapter. 271 """ 272 path = self.adapter.object_path 273 props = dbus.Interface( 274 self.bus.get_object(SERVICE_NAME, path), PROPERTIES) 275 address = props.Get(ADAPTER_INTERFACE, "Address") 276 return address 277 278 def find_device(self, device_address): 279 """Discovers the DUT and returns its dbus interface. 280 281 Args: 282 Device_address: Bluetooth interface MAC address of the device. 283 284 Returns: 285 Dbus interface of the device. 286 """ 287 self.bd_address = device_address 288 addr = "dev_" + str(device_address).replace(":", "_") 289 device_path = "org/bluez/hci0/" + addr 290 self.adapter.StartDiscovery() 291 time.sleep(DISCOVERY_TIME) 292 objects = self.om.GetManagedObjects() 293 for path, interfaces in objects.items(): 294 if device_path in path: 295 obj = self.bus.get_object(SERVICE_NAME, path) 296 self.device_interface = dbus.Interface(obj, DEVICE_INTERFACE) 297 self.adapter.StopDiscovery() 298 if not self.device_interface: 299 self.adapter.StopDiscovery() 300 return False 301 return True 302 303 def media_control_iface(self, device_address): 304 """Gets the dbus media control interface for the device 305 and returns it. 306 307 Args: 308 device_address: Bluetooth interface MAC address of the device. 309 310 Returns: 311 Dbus media control interface of the device. 312 """ 313 control_iface = dbus.Interface( 314 self.bus.get_object( 315 SERVICE_NAME, 316 '/org/bluez/hci0/dev_' + device_address.replace(":", "_")), 317 MEDIA_CONTROL_INTERFACE) 318 return control_iface 319 320 def get_a2dp_interface(self, device_address): 321 """Gets the dbus media interface for the device. 322 323 Args: 324 device_address: Bluetooth interface MAC address of the device. 325 326 Returns: 327 Dbus media interface of the device. 328 """ 329 a2dp_interface = dbus.Interface( 330 self.bus.get_object(SERVICE_NAME, '/org/bluez/hci0/dev_' + 331 device_address.replace(":", "_") + '/player0'), 332 MEDIA_PLAY_INTERFACE) 333 return a2dp_interface 334 335 def ofo_iface(self): 336 """Gets dbus hfp interface for the device. 337 338 Returns: 339 Dbus hfp interface of the device. 340 """ 341 manager = dbus.Interface( 342 self.bus.get_object('org.ofono', '/'), OFONO_MANAGER) 343 modems = manager.GetModems() 344 return modems 345 346 def call_manager(self, path): 347 """Gets Ofono(HFP) interface for the device. 348 349 Args: 350 path: Ofono interface path of the device. 351 352 Returns: 353 Ofono interface for the device. 354 """ 355 vcm = dbus.Interface( 356 self.bus.get_object('org.ofono', path), CALL_MANAGER) 357 return vcm 358 359 def answer_call_interface(self, path): 360 """Gets the voice call interface for the device. 361 362 Args 363 path: Voice call path of the device. 364 365 Returns: 366 Interface for the voice call. 367 """ 368 call = dbus.Interface( 369 self.bus.get_object('org.ofono', path), VOICE_CALL) 370 return call 371 372 def pair_bluetooth_device(self): 373 """Pairs the bluez machine with DUT. 374 375 Returns: 376 True if pairing is successful else False. 377 """ 378 self.device_interface.Pair() 379 path = self.device_interface.object_path 380 props = dbus.Interface( 381 self.bus.get_object(SERVICE_NAME, path), PROPERTIES) 382 paired = self.get_properties(props, DEVICE_INTERFACE, "Paired") 383 return paired 384 385 def connect_bluetooth_device(self, *args): 386 """Connects the bluez machine to DUT with the specified 387 profile. 388 389 Args: 390 uuid: Profile UUID which is to be connected. 391 392 Returns: 393 True if connection is successful else False. 394 """ 395 396 self.register_signal() 397 for uuid in args: 398 self.device_interface.ConnectProfile(uuid) 399 path = self.device_interface.object_path 400 props = dbus.Interface( 401 self.bus.get_object(SERVICE_NAME, path), PROPERTIES) 402 connect = self.get_properties(props, DEVICE_INTERFACE, "Connected") 403 return connect 404 405 def disconnect_bluetooth_profile(self, uuid, pri_ad): 406 """Disconnects the DUT for the specified profile. 407 408 Args: 409 uuid: Profile UUID which is to be disconnected. 410 pri_ad: An android device object. 411 412 Returns: 413 True if disconnection of profile is successful else False. 414 """ 415 416 self.register_signal() 417 self.device_interface.DisconnectProfile(uuid) 418 time.sleep(10) #Time taken to check disconnection. 419 connected_devices = pri_ad.droid.bluetoothGetConnectedDevices() 420 if len(connected_devices) > 0: 421 return False 422 return True 423 424 def play_media(self, address): 425 """Initiate media play for the specified device. 426 427 Args: 428 address: Bluetooth interface MAC address of the device. 429 430 Returns: 431 "playing" if successful else "stopped" or "paused". 432 """ 433 self.register_signal() 434 a2dp = self.media_control_iface(address) 435 time.sleep(WAIT_TIME) 436 a2dp.Play() 437 play_pause = self.get_a2dp_interface(address) 438 path = play_pause.object_path 439 time.sleep(WAIT_TIME) 440 props = dbus.Interface( 441 self.bus.get_object(SERVICE_NAME, path), PROPERTIES) 442 status = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Status") 443 return status 444 445 def pause_media(self, address): 446 """Pauses the media palyer for the specified device. 447 448 Args: 449 address: Bluetooth interface MAC address of the device. 450 451 Return: 452 "paused" or "stopped" if successful else "playing". 453 """ 454 self.register_signal() 455 a2dp = self.get_a2dp_interface(address) 456 time.sleep(WAIT_TIME) 457 a2dp.Pause() 458 path = a2dp.object_path 459 props = dbus.Interface( 460 self.bus.get_object(SERVICE_NAME, path), PROPERTIES) 461 status = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Status") 462 return status 463 464 def remove_bluetooth_device(self, address): 465 """Removes the device from the paired list. 466 467 Args: 468 address: Bluetooth interface MAC address of the device. 469 470 Returns: 471 True if removing of device is successful else False. 472 """ 473 managed_objects = self.get_managed_objects() 474 adapter = self.find_adapter_in_objects(managed_objects) 475 try: 476 dev = self.find_device_in_objects(managed_objects, address) 477 path = dev.object_path 478 except: 479 return False 480 481 adapter.RemoveDevice(path) 482 return True 483 484 def stop_media(self, address): 485 """Stops the media player for the specified device. 486 487 Args: 488 address: Bluetooth interface MAC address of the device. 489 490 Returns: 491 "paused" or "stopped" if successful else "playing". 492 """ 493 self.register_signal() 494 a2dp = self.get_a2dp_interface(address) 495 time.sleep(WAIT_TIME) 496 a2dp.Stop() 497 path = a2dp.object_path 498 props = dbus.Interface( 499 self.bus.get_object(SERVICE_NAME, path), PROPERTIES) 500 status = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Status") 501 return status 502 503 def skip_next(self, address): 504 """Skips to Next track in media player. 505 506 Args: 507 address: Bluetooth interface MAC address of the device. 508 509 Returns: 510 True if the media track change is successful else False. 511 """ 512 self.register_signal() 513 a2dp = self.get_a2dp_interface(address) 514 time.sleep(WAIT_TIME) 515 path = a2dp.object_path 516 props = dbus.Interface( 517 self.bus.get_object(SERVICE_NAME, path), PROPERTIES) 518 track = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Track") 519 Title = track['Title'] 520 a2dp.Next() 521 time.sleep(WAIT_TIME) 522 track = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Track") 523 if Title == track['Title']: 524 return False 525 return True 526 527 def skip_previous(self, address): 528 """Skips to previous track in media player. 529 530 Args: 531 address: Buetooth interface MAC address of the device. 532 533 Returns: 534 True if media track change is successful else False. 535 """ 536 a2dp = self.get_a2dp_interface(address) 537 time.sleep(WAIT_TIME) 538 path = a2dp.object_path 539 props = dbus.Interface( 540 self.bus.get_object(SERVICE_NAME, path), PROPERTIES) 541 track = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Track") 542 Title = track['Title'] 543 a2dp.Previous() 544 a2dp.Previous() 545 time.sleep(WAIT_TIME) 546 track = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Track") 547 if Title == track['Title']: 548 return False 549 return True 550 551 def volumeup(self, address): 552 """Increases volume of the media playing. 553 554 Args: 555 address: Buetooth interface MAC address of the device. 556 557 Returns: 558 True if successful. 559 """ 560 a2dp = self.media_control_iface(address) 561 a2dp.VolumeUp() 562 return True 563 564 def volumedown(self, address): 565 """Decreases volume of the media playing. 566 567 Args: 568 address: Buetooth interface MAC address of the device. 569 570 Returns: 571 True if successful. 572 """ 573 a2dp = self.media_control_iface(address) 574 a2dp.VolumeDown() 575 return True 576 577 def call_volume(self, duration): 578 """Performs Speaker gain and microphone gain when call is active. 579 580 Args: 581 duration:time in seconds to increase volume continuously. 582 583 Returns: 584 True if Property has been changed, otherwise False. 585 """ 586 vol_level = 0 587 modems = self.ofo_iface() 588 path = modems[0][0] 589 props = dbus.Interface( 590 self.bus.get_object('org.ofono', path), 'org.ofono.CallVolume') 591 start_time = time.time() 592 while (time.time()) < (start_time + duration): 593 props.SetProperty("SpeakerVolume", dbus.Byte(int(vol_level))) 594 time.sleep(WAIT_TIME) 595 props.SetProperty("MicrophoneVolume", dbus.Byte(int(vol_level))) 596 vol_level += 5 597 return True 598 599 def avrcp_actions(self, address): 600 """Performs AVRCP actions for the device 601 602 Args: 603 address: Bluetooth interface MAC address of the device. 604 605 Returns: 606 True if avrcp actions are performed else False. 607 """ 608 if not self.skip_next(address): 609 logging.info("skip Next failed") 610 return False 611 time.sleep(WAIT_TIME) 612 613 if not self.skip_previous(address): 614 logging.info("skip previous failed") 615 return False 616 time.sleep(WAIT_TIME) 617 618 if not self.volumeup(address): 619 logging.info("Volume up failed") 620 return False 621 time.sleep(WAIT_TIME) 622 623 if not self.volumedown(address): 624 logging.info("Volume down failed") 625 return False 626 return True 627 628 def initiate_and_disconnect_call_from_hf(self, phone_no, duration): 629 """Initiates the call from bluez for the specified phone number. 630 631 Args: 632 phone_no: Phone number to which the call should be made. 633 duration: Time till which the call should be active. 634 635 Returns: 636 True if the call is initiated and disconnected else False. 637 """ 638 modems = self.ofo_iface() 639 modem = modems[0][0] 640 hide_callerid = "default" 641 vcm = self.call_manager(modem) 642 time.sleep(WAIT_TIME) 643 path = vcm.Dial(phone_no, hide_callerid) 644 if 'voicecall' not in path: 645 return False 646 time.sleep(duration) 647 vcm.HangupAll() 648 return True 649 650 def answer_call(self, duration): 651 """Answers the incoming call from bluez. 652 653 Args: 654 duration: Time till which the call should be active. 655 656 Returns: 657 True if call is answered else False. 658 """ 659 modems = self.ofo_iface() 660 for path, properties in modems: 661 if CALL_MANAGER not in properties["Interfaces"]: 662 continue 663 mgr = self.call_manager(path) 664 calls = mgr.GetCalls() 665 for path, properties in calls: 666 state = properties["State"] 667 if state != "incoming": 668 continue 669 call = self.answer_call_interface(path) 670 call.Answer() 671 time.sleep(duration) 672 call.Hangup() 673 return True 674