• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import dbus
18import dbus.mainloop.glib
19import dbus.service
20import logging
21import time
22import os
23import subprocess
24
25from acts.test_utils.coex.coex_constants import ADAPTER_INTERFACE
26from acts.test_utils.coex.coex_constants import CALL_MANAGER
27from acts.test_utils.coex.coex_constants import CMD_FIND
28from acts.test_utils.coex.coex_constants import CMD_HCI
29from acts.test_utils.coex.coex_constants import CMD_PATH
30from acts.test_utils.coex.coex_constants import commands
31from acts.test_utils.coex.coex_constants import DBUS_INTERFACE
32from acts.test_utils.coex.coex_constants import DEVICE_INTERFACE
33from acts.test_utils.coex.coex_constants import DISCOVERY_TIME
34from acts.test_utils.coex.coex_constants import KILL_CMD
35from acts.test_utils.coex.coex_constants import MEDIA_CONTROL_INTERFACE
36from acts.test_utils.coex.coex_constants import MEDIA_PLAY_INTERFACE
37from acts.test_utils.coex.coex_constants import OBJECT_MANGER
38from acts.test_utils.coex.coex_constants import OFONO_MANAGER
39from acts.test_utils.coex.coex_constants import PROPERTIES
40from acts.test_utils.coex.coex_constants import PROPERTIES_CHANGED
41from acts.test_utils.coex.coex_constants import SERVICE_NAME
42from acts.test_utils.coex.coex_constants import VOICE_CALL
43from acts.test_utils.coex.coex_constants import WAIT_TIME
44from acts.utils import create_dir
45
46from gi.repository import GObject
47
48dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
49
50
51class BluezUtils():
52
53    def __init__(self, profile, password, log_path):
54        devices = {}
55        self.device_interface = False
56        self.mainloop = 0
57        self.property_changed = False
58        self.bd_address = None
59        self.list_daemon = ["dbus", "bluez"]
60        self.log_path = os.path.join(log_path, "bluez")
61        create_dir(self.log_path)
62        self.sudo_command = "echo " + password + " | sudo -S "
63        if profile.lower() == "hfp":
64            self.list_daemon.append("ofonod")
65        elif profile.lower() == "a2dp":
66            self.list_daemon.append("pulseaudio")
67        elif profile.lower() == "multiprofile":
68            self.list_daemon.extend(["pulseaudio", "ofonod"])
69        if not self.hci_config("up"):
70            logging.error("Can't get device info: No such device")
71        self.run_daemons()
72        self.bus = dbus.SystemBus()
73        self.bus.add_signal_receiver(
74            self.properties_changed,
75            dbus_interface=DBUS_INTERFACE,
76            signal_name=PROPERTIES_CHANGED,
77            arg0=DEVICE_INTERFACE,
78            path_keyword="path")
79        self.om = dbus.Interface(
80            self.bus.get_object(SERVICE_NAME, "/"), OBJECT_MANGER)
81        objects = self.om.GetManagedObjects()
82        for path, interfaces in objects.items():
83            if ADAPTER_INTERFACE in interfaces:
84                devices[path] = interfaces[ADAPTER_INTERFACE]
85                self.adapter = self.find_adapter(0)
86
87    def hci_config(self, hci_params):
88        """ Sets the interface up or down based on hci_params
89
90        Args:
91            hci_params : String denoting hciconfig paramters.
92
93        Returns:
94            True if success, False otherwise.
95        """
96        output, err = self.run_subprocess(CMD_HCI)
97        if hci_params not in output.decode().split("\n")[2].lower():
98            cmd = self.sudo_command + CMD_HCI + hci_params
99            out, err = self.run_subprocess(cmd)
100            if err:
101                logging.debug("command HCI not executed error = {}".format(err))
102                return False
103        return True
104
105    def run_daemons(self):
106        """Runs all the bluez related daemons which are in the list."""
107        self.kill_all_daemon()
108        for daemon in self.list_daemon:
109            cmd = CMD_PATH + commands[daemon]
110            if daemon != "pulseaudio":
111                cmd = self.sudo_command + cmd
112
113            # is_async = True if process runs on background.
114            self.run_subprocess(cmd, is_async=True)
115            time.sleep(5)
116
117    def kill_all_daemon(self):
118        """Kills all the bluez related daemons running."""
119        for daemon in self.list_daemon:
120            cmd = CMD_FIND + CMD_PATH + daemon \
121                  + "|grep -v grep|grep -v sudo"
122            result, err = self.run_subprocess(cmd)
123            if err:
124                logging.info("cmd {} not executed".format(cmd))
125            if result:
126                data = result.decode().split("\n")
127                command = self.sudo_command + KILL_CMD
128                for i in range(len(data) - 1):
129                    pid = data[i].split()[3]
130                    cmd = command + pid
131                    out, err = self.run_subprocess(cmd)
132                    if err:
133                        logging.error("command {} with error {}".format(
134                            err, cmd))
135                        logging.error("process with pid {}"
136                                      " not terminated".format(pid))
137
138    def run_subprocess(self, cmd, is_async=None, error_fd=subprocess.PIPE):
139        """Runs subprocess in the background and moves error data to a file.
140
141        Args:
142            cmd: command to be executed.
143            is_async: Boolean value to set background process.
144            error_fd: handler for error file.
145
146        Returns:
147            Output of proc.communicate(), if is_async is not set.
148        """
149        if is_async:
150            is_async = os.setpgrp
151            if "dbus" not in cmd:
152                file_name = os.path.join(self.log_path,
153                    (cmd.split("/")[4].split("-")[0])) + "_daemon_error.txt"
154                error_fd = open(file_name, "w+")
155
156        proc = subprocess.Popen(
157            cmd,
158            stdout=subprocess.PIPE,
159            stderr=error_fd,
160            preexec_fn=is_async,
161            shell=True)
162        logging.debug("Start standing subprocess with cmd: %s", cmd)
163        if is_async:
164            return
165        return proc.communicate()
166
167    def register_signal(self):
168        """Start signal_dispatcher"""
169        self.mainloop = GObject.MainLoop()
170        self.mainloop.run()
171
172    def unregister_signal(self):
173        """Stops signal_dispatcher"""
174        self.mainloop.quit()
175
176    def get_properties(self, props, path, check):
177        """Return's status for parameter check .
178
179        Args:
180            props:dbus interface
181            path:path for getting status
182            check:String for which status need to be checked
183        """
184        return props.Get(path, check)
185
186    def properties_changed(self, interface, changed, invalidated, path):
187        """Function to be executed when specified signal is caught"""
188        if path == "/org/bluez/hci0/dev_" + (self.bd_address).replace(":", "_"):
189            self.unregister_signal()
190            return
191
192    def get_managed_objects(self):
193        """Gets the instance of all the objects in dbus.
194
195        Returns:
196            Dictionary containing path and interface of
197            all the instance in dbus.
198        """
199        manager = dbus.Interface(
200            self.bus.get_object(SERVICE_NAME, "/"), OBJECT_MANGER)
201        return manager.GetManagedObjects()
202
203    def find_adapter(self, pattern=None):
204        """Gets the adapter interface with specified pattern in dbus.
205
206        Args:
207            pattern: Adapter name pattern to be found out.
208
209        Returns:
210             Adapter interface with specified pattern.
211        """
212        return self.find_adapter_in_objects(self.get_managed_objects(), pattern)
213
214    def find_adapter_in_objects(self, objects, pattern=None):
215        """Gets the adapter interface with specified pattern in dbus.
216
217        Args:
218            objects: Dictionary containing path and interface of
219            all the instance in dbus.
220            pattern: Adapter name pattern to be found out.
221
222        Returns:
223             Adapter interface if successful else raises an exception.
224        """
225        for path, ifaces in objects.items():
226            adapter = ifaces.get(ADAPTER_INTERFACE)
227            if adapter is None:
228                continue
229            if not pattern or pattern == adapter["Address"] or \
230                    path.endswith(pattern):
231                adapter_obj = self.bus.get_object(SERVICE_NAME, path)
232                return dbus.Interface(adapter_obj, ADAPTER_INTERFACE)
233        raise Exception("Bluetooth adapter not found")
234
235    def find_device_in_objects(self,
236                               objects,
237                               device_address,
238                               adapter_pattern=None):
239        """Gets the device interface in objects with specified device
240        address and pattern.
241
242        Args:
243            objects: Dictionary containing path and interface of
244            all the instance in dbus.
245            device_address: Bluetooth interface MAC address of the device
246            which is to be found out.
247            adapter_pattern: Adapter name pattern to be found out.
248
249        Returns:
250             Device interface if successful else raises an exception.
251        """
252        path_prefix = ""
253        if adapter_pattern:
254            adapter = self.find_adapter_in_objects(objects, adapter_pattern)
255            path_prefix = adapter.object_path
256        for path, ifaces in objects.items():
257            device = ifaces.get(DEVICE_INTERFACE)
258            if device is None:
259                continue
260            if (device["Address"] == device_address and
261                    path.startswith(path_prefix)):
262                device_obj = self.bus.get_object(SERVICE_NAME, path)
263                return dbus.Interface(device_obj, DEVICE_INTERFACE)
264        raise Exception("Bluetooth device not found")
265
266    def get_bluetooth_adapter_address(self):
267        """Gets the bluetooth adapter address.
268
269        Returns:
270            Address of bluetooth adapter.
271        """
272        path = self.adapter.object_path
273        props = dbus.Interface(
274            self.bus.get_object(SERVICE_NAME, path), PROPERTIES)
275        address = props.Get(ADAPTER_INTERFACE, "Address")
276        return address
277
278    def find_device(self, device_address):
279        """Discovers the DUT and returns its dbus interface.
280
281        Args:
282            Device_address: Bluetooth interface MAC address of the device.
283
284        Returns:
285            Dbus interface of the device.
286        """
287        self.bd_address = device_address
288        addr = "dev_" + str(device_address).replace(":", "_")
289        device_path = "org/bluez/hci0/" + addr
290        self.adapter.StartDiscovery()
291        time.sleep(DISCOVERY_TIME)
292        objects = self.om.GetManagedObjects()
293        for path, interfaces in objects.items():
294            if device_path in path:
295                obj = self.bus.get_object(SERVICE_NAME, path)
296                self.device_interface = dbus.Interface(obj, DEVICE_INTERFACE)
297                self.adapter.StopDiscovery()
298        if not self.device_interface:
299            self.adapter.StopDiscovery()
300            return False
301        return True
302
303    def media_control_iface(self, device_address):
304        """Gets the dbus media control interface for the device
305        and returns it.
306
307        Args:
308            device_address: Bluetooth interface MAC address of the device.
309
310        Returns:
311            Dbus media control interface of the device.
312        """
313        control_iface = dbus.Interface(
314            self.bus.get_object(
315                SERVICE_NAME,
316                '/org/bluez/hci0/dev_' + device_address.replace(":", "_")),
317            MEDIA_CONTROL_INTERFACE)
318        return control_iface
319
320    def get_a2dp_interface(self, device_address):
321        """Gets the dbus media interface for the device.
322
323        Args:
324            device_address: Bluetooth interface MAC address of the device.
325
326        Returns:
327            Dbus media interface of the device.
328        """
329        a2dp_interface = dbus.Interface(
330            self.bus.get_object(SERVICE_NAME, '/org/bluez/hci0/dev_' +
331                                device_address.replace(":", "_") + '/player0'),
332            MEDIA_PLAY_INTERFACE)
333        return a2dp_interface
334
335    def ofo_iface(self):
336        """Gets dbus hfp interface for the device.
337
338        Returns:
339            Dbus hfp interface of the device.
340        """
341        manager = dbus.Interface(
342            self.bus.get_object('org.ofono', '/'), OFONO_MANAGER)
343        modems = manager.GetModems()
344        return modems
345
346    def call_manager(self, path):
347        """Gets Ofono(HFP) interface for the device.
348
349        Args:
350            path: Ofono interface path of the device.
351
352        Returns:
353            Ofono interface for the device.
354        """
355        vcm = dbus.Interface(
356            self.bus.get_object('org.ofono', path), CALL_MANAGER)
357        return vcm
358
359    def answer_call_interface(self, path):
360        """Gets the voice call interface for the device.
361
362        Args
363            path: Voice call path of the device.
364
365        Returns:
366             Interface for the voice call.
367        """
368        call = dbus.Interface(
369            self.bus.get_object('org.ofono', path), VOICE_CALL)
370        return call
371
372    def pair_bluetooth_device(self):
373        """Pairs the bluez machine with DUT.
374
375        Returns:
376            True if pairing is successful else False.
377        """
378        self.device_interface.Pair()
379        path = self.device_interface.object_path
380        props = dbus.Interface(
381            self.bus.get_object(SERVICE_NAME, path), PROPERTIES)
382        paired = self.get_properties(props, DEVICE_INTERFACE, "Paired")
383        return paired
384
385    def connect_bluetooth_device(self, *args):
386        """Connects the bluez machine to DUT with the specified
387        profile.
388
389        Args:
390            uuid: Profile UUID which is to be connected.
391
392        Returns:
393            True if connection is successful else False.
394        """
395
396        self.register_signal()
397        for uuid in args:
398            self.device_interface.ConnectProfile(uuid)
399        path = self.device_interface.object_path
400        props = dbus.Interface(
401            self.bus.get_object(SERVICE_NAME, path), PROPERTIES)
402        connect = self.get_properties(props, DEVICE_INTERFACE, "Connected")
403        return connect
404
405    def disconnect_bluetooth_profile(self, uuid, pri_ad):
406        """Disconnects the DUT for the specified profile.
407
408        Args:
409            uuid: Profile UUID which is to be disconnected.
410            pri_ad: An android device object.
411
412        Returns:
413            True if disconnection of profile is successful else False.
414        """
415
416        self.register_signal()
417        self.device_interface.DisconnectProfile(uuid)
418        time.sleep(10)  #Time taken to check disconnection.
419        connected_devices = pri_ad.droid.bluetoothGetConnectedDevices()
420        if len(connected_devices) > 0:
421            return False
422        return True
423
424    def play_media(self, address):
425        """Initiate media play for the specified device.
426
427        Args:
428            address: Bluetooth interface MAC address of the device.
429
430        Returns:
431            "playing" if successful else "stopped" or "paused".
432        """
433        self.register_signal()
434        a2dp = self.media_control_iface(address)
435        time.sleep(WAIT_TIME)
436        a2dp.Play()
437        play_pause = self.get_a2dp_interface(address)
438        path = play_pause.object_path
439        time.sleep(WAIT_TIME)
440        props = dbus.Interface(
441            self.bus.get_object(SERVICE_NAME, path), PROPERTIES)
442        status = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Status")
443        return status
444
445    def pause_media(self, address):
446        """Pauses the media palyer for the specified device.
447
448        Args:
449            address: Bluetooth interface MAC address of the device.
450
451        Return:
452            "paused" or "stopped" if successful else "playing".
453        """
454        self.register_signal()
455        a2dp = self.get_a2dp_interface(address)
456        time.sleep(WAIT_TIME)
457        a2dp.Pause()
458        path = a2dp.object_path
459        props = dbus.Interface(
460            self.bus.get_object(SERVICE_NAME, path), PROPERTIES)
461        status = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Status")
462        return status
463
464    def remove_bluetooth_device(self, address):
465        """Removes the device from the paired list.
466
467        Args:
468            address: Bluetooth interface MAC address of the device.
469
470        Returns:
471            True if removing of device is successful else False.
472        """
473        managed_objects = self.get_managed_objects()
474        adapter = self.find_adapter_in_objects(managed_objects)
475        try:
476            dev = self.find_device_in_objects(managed_objects, address)
477            path = dev.object_path
478        except:
479            return False
480
481        adapter.RemoveDevice(path)
482        return True
483
484    def stop_media(self, address):
485        """Stops the media player for the specified device.
486
487        Args:
488            address: Bluetooth interface MAC address of the device.
489
490        Returns:
491            "paused" or "stopped" if successful else "playing".
492        """
493        self.register_signal()
494        a2dp = self.get_a2dp_interface(address)
495        time.sleep(WAIT_TIME)
496        a2dp.Stop()
497        path = a2dp.object_path
498        props = dbus.Interface(
499            self.bus.get_object(SERVICE_NAME, path), PROPERTIES)
500        status = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Status")
501        return status
502
503    def skip_next(self, address):
504        """Skips to Next track in media player.
505
506        Args:
507            address: Bluetooth interface MAC address of the device.
508
509        Returns:
510            True if the media track change is successful else False.
511        """
512        self.register_signal()
513        a2dp = self.get_a2dp_interface(address)
514        time.sleep(WAIT_TIME)
515        path = a2dp.object_path
516        props = dbus.Interface(
517            self.bus.get_object(SERVICE_NAME, path), PROPERTIES)
518        track = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Track")
519        Title = track['Title']
520        a2dp.Next()
521        time.sleep(WAIT_TIME)
522        track = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Track")
523        if Title == track['Title']:
524            return False
525        return True
526
527    def skip_previous(self, address):
528        """Skips to previous track in media player.
529
530        Args:
531            address: Buetooth interface MAC address of the device.
532
533        Returns:
534            True if media track change is successful else False.
535        """
536        a2dp = self.get_a2dp_interface(address)
537        time.sleep(WAIT_TIME)
538        path = a2dp.object_path
539        props = dbus.Interface(
540            self.bus.get_object(SERVICE_NAME, path), PROPERTIES)
541        track = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Track")
542        Title = track['Title']
543        a2dp.Previous()
544        a2dp.Previous()
545        time.sleep(WAIT_TIME)
546        track = self.get_properties(props, MEDIA_PLAY_INTERFACE, "Track")
547        if Title == track['Title']:
548            return False
549        return True
550
551    def volumeup(self, address):
552        """Increases volume of the media playing.
553
554        Args:
555            address: Buetooth interface MAC address of the device.
556
557        Returns:
558            True if successful.
559        """
560        a2dp = self.media_control_iface(address)
561        a2dp.VolumeUp()
562        return True
563
564    def volumedown(self, address):
565        """Decreases volume of the media playing.
566
567        Args:
568            address: Buetooth interface MAC address of the device.
569
570        Returns:
571            True if successful.
572        """
573        a2dp = self.media_control_iface(address)
574        a2dp.VolumeDown()
575        return True
576
577    def call_volume(self, duration):
578        """Performs Speaker gain and microphone gain when call is active.
579
580        Args:
581            duration:time in seconds to increase volume continuously.
582
583        Returns:
584            True if Property has been changed, otherwise False.
585        """
586        vol_level = 0
587        modems = self.ofo_iface()
588        path = modems[0][0]
589        props = dbus.Interface(
590            self.bus.get_object('org.ofono', path), 'org.ofono.CallVolume')
591        start_time = time.time()
592        while (time.time()) < (start_time + duration):
593            props.SetProperty("SpeakerVolume", dbus.Byte(int(vol_level)))
594            time.sleep(WAIT_TIME)
595            props.SetProperty("MicrophoneVolume", dbus.Byte(int(vol_level)))
596            vol_level += 5
597        return True
598
599    def avrcp_actions(self, address):
600        """Performs AVRCP actions for the device
601
602        Args:
603            address: Bluetooth interface MAC address of the device.
604
605        Returns:
606            True if avrcp actions are performed else False.
607        """
608        if not self.skip_next(address):
609            logging.info("skip Next failed")
610            return False
611        time.sleep(WAIT_TIME)
612
613        if not self.skip_previous(address):
614            logging.info("skip previous failed")
615            return False
616        time.sleep(WAIT_TIME)
617
618        if not self.volumeup(address):
619            logging.info("Volume up failed")
620            return False
621        time.sleep(WAIT_TIME)
622
623        if not self.volumedown(address):
624            logging.info("Volume down failed")
625            return False
626        return True
627
628    def initiate_and_disconnect_call_from_hf(self, phone_no, duration):
629        """Initiates the call from bluez for the specified phone number.
630
631        Args:
632            phone_no: Phone number to which the call should be made.
633            duration: Time till which the call should be active.
634
635        Returns:
636             True if the call is initiated and disconnected else False.
637        """
638        modems = self.ofo_iface()
639        modem = modems[0][0]
640        hide_callerid = "default"
641        vcm = self.call_manager(modem)
642        time.sleep(WAIT_TIME)
643        path = vcm.Dial(phone_no, hide_callerid)
644        if 'voicecall' not in path:
645            return False
646        time.sleep(duration)
647        vcm.HangupAll()
648        return True
649
650    def answer_call(self, duration):
651        """Answers the incoming call from bluez.
652
653        Args:
654            duration: Time till which the call should be active.
655
656        Returns:
657             True if call is answered else False.
658        """
659        modems = self.ofo_iface()
660        for path, properties in modems:
661            if CALL_MANAGER not in properties["Interfaces"]:
662                continue
663            mgr = self.call_manager(path)
664            calls = mgr.GetCalls()
665            for path, properties in calls:
666                state = properties["State"]
667                if state != "incoming":
668                    continue
669                call = self.answer_call_interface(path)
670                call.Answer()
671                time.sleep(duration)
672                call.Hangup()
673        return True
674