1#!/usr/bin/python3 2 3__author__ = "kerl@google.com, gwendal@google.com (Gwendal Grignou)" 4 5import io 6import six 7import unittest 8from unittest import mock 9 10from autotest_lib.client.bin import utils 11 12_IOSTAT_OUTPUT = ( 13 'Linux 3.8.11 (localhost) 02/19/19 _x86_64_ (4 CPU)\n' 14 '\n' 15 'Device tps kB_read/s kB_wrtn/s kB_read kB_wrtn\n' 16 'ALL 4.45 10.33 292.40 665582 188458\n' 17 '\n') 18 19class TestUtils(unittest.TestCase): 20 """Test utils functions.""" 21 22 # Test methods, disable missing-docstring 23 # pylint: disable=missing-docstring 24 def setUp(self): 25 utils._open_file = self.fake_open 26 # Files opened with utils._open_file will contain this string. 27 self.fake_file_text = '' 28 29 def fake_open(self, path): 30 # Use BytesIO instead of StringIO to support with statements. 31 if six.PY2: 32 return io.BytesIO(bytes(self.fake_file_text)) 33 else: 34 return io.StringIO(self.fake_file_text) 35 36 def test_concat_partition(self): 37 self.assertEquals("nvme0n1p3", utils.concat_partition("nvme0n1", 3)) 38 self.assertEquals("mmcblk1p3", utils.concat_partition("mmcblk1", 3)) 39 self.assertEquals("sda3", utils.concat_partition("sda", 3)) 40 41 # The columns in /proc/stat are: 42 # user nice system idle iowait irq softirq steal guest guest_nice 43 # 44 # Although older kernel versions might not contain all of them. 45 # Unit is 1/100ths of a second. 46 def test_get_cpu_usage(self): 47 self.fake_file_text = 'cpu 254544 9 254768 2859878 1 2 3 4 5 6\n' 48 usage = utils.get_cpu_usage() 49 self.assertEquals({ 50 'user': 254544, 51 'nice': 9, 52 'system': 254768, 53 'idle': 2859878, 54 'iowait': 1, 55 'irq': 2, 56 'softirq': 3, 57 'steal': 4, 58 'guest': 5, 59 'guest_nice': 6 60 }, usage) 61 62 def test_get_cpu_missing_columns(self): 63 self.fake_file_text = 'cpu 254544 9 254768 2859878\n' 64 usage = utils.get_cpu_usage() 65 self.assertEquals({ 66 'user': 254544, 67 'nice': 9, 68 'system': 254768, 69 'idle': 2859878, 70 'iowait': 0, 71 'irq': 0, 72 'softirq': 0, 73 'steal': 0, 74 'guest': 0, 75 'guest_nice': 0 76 }, usage) 77 78 def test_compute_active_cpu_time(self): 79 start_usage = { 80 'user': 900, 81 'nice': 10, 82 'system': 90, 83 'idle': 10000, 84 'iowait': 500, 85 'irq': 100, 86 'softirq': 50, 87 'steal': 150, 88 'guest': 170, 89 'guest_nice': 30 90 } 91 end_usage = { 92 'user': 1800, 93 'nice': 20, 94 'system': 180, 95 'idle': 13000, 96 'iowait': 2000, 97 'irq': 200, 98 'softirq': 100, 99 'steal': 300, 100 'guest': 340, 101 'guest_nice': 60 102 } 103 usage = utils.compute_active_cpu_time(start_usage, end_usage) 104 self.assertAlmostEqual(usage, 0.25) 105 106 def test_compute_active_cpu_time_idle(self): 107 start_usage = { 108 'user': 900, 109 'nice': 10, 110 'system': 90, 111 'idle': 10000, 112 'iowait': 500, 113 'irq': 100, 114 'softirq': 50, 115 'steal': 150, 116 'guest': 170, 117 'guest_nice':30 118 } 119 end_usage = { 120 'user': 900, 121 'nice': 10, 122 'system': 90, 123 'idle': 11000, 124 'iowait': 1000, 125 'irq': 100, 126 'softirq': 50, 127 'steal': 150, 128 'guest': 170, 129 'guest_nice':30 130 } 131 usage = utils.compute_active_cpu_time(start_usage, end_usage) 132 self.assertAlmostEqual(usage, 0) 133 134 def test_get_mem_total(self): 135 self.fake_file_text = ('MemTotal: 2048000 kB\n' 136 'MemFree: 307200 kB\n' 137 'Buffers: 102400 kB\n' 138 'Cached: 204800 kB\n') 139 self.assertAlmostEqual(utils.get_mem_total(), 2000) 140 141 def test_get_mem_free(self): 142 self.fake_file_text = ('MemTotal: 2048000 kB\n' 143 'MemFree: 307200 kB\n' 144 'Buffers: 102400 kB\n' 145 'Cached: 204800 kB\n') 146 self.assertAlmostEqual(utils.get_mem_free(), 300) 147 148 def test_get_mem_free_plus_buffers_and_cached(self): 149 self.fake_file_text = ('MemTotal: 2048000 kB\n' 150 'MemFree: 307200 kB\n' 151 'Buffers: 102400 kB\n' 152 'Cached: 204800 kB\n') 153 self.assertAlmostEqual(utils.get_mem_free_plus_buffers_and_cached(), 154 600) 155 156 def test_get_meminfo(self): 157 self.fake_file_text = ('MemTotal: 2048000 kB\n' 158 'MemFree: 307200 kB\n' 159 'Buffers: 102400 kB\n' 160 'Cached: 204800 kB\n' 161 'Active(anon): 409600 kB') 162 meminfo = utils.get_meminfo() 163 self.assertEqual(meminfo.MemTotal, 2048000) 164 self.assertEqual(meminfo.Active_anon, 409600) 165 166 def test_get_num_allocated_file_handles(self): 167 self.fake_file_text = '123 0 456\n' 168 self.assertEqual(utils.get_num_allocated_file_handles(), 123) 169 170 @mock.patch('autotest_lib.client.common_lib.utils.system_output') 171 def test_get_storage_statistics(self, system_output_mock): 172 system_output_mock.return_value = _IOSTAT_OUTPUT 173 statistics = utils.get_storage_statistics() 174 self.assertEqual({ 175 'read_kb': 665582.0, 176 'written_kb_per_s': 292.4, 177 'read_kb_per_s': 10.33, 178 'transfers_per_s': 4.45, 179 'written_kb': 188458.0, 180 }, statistics) 181 182 def test_base64_recursive_encode(self): 183 obj = { 184 'a': 10, 185 'b': 'hello', 186 'c': [100, 200, bytearray(b'\xf0\xf1\xf2\xf3\xf4')], 187 'd': { 188 784: bytearray(b'@\x14\x01P'), 189 78.0: bytearray(b'\x10\x05\x0b\x10\xb2\x1b\x00') 190 } 191 } 192 if utils.is_python2(): 193 expected_encoded_obj = { 194 'YQ==': 10, 195 'Yg==': 'aGVsbG8=', 196 'Yw==': [100, 200, '8PHy8/Q='], 197 'ZA==': { 198 784: 'QBQBUA==', 199 78.0: 'EAULELIbAA==' 200 } 201 } 202 else: 203 expected_encoded_obj = { 204 'a': 10, 205 'b': 'hello', 206 'c': [100, 200, b'8PHy8/Q='], 207 'd': { 208 784: b'QBQBUA==', 209 78.0: b'EAULELIbAA==' 210 } 211 } 212 213 encoded_obj = utils.base64_recursive_encode(obj) 214 self.assertEqual(expected_encoded_obj, encoded_obj) 215 216 def test_base64_recursive_decode(self): 217 if utils.is_python2(): 218 encoded_obj = { 219 'YQ==': 10, 220 'Yg==': 'aGVsbG8=', 221 'Yw==': [100, 200, '8PHy8/Q='], 222 'ZA==': { 223 784: 'QBQBUA==', 224 78.0: 'EAULELIbAA==' 225 } 226 } 227 else: 228 encoded_obj = { 229 'a': 10, 230 'b': 'hello', 231 'c': [100, 200, b'8PHy8/Q='], 232 'd': { 233 784: b'QBQBUA==', 234 78.0: b'EAULELIbAA==' 235 } 236 } 237 238 expected_decoded_obj = { 239 'a': 10, 240 'b': 'hello', 241 'c': [100, 200, b'\xf0\xf1\xf2\xf3\xf4'], 242 'd': { 243 784: b'@\x14\x01P', 244 78.0: b'\x10\x05\x0b\x10\xb2\x1b\x00' 245 } 246 } 247 248 decoded_obj = utils.base64_recursive_decode(encoded_obj) 249 self.assertEqual(expected_decoded_obj, decoded_obj) 250 251 def test_bytes_to_str_recursive(self): 252 obj = { 253 'a': 10, 254 'b': 'hello', 255 'c': b'b_hello', 256 'd': [100, 200, bytearray(b'\xf0\xf1\xf2\xf3\xf4')], 257 'e': { 258 784: bytearray(b'@\x14\x01P'), 259 78.0: bytearray(b'\x10\x05\x0b\x10\xb2\x1b\x00') 260 } 261 } 262 263 if utils.is_python2(): 264 self.assertEqual(b'foo', utils.bytes_to_str_recursive(b'foo')) 265 self.assertEqual(b'\x80abc', 266 utils.bytes_to_str_recursive(b'\x80abc')) 267 self.assertEqual('foo', utils.bytes_to_str_recursive('foo')) 268 self.assertEqual('\x80abc', 269 utils.bytes_to_str_recursive('\x80abc')) 270 self.assertEqual(obj, utils.bytes_to_str_recursive(obj)) 271 else: 272 self.assertEqual('foo', utils.bytes_to_str_recursive(b'foo')) 273 # self.assertEqual('\ufffdabc', utils.bytes_to_str_recursive(b'\x80abc')) 274 self.assertEqual('foo', utils.bytes_to_str_recursive('foo')) 275 self.assertEqual('\x80abc', 276 utils.bytes_to_str_recursive('\x80abc')) 277 expected_obj = { 278 'a': 10, 279 'b': 'hello', 280 'c': 'b_hello', 281 # u prefix: Python 2 interpreter friendly. 282 'd': [100, 200, u'\u0440\u0441\u0442\u0443\u0444'], 283 'e': { 284 784: '@\x14\x01P', 285 78.0: u'\x10\x05\x0b\x10\u00b2\x1b\x00' 286 } 287 } 288 self.assertEqual(expected_obj, utils.bytes_to_str_recursive(obj)) 289