# Copyright (c) 2013 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This module provides cras audio utilities.""" import logging import re from autotest_lib.client.cros.audio import cmd_utils _CRAS_TEST_CLIENT = '/usr/bin/cras_test_client' class CrasUtilsError(Exception): """Error in CrasUtils.""" pass def playback(blocking=True, stdin=None, *args, **kargs): """A helper function to execute the playback_cmd. @param blocking: Blocks this call until playback finishes. @param stdin: the standard input of playback process @param args: args passed to playback_cmd. @param kargs: kargs passed to playback_cmd. @returns: The process running the playback command. Note that if the blocking parameter is true, this will return a finished process. """ process = cmd_utils.popen(playback_cmd(*args, **kargs), stdin=stdin) if blocking: cmd_utils.wait_and_check_returncode(process) return process def capture(*args, **kargs): """A helper function to execute the capture_cmd. @param args: args passed to capture_cmd. @param kargs: kargs passed to capture_cmd. """ cmd_utils.execute(capture_cmd(*args, **kargs)) def playback_cmd(playback_file, block_size=None, duration=None, pin_device=None, channels=2, rate=48000): """Gets a command to playback a file with given settings. @param playback_file: the name of the file to play. '-' indicates to playback raw audio from the stdin. @param pin_device: the device id to playback on. @param block_size: the number of frames per callback(dictates latency). @param duration: seconds to playback. @param channels: number of channels. @param rate: the sampling rate. @returns: The command args put in a list of strings. """ args = [_CRAS_TEST_CLIENT] args += ['--playback_file', playback_file] if pin_device is not None: args += ['--pin_device', str(pin_device)] if block_size is not None: args += ['--block_size', str(block_size)] if duration is not None: args += ['--duration', str(duration)] args += ['--num_channels', str(channels)] args += ['--rate', str(rate)] return args def capture_cmd(capture_file, block_size=None, duration=10, pin_device=None, channels=1, rate=48000): """Gets a command to capture the audio into the file with given settings. @param capture_file: the name of file the audio to be stored in. @param pin_device: the device id to record from. @param block_size: the number of frames per callback(dictates latency). @param duration: seconds to record. If it is None, duration is not set, and command will keep capturing audio until it is terminated. @param channels: number of channels. @param rate: the sampling rate. @returns: The command args put in a list of strings. """ args = [_CRAS_TEST_CLIENT] args += ['--capture_file', capture_file] if pin_device is not None: args += ['--pin_device', str(pin_device)] if block_size is not None: args += ['--block_size', str(block_size)] if duration is not None: args += ['--duration', str(duration)] args += ['--num_channels', str(channels)] args += ['--rate', str(rate)] return args def listen_cmd( capture_file, block_size=None, duration=10, channels=1, rate=48000): """Gets a command to listen on hotword and record audio into the file with given settings. @param capture_file: the name of file the audio to be stored in. @param block_size: the number of frames per callback(dictates latency). @param duration: seconds to record. If it is None, duration is not set, and command will keep capturing audio until it is terminated. @param channels: number of channels. @param rate: the sampling rate. @returns: The command args put in a list of strings. """ args = [_CRAS_TEST_CLIENT] args += ['--listen_for_hotword', capture_file] if block_size is not None: args += ['--block_size', str(block_size)] if duration is not None: args += ['--duration', str(duration)] args += ['--num_channels', str(channels)] args += ['--rate', str(rate)] return args def loopback(*args, **kargs): """A helper function to execute loopback_cmd. @param args: args passed to loopback_cmd. @param kargs: kargs passed to loopback_cmd. """ cmd_utils.execute(loopback_cmd(*args, **kargs)) def loopback_cmd(output_file, duration=10, channels=2, rate=48000): """Gets a command to record the loopback. @param output_file: The name of the file the loopback to be stored in. @param channels: The number of channels of the recorded audio. @param duration: seconds to record. @param rate: the sampling rate. @returns: The command args put in a list of strings. """ args = [_CRAS_TEST_CLIENT] args += ['--loopback_file', output_file] args += ['--duration_seconds', str(duration)] args += ['--num_channels', str(channels)] args += ['--rate', str(rate)] return args def get_cras_nodes_cmd(): """Gets a command to query the nodes from Cras. @returns: The command to query nodes information from Cras using dbus-send. """ return ('dbus-send --system --type=method_call --print-reply ' '--dest=org.chromium.cras /org/chromium/cras ' 'org.chromium.cras.Control.GetNodes') def set_system_volume(volume): """Set the system volume. @param volume: the system output vlume to be set(0 - 100). """ get_cras_control_interface().SetOutputVolume(volume) def set_node_volume(node_id, volume): """Set the volume of the given output node. @param node_id: the id of the output node to be set the volume. @param volume: the volume to be set(0-100). """ get_cras_control_interface().SetOutputNodeVolume(node_id, volume) def get_cras_control_interface(private=False): """Gets Cras DBus control interface. @param private: Set to True to use a new instance for dbus.SystemBus instead of the shared instance. @returns: A dBus.Interface object with Cras Control interface. @raises: ImportError if this is not called on Cros device. """ try: import dbus except ImportError, e: logging.exception( 'Can not import dbus: %s. This method should only be ' 'called on Cros device.', e) raise bus = dbus.SystemBus(private=private) cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras') return dbus.Interface(cras_object, 'org.chromium.cras.Control') def get_cras_nodes(): """Gets nodes information from Cras. @returns: A dict containing information of each node. """ return get_cras_control_interface().GetNodes() def get_selected_nodes(): """Gets selected output nodes and input nodes. @returns: A tuple (output_nodes, input_nodes) where each field is a list of selected node IDs returned from Cras DBus API. Note that there may be multiple output/input nodes being selected at the same time. """ output_nodes = [] input_nodes = [] nodes = get_cras_nodes() for node in nodes: if node['Active']: if node['IsInput']: input_nodes.append(node['Id']) else: output_nodes.append(node['Id']) return (output_nodes, input_nodes) def set_selected_output_node_volume(volume): """Sets the selected output node volume. @param volume: the volume to be set (0-100). """ selected_output_node_ids, _ = get_selected_nodes() for node_id in selected_output_node_ids: set_node_volume(node_id, volume) def get_active_stream_count(): """Gets the number of active streams. @returns: The number of active streams. """ return int(get_cras_control_interface().GetNumberOfActiveStreams()) def set_system_mute(is_mute): """Sets the system mute switch. @param is_mute: Set True to mute the system playback. """ get_cras_control_interface().SetOutputMute(is_mute) def set_capture_mute(is_mute): """Sets the capture mute switch. @param is_mute: Set True to mute the capture. """ get_cras_control_interface().SetInputMute(is_mute) def node_type_is_plugged(node_type, nodes_info): """Determine if there is any node of node_type plugged. This method is used in the AudioLoopbackDongleLabel class, where the call is executed on autotest server. Use get_cras_nodes instead if the call can be executed on Cros device. Since Cras only reports the plugged node in GetNodes, we can parse the return value to see if there is any node with the given type. For example, if INTERNAL_MIC is of intereset, the pattern we are looking for is: dict entry( string "Type" variant string "INTERNAL_MIC" ) @param node_type: A str representing node type defined in CRAS_NODE_TYPES. @param nodes_info: A str containing output of command get_nodes_cmd. @returns: True if there is any node of node_type plugged. False otherwise. """ match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type, nodes_info) return True if match else False # Cras node types reported from Cras DBus control API. CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB', 'BLUETOOTH', 'LINEOUT', 'UNKNOWN'] CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH', 'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN', 'KEYBOARD_MIC', 'HOTWORD', 'FRONT_MIC', 'REAR_MIC', 'ECHO_REFERENCE'] CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES def get_filtered_node_types(callback): """Returns the pair of filtered output node types and input node types. @param callback: A callback function which takes a node as input parameter and filter the node based on its return value. @returns: A tuple (output_node_types, input_node_types) where each field is a list of node types defined in CRAS_NODE_TYPES, and their 'attribute_name' is True. """ output_node_types = [] input_node_types = [] nodes = get_cras_nodes() for node in nodes: if callback(node): node_type = str(node['Type']) if node_type not in CRAS_NODE_TYPES: logging.warning('node type %s is not in known CRAS_NODE_TYPES', node_type) if node['IsInput']: input_node_types.append(node_type) else: output_node_types.append(node_type) return (output_node_types, input_node_types) def get_selected_node_types(): """Returns the pair of active output node types and input node types. @returns: A tuple (output_node_types, input_node_types) where each field is a list of selected node types defined in CRAS_NODE_TYPES. """ def is_selected(node): """Checks if a node is selected. A node is selected if its Active attribute is True. @returns: True is a node is selected, False otherwise. """ return node['Active'] return get_filtered_node_types(is_selected) def get_selected_input_device_name(): """Returns the device name of the active input node. @returns: device name string. E.g. kbl_r5514_5663_max: :0,1 """ nodes = get_cras_nodes() for node in nodes: if node['Active'] and node['IsInput']: return node['DeviceName'] return None def get_selected_input_device_type(): """Returns the device type of the active input node. @returns: device type string. E.g. INTERNAL_MICROPHONE """ nodes = get_cras_nodes() for node in nodes: if node['Active'] and node['IsInput']: return node['Type'] return None def get_selected_output_device_name(): """Returns the device name of the active output node. @returns: device name string. E.g. mtk-rt5650: :0,0 """ nodes = get_cras_nodes() for node in nodes: if node['Active'] and not node['IsInput']: return node['DeviceName'] return None def get_selected_output_device_type(): """Returns the device type of the active output node. @returns: device type string. E.g. INTERNAL_SPEAKER """ nodes = get_cras_nodes() for node in nodes: if node['Active'] and not node['IsInput']: return node['Type'] return None def get_plugged_node_types(): """Returns the pair of plugged output node types and input node types. @returns: A tuple (output_node_types, input_node_types) where each field is a list of plugged node types defined in CRAS_NODE_TYPES. """ def is_plugged(node): """Checks if a node is plugged and is not unknown node. Cras DBus API only reports plugged node, so every node reported by Cras DBus API is plugged. However, we filter out UNKNOWN node here because the existence of unknown node depends on the number of redundant playback/record audio device created on audio card. Also, the user of Cras will ignore unknown nodes. @returns: True if a node is plugged and is not an UNKNOWN node. """ return node['Type'] != 'UNKNOWN' return get_filtered_node_types(is_plugged) def set_selected_node_types(output_node_types, input_node_types): """Sets selected node types. @param output_node_types: A list of output node types. None to skip setting. @param input_node_types: A list of input node types. None to skip setting. """ if output_node_types is not None and len(output_node_types) == 1: set_single_selected_output_node(output_node_types[0]) elif output_node_types: set_selected_output_nodes(output_node_types) if input_node_types is not None and len(input_node_types) == 1: set_single_selected_input_node(input_node_types[0]) elif input_node_types: set_selected_input_nodes(input_node_types) def set_single_selected_output_node(node_type): """Sets one selected output node. Note that Chrome UI uses SetActiveOutputNode of Cras DBus API to select one output node. @param node_type: A node type. """ nodes = get_cras_nodes() for node in nodes: if node['IsInput']: continue if node['Type'] == node_type: set_active_output_node(node['Id']) def set_single_selected_input_node(node_type): """Sets one selected input node. Note that Chrome UI uses SetActiveInputNode of Cras DBus API to select one input node. @param node_type: A node type. """ nodes = get_cras_nodes() for node in nodes: if not node['IsInput']: continue if node['Type'] == node_type: set_active_input_node(node['Id']) def set_selected_output_nodes(types): """Sets selected output node types. Note that Chrome UI uses SetActiveOutputNode of Cras DBus API to select one output node. Here we use add/remove active output node to support multiple nodes. @param types: A list of output node types. """ nodes = get_cras_nodes() for node in nodes: if node['IsInput']: continue if node['Type'] in types: add_active_output_node(node['Id']) elif node['Active']: remove_active_output_node(node['Id']) def set_selected_input_nodes(types): """Sets selected input node types. Note that Chrome UI uses SetActiveInputNode of Cras DBus API to select one input node. Here we use add/remove active input node to support multiple nodes. @param types: A list of input node types. """ nodes = get_cras_nodes() for node in nodes: if not node['IsInput']: continue if node['Type'] in types: add_active_input_node(node['Id']) elif node['Active']: remove_active_input_node(node['Id']) def set_active_input_node(node_id): """Sets one active input node. @param node_id: node id. """ get_cras_control_interface().SetActiveInputNode(node_id) def set_active_output_node(node_id): """Sets one active output node. @param node_id: node id. """ get_cras_control_interface().SetActiveOutputNode(node_id) def add_active_output_node(node_id): """Adds an active output node. @param node_id: node id. """ get_cras_control_interface().AddActiveOutputNode(node_id) def add_active_input_node(node_id): """Adds an active input node. @param node_id: node id. """ get_cras_control_interface().AddActiveInputNode(node_id) def remove_active_output_node(node_id): """Removes an active output node. @param node_id: node id. """ get_cras_control_interface().RemoveActiveOutputNode(node_id) def remove_active_input_node(node_id): """Removes an active input node. @param node_id: node id. """ get_cras_control_interface().RemoveActiveInputNode(node_id) def get_node_id_from_node_type(node_type, is_input): """Gets node id from node type. @param types: A node type defined in CRAS_NODE_TYPES. @param is_input: True if the node is input. False otherwise. @returns: A string for node id. @raises: CrasUtilsError: if unique node id can not be found. """ nodes = get_cras_nodes() find_ids = [] for node in nodes: if node['Type'] == node_type and node['IsInput'] == is_input: find_ids.append(node['Id']) if len(find_ids) != 1: raise CrasUtilsError( 'Can not find unique node id from node type %s' % node_type) return find_ids[0] def get_device_id_of(node_id): """Gets the device id of the node id. The conversion logic is replicated from the CRAS's type definition at third_party/adhd/cras/src/common/cras_types.h. @param node_id: A string for node id. @returns: A string for device id. @raise: CrasUtilsError: if device id is invalid. """ device_id = str(long(node_id) >> 32) if device_id == "0": raise CrasUtilsError('Got invalid device_id: 0') return device_id def get_device_id_from_node_type(node_type, is_input): """Gets device id from node type. @param types: A node type defined in CRAS_NODE_TYPES. @param is_input: True if the node is input. False otherwise. @returns: A string for device id. """ node_id = get_node_id_from_node_type(node_type, is_input) return get_device_id_of(node_id) def get_active_node_volume(): """Returns volume from active node. @returns: int for volume @raises: CrasUtilsError: if node volume cannot be found. """ nodes = get_cras_nodes() for node in nodes: if node['Active'] == 1 and node['IsInput'] == 0: return int(node['NodeVolume']) raise CrasUtilsError('Cannot find active node volume from nodes.')