• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2013 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Tests for the cmd_helper module."""
7
8import unittest
9import subprocess
10import time
11
12from devil import devil_env
13from devil.utils import cmd_helper
14
15with devil_env.SysPath(devil_env.PYMOCK_PATH):
16  import mock  # pylint: disable=import-error
17
18
19class CmdHelperSingleQuoteTest(unittest.TestCase):
20
21  def testSingleQuote_basic(self):
22    self.assertEquals('hello',
23                      cmd_helper.SingleQuote('hello'))
24
25  def testSingleQuote_withSpaces(self):
26    self.assertEquals("'hello world'",
27                      cmd_helper.SingleQuote('hello world'))
28
29  def testSingleQuote_withUnsafeChars(self):
30    self.assertEquals("""'hello'"'"'; rm -rf /'""",
31                      cmd_helper.SingleQuote("hello'; rm -rf /"))
32
33  def testSingleQuote_dontExpand(self):
34    test_string = 'hello $TEST_VAR'
35    cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string)
36    self.assertEquals(test_string,
37                      cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
38
39
40class CmdHelperDoubleQuoteTest(unittest.TestCase):
41
42  def testDoubleQuote_basic(self):
43    self.assertEquals('hello',
44                      cmd_helper.DoubleQuote('hello'))
45
46  def testDoubleQuote_withSpaces(self):
47    self.assertEquals('"hello world"',
48                      cmd_helper.DoubleQuote('hello world'))
49
50  def testDoubleQuote_withUnsafeChars(self):
51    self.assertEquals('''"hello\\"; rm -rf /"''',
52                      cmd_helper.DoubleQuote('hello"; rm -rf /'))
53
54  def testSingleQuote_doExpand(self):
55    test_string = 'hello $TEST_VAR'
56    cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string)
57    self.assertEquals('hello world',
58                      cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
59
60
61class CmdHelperShinkToSnippetTest(unittest.TestCase):
62
63  def testShrinkToSnippet_noArgs(self):
64    self.assertEquals('foo',
65        cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar'))
66    self.assertEquals("'foo foo'",
67        cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar'))
68    self.assertEquals('"$a"\' bar\'',
69        cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo'))
70    self.assertEquals('\'foo \'"$a"',
71        cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar'))
72    self.assertEquals('foo"$a"',
73        cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar'))
74
75  def testShrinkToSnippet_singleArg(self):
76    self.assertEquals("foo ''",
77        cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar'))
78    self.assertEquals("foo foo",
79        cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar'))
80    self.assertEquals('"$a" "$a"',
81        cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo'))
82    self.assertEquals('foo "$a""$a"',
83        cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar'))
84    self.assertEquals('foo "$a"\' \'"$a"',
85        cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar'))
86    self.assertEquals('foo "$a""$a"\' \'',
87        cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar'))
88    self.assertEquals('foo \' \'"$a""$a"\' \'',
89        cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
90
91
92_DEFAULT = 'DEFAULT'
93
94
95class _ProcessOutputEvent(object):
96
97  def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT):
98    self.select_fds = select_fds
99    self.read_contents = read_contents
100    self.ts = ts
101
102
103class _MockProcess(object):
104
105  def __init__(self, output_sequence=None, return_value=0):
106
107    # Arbitrary.
108    fake_stdout_fileno = 25
109
110    self.mock_proc = mock.MagicMock(spec=subprocess.Popen)
111    self.mock_proc.stdout = mock.MagicMock()
112    self.mock_proc.stdout.fileno = mock.MagicMock(
113        return_value=fake_stdout_fileno)
114    self.mock_proc.returncode = None
115
116    self._return_value = return_value
117
118    # This links the behavior of os.read, select.select, time.time, and
119    # <process>.poll. The output sequence can be thought of as a list of
120    # return values for select.select with corresponding return values for
121    # the other calls at any time between that select call and the following
122    # one. We iterate through the sequence only on calls to select.select.
123    #
124    # os.read is a special case, though, where we only return a given chunk
125    # of data *once* after a given call to select.
126
127    if not output_sequence:
128      output_sequence = []
129
130    # Use an leading element to make the iteration logic work.
131    initial_seq_element = _ProcessOutputEvent(
132        _DEFAULT, '',
133        output_sequence[0].ts if output_sequence else _DEFAULT)
134    output_sequence.insert(0, initial_seq_element)
135
136    for o in output_sequence:
137      if o.select_fds == _DEFAULT:
138        if o.read_contents is None:
139          o.select_fds = []
140        else:
141          o.select_fds = [fake_stdout_fileno]
142      if o.ts == _DEFAULT:
143        o.ts = time.time()
144    self._output_sequence = output_sequence
145
146    self._output_seq_index = 0
147    self._read_flags = [False] * len(output_sequence)
148
149    def read_side_effect(*_args, **_kwargs):
150      if self._read_flags[self._output_seq_index]:
151        return None
152      self._read_flags[self._output_seq_index] = True
153      return self._output_sequence[self._output_seq_index].read_contents
154
155    def select_side_effect(*_args, **_kwargs):
156      if self._output_seq_index is None:
157        self._output_seq_index = 0
158      else:
159        self._output_seq_index += 1
160      return (self._output_sequence[self._output_seq_index].select_fds,
161              None, None)
162
163    def time_side_effect(*_args, **_kwargs):
164      return self._output_sequence[self._output_seq_index].ts
165
166    def poll_side_effect(*_args, **_kwargs):
167      if self._output_seq_index >= len(self._output_sequence) - 1:
168        self.mock_proc.returncode = self._return_value
169      return self.mock_proc.returncode
170
171    mock_read = mock.MagicMock(side_effect=read_side_effect)
172    mock_select = mock.MagicMock(side_effect=select_side_effect)
173    mock_time = mock.MagicMock(side_effect=time_side_effect)
174    self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect)
175
176    # Set up but *do not start* the mocks.
177    self._mocks = [
178      mock.patch('fcntl.fcntl'),
179      mock.patch('os.read', new=mock_read),
180      mock.patch('select.select', new=mock_select),
181      mock.patch('time.time', new=mock_time),
182    ]
183
184  def __enter__(self):
185    for m in self._mocks:
186      m.__enter__()
187    return self.mock_proc
188
189  def __exit__(self, exc_type, exc_val, exc_tb):
190    for m in reversed(self._mocks):
191      m.__exit__(exc_type, exc_val, exc_tb)
192
193
194class CmdHelperIterCmdOutputLinesTest(unittest.TestCase):
195  """Test IterCmdOutputLines with some calls to the unix 'seq' command."""
196
197  # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
198  # can mock the process.
199  # pylint: disable=protected-access
200
201  _SIMPLE_OUTPUT_SEQUENCE = [
202    _ProcessOutputEvent(read_contents='1\n2\n'),
203  ]
204
205  def testIterCmdOutputLines_success(self):
206    with _MockProcess(
207        output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc:
208      for num, line in enumerate(
209          cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
210        self.assertEquals(num, int(line))
211
212  def testIterCmdOutputLines_exitStatusFail(self):
213    with self.assertRaises(subprocess.CalledProcessError):
214      with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
215                        return_value=1) as mock_proc:
216        for num, line in enumerate(
217            cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
218          self.assertEquals(num, int(line))
219        # after reading all the output we get an exit status of 1
220
221  def testIterCmdOutputLines_exitStatusIgnored(self):
222    with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
223                      return_value=1) as mock_proc:
224      for num, line in enumerate(
225          cmd_helper._IterCmdOutputLines(
226              mock_proc, 'mock_proc', check_status=False),
227          1):
228        self.assertEquals(num, int(line))
229
230  def testIterCmdOutputLines_exitStatusSkipped(self):
231    with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
232                      return_value=1) as mock_proc:
233      for num, line in enumerate(
234          cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
235        self.assertEquals(num, int(line))
236        # no exception will be raised because we don't attempt to read past
237        # the end of the output and, thus, the status never gets checked
238        if num == 2:
239          break
240
241  def testIterCmdOutputLines_delay(self):
242    output_sequence = [
243      _ProcessOutputEvent(read_contents='1\n2\n', ts=1),
244      _ProcessOutputEvent(read_contents=None, ts=2),
245      _ProcessOutputEvent(read_contents='Awake', ts=10),
246    ]
247    with _MockProcess(output_sequence=output_sequence) as mock_proc:
248      for num, line in enumerate(
249          cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc',
250                                         iter_timeout=5), 1):
251        if num <= 2:
252          self.assertEquals(num, int(line))
253        elif num == 3:
254          self.assertEquals(None, line)
255        elif num == 4:
256          self.assertEquals('Awake', line)
257        else:
258          self.fail()
259
260
261if __name__ == '__main__':
262  unittest.main()
263