1#!/usr/bin/env python3 2# Copyright 2019 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Unittests for the utils module.""" 17 18import datetime 19import os 20from pathlib import Path 21import sys 22import unittest 23 24_path = os.path.realpath(__file__ + '/../..') 25if sys.path[0] != _path: 26 sys.path.insert(0, _path) 27del _path 28 29# We have to import our local modules after the sys.path tweak. We can't use 30# relative imports because this is an executable program, not a module. 31# pylint: disable=wrong-import-position 32import rh 33import rh.utils 34 35 36class TimeDeltaStrTests(unittest.TestCase): 37 """Verify behavior of timedelta_str object.""" 38 39 def test_same(self): 40 """Check timedelta of 0 seconds.""" 41 delta = datetime.timedelta(0) 42 self.assertEqual('0.000s', rh.utils.timedelta_str(delta)) 43 44 def test_millisecondss(self): 45 """Check timedelta of milliseconds.""" 46 delta = datetime.timedelta(seconds=0.123456) 47 self.assertEqual('0.123s', rh.utils.timedelta_str(delta)) 48 49 def test_seconds(self): 50 """Check timedelta of seconds.""" 51 delta = datetime.timedelta(seconds=12.3) 52 self.assertEqual('12.300s', rh.utils.timedelta_str(delta)) 53 54 def test_minutes(self): 55 """Check timedelta of minutes.""" 56 delta = datetime.timedelta(seconds=72.3) 57 self.assertEqual('1m12.300s', rh.utils.timedelta_str(delta)) 58 59 def test_hours(self): 60 """Check timedelta of hours.""" 61 delta = datetime.timedelta(seconds=4000.3) 62 self.assertEqual('1h6m40.300s', rh.utils.timedelta_str(delta)) 63 64 65class CompletedProcessTests(unittest.TestCase): 66 """Verify behavior of CompletedProcess object.""" 67 68 def test_empty_cmdstr(self): 69 """Check cmdstr with an empty command.""" 70 result = rh.utils.CompletedProcess(args=[]) 71 self.assertEqual('', result.cmdstr) 72 73 def test_basic_cmdstr(self): 74 """Check cmdstr with a basic command command.""" 75 result = rh.utils.CompletedProcess(args=['ls', 'a b']) 76 self.assertEqual("ls 'a b'", result.cmdstr) 77 78 def test_str(self): 79 """Check str() handling.""" 80 # We don't enforce much, just that it doesn't crash. 81 result = rh.utils.CompletedProcess() 82 self.assertNotEqual('', str(result)) 83 result = rh.utils.CompletedProcess(args=[]) 84 self.assertNotEqual('', str(result)) 85 86 def test_repr(self): 87 """Check repr() handling.""" 88 # We don't enforce much, just that it doesn't crash. 89 result = rh.utils.CompletedProcess() 90 self.assertNotEqual('', repr(result)) 91 result = rh.utils.CompletedProcess(args=[]) 92 self.assertNotEqual('', repr(result)) 93 94 95class CalledProcessErrorTests(unittest.TestCase): 96 """Verify behavior of CalledProcessError object.""" 97 98 def test_basic(self): 99 """Basic test we can create a normal instance.""" 100 rh.utils.CalledProcessError(0, ['mycmd']) 101 102 def test_stringify(self): 103 """Check stringify() handling.""" 104 # We don't assert much so we leave flexibility in changing format. 105 err = rh.utils.CalledProcessError(0, ['mycmd']) 106 self.assertIn('mycmd', err.stringify()) 107 108 def test_str(self): 109 """Check str() handling.""" 110 # We don't assert much so we leave flexibility in changing format. 111 err = rh.utils.CalledProcessError(0, ['mycmd']) 112 self.assertIn('mycmd', str(err)) 113 114 def test_repr(self): 115 """Check repr() handling.""" 116 # We don't assert much so we leave flexibility in changing format. 117 err = rh.utils.CalledProcessError(0, ['mycmd']) 118 self.assertNotEqual('', repr(err)) 119 120 def test_output(self): 121 """Make sure .output is removed and .stdout works.""" 122 e = rh.utils.CalledProcessError( 123 0, ['true'], stdout='STDOUT', stderr='STDERR') 124 with self.assertRaises(AttributeError): 125 assert e.output is None 126 assert e.stdout == 'STDOUT' 127 assert e.stderr == 'STDERR' 128 129 e.stdout = 'STDout' 130 e.stderr = 'STDerr' 131 with self.assertRaises(AttributeError): 132 assert e.output is None 133 assert e.stdout == 'STDout' 134 assert e.stderr == 'STDerr' 135 136 137class RunCommandTests(unittest.TestCase): 138 """Verify behavior of run helper.""" 139 140 def test_basic(self): 141 """Simple basic test.""" 142 ret = rh.utils.run(['true']) 143 self.assertEqual('true', ret.cmdstr) 144 self.assertIsNone(ret.stdout) 145 self.assertIsNone(ret.stderr) 146 147 def test_stdout_capture(self): 148 """Verify output capturing works.""" 149 ret = rh.utils.run(['echo', 'hi'], redirect_stdout=True) 150 self.assertEqual('hi\n', ret.stdout) 151 self.assertIsNone(ret.stderr) 152 153 def test_stderr_capture(self): 154 """Verify stderr capturing works.""" 155 ret = rh.utils.run(['sh', '-c', 'echo hi >&2'], redirect_stderr=True) 156 self.assertIsNone(ret.stdout) 157 self.assertEqual('hi\n', ret.stderr) 158 159 def test_stdout_utf8(self): 160 """Verify reading UTF-8 data works.""" 161 ret = rh.utils.run(['printf', r'\xc3\x9f'], redirect_stdout=True) 162 self.assertEqual('ß', ret.stdout) 163 self.assertIsNone(ret.stderr) 164 165 def test_stdin_utf8(self): 166 """Verify writing UTF-8 data works.""" 167 ret = rh.utils.run(['cat'], redirect_stdout=True, input='ß') 168 self.assertEqual('ß', ret.stdout) 169 self.assertIsNone(ret.stderr) 170 171 def test_check_false(self): 172 """Verify handling of check=False.""" 173 ret = rh.utils.run(['false'], check=False) 174 self.assertNotEqual(0, ret.returncode) 175 self.assertIn('false', str(ret)) 176 177 ret = rh.utils.run(['true'], check=False) 178 self.assertEqual(0, ret.returncode) 179 self.assertIn('true', str(ret)) 180 181 def test_check_true(self): 182 """Verify handling of check=True.""" 183 with self.assertRaises(rh.utils.CalledProcessError) as e: 184 rh.utils.run(['false'], check=True) 185 err = e.exception 186 self.assertNotEqual(0, err.returncode) 187 self.assertIn('false', str(err)) 188 189 ret = rh.utils.run(['true'], check=True) 190 self.assertEqual(0, ret.returncode) 191 self.assertIn('true', str(ret)) 192 193 def test_check_false_output(self): 194 """Verify handling of output capturing w/check=False.""" 195 with self.assertRaises(rh.utils.CalledProcessError) as e: 196 rh.utils.run(['sh', '-c', 'echo out; echo err >&2; false'], 197 check=True, capture_output=True) 198 err = e.exception 199 self.assertNotEqual(0, err.returncode) 200 self.assertIn('false', str(err)) 201 202 def test_check_true_missing_prog_output(self): 203 """Verify handling of output capturing w/missing progs.""" 204 with self.assertRaises(rh.utils.CalledProcessError) as e: 205 rh.utils.run(['./!~a/b/c/d/'], check=True, capture_output=True) 206 err = e.exception 207 self.assertNotEqual(0, err.returncode) 208 self.assertIn('a/b/c/d', str(err)) 209 210 def test_check_false_missing_prog_output(self): 211 """Verify handling of output capturing w/missing progs.""" 212 ret = rh.utils.run(['./!~a/b/c/d/'], check=False, capture_output=True) 213 self.assertNotEqual(0, ret.returncode) 214 self.assertIn('a/b/c/d', str(ret)) 215 216 def test_check_false_missing_prog_combined_output(self): 217 """Verify handling of combined output capturing w/missing progs.""" 218 with self.assertRaises(rh.utils.CalledProcessError) as e: 219 rh.utils.run(['./!~a/b/c/d/'], check=True, 220 combine_stdout_stderr=True) 221 err = e.exception 222 self.assertNotEqual(0, err.returncode) 223 self.assertIn('a/b/c/d', str(err)) 224 225 def test_pathlib(self): 226 """Verify pathlib arguments work.""" 227 result = rh.utils.run(['true', Path('/')]) 228 # Verify stringify behavior. 229 str(result) 230 self.assertEqual(result.cmdstr, 'true /') 231 232 233if __name__ == '__main__': 234 unittest.main() 235