# Copyright 2015 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This module provides cras DBus audio utilities.""" import logging import multiprocessing import pprint from autotest_lib.client.cros.audio import cras_utils def _set_default_main_loop(): """Sets the gobject main loop to be the event loop for DBus. @raises: ImportError if dbus.mainloop.glib can not be imported. """ try: import dbus.mainloop.glib except ImportError, e: logging.exception( 'Can not import dbus.mainloop.glib: %s. ' 'This method should only be called on Cros device.', e) raise dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) def _get_gobject(): """Tries to import gobject. @returns: The imported gobject module. @raises: ImportError if gobject can not be imported. """ try: import gobject except ImportError, e: logging.exception( 'Can not import gobject: %s. This method should only be ' 'called on Cros device.', e) raise return gobject class CrasDBusMonitorError(Exception): """Error in CrasDBusMonitor.""" pass class CrasDBusMonitor(object): """Monitor for DBus signal from Cras.""" def __init__(self): _set_default_main_loop() # Acquires a new Cras interface through a new dbus.SystemBus instance # which has default main loop. self._iface = cras_utils.get_cras_control_interface(private=True) self._loop = _get_gobject().MainLoop() self._count = 0 class CrasDBusSignalListener(CrasDBusMonitor): """Listener for certain signal.""" def __init__(self): super(CrasDBusSignalListener, self).__init__() self._target_signal_count = 0 def wait_for_nodes_changed(self, target_signal_count, timeout_secs): """Waits for NodesChanged signal. @param target_signal_count: The expected number of signal. @param timeout_secs: The timeout in seconds. @raises: CrasDBusMonitorError if there is no enough signals before timeout. """ self._target_signal_count = target_signal_count signal_match = self._iface.connect_to_signal( 'NodesChanged', self._nodes_changed_handler) _get_gobject().timeout_add( timeout_secs * 1000, self._timeout_quit_main_loop) # Blocks here until _nodes_changed_handler or _timeout_quit_main_loop # quits the loop. self._loop.run() signal_match.remove() if self._count < self._target_signal_count: raise CrasDBusMonitorError('Timeout') def _nodes_changed_handler(self): """Handler for NodesChanged signal.""" if self._loop.is_running(): logging.debug('Got NodesChanged signal when loop is running.') self._count = self._count + 1 logging.debug('count = %d', self._count) if self._count >= self._target_signal_count: logging.debug('Quit main loop') self._loop.quit() else: logging.debug('Got NodesChanged signal when loop is not running.' ' Ignore it') def _timeout_quit_main_loop(self): """Handler for timeout in main loop. @returns: False so this callback will not be called again. """ if self._loop.is_running(): logging.error('Quit main loop because of timeout') self._loop.quit() else: logging.debug( 'Got _quit_main_loop after main loop quits. Ignore it') return False class CrasDBusBackgroundSignalCounter(object): """Controls signal counter which runs in background.""" def __init__(self): self._proc = None self._signal_name = None self._counter = None self._parent_conn = None self._child_conn = None def start(self, signal_name): """Starts the signal counter in a subprocess. @param signal_name: The name of the signal to count. """ self._signal_name = signal_name self._parent_conn, self._child_conn = multiprocessing.Pipe() self._proc = multiprocessing.Process( target=self._run, args=(self._child_conn,)) self._proc.daemon = True self._proc.start() def _run(self, child_conn): """Runs CrasDBusCounter. This should be called in a subprocess. This blocks until parent_conn send stop command to the pipe. """ self._counter = CrasDBusCounter(self._signal_name, child_conn) self._counter.run() def stop(self): """Stops the CrasDBusCounter by sending stop command to parent_conn. The result of CrasDBusCounter in its subproces can be obtained by reading from parent_conn. @returns: The count of the signal of interest. """ self._parent_conn.send(CrasDBusCounter.STOP_CMD) return self._parent_conn.recv() class CrasDBusCounter(CrasDBusMonitor): """Counter for DBus signal sent from Cras""" _CHECK_QUIT_PERIOD_SECS = 0.1 STOP_CMD = 'stop' def __init__(self, signal_name, child_conn, ignore_redundant=True): """Initializes a CrasDBusCounter. @param signal_name: The name of the signal of interest. @param child_conn: A multiprocessing.Pipe which is used to receive stop signal and to send the counting result. @param ignore_redundant: Ignores signal if GetNodes result stays the same. This happens when there is change in unplugged nodes, which does not affect Cras client. """ super(CrasDBusCounter, self).__init__() self._signal_name = signal_name self._count = None self._child_conn = child_conn self._ignore_redundant = ignore_redundant self._nodes = None def run(self): """Runs the gobject main loop and listens for the signal.""" self._count = 0 self._nodes = cras_utils.get_cras_nodes() logging.debug('Before starting the counter') logging.debug('nodes = %s', pprint.pformat(self._nodes)) signal_match = self._iface.connect_to_signal( self._signal_name, self._signal_handler) _get_gobject().timeout_add( int(self._CHECK_QUIT_PERIOD_SECS * 1000), self._check_quit_main_loop) logging.debug('Start counting for signal %s', self._signal_name) # Blocks here until _check_quit_main_loop quits the loop. self._loop.run() signal_match.remove() logging.debug('Count result: %s', self._count) self._child_conn.send(self._count) def _signal_handler(self): """Handler for signal.""" if self._loop.is_running(): logging.debug('Got %s signal when loop is running.', self._signal_name) logging.debug('Getting nodes.') nodes = cras_utils.get_cras_nodes() logging.debug('nodes = %s', pprint.pformat(nodes)) if self._ignore_redundant and self._nodes == nodes: logging.debug('Nodes did not change. Ignore redundant signal') return self._count = self._count + 1 logging.debug('count = %d', self._count) else: logging.debug('Got %s signal when loop is not running.' ' Ignore it', self._signal_name) def _should_stop(self): """Checks if user wants to stop main loop.""" if self._child_conn.poll(): if self._child_conn.recv() == self.STOP_CMD: logging.debug('Should stop') return True return False def _check_quit_main_loop(self): """Handler for timeout in main loop. @returns: True so this callback will not be called again. False if user quits main loop. """ if self._loop.is_running(): logging.debug('main loop is running in _check_quit_main_loop') if self._should_stop(): logging.debug('Quit main loop because of stop command') self._loop.quit() return False else: logging.debug('No stop command, keep running') return True else: logging.debug( 'Got _quit_main_loop after main loop quits. Ignore it') return False class CrasDBusMonitorUnexpectedNodesChanged(Exception): """Error for unexpected nodes changed.""" pass def wait_for_unexpected_nodes_changed(timeout_secs): """Waits for unexpected nodes changed signal in this blocking call. @param timeout_secs: Timeout in seconds for waiting. @raises CrasDBusMonitorUnexpectedNodesChanged if there is NodesChanged signal """ try: CrasDBusSignalListener().wait_for_nodes_changed(1, timeout_secs) except CrasDBusMonitorError: logging.debug('There is no NodesChanged signal, as expected') return raise CrasDBusMonitorUnexpectedNodesChanged()