• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides cras audio utilities."""
6
7import logging
8import re
9
10from autotest_lib.client.cros.audio import cmd_utils
11
12_CRAS_TEST_CLIENT = '/usr/bin/cras_test_client'
13
14
15class CrasUtilsError(Exception):
16    """Error in CrasUtils."""
17    pass
18
19
20def playback(blocking=True, stdin=None, *args, **kargs):
21    """A helper function to execute the playback_cmd.
22
23    @param blocking: Blocks this call until playback finishes.
24    @param stdin: the standard input of playback process
25    @param args: args passed to playback_cmd.
26    @param kargs: kargs passed to playback_cmd.
27
28    @returns: The process running the playback command. Note that if the
29              blocking parameter is true, this will return a finished process.
30    """
31    process = cmd_utils.popen(playback_cmd(*args, **kargs), stdin=stdin)
32    if blocking:
33        cmd_utils.wait_and_check_returncode(process)
34    return process
35
36
37def capture(*args, **kargs):
38    """A helper function to execute the capture_cmd.
39
40    @param args: args passed to capture_cmd.
41    @param kargs: kargs passed to capture_cmd.
42
43    """
44    cmd_utils.execute(capture_cmd(*args, **kargs))
45
46
47def playback_cmd(playback_file, block_size=None, duration=None,
48                 pin_device=None, channels=2, rate=48000):
49    """Gets a command to playback a file with given settings.
50
51    @param playback_file: the name of the file to play. '-' indicates to
52                          playback raw audio from the stdin.
53    @param pin_device: the device id to playback on.
54    @param block_size: the number of frames per callback(dictates latency).
55    @param duration: seconds to playback.
56    @param channels: number of channels.
57    @param rate: the sampling rate.
58
59    @returns: The command args put in a list of strings.
60
61    """
62    args = [_CRAS_TEST_CLIENT]
63    args += ['--playback_file', playback_file]
64    if pin_device is not None:
65        args += ['--pin_device', str(pin_device)]
66    if block_size is not None:
67        args += ['--block_size', str(block_size)]
68    if duration is not None:
69        args += ['--duration', str(duration)]
70    args += ['--num_channels', str(channels)]
71    args += ['--rate', str(rate)]
72    return args
73
74
75def capture_cmd(capture_file, block_size=None, duration=10,
76                pin_device=None, channels=1, rate=48000):
77    """Gets a command to capture the audio into the file with given settings.
78
79    @param capture_file: the name of file the audio to be stored in.
80    @param pin_device: the device id to record from.
81    @param block_size: the number of frames per callback(dictates latency).
82    @param duration: seconds to record. If it is None, duration is not set,
83                     and command will keep capturing audio until it is
84                     terminated.
85    @param channels: number of channels.
86    @param rate: the sampling rate.
87
88    @returns: The command args put in a list of strings.
89
90    """
91    args = [_CRAS_TEST_CLIENT]
92    args += ['--capture_file', capture_file]
93    if pin_device is not None:
94        args += ['--pin_device', str(pin_device)]
95    if block_size is not None:
96        args += ['--block_size', str(block_size)]
97    if duration is not None:
98        args += ['--duration', str(duration)]
99    args += ['--num_channels', str(channels)]
100    args += ['--rate', str(rate)]
101    return args
102
103
104def listen_cmd(
105        capture_file, block_size=None, duration=10, channels=1, rate=48000):
106    """Gets a command to listen on hotword and record audio into the file with
107       given settings.
108
109    @param capture_file: the name of file the audio to be stored in.
110    @param block_size: the number of frames per callback(dictates latency).
111    @param duration: seconds to record. If it is None, duration is not set,
112                     and command will keep capturing audio until it is
113                     terminated.
114    @param channels: number of channels.
115    @param rate: the sampling rate.
116
117    @returns: The command args put in a list of strings.
118
119    """
120    args = [_CRAS_TEST_CLIENT]
121    args += ['--listen_for_hotword', capture_file]
122    if block_size is not None:
123        args += ['--block_size', str(block_size)]
124    if duration is not None:
125        args += ['--duration', str(duration)]
126    args += ['--num_channels', str(channels)]
127    args += ['--rate', str(rate)]
128    return args
129
130
131def loopback(*args, **kargs):
132    """A helper function to execute loopback_cmd.
133
134    @param args: args passed to loopback_cmd.
135    @param kargs: kargs passed to loopback_cmd.
136
137    """
138
139    cmd_utils.execute(loopback_cmd(*args, **kargs))
140
141
142def loopback_cmd(output_file, duration=10, channels=2, rate=48000):
143    """Gets a command to record the loopback.
144
145    @param output_file: The name of the file the loopback to be stored in.
146    @param channels: The number of channels of the recorded audio.
147    @param duration: seconds to record.
148    @param rate: the sampling rate.
149
150    @returns: The command args put in a list of strings.
151
152    """
153    args = [_CRAS_TEST_CLIENT]
154    args += ['--loopback_file', output_file]
155    args += ['--duration_seconds', str(duration)]
156    args += ['--num_channels', str(channels)]
157    args += ['--rate', str(rate)]
158    return args
159
160
161def get_cras_nodes_cmd():
162    """Gets a command to query the nodes from Cras.
163
164    @returns: The command to query nodes information from Cras using dbus-send.
165
166    """
167    return ('dbus-send --system --type=method_call --print-reply '
168            '--dest=org.chromium.cras /org/chromium/cras '
169            'org.chromium.cras.Control.GetNodes')
170
171
172def set_system_volume(volume):
173    """Set the system volume.
174
175    @param volume: the system output vlume to be set(0 - 100).
176
177    """
178    get_cras_control_interface().SetOutputVolume(volume)
179
180
181def set_node_volume(node_id, volume):
182    """Set the volume of the given output node.
183
184    @param node_id: the id of the output node to be set the volume.
185    @param volume: the volume to be set(0-100).
186
187    """
188    get_cras_control_interface().SetOutputNodeVolume(node_id, volume)
189
190
191def get_cras_control_interface(private=False):
192    """Gets Cras DBus control interface.
193
194    @param private: Set to True to use a new instance for dbus.SystemBus
195                    instead of the shared instance.
196
197    @returns: A dBus.Interface object with Cras Control interface.
198
199    @raises: ImportError if this is not called on Cros device.
200
201    """
202    try:
203        import dbus
204    except ImportError, e:
205        logging.exception(
206                'Can not import dbus: %s. This method should only be '
207                'called on Cros device.', e)
208        raise
209    bus = dbus.SystemBus(private=private)
210    cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras')
211    return dbus.Interface(cras_object, 'org.chromium.cras.Control')
212
213
214def get_cras_nodes():
215    """Gets nodes information from Cras.
216
217    @returns: A dict containing information of each node.
218
219    """
220    return get_cras_control_interface().GetNodes()
221
222
223def get_selected_nodes():
224    """Gets selected output nodes and input nodes.
225
226    @returns: A tuple (output_nodes, input_nodes) where each
227              field is a list of selected node IDs returned from Cras DBus API.
228              Note that there may be multiple output/input nodes being selected
229              at the same time.
230
231    """
232    output_nodes = []
233    input_nodes = []
234    nodes = get_cras_nodes()
235    for node in nodes:
236        if node['Active']:
237            if node['IsInput']:
238                input_nodes.append(node['Id'])
239            else:
240                output_nodes.append(node['Id'])
241    return (output_nodes, input_nodes)
242
243
244def set_selected_output_node_volume(volume):
245    """Sets the selected output node volume.
246
247    @param volume: the volume to be set (0-100).
248
249    """
250    selected_output_node_ids, _ = get_selected_nodes()
251    for node_id in selected_output_node_ids:
252        set_node_volume(node_id, volume)
253
254
255def get_active_stream_count():
256    """Gets the number of active streams.
257
258    @returns: The number of active streams.
259
260    """
261    return int(get_cras_control_interface().GetNumberOfActiveStreams())
262
263
264def set_system_mute(is_mute):
265    """Sets the system mute switch.
266
267    @param is_mute: Set True to mute the system playback.
268
269    """
270    get_cras_control_interface().SetOutputMute(is_mute)
271
272
273def set_capture_mute(is_mute):
274    """Sets the capture mute switch.
275
276    @param is_mute: Set True to mute the capture.
277
278    """
279    get_cras_control_interface().SetInputMute(is_mute)
280
281
282def node_type_is_plugged(node_type, nodes_info):
283    """Determine if there is any node of node_type plugged.
284
285    This method is used in the AudioLoopbackDongleLabel class, where the
286    call is executed on autotest server. Use get_cras_nodes instead if
287    the call can be executed on Cros device.
288
289    Since Cras only reports the plugged node in GetNodes, we can
290    parse the return value to see if there is any node with the given type.
291    For example, if INTERNAL_MIC is of intereset, the pattern we are
292    looking for is:
293
294    dict entry(
295       string "Type"
296       variant             string "INTERNAL_MIC"
297    )
298
299    @param node_type: A str representing node type defined in CRAS_NODE_TYPES.
300    @param nodes_info: A str containing output of command get_nodes_cmd.
301
302    @returns: True if there is any node of node_type plugged. False otherwise.
303
304    """
305    match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type,
306                      nodes_info)
307    return True if match else False
308
309
310# Cras node types reported from Cras DBus control API.
311CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB',
312                          'BLUETOOTH', 'LINEOUT', 'UNKNOWN']
313CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH',
314                         'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN',
315                         'KEYBOARD_MIC', 'HOTWORD', 'FRONT_MIC', 'REAR_MIC',
316                         'ECHO_REFERENCE']
317CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES
318
319
320def get_filtered_node_types(callback):
321    """Returns the pair of filtered output node types and input node types.
322
323    @param callback: A callback function which takes a node as input parameter
324                     and filter the node based on its return value.
325
326    @returns: A tuple (output_node_types, input_node_types) where each
327              field is a list of node types defined in CRAS_NODE_TYPES,
328              and their 'attribute_name' is True.
329
330    """
331    output_node_types = []
332    input_node_types = []
333    nodes = get_cras_nodes()
334    for node in nodes:
335        if callback(node):
336            node_type = str(node['Type'])
337            if node_type not in CRAS_NODE_TYPES:
338                logging.warning('node type %s is not in known CRAS_NODE_TYPES',
339                                node_type)
340            if node['IsInput']:
341                input_node_types.append(node_type)
342            else:
343                output_node_types.append(node_type)
344    return (output_node_types, input_node_types)
345
346
347def get_selected_node_types():
348    """Returns the pair of active output node types and input node types.
349
350    @returns: A tuple (output_node_types, input_node_types) where each
351              field is a list of selected node types defined in CRAS_NODE_TYPES.
352
353    """
354    def is_selected(node):
355        """Checks if a node is selected.
356
357        A node is selected if its Active attribute is True.
358
359        @returns: True is a node is selected, False otherwise.
360
361        """
362        return node['Active']
363
364    return get_filtered_node_types(is_selected)
365
366
367def get_selected_input_device_name():
368    """Returns the device name of the active input node.
369
370    @returns: device name string. E.g. kbl_r5514_5663_max: :0,1
371    """
372    nodes = get_cras_nodes()
373    for node in nodes:
374        if node['Active'] and node['IsInput']:
375            return node['DeviceName']
376    return None
377
378
379def get_selected_input_device_type():
380    """Returns the device type of the active input node.
381
382    @returns: device type string. E.g. INTERNAL_MICROPHONE
383    """
384    nodes = get_cras_nodes()
385    for node in nodes:
386        if node['Active'] and node['IsInput']:
387            return node['Type']
388    return None
389
390
391def get_selected_output_device_name():
392    """Returns the device name of the active output node.
393
394    @returns: device name string. E.g. mtk-rt5650: :0,0
395    """
396    nodes = get_cras_nodes()
397    for node in nodes:
398        if node['Active'] and not node['IsInput']:
399            return node['DeviceName']
400    return None
401
402
403def get_selected_output_device_type():
404    """Returns the device type of the active output node.
405
406    @returns: device type string. E.g. INTERNAL_SPEAKER
407    """
408    nodes = get_cras_nodes()
409    for node in nodes:
410        if node['Active'] and not node['IsInput']:
411            return node['Type']
412    return None
413
414
415def get_plugged_node_types():
416    """Returns the pair of plugged output node types and input node types.
417
418    @returns: A tuple (output_node_types, input_node_types) where each
419              field is a list of plugged node types defined in CRAS_NODE_TYPES.
420
421    """
422    def is_plugged(node):
423        """Checks if a node is plugged and is not unknown node.
424
425        Cras DBus API only reports plugged node, so every node reported by Cras
426        DBus API is plugged. However, we filter out UNKNOWN node here because
427        the existence of unknown node depends on the number of redundant
428        playback/record audio device created on audio card. Also, the user of
429        Cras will ignore unknown nodes.
430
431        @returns: True if a node is plugged and is not an UNKNOWN node.
432
433        """
434        return node['Type'] != 'UNKNOWN'
435
436    return get_filtered_node_types(is_plugged)
437
438
439def set_selected_node_types(output_node_types, input_node_types):
440    """Sets selected node types.
441
442    @param output_node_types: A list of output node types. None to skip setting.
443    @param input_node_types: A list of input node types. None to skip setting.
444
445    """
446    if output_node_types is not None and len(output_node_types) == 1:
447        set_single_selected_output_node(output_node_types[0])
448    elif output_node_types:
449        set_selected_output_nodes(output_node_types)
450    if input_node_types is not None and len(input_node_types) == 1:
451        set_single_selected_input_node(input_node_types[0])
452    elif input_node_types:
453        set_selected_input_nodes(input_node_types)
454
455
456def set_single_selected_output_node(node_type):
457    """Sets one selected output node.
458
459    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
460    to select one output node.
461
462    @param node_type: A node type.
463
464    """
465    nodes = get_cras_nodes()
466    for node in nodes:
467        if node['IsInput']:
468            continue
469        if node['Type'] == node_type:
470            set_active_output_node(node['Id'])
471
472
473def set_single_selected_input_node(node_type):
474    """Sets one selected input node.
475
476    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
477    to select one input node.
478
479    @param node_type: A node type.
480
481    """
482    nodes = get_cras_nodes()
483    for node in nodes:
484        if not node['IsInput']:
485            continue
486        if node['Type'] == node_type:
487            set_active_input_node(node['Id'])
488
489
490def set_selected_output_nodes(types):
491    """Sets selected output node types.
492
493    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
494    to select one output node. Here we use add/remove active output node
495    to support multiple nodes.
496
497    @param types: A list of output node types.
498
499    """
500    nodes = get_cras_nodes()
501    for node in nodes:
502        if node['IsInput']:
503            continue
504        if node['Type'] in types:
505            add_active_output_node(node['Id'])
506        elif node['Active']:
507            remove_active_output_node(node['Id'])
508
509
510def set_selected_input_nodes(types):
511    """Sets selected input node types.
512
513    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
514    to select one input node. Here we use add/remove active input node
515    to support multiple nodes.
516
517    @param types: A list of input node types.
518
519    """
520    nodes = get_cras_nodes()
521    for node in nodes:
522        if not node['IsInput']:
523            continue
524        if node['Type'] in types:
525            add_active_input_node(node['Id'])
526        elif node['Active']:
527            remove_active_input_node(node['Id'])
528
529
530def set_active_input_node(node_id):
531    """Sets one active input node.
532
533    @param node_id: node id.
534
535    """
536    get_cras_control_interface().SetActiveInputNode(node_id)
537
538
539def set_active_output_node(node_id):
540    """Sets one active output node.
541
542    @param node_id: node id.
543
544    """
545    get_cras_control_interface().SetActiveOutputNode(node_id)
546
547
548def add_active_output_node(node_id):
549    """Adds an active output node.
550
551    @param node_id: node id.
552
553    """
554    get_cras_control_interface().AddActiveOutputNode(node_id)
555
556
557def add_active_input_node(node_id):
558    """Adds an active input node.
559
560    @param node_id: node id.
561
562    """
563    get_cras_control_interface().AddActiveInputNode(node_id)
564
565
566def remove_active_output_node(node_id):
567    """Removes an active output node.
568
569    @param node_id: node id.
570
571    """
572    get_cras_control_interface().RemoveActiveOutputNode(node_id)
573
574
575def remove_active_input_node(node_id):
576    """Removes an active input node.
577
578    @param node_id: node id.
579
580    """
581    get_cras_control_interface().RemoveActiveInputNode(node_id)
582
583
584def get_node_id_from_node_type(node_type, is_input):
585    """Gets node id from node type.
586
587    @param types: A node type defined in CRAS_NODE_TYPES.
588    @param is_input: True if the node is input. False otherwise.
589
590    @returns: A string for node id.
591
592    @raises: CrasUtilsError: if unique node id can not be found.
593
594    """
595    nodes = get_cras_nodes()
596    find_ids = []
597    for node in nodes:
598        if node['Type'] == node_type and node['IsInput'] == is_input:
599            find_ids.append(node['Id'])
600    if len(find_ids) != 1:
601        raise CrasUtilsError(
602                'Can not find unique node id from node type %s' % node_type)
603    return find_ids[0]
604
605
606def get_device_id_of(node_id):
607    """Gets the device id of the node id.
608
609    The conversion logic is replicated from the CRAS's type definition at
610    third_party/adhd/cras/src/common/cras_types.h.
611
612    @param node_id: A string for node id.
613
614    @returns: A string for device id.
615
616    @raise: CrasUtilsError: if device id is invalid.
617    """
618    device_id = str(long(node_id) >> 32)
619    if device_id == "0":
620        raise CrasUtilsError('Got invalid device_id: 0')
621    return device_id
622
623
624def get_device_id_from_node_type(node_type, is_input):
625    """Gets device id from node type.
626
627    @param types: A node type defined in CRAS_NODE_TYPES.
628    @param is_input: True if the node is input. False otherwise.
629
630    @returns: A string for device id.
631
632    """
633    node_id = get_node_id_from_node_type(node_type, is_input)
634    return get_device_id_of(node_id)
635
636
637def get_active_node_volume():
638    """Returns volume from active node.
639
640    @returns: int for volume
641
642    @raises: CrasUtilsError: if node volume cannot be found.
643    """
644    nodes = get_cras_nodes()
645    for node in nodes:
646        if node['Active'] == 1 and node['IsInput'] == 0:
647            return int(node['NodeVolume'])
648    raise CrasUtilsError('Cannot find active node volume from nodes.')
649