1#!/usr/bin/env python 2# Copyright 2013 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Tests for the cmd_helper module.""" 7 8import unittest 9import subprocess 10import time 11 12from devil import devil_env 13from devil.utils import cmd_helper 14 15with devil_env.SysPath(devil_env.PYMOCK_PATH): 16 import mock # pylint: disable=import-error 17 18 19class CmdHelperSingleQuoteTest(unittest.TestCase): 20 21 def testSingleQuote_basic(self): 22 self.assertEquals('hello', 23 cmd_helper.SingleQuote('hello')) 24 25 def testSingleQuote_withSpaces(self): 26 self.assertEquals("'hello world'", 27 cmd_helper.SingleQuote('hello world')) 28 29 def testSingleQuote_withUnsafeChars(self): 30 self.assertEquals("""'hello'"'"'; rm -rf /'""", 31 cmd_helper.SingleQuote("hello'; rm -rf /")) 32 33 def testSingleQuote_dontExpand(self): 34 test_string = 'hello $TEST_VAR' 35 cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string) 36 self.assertEquals(test_string, 37 cmd_helper.GetCmdOutput(cmd, shell=True).rstrip()) 38 39 40class CmdHelperDoubleQuoteTest(unittest.TestCase): 41 42 def testDoubleQuote_basic(self): 43 self.assertEquals('hello', 44 cmd_helper.DoubleQuote('hello')) 45 46 def testDoubleQuote_withSpaces(self): 47 self.assertEquals('"hello world"', 48 cmd_helper.DoubleQuote('hello world')) 49 50 def testDoubleQuote_withUnsafeChars(self): 51 self.assertEquals('''"hello\\"; rm -rf /"''', 52 cmd_helper.DoubleQuote('hello"; rm -rf /')) 53 54 def testSingleQuote_doExpand(self): 55 test_string = 'hello $TEST_VAR' 56 cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string) 57 self.assertEquals('hello world', 58 cmd_helper.GetCmdOutput(cmd, shell=True).rstrip()) 59 60 61class CmdHelperShinkToSnippetTest(unittest.TestCase): 62 63 def testShrinkToSnippet_noArgs(self): 64 self.assertEquals('foo', 65 cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar')) 66 self.assertEquals("'foo foo'", 67 cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar')) 68 self.assertEquals('"$a"\' bar\'', 69 cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo')) 70 self.assertEquals('\'foo \'"$a"', 71 cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar')) 72 self.assertEquals('foo"$a"', 73 cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar')) 74 75 def testShrinkToSnippet_singleArg(self): 76 self.assertEquals("foo ''", 77 cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar')) 78 self.assertEquals("foo foo", 79 cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar')) 80 self.assertEquals('"$a" "$a"', 81 cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo')) 82 self.assertEquals('foo "$a""$a"', 83 cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar')) 84 self.assertEquals('foo "$a"\' \'"$a"', 85 cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar')) 86 self.assertEquals('foo "$a""$a"\' \'', 87 cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar')) 88 self.assertEquals('foo \' \'"$a""$a"\' \'', 89 cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar')) 90 91 92_DEFAULT = 'DEFAULT' 93 94 95class _ProcessOutputEvent(object): 96 97 def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT): 98 self.select_fds = select_fds 99 self.read_contents = read_contents 100 self.ts = ts 101 102 103class _MockProcess(object): 104 105 def __init__(self, output_sequence=None, return_value=0): 106 107 # Arbitrary. 108 fake_stdout_fileno = 25 109 110 self.mock_proc = mock.MagicMock(spec=subprocess.Popen) 111 self.mock_proc.stdout = mock.MagicMock() 112 self.mock_proc.stdout.fileno = mock.MagicMock( 113 return_value=fake_stdout_fileno) 114 self.mock_proc.returncode = None 115 116 self._return_value = return_value 117 118 # This links the behavior of os.read, select.select, time.time, and 119 # <process>.poll. The output sequence can be thought of as a list of 120 # return values for select.select with corresponding return values for 121 # the other calls at any time between that select call and the following 122 # one. We iterate through the sequence only on calls to select.select. 123 # 124 # os.read is a special case, though, where we only return a given chunk 125 # of data *once* after a given call to select. 126 127 if not output_sequence: 128 output_sequence = [] 129 130 # Use an leading element to make the iteration logic work. 131 initial_seq_element = _ProcessOutputEvent( 132 _DEFAULT, '', 133 output_sequence[0].ts if output_sequence else _DEFAULT) 134 output_sequence.insert(0, initial_seq_element) 135 136 for o in output_sequence: 137 if o.select_fds == _DEFAULT: 138 if o.read_contents is None: 139 o.select_fds = [] 140 else: 141 o.select_fds = [fake_stdout_fileno] 142 if o.ts == _DEFAULT: 143 o.ts = time.time() 144 self._output_sequence = output_sequence 145 146 self._output_seq_index = 0 147 self._read_flags = [False] * len(output_sequence) 148 149 def read_side_effect(*_args, **_kwargs): 150 if self._read_flags[self._output_seq_index]: 151 return None 152 self._read_flags[self._output_seq_index] = True 153 return self._output_sequence[self._output_seq_index].read_contents 154 155 def select_side_effect(*_args, **_kwargs): 156 if self._output_seq_index is None: 157 self._output_seq_index = 0 158 else: 159 self._output_seq_index += 1 160 return (self._output_sequence[self._output_seq_index].select_fds, 161 None, None) 162 163 def time_side_effect(*_args, **_kwargs): 164 return self._output_sequence[self._output_seq_index].ts 165 166 def poll_side_effect(*_args, **_kwargs): 167 if self._output_seq_index >= len(self._output_sequence) - 1: 168 self.mock_proc.returncode = self._return_value 169 return self.mock_proc.returncode 170 171 mock_read = mock.MagicMock(side_effect=read_side_effect) 172 mock_select = mock.MagicMock(side_effect=select_side_effect) 173 mock_time = mock.MagicMock(side_effect=time_side_effect) 174 self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect) 175 176 # Set up but *do not start* the mocks. 177 self._mocks = [ 178 mock.patch('fcntl.fcntl'), 179 mock.patch('os.read', new=mock_read), 180 mock.patch('select.select', new=mock_select), 181 mock.patch('time.time', new=mock_time), 182 ] 183 184 def __enter__(self): 185 for m in self._mocks: 186 m.__enter__() 187 return self.mock_proc 188 189 def __exit__(self, exc_type, exc_val, exc_tb): 190 for m in reversed(self._mocks): 191 m.__exit__(exc_type, exc_val, exc_tb) 192 193 194class CmdHelperIterCmdOutputLinesTest(unittest.TestCase): 195 """Test IterCmdOutputLines with some calls to the unix 'seq' command.""" 196 197 # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it 198 # can mock the process. 199 # pylint: disable=protected-access 200 201 _SIMPLE_OUTPUT_SEQUENCE = [ 202 _ProcessOutputEvent(read_contents='1\n2\n'), 203 ] 204 205 def testIterCmdOutputLines_success(self): 206 with _MockProcess( 207 output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc: 208 for num, line in enumerate( 209 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 210 self.assertEquals(num, int(line)) 211 212 def testIterCmdOutputLines_exitStatusFail(self): 213 with self.assertRaises(subprocess.CalledProcessError): 214 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 215 return_value=1) as mock_proc: 216 for num, line in enumerate( 217 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 218 self.assertEquals(num, int(line)) 219 # after reading all the output we get an exit status of 1 220 221 def testIterCmdOutputLines_exitStatusIgnored(self): 222 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 223 return_value=1) as mock_proc: 224 for num, line in enumerate( 225 cmd_helper._IterCmdOutputLines( 226 mock_proc, 'mock_proc', check_status=False), 227 1): 228 self.assertEquals(num, int(line)) 229 230 def testIterCmdOutputLines_exitStatusSkipped(self): 231 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 232 return_value=1) as mock_proc: 233 for num, line in enumerate( 234 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 235 self.assertEquals(num, int(line)) 236 # no exception will be raised because we don't attempt to read past 237 # the end of the output and, thus, the status never gets checked 238 if num == 2: 239 break 240 241 def testIterCmdOutputLines_delay(self): 242 output_sequence = [ 243 _ProcessOutputEvent(read_contents='1\n2\n', ts=1), 244 _ProcessOutputEvent(read_contents=None, ts=2), 245 _ProcessOutputEvent(read_contents='Awake', ts=10), 246 ] 247 with _MockProcess(output_sequence=output_sequence) as mock_proc: 248 for num, line in enumerate( 249 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc', 250 iter_timeout=5), 1): 251 if num <= 2: 252 self.assertEquals(num, int(line)) 253 elif num == 3: 254 self.assertEquals(None, line) 255 elif num == 4: 256 self.assertEquals('Awake', line) 257 else: 258 self.fail() 259 260 261if __name__ == '__main__': 262 unittest.main() 263