• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2
3# pylint: disable=missing-docstring
4
5import StringIO
6import errno
7import itertools
8import logging
9import os
10import select
11import socket
12import subprocess
13import time
14import unittest
15import urllib2
16
17import common
18from autotest_lib.client.common_lib import autotemp
19from autotest_lib.client.common_lib import utils
20from autotest_lib.client.common_lib.test_utils import mock
21
22# mock 1.0.0 (in site-packages/chromite/third_party/mock.py)
23# which is an ancestor of Python's default library starting from Python 3.3.
24# See https://docs.python.org/3/library/unittest.mock.html
25import mock as pymock
26
27metrics = utils.metrics_mock
28
29
30class test_read_one_line(unittest.TestCase):
31    def setUp(self):
32        self.god = mock.mock_god(ut=self)
33        self.god.stub_function(utils, "open")
34
35
36    def tearDown(self):
37        self.god.unstub_all()
38
39
40    def test_ip_to_long(self):
41        self.assertEqual(utils.ip_to_long('0.0.0.0'), 0)
42        self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295)
43        self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521)
44        self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320)
45
46
47    def test_long_to_ip(self):
48        self.assertEqual(utils.long_to_ip(0), '0.0.0.0')
49        self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255')
50        self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1')
51        self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8')
52
53
54    def test_create_subnet_mask(self):
55        self.assertEqual(utils.create_subnet_mask(0), 0)
56        self.assertEqual(utils.create_subnet_mask(32), 4294967295)
57        self.assertEqual(utils.create_subnet_mask(25), 4294967168)
58
59
60    def test_format_ip_with_mask(self):
61        self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0),
62                         '0.0.0.0/0')
63        self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32),
64                         '192.168.0.1/32')
65        self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26),
66                         '192.168.0.0/26')
67        self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26),
68                         '192.168.0.192/26')
69
70
71    def create_test_file(self, contents):
72        test_file = StringIO.StringIO(contents)
73        utils.open.expect_call("filename", "r").and_return(test_file)
74
75
76    def test_reads_one_line_file(self):
77        self.create_test_file("abc\n")
78        self.assertEqual("abc", utils.read_one_line("filename"))
79        self.god.check_playback()
80
81
82    def test_strips_read_lines(self):
83        self.create_test_file("abc   \n")
84        self.assertEqual("abc   ", utils.read_one_line("filename"))
85        self.god.check_playback()
86
87
88    def test_drops_extra_lines(self):
89        self.create_test_file("line 1\nline 2\nline 3\n")
90        self.assertEqual("line 1", utils.read_one_line("filename"))
91        self.god.check_playback()
92
93
94    def test_works_on_empty_file(self):
95        self.create_test_file("")
96        self.assertEqual("", utils.read_one_line("filename"))
97        self.god.check_playback()
98
99
100    def test_works_on_file_with_no_newlines(self):
101        self.create_test_file("line but no newline")
102        self.assertEqual("line but no newline",
103                         utils.read_one_line("filename"))
104        self.god.check_playback()
105
106
107    def test_preserves_leading_whitespace(self):
108        self.create_test_file("   has leading whitespace")
109        self.assertEqual("   has leading whitespace",
110                         utils.read_one_line("filename"))
111
112
113class test_write_one_line(unittest.TestCase):
114    def setUp(self):
115        self.god = mock.mock_god(ut=self)
116        self.god.stub_function(utils, "open")
117
118
119    def tearDown(self):
120        self.god.unstub_all()
121
122
123    def get_write_one_line_output(self, content):
124        test_file = mock.SaveDataAfterCloseStringIO()
125        utils.open.expect_call("filename", "w").and_return(test_file)
126        utils.write_one_line("filename", content)
127        self.god.check_playback()
128        return test_file.final_data
129
130
131    def test_writes_one_line_file(self):
132        self.assertEqual("abc\n", self.get_write_one_line_output("abc"))
133
134
135    def test_preserves_existing_newline(self):
136        self.assertEqual("abc\n", self.get_write_one_line_output("abc\n"))
137
138
139    def test_preserves_leading_whitespace(self):
140        self.assertEqual("   abc\n", self.get_write_one_line_output("   abc"))
141
142
143    def test_preserves_trailing_whitespace(self):
144        self.assertEqual("abc   \n", self.get_write_one_line_output("abc   "))
145
146
147    def test_handles_empty_input(self):
148        self.assertEqual("\n", self.get_write_one_line_output(""))
149
150
151class test_open_write_close(unittest.TestCase):
152    def setUp(self):
153        self.god = mock.mock_god(ut=self)
154        self.god.stub_function(utils, "open")
155
156
157    def tearDown(self):
158        self.god.unstub_all()
159
160
161    def test_simple_functionality(self):
162        data = "\n\nwhee\n"
163        test_file = mock.SaveDataAfterCloseStringIO()
164        utils.open.expect_call("filename", "w").and_return(test_file)
165        utils.open_write_close("filename", data)
166        self.god.check_playback()
167        self.assertEqual(data, test_file.final_data)
168
169
170class test_read_keyval(unittest.TestCase):
171    def setUp(self):
172        self.god = mock.mock_god(ut=self)
173        self.god.stub_function(utils, "open")
174        self.god.stub_function(os.path, "isdir")
175        self.god.stub_function(os.path, "exists")
176
177
178    def tearDown(self):
179        self.god.unstub_all()
180
181
182    def create_test_file(self, filename, contents):
183        test_file = StringIO.StringIO(contents)
184        os.path.exists.expect_call(filename).and_return(True)
185        utils.open.expect_call(filename).and_return(test_file)
186
187
188    def read_keyval(self, contents):
189        os.path.isdir.expect_call("file").and_return(False)
190        self.create_test_file("file", contents)
191        keyval = utils.read_keyval("file")
192        self.god.check_playback()
193        return keyval
194
195
196    def test_returns_empty_when_file_doesnt_exist(self):
197        os.path.isdir.expect_call("file").and_return(False)
198        os.path.exists.expect_call("file").and_return(False)
199        self.assertEqual({}, utils.read_keyval("file"))
200        self.god.check_playback()
201
202
203    def test_accesses_files_directly(self):
204        os.path.isdir.expect_call("file").and_return(False)
205        self.create_test_file("file", "")
206        utils.read_keyval("file")
207        self.god.check_playback()
208
209
210    def test_accesses_directories_through_keyval_file(self):
211        os.path.isdir.expect_call("dir").and_return(True)
212        self.create_test_file("dir/keyval", "")
213        utils.read_keyval("dir")
214        self.god.check_playback()
215
216
217    def test_values_are_rstripped(self):
218        keyval = self.read_keyval("a=b   \n")
219        self.assertEquals(keyval, {"a": "b"})
220
221
222    def test_comments_are_ignored(self):
223        keyval = self.read_keyval("a=b # a comment\n")
224        self.assertEquals(keyval, {"a": "b"})
225
226
227    def test_integers_become_ints(self):
228        keyval = self.read_keyval("a=1\n")
229        self.assertEquals(keyval, {"a": 1})
230        self.assertEquals(int, type(keyval["a"]))
231
232
233    def test_float_values_become_floats(self):
234        keyval = self.read_keyval("a=1.5\n")
235        self.assertEquals(keyval, {"a": 1.5})
236        self.assertEquals(float, type(keyval["a"]))
237
238
239    def test_multiple_lines(self):
240        keyval = self.read_keyval("a=one\nb=two\n")
241        self.assertEquals(keyval, {"a": "one", "b": "two"})
242
243
244    def test_the_last_duplicate_line_is_used(self):
245        keyval = self.read_keyval("a=one\nb=two\na=three\n")
246        self.assertEquals(keyval, {"a": "three", "b": "two"})
247
248
249    def test_extra_equals_are_included_in_values(self):
250        keyval = self.read_keyval("a=b=c\n")
251        self.assertEquals(keyval, {"a": "b=c"})
252
253
254    def test_non_alphanumeric_keynames_are_rejected(self):
255        self.assertRaises(ValueError, self.read_keyval, "a$=one\n")
256
257
258    def test_underscores_are_allowed_in_key_names(self):
259        keyval = self.read_keyval("a_b=value\n")
260        self.assertEquals(keyval, {"a_b": "value"})
261
262
263    def test_dashes_are_allowed_in_key_names(self):
264        keyval = self.read_keyval("a-b=value\n")
265        self.assertEquals(keyval, {"a-b": "value"})
266
267    def test_empty_value_is_allowed(self):
268        keyval = self.read_keyval("a=\n")
269        self.assertEquals(keyval, {"a": ""})
270
271
272class test_write_keyval(unittest.TestCase):
273    def setUp(self):
274        self.god = mock.mock_god(ut=self)
275        self.god.stub_function(utils, "open")
276        self.god.stub_function(os.path, "isdir")
277
278
279    def tearDown(self):
280        self.god.unstub_all()
281
282
283    def assertHasLines(self, value, lines):
284        vlines = value.splitlines()
285        vlines.sort()
286        self.assertEquals(vlines, sorted(lines))
287
288
289    def write_keyval(self, filename, dictionary, expected_filename=None,
290                     type_tag=None):
291        if expected_filename is None:
292            expected_filename = filename
293        test_file = StringIO.StringIO()
294        self.god.stub_function(test_file, "close")
295        utils.open.expect_call(expected_filename, "a").and_return(test_file)
296        test_file.close.expect_call()
297        if type_tag is None:
298            utils.write_keyval(filename, dictionary)
299        else:
300            utils.write_keyval(filename, dictionary, type_tag)
301        return test_file.getvalue()
302
303
304    def write_keyval_file(self, dictionary, type_tag=None):
305        os.path.isdir.expect_call("file").and_return(False)
306        return self.write_keyval("file", dictionary, type_tag=type_tag)
307
308
309    def test_accesses_files_directly(self):
310        os.path.isdir.expect_call("file").and_return(False)
311        result = self.write_keyval("file", {"a": "1"})
312        self.assertEquals(result, "a=1\n")
313
314
315    def test_accesses_directories_through_keyval_file(self):
316        os.path.isdir.expect_call("dir").and_return(True)
317        result = self.write_keyval("dir", {"b": "2"}, "dir/keyval")
318        self.assertEquals(result, "b=2\n")
319
320
321    def test_numbers_are_stringified(self):
322        result = self.write_keyval_file({"c": 3})
323        self.assertEquals(result, "c=3\n")
324
325
326    def test_type_tags_are_excluded_by_default(self):
327        result = self.write_keyval_file({"d": "a string"})
328        self.assertEquals(result, "d=a string\n")
329        self.assertRaises(ValueError, self.write_keyval_file,
330                          {"d{perf}": "a string"})
331
332
333    def test_perf_tags_are_allowed(self):
334        result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2},
335                                        type_tag="perf")
336        self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"])
337        self.assertRaises(ValueError, self.write_keyval_file,
338                          {"a": 1, "b": 2}, type_tag="perf")
339
340
341    def test_non_alphanumeric_keynames_are_rejected(self):
342        self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0})
343
344
345    def test_underscores_are_allowed_in_key_names(self):
346        result = self.write_keyval_file({"a_b": "value"})
347        self.assertEquals(result, "a_b=value\n")
348
349
350    def test_dashes_are_allowed_in_key_names(self):
351        result = self.write_keyval_file({"a-b": "value"})
352        self.assertEquals(result, "a-b=value\n")
353
354
355class test_is_url(unittest.TestCase):
356    def test_accepts_http(self):
357        self.assertTrue(utils.is_url("http://example.com"))
358
359
360    def test_accepts_ftp(self):
361        self.assertTrue(utils.is_url("ftp://ftp.example.com"))
362
363
364    def test_rejects_local_path(self):
365        self.assertFalse(utils.is_url("/home/username/file"))
366
367
368    def test_rejects_local_filename(self):
369        self.assertFalse(utils.is_url("filename"))
370
371
372    def test_rejects_relative_local_path(self):
373        self.assertFalse(utils.is_url("somedir/somesubdir/file"))
374
375
376    def test_rejects_local_path_containing_url(self):
377        self.assertFalse(utils.is_url("somedir/http://path/file"))
378
379
380class test_urlopen(unittest.TestCase):
381    def setUp(self):
382        self.god = mock.mock_god(ut=self)
383
384
385    def tearDown(self):
386        self.god.unstub_all()
387
388
389    def stub_urlopen_with_timeout_comparison(self, test_func, expected_return,
390                                             *expected_args):
391        expected_args += (None,) * (2 - len(expected_args))
392        def urlopen(url, data=None):
393            self.assertEquals(expected_args, (url,data))
394            test_func(socket.getdefaulttimeout())
395            return expected_return
396        self.god.stub_with(urllib2, "urlopen", urlopen)
397
398
399    def stub_urlopen_with_timeout_check(self, expected_timeout,
400                                        expected_return, *expected_args):
401        def test_func(timeout):
402            self.assertEquals(timeout, expected_timeout)
403        self.stub_urlopen_with_timeout_comparison(test_func, expected_return,
404                                                  *expected_args)
405
406
407    def test_timeout_set_during_call(self):
408        self.stub_urlopen_with_timeout_check(30, "retval", "url")
409        retval = utils.urlopen("url", timeout=30)
410        self.assertEquals(retval, "retval")
411
412
413    def test_timeout_reset_after_call(self):
414        old_timeout = socket.getdefaulttimeout()
415        self.stub_urlopen_with_timeout_check(30, None, "url")
416        try:
417            socket.setdefaulttimeout(1234)
418            utils.urlopen("url", timeout=30)
419            self.assertEquals(1234, socket.getdefaulttimeout())
420        finally:
421            socket.setdefaulttimeout(old_timeout)
422
423
424    def test_timeout_set_by_default(self):
425        def test_func(timeout):
426            self.assertTrue(timeout is not None)
427        self.stub_urlopen_with_timeout_comparison(test_func, None, "url")
428        utils.urlopen("url")
429
430
431    def test_args_are_untouched(self):
432        self.stub_urlopen_with_timeout_check(30, None, "http://url",
433                                             "POST data")
434        utils.urlopen("http://url", timeout=30, data="POST data")
435
436
437class test_urlretrieve(unittest.TestCase):
438    def setUp(self):
439        self.god = mock.mock_god(ut=self)
440
441
442    def tearDown(self):
443        self.god.unstub_all()
444
445
446    def test_urlopen_passed_arguments(self):
447        self.god.stub_function(utils, "urlopen")
448        self.god.stub_function(utils.shutil, "copyfileobj")
449        self.god.stub_function(utils, "open")
450
451        url = "url"
452        dest = "somefile"
453        data = object()
454        timeout = 10
455
456        src_file = self.god.create_mock_class(file, "file")
457        dest_file = self.god.create_mock_class(file, "file")
458
459        (utils.urlopen.expect_call(url, data=data, timeout=timeout)
460                .and_return(src_file))
461        utils.open.expect_call(dest, "wb").and_return(dest_file)
462        utils.shutil.copyfileobj.expect_call(src_file, dest_file)
463        dest_file.close.expect_call()
464        src_file.close.expect_call()
465
466        utils.urlretrieve(url, dest, data=data, timeout=timeout)
467        self.god.check_playback()
468
469
470class test_merge_trees(unittest.TestCase):
471    # a some path-handling helper functions
472    def src(self, *path_segments):
473        return os.path.join(self.src_tree.name, *path_segments)
474
475
476    def dest(self, *path_segments):
477        return os.path.join(self.dest_tree.name, *path_segments)
478
479
480    def paths(self, *path_segments):
481        return self.src(*path_segments), self.dest(*path_segments)
482
483
484    def assertFileEqual(self, *path_segments):
485        src, dest = self.paths(*path_segments)
486        self.assertEqual(True, os.path.isfile(src))
487        self.assertEqual(True, os.path.isfile(dest))
488        self.assertEqual(os.path.getsize(src), os.path.getsize(dest))
489        self.assertEqual(open(src).read(), open(dest).read())
490
491
492    def assertFileContents(self, contents, *path_segments):
493        dest = self.dest(*path_segments)
494        self.assertEqual(True, os.path.isfile(dest))
495        self.assertEqual(os.path.getsize(dest), len(contents))
496        self.assertEqual(contents, open(dest).read())
497
498
499    def setUp(self):
500        self.src_tree = autotemp.tempdir(unique_id='utilsrc')
501        self.dest_tree = autotemp.tempdir(unique_id='utilsdest')
502
503        # empty subdirs
504        os.mkdir(self.src("empty"))
505        os.mkdir(self.dest("empty"))
506
507
508    def tearDown(self):
509        self.src_tree.clean()
510        self.dest_tree.clean()
511
512
513    def test_both_dont_exist(self):
514        utils.merge_trees(*self.paths("empty"))
515
516
517    def test_file_only_at_src(self):
518        print >> open(self.src("src_only"), "w"), "line 1"
519        utils.merge_trees(*self.paths("src_only"))
520        self.assertFileEqual("src_only")
521
522
523    def test_file_only_at_dest(self):
524        print >> open(self.dest("dest_only"), "w"), "line 1"
525        utils.merge_trees(*self.paths("dest_only"))
526        self.assertEqual(False, os.path.exists(self.src("dest_only")))
527        self.assertFileContents("line 1\n", "dest_only")
528
529
530    def test_file_at_both(self):
531        print >> open(self.dest("in_both"), "w"), "line 1"
532        print >> open(self.src("in_both"), "w"), "line 2"
533        utils.merge_trees(*self.paths("in_both"))
534        self.assertFileContents("line 1\nline 2\n", "in_both")
535
536
537    def test_directory_with_files_in_both(self):
538        print >> open(self.dest("in_both"), "w"), "line 1"
539        print >> open(self.src("in_both"), "w"), "line 3"
540        utils.merge_trees(*self.paths())
541        self.assertFileContents("line 1\nline 3\n", "in_both")
542
543
544    def test_directory_with_mix_of_files(self):
545        print >> open(self.dest("in_dest"), "w"), "dest line"
546        print >> open(self.src("in_src"), "w"), "src line"
547        utils.merge_trees(*self.paths())
548        self.assertFileContents("dest line\n", "in_dest")
549        self.assertFileContents("src line\n", "in_src")
550
551
552    def test_directory_with_subdirectories(self):
553        os.mkdir(self.src("src_subdir"))
554        print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line"
555        os.mkdir(self.src("both_subdir"))
556        os.mkdir(self.dest("both_subdir"))
557        print >> open(self.src("both_subdir", "subfile"), "w"), "src line"
558        print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line"
559        utils.merge_trees(*self.paths())
560        self.assertFileContents("subdir line\n", "src_subdir", "subfile")
561        self.assertFileContents("dest line\nsrc line\n", "both_subdir",
562                                "subfile")
563
564
565class test_get_relative_path(unittest.TestCase):
566    def test_not_absolute(self):
567        self.assertRaises(AssertionError, utils.get_relative_path, "a", "b")
568
569    def test_same_dir(self):
570        self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c")
571
572    def test_forward_dir(self):
573        self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d")
574
575    def test_previous_dir(self):
576        self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..")
577
578    def test_parallel_dir(self):
579        self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"),
580                         "../../../c/d")
581
582
583class test_sh_escape(unittest.TestCase):
584    def _test_in_shell(self, text):
585        escaped_text = utils.sh_escape(text)
586        proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True,
587                                stdin=open(os.devnull, 'r'),
588                                stdout=subprocess.PIPE,
589                                stderr=open(os.devnull, 'w'))
590        stdout, _ = proc.communicate()
591        self.assertEqual(proc.returncode, 0)
592        self.assertEqual(stdout[:-1], text)
593
594
595    def test_normal_string(self):
596        self._test_in_shell('abcd')
597
598
599    def test_spaced_string(self):
600        self._test_in_shell('abcd efgh')
601
602
603    def test_dollar(self):
604        self._test_in_shell('$')
605
606
607    def test_single_quote(self):
608        self._test_in_shell('\'')
609
610
611    def test_single_quoted_string(self):
612        self._test_in_shell('\'efgh\'')
613
614
615    def test_string_with_single_quote(self):
616        self._test_in_shell("a'b")
617
618
619    def test_string_with_escaped_single_quote(self):
620        self._test_in_shell(r"a\'b")
621
622
623    def test_double_quote(self):
624        self._test_in_shell('"')
625
626
627    def test_double_quoted_string(self):
628        self._test_in_shell('"abcd"')
629
630
631    def test_backtick(self):
632        self._test_in_shell('`')
633
634
635    def test_backticked_string(self):
636        self._test_in_shell('`jklm`')
637
638
639    def test_backslash(self):
640        self._test_in_shell('\\')
641
642
643    def test_backslashed_special_characters(self):
644        self._test_in_shell('\\$')
645        self._test_in_shell('\\"')
646        self._test_in_shell('\\\'')
647        self._test_in_shell('\\`')
648
649
650    def test_backslash_codes(self):
651        self._test_in_shell('\\n')
652        self._test_in_shell('\\r')
653        self._test_in_shell('\\t')
654        self._test_in_shell('\\v')
655        self._test_in_shell('\\b')
656        self._test_in_shell('\\a')
657        self._test_in_shell('\\000')
658
659    def test_real_newline(self):
660        self._test_in_shell('\n')
661        self._test_in_shell('\\\n')
662
663
664class test_sh_quote_word(test_sh_escape):
665    """Run tests on sh_quote_word.
666
667    Inherit from test_sh_escape to get the same tests to run on both.
668    """
669
670    def _test_in_shell(self, text):
671        quoted_word = utils.sh_quote_word(text)
672        echoed_value = subprocess.check_output('echo %s' % quoted_word,
673                                               shell=True)
674        self.assertEqual(echoed_value, text + '\n')
675
676
677class test_nested_sh_quote_word(test_sh_quote_word):
678    """Run nested tests on sh_quote_word.
679
680    Inherit from test_sh_quote_word to get the same tests to run on both.
681    """
682
683    def _test_in_shell(self, text):
684        command = 'echo ' + utils.sh_quote_word(text)
685        nested_command = 'echo ' + utils.sh_quote_word(command)
686        produced_command = subprocess.check_output(nested_command, shell=True)
687        echoed_value = subprocess.check_output(produced_command, shell=True)
688        self.assertEqual(echoed_value, text + '\n')
689
690
691class test_run(unittest.TestCase):
692    """
693    Test the utils.run() function.
694
695    Note: This test runs simple external commands to test the utils.run()
696    API without assuming implementation details.
697    """
698
699    # Log levels in ascending severity.
700    LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
701                  logging.CRITICAL]
702
703
704    def setUp(self):
705        self.god = mock.mock_god(ut=self)
706        self.god.stub_function(utils.logging, 'warning')
707        self.god.stub_function(utils.logging, 'debug')
708
709        # Log level -> StringIO.StringIO.
710        self.logs = {}
711        for level in self.LOG_LEVELS:
712            self.logs[level] = StringIO.StringIO()
713
714        # Override logging_manager.LoggingFile to return buffers.
715        def logging_file(level=None, prefix=None):
716            return self.logs[level]
717        self.god.stub_with(utils.logging_manager, 'LoggingFile', logging_file)
718
719    def tearDown(self):
720        self.god.unstub_all()
721
722
723    def __check_result(self, result, command, exit_status=0, stdout='',
724                       stderr=''):
725        self.assertEquals(result.command, command)
726        self.assertEquals(result.exit_status, exit_status)
727        self.assertEquals(result.stdout, stdout)
728        self.assertEquals(result.stderr, stderr)
729
730
731    def __get_logs(self):
732        """Returns contents of log buffers at all levels.
733
734            @return: 5-element list of strings corresponding to logged messages
735                at the levels in self.LOG_LEVELS.
736        """
737        return [self.logs[v].getvalue() for v in self.LOG_LEVELS]
738
739
740    def test_default_simple(self):
741        cmd = 'echo "hello world"'
742        # expect some king of logging.debug() call but don't care about args
743        utils.logging.debug.expect_any_call()
744        self.__check_result(utils.run(cmd), cmd, stdout='hello world\n')
745
746
747    def test_default_failure(self):
748        cmd = 'exit 11'
749        try:
750            utils.run(cmd, verbose=False)
751        except utils.error.CmdError, err:
752            self.__check_result(err.result_obj, cmd, exit_status=11)
753
754
755    def test_ignore_status(self):
756        cmd = 'echo error >&2 && exit 11'
757        self.__check_result(utils.run(cmd, ignore_status=True, verbose=False),
758                            cmd, exit_status=11, stderr='error\n')
759
760
761    def test_timeout(self):
762        # we expect a logging.warning() message, don't care about the contents
763        utils.logging.warning.expect_any_call()
764        try:
765            utils.run('echo -n output && sleep 10', timeout=1, verbose=False)
766        except utils.error.CmdError, err:
767            self.assertEquals(err.result_obj.stdout, 'output')
768
769
770    def test_stdout_stderr_tee(self):
771        cmd = 'echo output && echo error >&2'
772        stdout_tee = StringIO.StringIO()
773        stderr_tee = StringIO.StringIO()
774
775        self.__check_result(utils.run(
776                cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee,
777                verbose=False), cmd, stdout='output\n', stderr='error\n')
778        self.assertEqual(stdout_tee.getvalue(), 'output\n')
779        self.assertEqual(stderr_tee.getvalue(), 'error\n')
780
781
782    def test_stdin_string(self):
783        cmd = 'cat'
784        self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'),
785                            cmd, stdout='hi!\n')
786
787
788    def test_stdout_tee_to_logs_info(self):
789        """Test logging stdout at the info level."""
790        utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
791                  stdout_level=logging.INFO, verbose=False)
792        self.assertEqual(self.__get_logs(), ['', 'output\n', '', '', ''])
793
794
795    def test_stdout_tee_to_logs_warning(self):
796        """Test logging stdout at the warning level."""
797        utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
798                  stdout_level=logging.WARNING, verbose=False)
799        self.assertEqual(self.__get_logs(), ['', '', 'output\n', '', ''])
800
801
802    def test_stdout_and_stderr_tee_to_logs(self):
803        """Test simultaneous stdout and stderr log levels."""
804        utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
805                  stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
806                  stderr_level=logging.ERROR, verbose=False)
807        self.assertEqual(self.__get_logs(), ['', 'output\n', '', 'error\n', ''])
808
809
810    def test_default_expected_stderr_log_level(self):
811        """Test default expected stderr log level.
812
813        stderr should be logged at the same level as stdout when
814        stderr_is_expected is true and stderr_level isn't passed.
815        """
816        utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
817                  stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
818                  stderr_is_expected=True, verbose=False)
819        self.assertEqual(self.__get_logs(), ['', 'output\nerror\n', '', '', ''])
820
821
822    def test_safe_args(self):
823        # NOTE: The string in expected_quoted_cmd depends on the internal
824        # implementation of shell quoting which is used by utils.run(),
825        # in this case, sh_quote_word().
826        expected_quoted_cmd = "echo 'hello \"world' again"
827        self.__check_result(utils.run(
828                'echo', verbose=False, args=('hello "world', 'again')),
829                expected_quoted_cmd, stdout='hello "world again\n')
830
831
832    def test_safe_args_given_string(self):
833        self.assertRaises(TypeError, utils.run, 'echo', args='hello')
834
835
836    def test_wait_interrupt(self):
837        """Test that we actually select twice if the first one returns EINTR."""
838        utils.logging.debug.expect_any_call()
839
840        bg_job = utils.BgJob('echo "hello world"')
841        bg_job.result.exit_status = 0
842        self.god.stub_function(utils.select, 'select')
843
844        utils.select.select.expect_any_call().and_raises(
845                select.error(errno.EINTR, 'Select interrupted'))
846        utils.logging.warning.expect_any_call()
847
848        utils.select.select.expect_any_call().and_return(
849                ([bg_job.sp.stdout, bg_job.sp.stderr], [], None))
850        utils.logging.warning.expect_any_call()
851
852        self.assertFalse(
853                utils._wait_for_commands([bg_job], time.time(), None))
854
855
856class test_compare_versions(unittest.TestCase):
857    def test_zerofill(self):
858        self.assertEqual(utils.compare_versions('1.7', '1.10'), -1)
859        self.assertEqual(utils.compare_versions('1.222', '1.3'), 1)
860        self.assertEqual(utils.compare_versions('1.03', '1.3'), 0)
861
862
863    def test_unequal_len(self):
864        self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1)
865        self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1)
866
867
868    def test_dash_delimited(self):
869        self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1)
870        self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1)
871        self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0)
872
873
874    def test_alphabets(self):
875        self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1)
876        self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1)
877        self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0)
878
879
880    def test_mix_symbols(self):
881        self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1)
882        self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1)
883        self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0)
884
885        self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1)
886        self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1)
887        self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0)
888
889
890class test_args_to_dict(unittest.TestCase):
891    def test_no_args(self):
892        result = utils.args_to_dict([])
893        self.assertEqual({}, result)
894
895
896    def test_matches(self):
897        result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:',
898                                     'F__o0O=', 'B8r:=:=', '_bAZ_=:=:'])
899        self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'',
900                                  'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'})
901
902
903    def test_unmatches(self):
904        # Temporarily shut warning messages from args_to_dict() when an argument
905        # doesn't match its pattern.
906        logger = logging.getLogger()
907        saved_level = logger.level
908        logger.setLevel(logging.ERROR)
909
910        try:
911            result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b',
912                                         ':VAL', '=VVV', 'WORD'])
913            self.assertEqual({}, result)
914        finally:
915            # Restore level.
916            logger.setLevel(saved_level)
917
918
919class test_get_random_port(unittest.TestCase):
920    def do_bind(self, port, socket_type, socket_proto):
921        s = socket.socket(socket.AF_INET, socket_type, socket_proto)
922        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
923        s.bind(('', port))
924        return s
925
926
927    def test_get_port(self):
928        for _ in xrange(100):
929            p = utils.get_unused_port()
930            s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP)
931            self.assert_(s.getsockname())
932            s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
933            self.assert_(s.getsockname())
934
935
936def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
937    """Test global function.
938    """
939
940
941class TestClass(object):
942    """Test class.
943    """
944
945    def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
946        """Test instance function.
947        """
948
949
950    @classmethod
951    def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
952        """Test class function.
953        """
954
955
956    @staticmethod
957    def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
958        """Test static function.
959        """
960
961
962class GetFunctionArgUnittest(unittest.TestCase):
963    """Tests for method get_function_arg_value."""
964
965    def run_test(self, func, insert_arg):
966        """Run test.
967
968        @param func: Function being called with given arguments.
969        @param insert_arg: Set to True to insert an object in the argument list.
970                           This is to mock instance/class object.
971        """
972        if insert_arg:
973            args = (None, 1, 2, 3)
974        else:
975            args = (1, 2, 3)
976        for i in range(1, 7):
977            self.assertEquals(utils.get_function_arg_value(
978                    func, 'arg%d'%i, args, {}), i)
979
980        self.assertEquals(utils.get_function_arg_value(
981                func, 'arg7', args, {'arg7': 7}), 7)
982        self.assertRaises(
983                KeyError, utils.get_function_arg_value,
984                func, 'arg3', args[:-1], {})
985
986
987    def test_global_function(self):
988        """Test global function.
989        """
990        self.run_test(test_function, False)
991
992
993    def test_instance_function(self):
994        """Test instance function.
995        """
996        self.run_test(TestClass().test_instance_function, True)
997
998
999    def test_class_function(self):
1000        """Test class function.
1001        """
1002        self.run_test(TestClass.test_class_function, True)
1003
1004
1005    def test_static_function(self):
1006        """Test static function.
1007        """
1008        self.run_test(TestClass.test_static_function, False)
1009
1010
1011class IsInSameSubnetUnittest(unittest.TestCase):
1012    """Test is_in_same_subnet function."""
1013
1014    def test_is_in_same_subnet(self):
1015        """Test is_in_same_subnet function."""
1016        self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
1017                                                23))
1018        self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
1019                                                24))
1020        self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255',
1021                                                24))
1022        self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0',
1023                                                24))
1024
1025
1026class GetWirelessSsidUnittest(unittest.TestCase):
1027    """Test get_wireless_ssid function."""
1028
1029    DEFAULT_SSID = 'default'
1030    SSID_1 = 'ssid_1'
1031    SSID_2 = 'ssid_2'
1032    SSID_3 = 'ssid_3'
1033
1034    def test_get_wireless_ssid(self):
1035        """Test is_in_same_subnet function."""
1036        god = mock.mock_god()
1037        god.stub_function_to_return(utils.CONFIG, 'get_config_value',
1038                                    self.DEFAULT_SSID)
1039        god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex',
1040                                    {'wireless_ssid_1.2.3.4/24': self.SSID_1,
1041                                     'wireless_ssid_4.3.2.1/16': self.SSID_2,
1042                                     'wireless_ssid_4.3.2.111/32': self.SSID_3})
1043        self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100'))
1044        self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100'))
1045        self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111'))
1046        self.assertEqual(self.DEFAULT_SSID,
1047                         utils.get_wireless_ssid('100.0.0.100'))
1048
1049
1050class LaunchControlBuildParseUnittest(unittest.TestCase):
1051    """Test various parsing functions related to Launch Control builds and
1052    devices.
1053    """
1054
1055    def test_parse_launch_control_target(self):
1056        """Test parse_launch_control_target function."""
1057        target_tests = {
1058                ('shamu', 'userdebug'): 'shamu-userdebug',
1059                ('shamu', 'eng'): 'shamu-eng',
1060                ('shamu-board', 'eng'): 'shamu-board-eng',
1061                (None, None): 'bad_target',
1062                (None, None): 'target'}
1063        for result, target in target_tests.items():
1064            self.assertEqual(result, utils.parse_launch_control_target(target))
1065
1066
1067class GetOffloaderUriTest(unittest.TestCase):
1068    """Test get_offload_gsuri function."""
1069    _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket'
1070
1071    def setUp(self):
1072        self.god = mock.mock_god()
1073
1074    def tearDown(self):
1075        self.god.unstub_all()
1076
1077    def test_get_default_lab_offload_gsuri(self):
1078        """Test default lab offload gsuri ."""
1079        self.god.mock_up(utils.CONFIG, 'CONFIG')
1080        self.god.stub_function_to_return(utils, 'is_moblab', False)
1081        self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI,
1082                utils.get_offload_gsuri())
1083
1084        self.god.check_playback()
1085
1086    def test_get_default_moblab_offload_gsuri(self):
1087        self.god.mock_up(utils.CONFIG, 'CONFIG')
1088        self.god.stub_function_to_return(utils, 'is_moblab', True)
1089        utils.CONFIG.get_config_value.expect_call(
1090                'CROS', 'image_storage_server').and_return(
1091                        self._IMAGE_STORAGE_SERVER)
1092        self.god.stub_function_to_return(utils,
1093                'get_moblab_serial_number', 'test_serial_number')
1094        self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
1095        expected_gsuri = '%sresults/%s/%s/' % (
1096                self._IMAGE_STORAGE_SERVER, 'test_serial_number', 'test_id')
1097        cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI
1098        utils.DEFAULT_OFFLOAD_GSURI = None
1099        gsuri = utils.get_offload_gsuri()
1100        utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri
1101        self.assertEqual(expected_gsuri, gsuri)
1102
1103        self.god.check_playback()
1104
1105    def test_get_moblab_offload_gsuri(self):
1106        """Test default lab offload gsuri ."""
1107        self.god.mock_up(utils.CONFIG, 'CONFIG')
1108        self.god.stub_function_to_return(utils, 'is_moblab', True)
1109        self.god.stub_function_to_return(utils,
1110                'get_moblab_serial_number', 'test_serial_number')
1111        self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
1112        gsuri = '%s%s/%s/' % (
1113                utils.DEFAULT_OFFLOAD_GSURI, 'test_serial_number', 'test_id')
1114        self.assertEqual(gsuri, utils.get_offload_gsuri())
1115
1116        self.god.check_playback()
1117
1118
1119
1120class  MockMetricsTest(unittest.TestCase):
1121    """Test metrics mock class can handle various metrics calls."""
1122
1123    def test_Counter(self):
1124        """Test the mock class can create an instance and call any method.
1125        """
1126        c = metrics.Counter('counter')
1127        c.increment(fields={'key': 1})
1128
1129
1130    def test_Context(self):
1131        """Test the mock class can handle context class.
1132        """
1133        test_value = None
1134        with metrics.SecondsTimer('context') as t:
1135            test_value = 'called_in_context'
1136            t['random_key'] = 'pass'
1137        self.assertEqual('called_in_context', test_value)
1138
1139
1140    def test_decorator(self):
1141        """Test the mock class can handle decorator.
1142        """
1143        class TestClass(object):
1144
1145            def __init__(self):
1146                self.value = None
1147
1148        test_value = TestClass()
1149        test_value.value = None
1150        @metrics.SecondsTimerDecorator('decorator')
1151        def test(arg):
1152            arg.value = 'called_in_decorator'
1153
1154        test(test_value)
1155        self.assertEqual('called_in_decorator', test_value.value)
1156
1157
1158    def test_setitem(self):
1159        """Test the mock class can handle set item call.
1160        """
1161        timer = metrics.SecondsTimer('name')
1162        timer['random_key'] = 'pass'
1163
1164
1165class test_background_sample(unittest.TestCase):
1166    """Test that the background sample can sample as desired.
1167    """
1168
1169    def test_can_sample(self):
1170        """Test that a simple sample will work with no other complications.
1171        """
1172        should_be_sampled = 'name'
1173
1174        def sample_function():
1175            """Return value of variable stored in method."""
1176            return should_be_sampled
1177        still_sampling = True
1178
1179        t = utils.background_sample_until_condition(
1180                function=sample_function,
1181                condition=lambda: still_sampling,
1182                timeout=5,
1183                sleep_interval=0.1)
1184        result = t.finish()
1185        self.assertIn(should_be_sampled, result)
1186
1187
1188    def test_samples_multiple_values(self):
1189        """Test that a sample will work and actually samples at the necessary
1190        intervals, such that it will pick up changes.
1191        """
1192        should_be_sampled = 'name'
1193
1194        def sample_function():
1195            """Return value of variable stored in method."""
1196            return should_be_sampled
1197        still_sampling = True
1198
1199        t = utils.background_sample_until_condition(
1200                function=sample_function,
1201                condition=lambda: still_sampling,
1202                timeout=5,
1203                sleep_interval=0.1)
1204        # Let it sample values some with the initial value.
1205        time.sleep(2.5)
1206        # It should also sample some with the new value.
1207        should_be_sampled = 'noname'
1208        result = t.finish()
1209        self.assertIn('name', result)
1210        self.assertIn('noname', result)
1211
1212
1213class FakeTime(object):
1214    """Provides time() and sleep() for faking time module.
1215    """
1216
1217    def __init__(self, start_time):
1218        self._time = start_time
1219
1220
1221    def time(self):
1222        return self._time
1223
1224
1225    def sleep(self, interval):
1226        self._time += interval
1227
1228
1229class TimeModuleMockTestCase(unittest.TestCase):
1230    """Mocks up utils.time with a FakeTime.
1231
1232    It substitudes time.time() and time.sleep() with FakeTime.time()
1233    and FakeTime.sleep(), respectively.
1234    """
1235
1236    def setUp(self):
1237        self.fake_time_begin = 10
1238        self.fake_time = FakeTime(self.fake_time_begin)
1239        self.patcher = pymock.patch(
1240            'autotest_lib.client.common_lib.utils.time')
1241        self.time_mock = self.patcher.start()
1242        self.addCleanup(self.patcher.stop)
1243        self.time_mock.time.side_effect = self.fake_time.time
1244        self.time_mock.sleep.side_effect = self.fake_time.sleep
1245
1246
1247def always_raise():
1248    """A function that raises an exception."""
1249    raise Exception('always raise')
1250
1251
1252def fail_n_times(count):
1253    """Creates a function that returns False for the first count-th calls.
1254
1255    @return a function returns False for the first count-th calls and True
1256            afterwards.
1257    """
1258    counter = itertools.count(count, -1)
1259    return lambda: next(counter) <= 0
1260
1261
1262class test_poll_for_condition(TimeModuleMockTestCase):
1263    """Test poll_for_condition.
1264    """
1265
1266    def test_ok(self):
1267        """Test polling condition that returns True.
1268        """
1269        self.assertTrue(utils.poll_for_condition(lambda: True))
1270
1271
1272    def test_ok_evaluated_as_true(self):
1273        """Test polling condition which's return value is evaluated as True.
1274        """
1275        self.assertEqual(1, utils.poll_for_condition(lambda: 1))
1276
1277        self.assertEqual('something',
1278                         utils.poll_for_condition(lambda: 'something'))
1279
1280
1281    def test_fail(self):
1282        """Test polling condition that returns False.
1283
1284        Expect TimeoutError exception as neither customized exception nor
1285        exception raised from condition().
1286        """
1287        with self.assertRaises(utils.TimeoutError):
1288            utils.poll_for_condition(lambda: False, timeout=3, sleep_interval=1)
1289        self.assertEqual(3, self.time_mock.sleep.call_count)
1290
1291
1292    def test_fail_evaluated_as_false(self):
1293        """Test polling condition which's return value is evaluated as False.
1294
1295        Expect TimeoutError exception as neither customized exception nor
1296        exception raised from condition().
1297        """
1298        with self.assertRaises(utils.TimeoutError):
1299            utils.poll_for_condition(lambda: 0, timeout=3, sleep_interval=1)
1300        self.assertEqual(3, self.time_mock.sleep.call_count)
1301
1302        with self.assertRaises(utils.TimeoutError):
1303            utils.poll_for_condition(lambda: None, timeout=3, sleep_interval=1)
1304
1305
1306    def test_exception_arg(self):
1307        """Test polling condition always fails.
1308
1309        Expect exception raised by 'exception' args.
1310        """
1311        with self.assertRaisesRegexp(Exception, 'from args'):
1312            utils.poll_for_condition(lambda: False,
1313                                     exception=Exception('from args'),
1314                                     timeout=3, sleep_interval=1)
1315        self.assertEqual(3, self.time_mock.sleep.call_count)
1316
1317
1318    def test_exception_from_condition(self):
1319        """Test polling condition always fails.
1320
1321        Expect exception raised by condition().
1322        """
1323        with self.assertRaisesRegexp(Exception, 'always raise'):
1324            utils.poll_for_condition(always_raise,
1325                                     exception=Exception('from args'),
1326                                     timeout=3, sleep_interval=1)
1327        # For poll_for_condition, if condition() raises exception, it raises
1328        # immidiately without retry. So sleep() should not be called.
1329        self.time_mock.sleep.assert_not_called()
1330
1331
1332    def test_ok_after_retry(self):
1333        """Test polling a condition which is success after retry twice.
1334        """
1335        self.assertTrue(utils.poll_for_condition(fail_n_times(2), timeout=3,
1336                                                 sleep_interval=1))
1337
1338
1339    def test_cannot_wait(self):
1340        """Test polling a condition which fails till timeout.
1341        """
1342        with self.assertRaisesRegexp(
1343                utils.TimeoutError,
1344                'Timed out waiting for unnamed condition'):
1345            utils.poll_for_condition(fail_n_times(4), timeout=3,
1346                                     sleep_interval=1)
1347        self.assertEqual(3, self.time_mock.sleep.call_count)
1348
1349
1350class test_poll_for_condition_ex(TimeModuleMockTestCase):
1351    """Test poll_for_condition_ex.
1352    """
1353
1354    def test_ok(self):
1355        """Test polling condition that returns True.
1356        """
1357        self.assertTrue(utils.poll_for_condition_ex(lambda: True))
1358
1359
1360    def test_ok_evaluated_as_true(self):
1361        """Test polling condition which's return value is evaluated as True.
1362        """
1363        self.assertEqual(1, utils.poll_for_condition_ex(lambda: 1))
1364
1365        self.assertEqual('something',
1366                         utils.poll_for_condition_ex(lambda: 'something'))
1367
1368
1369    def test_fail(self):
1370        """Test polling condition that returns False.
1371
1372        Expect TimeoutError raised.
1373        """
1374        with self.assertRaisesRegexp(
1375                utils.TimeoutError,
1376                'Timed out waiting for unamed condition'):
1377            utils.poll_for_condition_ex(lambda: False, timeout=3,
1378                                        sleep_interval=1)
1379        self.assertEqual(2, self.time_mock.sleep.call_count)
1380
1381
1382    def test_fail_evaluated_as_false(self):
1383        """Test polling condition which's return value is evaluated as False.
1384
1385        Expect TimeoutError raised.
1386        """
1387        with self.assertRaisesRegexp(
1388                utils.TimeoutError,
1389                'Timed out waiting for unamed condition'):
1390            utils.poll_for_condition_ex(lambda: 0, timeout=3,
1391                                        sleep_interval=1)
1392        self.assertEqual(2, self.time_mock.sleep.call_count)
1393
1394        with self.assertRaisesRegexp(
1395                utils.TimeoutError,
1396                'Timed out waiting for unamed condition'):
1397            utils.poll_for_condition_ex(lambda: None, timeout=3,
1398                                        sleep_interval=1)
1399
1400
1401    def test_desc_arg(self):
1402        """Test polling condition always fails with desc.
1403
1404        Expect TimeoutError with condition description embedded.
1405        """
1406        with self.assertRaisesRegexp(
1407                utils.TimeoutError,
1408                'Timed out waiting for always false condition'):
1409            utils.poll_for_condition_ex(lambda: False,
1410                                        desc='always false condition',
1411                                        timeout=3, sleep_interval=1)
1412        self.assertEqual(2, self.time_mock.sleep.call_count)
1413
1414
1415    def test_exception(self):
1416        """Test polling condition that raises.
1417
1418        Expect TimeoutError with condition raised exception embedded.
1419        """
1420        with self.assertRaisesRegexp(
1421                utils.TimeoutError,
1422                "Reason: Exception\('always raise',\)"):
1423            utils.poll_for_condition_ex(always_raise, timeout=3,
1424                                        sleep_interval=1)
1425        self.assertEqual(2, self.time_mock.sleep.call_count)
1426
1427
1428    def test_ok_after_retry(self):
1429        """Test polling a condition which is success after retry twice.
1430        """
1431        self.assertTrue(utils.poll_for_condition_ex(fail_n_times(2), timeout=3,
1432                                                    sleep_interval=1))
1433
1434
1435    def test_cannot_wait(self):
1436        """Test polling a condition which fails till timeout.
1437        """
1438        with self.assertRaisesRegexp(
1439                utils.TimeoutError,
1440                'condition evaluted as false'):
1441            utils.poll_for_condition_ex(fail_n_times(3), timeout=3,
1442                                        sleep_interval=1)
1443        self.assertEqual(2, self.time_mock.sleep.call_count)
1444
1445
1446class test_timer(TimeModuleMockTestCase):
1447    """Test Timer.
1448    """
1449
1450    def test_zero_timeout(self):
1451        """Test Timer with zero timeout.
1452
1453        Only the first timer.sleep(0) is True.
1454        """
1455        timer = utils.Timer(0)
1456        self.assertTrue(timer.sleep(0))
1457        self.assertFalse(timer.sleep(0))
1458        self.time_mock.sleep.assert_not_called()
1459
1460
1461    def test_sleep(self):
1462        """Test Timer.sleep()
1463        """
1464        timeout = 3
1465        sleep_interval = 2
1466        timer = utils.Timer(timeout)
1467
1468        # Kicks off timer.
1469        self.assertTrue(timer.sleep(sleep_interval))
1470        self.assertEqual(self.fake_time_begin + timeout, timer.deadline)
1471        self.assertTrue(timer.sleep(sleep_interval))
1472        # now: 12. 12 + 2 > 13, unable to sleep
1473        self.assertFalse(timer.sleep(sleep_interval))
1474
1475        self.time_mock.sleep.assert_has_calls([pymock.call(sleep_interval)])
1476
1477
1478class test_timeout_error(unittest.TestCase):
1479    """Test TimeoutError.
1480
1481    Test TimeoutError with three invocations format.
1482    """
1483
1484    def test_no_args(self):
1485        """Create TimeoutError without arguments.
1486        """
1487        e = utils.TimeoutError()
1488        self.assertEqual('', str(e))
1489        self.assertEqual('TimeoutError()', repr(e))
1490
1491
1492    def test_with_message(self):
1493        """Create TimeoutError with text message.
1494        """
1495        e = utils.TimeoutError(message='Waiting for condition')
1496        self.assertEqual('Waiting for condition', str(e))
1497        self.assertEqual("TimeoutError('Waiting for condition',)", repr(e))
1498
1499        # Positional message argument for backward compatibility.
1500        e = utils.TimeoutError('Waiting for condition')
1501        self.assertEqual('Waiting for condition', str(e))
1502        self.assertEqual("TimeoutError('Waiting for condition',)", repr(e))
1503
1504
1505
1506    def test_with_reason(self):
1507        """Create TimeoutError with reason only.
1508        """
1509        e = utils.TimeoutError(reason='illegal input')
1510        self.assertEqual("Reason: 'illegal input'", str(e))
1511        self.assertEqual("TimeoutError(\"Reason: 'illegal input'\",)", repr(e))
1512        self.assertEqual('illegal input', e.reason)
1513
1514
1515    def test_with_message_reason(self):
1516        """Create TimeoutError with text message and reason.
1517        """
1518        e = utils.TimeoutError(message='Waiting for condition',
1519                               reason='illegal input')
1520        self.assertEqual("Waiting for condition. Reason: 'illegal input'",
1521                         str(e))
1522        self.assertEqual('illegal input', e.reason)
1523
1524        # Positional message argument for backward compatibility.
1525        e = utils.TimeoutError('Waiting for condition', reason='illegal input')
1526        self.assertEqual("Waiting for condition. Reason: 'illegal input'",
1527                         str(e))
1528        self.assertEqual('illegal input', e.reason)
1529
1530
1531    def test_with_message_reason_object(self):
1532        """Create TimeoutError with text message and reason as exception object.
1533        """
1534        e = utils.TimeoutError(message='Waiting for condition',
1535                               reason=Exception('illegal input'))
1536        self.assertEqual(
1537            "Waiting for condition. Reason: Exception('illegal input',)",
1538            str(e))
1539        self.assertIsInstance(e.reason, Exception)
1540        self.assertEqual('illegal input', e.reason.message)
1541
1542        # Positional message argument for backward compatibility.
1543        e = utils.TimeoutError('Waiting for condition',
1544                               reason=Exception('illegal input'))
1545        self.assertEqual(
1546            "Waiting for condition. Reason: Exception('illegal input',)",
1547            str(e))
1548        self.assertIsInstance(e.reason, Exception)
1549        self.assertEqual('illegal input', e.reason.message)
1550
1551
1552
1553if __name__ == "__main__":
1554    unittest.main()
1555