1# Copyright (c) 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides cras audio utilities.""" 6 7import logging 8import re 9 10from autotest_lib.client.cros.audio import cmd_utils 11 12_CRAS_TEST_CLIENT = '/usr/bin/cras_test_client' 13 14 15class CrasUtilsError(Exception): 16 pass 17 18 19def playback(blocking=True, *args, **kargs): 20 """A helper function to execute the playback_cmd. 21 22 @param blocking: Blocks this call until playback finishes. 23 @param args: args passed to playback_cmd. 24 @param kargs: kargs passed to playback_cmd. 25 26 @returns: The process running the playback command. Note that if the 27 blocking parameter is true, this will return a finished process. 28 """ 29 process = cmd_utils.popen(playback_cmd(*args, **kargs)) 30 if blocking: 31 cmd_utils.wait_and_check_returncode(process) 32 return process 33 34 35def capture(*args, **kargs): 36 """A helper function to execute the capture_cmd. 37 38 @param args: args passed to capture_cmd. 39 @param kargs: kargs passed to capture_cmd. 40 41 """ 42 cmd_utils.execute(capture_cmd(*args, **kargs)) 43 44 45def playback_cmd(playback_file, block_size=None, duration=None, 46 channels=2, rate=48000): 47 """Gets a command to playback a file with given settings. 48 49 @param playback_file: the name of the file to play. '-' indicates to 50 playback raw audio from the stdin. 51 @param block_size: the number of frames per callback(dictates latency). 52 @param duration: seconds to playback 53 @param channels: number of channels. 54 @param rate: the sampling rate 55 56 @returns: The command args put in a list of strings. 57 58 """ 59 args = [_CRAS_TEST_CLIENT] 60 args += ['--playback_file', playback_file] 61 if block_size is not None: 62 args += ['--block_size', str(block_size)] 63 if duration is not None: 64 args += ['--duration', str(duration)] 65 args += ['--num_channels', str(channels)] 66 args += ['--rate', str(rate)] 67 return args 68 69 70def capture_cmd( 71 capture_file, block_size=None, duration=10, channels=1, rate=48000): 72 """Gets a command to capture the audio into the file with given settings. 73 74 @param capture_file: the name of file the audio to be stored in. 75 @param block_size: the number of frames per callback(dictates latency). 76 @param duration: seconds to record. If it is None, duration is not set, 77 and command will keep capturing audio until it is 78 terminated. 79 @param channels: number of channels. 80 @param rate: the sampling rate. 81 82 @returns: The command args put in a list of strings. 83 84 """ 85 args = [_CRAS_TEST_CLIENT] 86 args += ['--capture_file', capture_file] 87 if block_size is not None: 88 args += ['--block_size', str(block_size)] 89 if duration is not None: 90 args += ['--duration', str(duration)] 91 args += ['--num_channels', str(channels)] 92 args += ['--rate', str(rate)] 93 return args 94 95 96def listen_cmd( 97 capture_file, block_size=None, duration=10, channels=1, rate=48000): 98 """Gets a command to listen on hotword and record audio into the file with 99 given settings. 100 101 @param capture_file: the name of file the audio to be stored in. 102 @param block_size: the number of frames per callback(dictates latency). 103 @param duration: seconds to record. If it is None, duration is not set, 104 and command will keep capturing audio until it is 105 terminated. 106 @param channels: number of channels. 107 @param rate: the sampling rate. 108 109 @returns: The command args put in a list of strings. 110 111 """ 112 args = [_CRAS_TEST_CLIENT] 113 args += ['--listen_for_hotword', capture_file] 114 if block_size is not None: 115 args += ['--block_size', str(block_size)] 116 if duration is not None: 117 args += ['--duration', str(duration)] 118 args += ['--num_channels', str(channels)] 119 args += ['--rate', str(rate)] 120 return args 121 122 123def loopback(*args, **kargs): 124 """A helper function to execute loopback_cmd. 125 126 @param args: args passed to loopback_cmd. 127 @param kargs: kargs passed to loopback_cmd. 128 129 """ 130 131 cmd_utils.execute(loopback_cmd(*args, **kargs)) 132 133 134def loopback_cmd(output_file, duration=10, channels=2, rate=48000): 135 """Gets a command to record the loopback. 136 137 @param output_file: The name of the file the loopback to be stored in. 138 @param channels: The number of channels of the recorded audio. 139 @param duration: seconds to record. 140 @param rate: the sampling rate. 141 142 @returns: The command args put in a list of strings. 143 144 """ 145 args = [_CRAS_TEST_CLIENT] 146 args += ['--loopback_file', output_file] 147 args += ['--duration_seconds', str(duration)] 148 args += ['--num_channels', str(channels)] 149 args += ['--rate', str(rate)] 150 return args 151 152 153def get_cras_nodes_cmd(): 154 """Gets a command to query the nodes from Cras. 155 156 @returns: The command to query nodes information from Cras using dbus-send. 157 158 """ 159 return ('dbus-send --system --type=method_call --print-reply ' 160 '--dest=org.chromium.cras /org/chromium/cras ' 161 'org.chromium.cras.Control.GetNodes') 162 163 164def set_system_volume(volume): 165 """Set the system volume. 166 167 @param volume: the system output vlume to be set(0 - 100). 168 169 """ 170 get_cras_control_interface().SetOutputVolume(volume) 171 172 173def set_node_volume(node_id, volume): 174 """Set the volume of the given output node. 175 176 @param node_id: the id of the output node to be set the volume. 177 @param volume: the volume to be set(0-100). 178 179 """ 180 get_cras_control_interface().SetOutputNodeVolume(node_id, volume) 181 182 183def set_capture_gain(gain): 184 """Set the system capture gain. 185 186 @param gain the capture gain in db*100 (100 = 1dB) 187 188 """ 189 get_cras_control_interface().SetInputGain(gain) 190 191 192def get_cras_control_interface(private=False): 193 """Gets Cras DBus control interface. 194 195 @param private: Set to True to use a new instance for dbus.SystemBus 196 instead of the shared instance. 197 198 @returns: A dBus.Interface object with Cras Control interface. 199 200 @raises: ImportError if this is not called on Cros device. 201 202 """ 203 try: 204 import dbus 205 except ImportError, e: 206 logging.exception( 207 'Can not import dbus: %s. This method should only be ' 208 'called on Cros device.', e) 209 raise 210 bus = dbus.SystemBus(private=private) 211 cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras') 212 return dbus.Interface(cras_object, 'org.chromium.cras.Control') 213 214 215def get_cras_nodes(): 216 """Gets nodes information from Cras. 217 218 @returns: A dict containing information of each node. 219 220 """ 221 return get_cras_control_interface().GetNodes() 222 223 224def get_selected_nodes(): 225 """Gets selected output nodes and input nodes. 226 227 @returns: A tuple (output_nodes, input_nodes) where each 228 field is a list of selected node IDs returned from Cras DBus API. 229 Note that there may be multiple output/input nodes being selected 230 at the same time. 231 232 """ 233 output_nodes = [] 234 input_nodes = [] 235 nodes = get_cras_nodes() 236 for node in nodes: 237 if node['Active']: 238 if node['IsInput']: 239 input_nodes.append(node['Id']) 240 else: 241 output_nodes.append(node['Id']) 242 return (output_nodes, input_nodes) 243 244 245def set_selected_output_node_volume(volume): 246 """Sets the selected output node volume. 247 248 @param volume: the volume to be set (0-100). 249 250 """ 251 selected_output_node_ids, _ = get_selected_nodes() 252 for node_id in selected_output_node_ids: 253 set_node_volume(node_id, volume) 254 255 256def get_active_stream_count(): 257 """Gets the number of active streams. 258 259 @returns: The number of active streams. 260 261 """ 262 return int(get_cras_control_interface().GetNumberOfActiveStreams()) 263 264 265def set_system_mute(is_mute): 266 """Sets the system mute switch. 267 268 @param is_mute: Set True to mute the system playback. 269 270 """ 271 get_cras_control_interface().SetOutputMute(is_mute) 272 273 274def set_capture_mute(is_mute): 275 """Sets the capture mute switch. 276 277 @param is_mute: Set True to mute the capture. 278 279 """ 280 get_cras_control_interface().SetInputMute(is_mute) 281 282 283def node_type_is_plugged(node_type, nodes_info): 284 """Determine if there is any node of node_type plugged. 285 286 This method is used in the AudioLoopbackDongleLabel class, where the 287 call is executed on autotest server. Use get_cras_nodes instead if 288 the call can be executed on Cros device. 289 290 Since Cras only reports the plugged node in GetNodes, we can 291 parse the return value to see if there is any node with the given type. 292 For example, if INTERNAL_MIC is of intereset, the pattern we are 293 looking for is: 294 295 dict entry( 296 string "Type" 297 variant string "INTERNAL_MIC" 298 ) 299 300 @param node_type: A str representing node type defined in CRAS_NODE_TYPES. 301 @param nodes_info: A str containing output of command get_nodes_cmd. 302 303 @returns: True if there is any node of node_type plugged. False otherwise. 304 305 """ 306 match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type, 307 nodes_info) 308 return True if match else False 309 310 311# Cras node types reported from Cras DBus control API. 312CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB', 313 'BLUETOOTH', 'LINEOUT', 'UNKNOWN'] 314CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH', 315 'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN', 316 'KEYBOARD_MIC', 'HOTWORD', 'FRONT_MIC', 'REAR_MIC'] 317CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES 318 319 320def get_filtered_node_types(callback): 321 """Returns the pair of filtered output node types and input node types. 322 323 @param callback: A callback function which takes a node as input parameter 324 and filter the node based on its return value. 325 326 @returns: A tuple (output_node_types, input_node_types) where each 327 field is a list of node types defined in CRAS_NODE_TYPES, 328 and their 'attribute_name' is True. 329 330 """ 331 output_node_types = [] 332 input_node_types = [] 333 nodes = get_cras_nodes() 334 for node in nodes: 335 if callback(node): 336 node_type = str(node['Type']) 337 if node_type not in CRAS_NODE_TYPES: 338 raise RuntimeError( 339 'node type %s is not valid' % node_type) 340 if node['IsInput']: 341 input_node_types.append(node_type) 342 else: 343 output_node_types.append(node_type) 344 return (output_node_types, input_node_types) 345 346 347def get_selected_node_types(): 348 """Returns the pair of active output node types and input node types. 349 350 @returns: A tuple (output_node_types, input_node_types) where each 351 field is a list of selected node types defined in CRAS_NODE_TYPES. 352 353 """ 354 def is_selected(node): 355 """Checks if a node is selected. 356 357 A node is selected if its Active attribute is True. 358 359 @returns: True is a node is selected, False otherwise. 360 361 """ 362 return node['Active'] 363 364 return get_filtered_node_types(is_selected) 365 366 367def get_selected_input_device_name(): 368 """Returns the device name of the active input node. 369 370 @returns: device name string. E.g. kbl_r5514_5663_max: :0,1 371 """ 372 nodes = get_cras_nodes() 373 for node in nodes: 374 if node['Active'] and node['IsInput']: 375 return node['DeviceName'] 376 return None 377 378 379def get_selected_output_device_name(): 380 """Returns the device name of the active output node. 381 382 @returns: device name string. E.g. mtk-rt5650: :0,0 383 """ 384 nodes = get_cras_nodes() 385 for node in nodes: 386 if node['Active'] and not node['IsInput']: 387 return node['DeviceName'] 388 return None 389 390 391def get_selected_output_device_type(): 392 """Returns the device type of the active output node. 393 394 @returns: device type string. E.g. INTERNAL_SPEAKER 395 """ 396 nodes = get_cras_nodes() 397 for node in nodes: 398 if node['Active'] and not node['IsInput']: 399 return node['Type'] 400 return None 401 402 403def get_plugged_node_types(): 404 """Returns the pair of plugged output node types and input node types. 405 406 @returns: A tuple (output_node_types, input_node_types) where each 407 field is a list of plugged node types defined in CRAS_NODE_TYPES. 408 409 """ 410 def is_plugged(node): 411 """Checks if a node is plugged and is not unknown node. 412 413 Cras DBus API only reports plugged node, so every node reported by Cras 414 DBus API is plugged. However, we filter out UNKNOWN node here because 415 the existence of unknown node depends on the number of redundant 416 playback/record audio device created on audio card. Also, the user of 417 Cras will ignore unknown nodes. 418 419 @returns: True if a node is plugged and is not an UNKNOWN node. 420 421 """ 422 return node['Type'] != 'UNKNOWN' 423 424 return get_filtered_node_types(is_plugged) 425 426 427def set_selected_node_types(output_node_types, input_node_types): 428 """Sets selected node types. 429 430 @param output_node_types: A list of output node types. None to skip setting. 431 @param input_node_types: A list of input node types. None to skip setting. 432 433 """ 434 if len(output_node_types) == 1: 435 set_single_selected_output_node(output_node_types[0]) 436 elif output_node_types: 437 set_selected_output_nodes(output_node_types) 438 if len(input_node_types) == 1: 439 set_single_selected_input_node(input_node_types[0]) 440 elif input_node_types: 441 set_selected_input_nodes(input_node_types) 442 443 444def set_single_selected_output_node(node_type): 445 """Sets one selected output node. 446 447 Note that Chrome UI uses SetActiveOutputNode of Cras DBus API 448 to select one output node. 449 450 @param node_type: A node type. 451 452 """ 453 nodes = get_cras_nodes() 454 for node in nodes: 455 if node['IsInput']: 456 continue 457 if node['Type'] == node_type: 458 set_active_output_node(node['Id']) 459 460 461def set_single_selected_input_node(node_type): 462 """Sets one selected input node. 463 464 Note that Chrome UI uses SetActiveInputNode of Cras DBus API 465 to select one input node. 466 467 @param node_type: A node type. 468 469 """ 470 nodes = get_cras_nodes() 471 for node in nodes: 472 if not node['IsInput']: 473 continue 474 if node['Type'] == node_type: 475 set_active_input_node(node['Id']) 476 477 478def set_selected_output_nodes(types): 479 """Sets selected output node types. 480 481 Note that Chrome UI uses SetActiveOutputNode of Cras DBus API 482 to select one output node. Here we use add/remove active output node 483 to support multiple nodes. 484 485 @param types: A list of output node types. 486 487 """ 488 nodes = get_cras_nodes() 489 for node in nodes: 490 if node['IsInput']: 491 continue 492 if node['Type'] in types: 493 add_active_output_node(node['Id']) 494 elif node['Active']: 495 remove_active_output_node(node['Id']) 496 497 498def set_selected_input_nodes(types): 499 """Sets selected input node types. 500 501 Note that Chrome UI uses SetActiveInputNode of Cras DBus API 502 to select one input node. Here we use add/remove active input node 503 to support multiple nodes. 504 505 @param types: A list of input node types. 506 507 """ 508 nodes = get_cras_nodes() 509 for node in nodes: 510 if not node['IsInput']: 511 continue 512 if node['Type'] in types: 513 add_active_input_node(node['Id']) 514 elif node['Active']: 515 remove_active_input_node(node['Id']) 516 517 518def set_active_input_node(node_id): 519 """Sets one active input node. 520 521 @param node_id: node id. 522 523 """ 524 get_cras_control_interface().SetActiveInputNode(node_id) 525 526 527def set_active_output_node(node_id): 528 """Sets one active output node. 529 530 @param node_id: node id. 531 532 """ 533 get_cras_control_interface().SetActiveOutputNode(node_id) 534 535 536def add_active_output_node(node_id): 537 """Adds an active output node. 538 539 @param node_id: node id. 540 541 """ 542 get_cras_control_interface().AddActiveOutputNode(node_id) 543 544 545def add_active_input_node(node_id): 546 """Adds an active input node. 547 548 @param node_id: node id. 549 550 """ 551 get_cras_control_interface().AddActiveInputNode(node_id) 552 553 554def remove_active_output_node(node_id): 555 """Removes an active output node. 556 557 @param node_id: node id. 558 559 """ 560 get_cras_control_interface().RemoveActiveOutputNode(node_id) 561 562 563def remove_active_input_node(node_id): 564 """Removes an active input node. 565 566 @param node_id: node id. 567 568 """ 569 get_cras_control_interface().RemoveActiveInputNode(node_id) 570 571 572def get_node_id_from_node_type(node_type, is_input): 573 """Gets node id from node type. 574 575 @param types: A node type defined in CRAS_NODE_TYPES. 576 @param is_input: True if the node is input. False otherwise. 577 578 @returns: A string for node id. 579 580 @raises: CrasUtilsError: if unique node id can not be found. 581 582 """ 583 nodes = get_cras_nodes() 584 find_ids = [] 585 for node in nodes: 586 if node['Type'] == node_type and node['IsInput'] == is_input: 587 find_ids.append(node['Id']) 588 if len(find_ids) != 1: 589 raise CrasUtilsError( 590 'Can not find unique node id from node type %s' % node_type) 591 return find_ids[0] 592 593def get_active_node_volume(): 594 """Returns volume from active node. 595 596 @returns: int for volume 597 598 @raises: CrasUtilsError: if node volume cannot be found. 599 """ 600 nodes = get_cras_nodes() 601 for node in nodes: 602 if node['Active'] == 1 and node['IsInput'] == 0: 603 return int(node['NodeVolume']) 604 raise CrasUtilsError('Cannot find active node volume from nodes.') 605