• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python3
2
3__author__ = "kerl@google.com, gwendal@google.com (Gwendal Grignou)"
4
5import io
6import six
7import unittest
8from unittest import mock
9
10from autotest_lib.client.bin import utils
11
12_IOSTAT_OUTPUT = (
13    'Linux 3.8.11 (localhost)   02/19/19        _x86_64_        (4 CPU)\n'
14    '\n'
15    'Device            tps    kB_read/s    kB_wrtn/s    kB_read    kB_wrtn\n'
16    'ALL              4.45        10.33       292.40     665582     188458\n'
17    '\n')
18
19class TestUtils(unittest.TestCase):
20    """Test utils functions."""
21
22    # Test methods, disable missing-docstring
23    # pylint: disable=missing-docstring
24    def setUp(self):
25        utils._open_file = self.fake_open
26        # Files opened with utils._open_file will contain this string.
27        self.fake_file_text = ''
28
29    def fake_open(self, path):
30        # Use BytesIO instead of StringIO to support with statements.
31        if six.PY2:
32            return io.BytesIO(bytes(self.fake_file_text))
33        else:
34            return io.StringIO(self.fake_file_text)
35
36    def test_concat_partition(self):
37        self.assertEquals("nvme0n1p3", utils.concat_partition("nvme0n1", 3))
38        self.assertEquals("mmcblk1p3", utils.concat_partition("mmcblk1", 3))
39        self.assertEquals("sda3", utils.concat_partition("sda", 3))
40
41    # The columns in /proc/stat are:
42    # user nice system idle iowait irq softirq steal guest guest_nice
43    #
44    # Although older kernel versions might not contain all of them.
45    # Unit is 1/100ths of a second.
46    def test_get_cpu_usage(self):
47        self.fake_file_text = 'cpu 254544 9 254768 2859878 1 2 3 4 5 6\n'
48        usage = utils.get_cpu_usage()
49        self.assertEquals({
50            'user': 254544,
51            'nice': 9,
52            'system': 254768,
53            'idle': 2859878,
54            'iowait': 1,
55            'irq': 2,
56            'softirq': 3,
57            'steal': 4,
58            'guest': 5,
59            'guest_nice': 6
60        }, usage)
61
62    def test_get_cpu_missing_columns(self):
63        self.fake_file_text = 'cpu 254544 9 254768 2859878\n'
64        usage = utils.get_cpu_usage()
65        self.assertEquals({
66            'user': 254544,
67            'nice': 9,
68            'system': 254768,
69            'idle': 2859878,
70            'iowait': 0,
71            'irq': 0,
72            'softirq': 0,
73            'steal': 0,
74            'guest': 0,
75            'guest_nice': 0
76        }, usage)
77
78    def test_compute_active_cpu_time(self):
79        start_usage = {
80            'user': 900,
81            'nice': 10,
82            'system': 90,
83            'idle': 10000,
84            'iowait': 500,
85            'irq': 100,
86            'softirq': 50,
87            'steal': 150,
88            'guest': 170,
89            'guest_nice': 30
90        }
91        end_usage = {
92            'user': 1800,
93            'nice': 20,
94            'system': 180,
95            'idle': 13000,
96            'iowait': 2000,
97            'irq': 200,
98            'softirq': 100,
99            'steal': 300,
100            'guest': 340,
101            'guest_nice': 60
102        }
103        usage = utils.compute_active_cpu_time(start_usage, end_usage)
104        self.assertAlmostEqual(usage, 0.25)
105
106    def test_compute_active_cpu_time_idle(self):
107        start_usage = {
108            'user': 900,
109            'nice': 10,
110            'system': 90,
111            'idle': 10000,
112            'iowait': 500,
113            'irq': 100,
114            'softirq': 50,
115            'steal': 150,
116            'guest': 170,
117            'guest_nice':30
118        }
119        end_usage = {
120            'user': 900,
121            'nice': 10,
122            'system': 90,
123            'idle': 11000,
124            'iowait': 1000,
125            'irq': 100,
126            'softirq': 50,
127            'steal': 150,
128            'guest': 170,
129            'guest_nice':30
130        }
131        usage = utils.compute_active_cpu_time(start_usage, end_usage)
132        self.assertAlmostEqual(usage, 0)
133
134    def test_get_mem_total(self):
135        self.fake_file_text = ('MemTotal:  2048000 kB\n'
136                               'MemFree:  307200 kB\n'
137                               'Buffers:  102400 kB\n'
138                               'Cached:   204800 kB\n')
139        self.assertAlmostEqual(utils.get_mem_total(), 2000)
140
141    def test_get_mem_free(self):
142        self.fake_file_text = ('MemTotal:  2048000 kB\n'
143                               'MemFree:  307200 kB\n'
144                               'Buffers:  102400 kB\n'
145                               'Cached:   204800 kB\n')
146        self.assertAlmostEqual(utils.get_mem_free(), 300)
147
148    def test_get_mem_free_plus_buffers_and_cached(self):
149        self.fake_file_text = ('MemTotal:  2048000 kB\n'
150                               'MemFree:  307200 kB\n'
151                               'Buffers:  102400 kB\n'
152                               'Cached:   204800 kB\n')
153        self.assertAlmostEqual(utils.get_mem_free_plus_buffers_and_cached(),
154                               600)
155
156    def test_get_meminfo(self):
157        self.fake_file_text = ('MemTotal:      2048000 kB\n'
158                               'MemFree:        307200 kB\n'
159                               'Buffers:        102400 kB\n'
160                               'Cached:         204800 kB\n'
161                               'Active(anon):   409600 kB')
162        meminfo = utils.get_meminfo()
163        self.assertEqual(meminfo.MemTotal, 2048000)
164        self.assertEqual(meminfo.Active_anon, 409600)
165
166    def test_get_num_allocated_file_handles(self):
167        self.fake_file_text = '123 0 456\n'
168        self.assertEqual(utils.get_num_allocated_file_handles(), 123)
169
170    @mock.patch('autotest_lib.client.common_lib.utils.system_output')
171    def test_get_storage_statistics(self, system_output_mock):
172        system_output_mock.return_value = _IOSTAT_OUTPUT
173        statistics = utils.get_storage_statistics()
174        self.assertEqual({
175            'read_kb': 665582.0,
176            'written_kb_per_s': 292.4,
177            'read_kb_per_s': 10.33,
178            'transfers_per_s': 4.45,
179            'written_kb': 188458.0,
180        }, statistics)
181
182    def test_base64_recursive_encode(self):
183        obj = {
184                'a': 10,
185                'b': 'hello',
186                'c': [100, 200, bytearray(b'\xf0\xf1\xf2\xf3\xf4')],
187                'd': {
188                        784: bytearray(b'@\x14\x01P'),
189                        78.0: bytearray(b'\x10\x05\x0b\x10\xb2\x1b\x00')
190                }
191        }
192        if utils.is_python2():
193            expected_encoded_obj = {
194                    'YQ==': 10,
195                    'Yg==': 'aGVsbG8=',
196                    'Yw==': [100, 200, '8PHy8/Q='],
197                    'ZA==': {
198                            784: 'QBQBUA==',
199                            78.0: 'EAULELIbAA=='
200                    }
201            }
202        else:
203            expected_encoded_obj = {
204                    'a': 10,
205                    'b': 'hello',
206                    'c': [100, 200, b'8PHy8/Q='],
207                    'd': {
208                            784: b'QBQBUA==',
209                            78.0: b'EAULELIbAA=='
210                    }
211            }
212
213        encoded_obj = utils.base64_recursive_encode(obj)
214        self.assertEqual(expected_encoded_obj, encoded_obj)
215
216    def test_base64_recursive_decode(self):
217        if utils.is_python2():
218            encoded_obj = {
219                    'YQ==': 10,
220                    'Yg==': 'aGVsbG8=',
221                    'Yw==': [100, 200, '8PHy8/Q='],
222                    'ZA==': {
223                            784: 'QBQBUA==',
224                            78.0: 'EAULELIbAA=='
225                    }
226            }
227        else:
228            encoded_obj = {
229                    'a': 10,
230                    'b': 'hello',
231                    'c': [100, 200, b'8PHy8/Q='],
232                    'd': {
233                            784: b'QBQBUA==',
234                            78.0: b'EAULELIbAA=='
235                    }
236            }
237
238        expected_decoded_obj = {
239                'a': 10,
240                'b': 'hello',
241                'c': [100, 200, b'\xf0\xf1\xf2\xf3\xf4'],
242                'd': {
243                        784: b'@\x14\x01P',
244                        78.0: b'\x10\x05\x0b\x10\xb2\x1b\x00'
245                }
246        }
247
248        decoded_obj = utils.base64_recursive_decode(encoded_obj)
249        self.assertEqual(expected_decoded_obj, decoded_obj)
250
251    def test_bytes_to_str_recursive(self):
252        obj = {
253                'a': 10,
254                'b': 'hello',
255                'c': b'b_hello',
256                'd': [100, 200, bytearray(b'\xf0\xf1\xf2\xf3\xf4')],
257                'e': {
258                        784: bytearray(b'@\x14\x01P'),
259                        78.0: bytearray(b'\x10\x05\x0b\x10\xb2\x1b\x00')
260                }
261        }
262
263        if utils.is_python2():
264            self.assertEqual(b'foo', utils.bytes_to_str_recursive(b'foo'))
265            self.assertEqual(b'\x80abc',
266                             utils.bytes_to_str_recursive(b'\x80abc'))
267            self.assertEqual('foo', utils.bytes_to_str_recursive('foo'))
268            self.assertEqual('\x80abc',
269                             utils.bytes_to_str_recursive('\x80abc'))
270            self.assertEqual(obj, utils.bytes_to_str_recursive(obj))
271        else:
272            self.assertEqual('foo', utils.bytes_to_str_recursive(b'foo'))
273            # self.assertEqual('\ufffdabc', utils.bytes_to_str_recursive(b'\x80abc'))
274            self.assertEqual('foo', utils.bytes_to_str_recursive('foo'))
275            self.assertEqual('\x80abc',
276                             utils.bytes_to_str_recursive('\x80abc'))
277            expected_obj = {
278                    'a': 10,
279                    'b': 'hello',
280                    'c': 'b_hello',
281                    # u prefix: Python 2 interpreter friendly.
282                    'd': [100, 200, u'\u0440\u0441\u0442\u0443\u0444'],
283                    'e': {
284                            784: '@\x14\x01P',
285                            78.0: u'\x10\x05\x0b\x10\u00b2\x1b\x00'
286                    }
287            }
288            self.assertEqual(expected_obj, utils.bytes_to_str_recursive(obj))
289