• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2# Copyright 2018 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import datetime
7import json
8import os
9import shutil
10import tempfile
11import unittest
12
13import dateutil.parser
14
15import common
16import tast
17
18from autotest_lib.client.common_lib import base_job
19from autotest_lib.client.common_lib import error
20from autotest_lib.client.common_lib import utils
21
22
23# Arbitrary base time to use in tests.
24BASE_TIME = dateutil.parser.parse('2018-01-01T00:00:00Z')
25
26# Arbitrary fixed time to use in place of time.time() when running tests.
27NOW = BASE_TIME + datetime.timedelta(0, 60)
28
29
30class TastTest(unittest.TestCase):
31    """Tests the tast.tast Autotest server test.
32
33    This unit test verifies interactions between the tast.py Autotest server
34    test and the 'tast' executable that's actually responsible for running
35    individual Tast tests and reporting their results. To do that, it sets up a
36    fake environment in which it can run the Autotest test, including a fake
37    implementation of the 'tast' executable provided by testdata/fake_tast.py.
38    """
39
40    # Arbitrary data to pass to the tast command.
41    HOST = 'dut.example.net'
42    PORT = 22
43    TEST_PATTERNS = ['(bvt)']
44    MAX_RUN_SEC = 300
45
46    def setUp(self):
47        self._temp_dir = tempfile.mkdtemp('.tast_unittest')
48
49        def make_subdir(subdir):
50            # pylint: disable=missing-docstring
51            path = os.path.join(self._temp_dir, subdir)
52            os.mkdir(path)
53            return path
54
55        self._job = FakeServerJob(make_subdir('job'), make_subdir('tmp'))
56        self._bin_dir = make_subdir('bin')
57        self._out_dir = make_subdir('out')
58        self._root_dir = make_subdir('root')
59        self._set_up_root()
60
61        self._test = tast.tast(self._job, self._bin_dir, self._out_dir)
62
63        self._test_patterns = []
64        self._tast_commands = {}
65
66    def tearDown(self):
67        shutil.rmtree(self._temp_dir)
68
69    def _get_path_in_root(self, orig_path):
70        """Appends a path to self._root_dir (which stores Tast-related files).
71
72        @param orig_path: Path to append, e.g. '/usr/bin/tast'.
73        @return: Path within the root dir, e.g. '/path/to/root/usr/bin/tast'.
74        """
75        return os.path.join(self._root_dir, os.path.relpath(orig_path, '/'))
76
77    def _set_up_root(self, ssp=False):
78        """Creates Tast-related files and dirs within self._root_dir.
79
80        @param ssp: If True, install files to locations used with Server-Side
81            Packaging. Otherwise, install to locations used by Portage packages.
82        """
83        def create_file(orig_dest, src=None):
84            """Creates a file under self._root_dir.
85
86            @param orig_dest: Original absolute path, e.g. "/usr/bin/tast".
87            @param src: Absolute path to file to copy, or none to create empty.
88            @return: Absolute path to created file.
89            """
90            dest = self._get_path_in_root(orig_dest)
91            if not os.path.exists(os.path.dirname(dest)):
92                os.makedirs(os.path.dirname(dest))
93            if src:
94                shutil.copyfile(src, dest)
95                shutil.copymode(src, dest)
96            else:
97                open(dest, 'a').close()
98            return dest
99
100        # Copy fake_tast.py to the usual location for the 'tast' executable.
101        # The remote bundle dir and remote_test_runner just need to exist so
102        # tast.py can find them; their contents don't matter since fake_tast.py
103        # won't actually use them.
104        self._tast_path = create_file(
105                tast.tast._SSP_TAST_PATH if ssp else tast.tast._TAST_PATH,
106                os.path.join(os.path.dirname(os.path.realpath(__file__)),
107                             'testdata', 'fake_tast.py'))
108        self._remote_bundle_dir = os.path.dirname(
109                create_file(os.path.join(tast.tast._SSP_REMOTE_BUNDLE_DIR if ssp
110                                         else tast.tast._REMOTE_BUNDLE_DIR,
111                                         'fake')))
112        self._remote_test_runner_path = create_file(
113                tast.tast._SSP_REMOTE_TEST_RUNNER_PATH if ssp
114                else tast.tast._REMOTE_TEST_RUNNER_PATH)
115
116    def _init_tast_commands(self, tests, run_private_tests=False):
117        """Sets fake_tast.py's behavior for 'list' and 'run' commands.
118
119        @param tests: List of TestInfo objects.
120        @param run_private_tests: Whether to run private tests.
121        """
122        list_args = [
123            'build=False',
124            'patterns=%s' % self.TEST_PATTERNS,
125            'remotebundledir=' + self._remote_bundle_dir,
126            'remoterunner=' + self._remote_test_runner_path,
127            'sshretries=%d' % tast.tast._SSH_CONNECT_RETRIES,
128            'target=%s:%d' % (self.HOST, self.PORT),
129            'downloadprivatebundles=%s' % run_private_tests,
130            'verbose=True',
131        ]
132        run_args = list_args + ['resultsdir=' + self._test.resultsdir]
133        test_list = json.dumps([t.test() for t in tests])
134        run_files = {
135            self._results_path(): ''.join(
136                    [json.dumps(t.test_result()) + '\n'
137                     for t in tests if t.start_time()]),
138        }
139        self._tast_commands = {
140            'list': TastCommand(list_args, stdout=test_list),
141            'run': TastCommand(run_args, files_to_write=run_files),
142        }
143
144    def _results_path(self):
145        """Returns the path where "tast run" writes streamed results.
146
147        @return Path to streamed results file.
148        """
149        return os.path.join(self._test.resultsdir,
150                            tast.tast._STREAMED_RESULTS_FILENAME)
151
152    def _run_test(self, ignore_test_failures=False, run_private_tests=False):
153        """Writes fake_tast.py's configuration and runs the test.
154
155        @param ignore_test_failures: Passed as the identically-named arg to
156            Tast.initialize().
157        @param run_private_tests: Passed as the identically-named arg to
158            Tast.initialize().
159        """
160        self._test.initialize(FakeHost(self.HOST, self.PORT),
161                              self.TEST_PATTERNS,
162                              ignore_test_failures=ignore_test_failures,
163                              max_run_sec=self.MAX_RUN_SEC,
164                              install_root=self._root_dir,
165                              run_private_tests=run_private_tests)
166        self._test.set_fake_now_for_testing(
167                (NOW - tast._UNIX_EPOCH).total_seconds())
168
169        cfg = {}
170        for name, cmd in self._tast_commands.iteritems():
171            cfg[name] = vars(cmd)
172        path = os.path.join(os.path.dirname(self._tast_path), 'config.json')
173        with open(path, 'a') as f:
174            json.dump(cfg, f)
175
176        try:
177            self._test.run_once()
178        finally:
179            if self._job.post_run_hook:
180                self._job.post_run_hook()
181
182    def _run_test_for_failure(self, failed, missing):
183        """Calls _run_test and checks the resulting failure message.
184
185        @param failed: List of TestInfo objects for expected-to-fail tests.
186        @param missing: List of TestInfo objects for expected-missing tests.
187        """
188        with self.assertRaises(error.TestFail) as cm:
189            self._run_test()
190
191        msg = self._test._get_failure_message([t.name() for t in failed],
192                                              [t.name() for t in missing])
193        self.assertEqual(msg, str(cm.exception))
194
195    def _load_job_keyvals(self):
196        """Loads job keyvals.
197
198        @return Keyvals as a str-to-str dict, or None if keyval file is missing.
199        """
200        if not os.path.exists(os.path.join(self._job.resultdir,
201                                           'keyval')):
202            return None
203        return utils.read_keyval(self._job.resultdir)
204
205    def testPassingTests(self):
206        """Tests that passing tests are reported correctly."""
207        tests = [TestInfo('pkg.Test1', 0, 2),
208                 TestInfo('pkg.Test2', 3, 5),
209                 TestInfo('pkg.Test3', 6, 8)]
210        self._init_tast_commands(tests)
211        self._run_test()
212        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
213                         status_string(self._job.status_entries))
214        self.assertIs(self._load_job_keyvals(), None)
215
216    def testFailingTests(self):
217        """Tests that failing tests are reported correctly."""
218        tests = [TestInfo('pkg.Test1', 0, 2, errors=[('failed', 1)]),
219                 TestInfo('pkg.Test2', 3, 6),
220                 TestInfo('pkg.Test2', 7, 8, errors=[('another', 7)])]
221        self._init_tast_commands(tests)
222        self._run_test_for_failure([tests[0], tests[2]], [])
223        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
224                         status_string(self._job.status_entries))
225        self.assertIs(self._load_job_keyvals(), None)
226
227    def testIgnoreTestFailures(self):
228        """Tests that tast.tast can still pass with Tast test failures."""
229        tests = [TestInfo('pkg.Test', 0, 2, errors=[('failed', 1)])]
230        self._init_tast_commands(tests)
231        self._run_test(ignore_test_failures=True)
232        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
233                         status_string(self._job.status_entries))
234
235    def testSkippedTest(self):
236        """Tests that skipped tests aren't reported."""
237        tests = [TestInfo('pkg.Normal', 0, 1),
238                 TestInfo('pkg.Skipped', 2, 2, skip_reason='missing deps')]
239        self._init_tast_commands(tests)
240        self._run_test()
241        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
242                         status_string(self._job.status_entries))
243        self.assertIs(self._load_job_keyvals(), None)
244
245    def testSkippedTestWithErrors(self):
246        """Tests that skipped tests are reported if they also report errors."""
247        tests = [TestInfo('pkg.Normal', 0, 1),
248                 TestInfo('pkg.SkippedWithErrors', 2, 2, skip_reason='bad deps',
249                          errors=[('bad deps', 2)])]
250        self._init_tast_commands(tests)
251        self._run_test_for_failure([tests[1]], [])
252        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
253                         status_string(self._job.status_entries))
254        self.assertIs(self._load_job_keyvals(), None)
255
256    def testMissingTests(self):
257        """Tests that missing tests are reported when there's another test."""
258        tests = [TestInfo('pkg.Test1', None, None),
259                 TestInfo('pkg.Test2', 0, 2),
260                 TestInfo('pkg.Test3', None, None)]
261        self._init_tast_commands(tests)
262        self._run_test_for_failure([], [tests[0], tests[2]])
263        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
264                         status_string(self._job.status_entries))
265        self.assertEqual(self._load_job_keyvals(),
266                         {'tast_missing_test.0': 'pkg.Test1',
267                          'tast_missing_test.1': 'pkg.Test3'})
268
269    def testNoTestsRun(self):
270        """Tests that a missing test is reported when it's the only test."""
271        tests = [TestInfo('pkg.Test', None, None)]
272        self._init_tast_commands(tests)
273        self._run_test_for_failure([], [tests[0]])
274        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
275                         status_string(self._job.status_entries))
276        self.assertEqual(self._load_job_keyvals(),
277                         {'tast_missing_test.0': 'pkg.Test'})
278
279    def testHangingTest(self):
280        """Tests that a not-finished test is reported."""
281        tests = [TestInfo('pkg.Test1', 0, 2),
282                 TestInfo('pkg.Test2', 3, None),
283                 TestInfo('pkg.Test3', None, None)]
284        self._init_tast_commands(tests)
285        self._run_test_for_failure([tests[1]], [tests[2]])
286        self.assertEqual(
287                status_string(get_status_entries_from_tests(tests[:2])),
288                status_string(self._job.status_entries))
289        self.assertEqual(self._load_job_keyvals(),
290                         {'tast_missing_test.0': 'pkg.Test3'})
291
292    def testRunError(self):
293        """Tests that a run error is reported for a non-finished test."""
294        tests = [TestInfo('pkg.Test1', 0, 2),
295                 TestInfo('pkg.Test2', 3, None),
296                 TestInfo('pkg.Test3', None, None)]
297        self._init_tast_commands(tests)
298
299        # Simulate the run being aborted due to a lost SSH connection.
300        path = os.path.join(self._test.resultsdir,
301                            tast.tast._RUN_ERROR_FILENAME)
302        msg = 'Lost SSH connection to DUT'
303        self._tast_commands['run'].files_to_write[path] = msg
304        self._tast_commands['run'].status = 1
305
306        self._run_test_for_failure([tests[1]], [tests[2]])
307        self.assertEqual(
308                status_string(get_status_entries_from_tests(tests[:2], msg)),
309                status_string(self._job.status_entries))
310        self.assertEqual(self._load_job_keyvals(),
311                         {'tast_missing_test.0': 'pkg.Test3'})
312
313    def testNoTestsMatched(self):
314        """Tests that an error is raised if no tests are matched."""
315        self._init_tast_commands([])
316        with self.assertRaises(error.TestFail) as _:
317            self._run_test()
318
319    def testListCommandFails(self):
320        """Tests that an error is raised if the list command fails."""
321        self._init_tast_commands([])
322
323        # The list subcommand writes log messages to stderr on failure.
324        FAILURE_MSG = "failed to connect"
325        self._tast_commands['list'].status = 1
326        self._tast_commands['list'].stdout = None
327        self._tast_commands['list'].stderr = 'blah blah\n%s\n' % FAILURE_MSG
328
329        # The first line of the exception should include the last line of output
330        # from tast.
331        with self.assertRaises(error.TestFail) as cm:
332            self._run_test()
333        first_line = str(cm.exception).split('\n')[0]
334        self.assertTrue(FAILURE_MSG in first_line,
335                        '"%s" not in "%s"' % (FAILURE_MSG, first_line))
336
337    def testListCommandPrintsGarbage(self):
338        """Tests that an error is raised if the list command prints bad data."""
339        self._init_tast_commands([])
340        self._tast_commands['list'].stdout = 'not valid JSON data'
341        with self.assertRaises(error.TestFail) as _:
342            self._run_test()
343
344    def testRunCommandFails(self):
345        """Tests that an error is raised if the run command fails."""
346        tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', 2, 3)]
347        self._init_tast_commands(tests)
348        FAILURE_MSG = "this is the failure"
349        self._tast_commands['run'].status = 1
350        self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG
351
352        with self.assertRaises(error.TestFail) as cm:
353            self._run_test()
354        self.assertTrue(FAILURE_MSG in str(cm.exception),
355                        '"%s" not in "%s"' % (FAILURE_MSG, str(cm.exception)))
356        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
357                         status_string(self._job.status_entries))
358
359    def testRunCommandWritesTrailingGarbage(self):
360        """Tests that an error is raised if the run command prints bad data."""
361        tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', None, None)]
362        self._init_tast_commands(tests)
363        self._tast_commands['run'].files_to_write[self._results_path()] += \
364                'not valid JSON data'
365        with self.assertRaises(error.TestFail) as _:
366            self._run_test()
367        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
368                         status_string(self._job.status_entries))
369
370    def testNoResultsFile(self):
371        """Tests that an error is raised if no results file is written."""
372        tests = [TestInfo('pkg.Test1', None, None)]
373        self._init_tast_commands(tests)
374        self._tast_commands['run'].files_to_write = {}
375        with self.assertRaises(error.TestFail) as _:
376            self._run_test()
377        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
378                         status_string(self._job.status_entries))
379
380    def testNoResultsFileAfterRunCommandFails(self):
381        """Tests that stdout is included in error after missing results."""
382        tests = [TestInfo('pkg.Test1', None, None)]
383        self._init_tast_commands(tests)
384        FAILURE_MSG = "this is the failure"
385        self._tast_commands['run'].status = 1
386        self._tast_commands['run'].files_to_write = {}
387        self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG
388
389        # The first line of the exception should include the last line of output
390        # from tast rather than a message about the missing results file.
391        with self.assertRaises(error.TestFail) as cm:
392            self._run_test()
393        first_line = str(cm.exception).split('\n')[0]
394        self.assertTrue(FAILURE_MSG in first_line,
395                        '"%s" not in "%s"' % (FAILURE_MSG, first_line))
396        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
397                         status_string(self._job.status_entries))
398
399    def testMissingTastExecutable(self):
400        """Tests that an error is raised if the tast command isn't found."""
401        os.remove(self._get_path_in_root(tast.tast._TAST_PATH))
402        with self.assertRaises(error.TestFail) as _:
403            self._run_test()
404
405    def testMissingRemoteTestRunner(self):
406        """Tests that an error is raised if remote_test_runner isn't found."""
407        os.remove(self._get_path_in_root(tast.tast._REMOTE_TEST_RUNNER_PATH))
408        with self.assertRaises(error.TestFail) as _:
409            self._run_test()
410
411    def testMissingRemoteBundleDir(self):
412        """Tests that an error is raised if remote bundles aren't found."""
413        shutil.rmtree(self._get_path_in_root(tast.tast._REMOTE_BUNDLE_DIR))
414        with self.assertRaises(error.TestFail) as _:
415            self._run_test()
416
417    def testSspPaths(self):
418        """Tests that files can be located at their alternate SSP locations."""
419        for p in os.listdir(self._root_dir):
420            shutil.rmtree(os.path.join(self._root_dir, p))
421        self._set_up_root(ssp=True)
422
423        tests = [TestInfo('pkg.Test', 0, 1)]
424        self._init_tast_commands(tests)
425        self._run_test()
426        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
427                         status_string(self._job.status_entries))
428
429    def testSumTestTimeouts(self):
430        """Tests that test timeouts are summed for the overall timeout."""
431        ns_in_sec = 1000000000
432        tests = [TestInfo('pkg.Test1', 0, 0, timeout_ns=(23 * ns_in_sec)),
433                 TestInfo('pkg.Test2', 0, 0, timeout_ns=(41 * ns_in_sec))]
434        self._init_tast_commands(tests)
435        self._run_test()
436        self.assertEqual(64 + tast.tast._RUN_OVERHEAD_SEC,
437                         self._test._get_run_tests_timeout_sec())
438
439    def testCapTestTimeout(self):
440        """Tests that excessive test timeouts are capped."""
441        timeout_ns = 2 * self.MAX_RUN_SEC * 1000000000
442        tests = [TestInfo('pkg.Test', 0, 0, timeout_ns=timeout_ns)]
443        self._init_tast_commands(tests)
444        self._run_test()
445        self.assertEqual(self.MAX_RUN_SEC,
446                         self._test._get_run_tests_timeout_sec())
447
448    def testFailureMessage(self):
449        """Tests that appropriate failure messages are generated."""
450        # Just do this to initialize the self._test.
451        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)])
452        self._run_test()
453
454        msg = lambda f, m: self._test._get_failure_message(f, m)
455        self.assertEqual('', msg([], []))
456        self.assertEqual('1 failed: t1', msg(['t1'], []))
457        self.assertEqual('2 failed: t1 t2', msg(['t1', 't2'], []))
458        self.assertEqual('1 missing: t1', msg([], ['t1']))
459        self.assertEqual('1 failed: t1; 1 missing: t2', msg(['t1'], ['t2']))
460
461    def testFailureMessageIgnoreTestFailures(self):
462        """Tests that test failures are ignored in messages when requested."""
463        # Just do this to initialize the self._test.
464        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)])
465        self._run_test(ignore_test_failures=True)
466
467        msg = lambda f, m: self._test._get_failure_message(f, m)
468        self.assertEqual('', msg([], []))
469        self.assertEqual('', msg(['t1'], []))
470        self.assertEqual('1 missing: t1', msg([], ['t1']))
471        self.assertEqual('1 missing: t2', msg(['t1'], ['t2']))
472
473    def testRunPrivateTests(self):
474        """Tests running private tests."""
475        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
476                                 run_private_tests=True)
477        self._run_test(ignore_test_failures=True, run_private_tests=True)
478
479
480class TestInfo:
481    """Wraps information about a Tast test.
482
483    This struct is used to:
484    - get test definitions printed by fake_tast.py's 'list' command
485    - get test results written by fake_tast.py's 'run' command
486    - get expected base_job.status_log_entry objects that unit tests compare
487      against what tast.Tast actually recorded
488    """
489    def __init__(self, name, start_offset, end_offset, errors=None,
490                 skip_reason=None, attr=None, timeout_ns=0):
491        """
492        @param name: Name of the test, e.g. 'ui.ChromeLogin'.
493        @param start_offset: Start time as int seconds offset from BASE_TIME,
494            or None to indicate that tast didn't report a result for this test.
495        @param end_offset: End time as int seconds offset from BASE_TIME, or
496            None to indicate that tast reported that this test started but not
497            that it finished.
498        @param errors: List of (string, int) tuples containing reasons and
499            seconds offsets of errors encountered while running the test, or
500            None if no errors were encountered.
501        @param skip_reason: Human-readable reason that the test was skipped, or
502            None to indicate that it wasn't skipped.
503        @param attr: List of string test attributes assigned to the test, or
504            None if no attributes are assigned.
505        @param timeout_ns: Test timeout in nanoseconds.
506        """
507        def from_offset(offset):
508            """Returns an offset from BASE_TIME.
509
510            @param offset: Offset as integer seconds.
511            @return: datetime.datetime object.
512            """
513            if offset is None:
514                return None
515            return BASE_TIME + datetime.timedelta(seconds=offset)
516
517        self._name = name
518        self._start_time = from_offset(start_offset)
519        self._end_time = from_offset(end_offset)
520        self._errors = (
521                [(e[0], from_offset(e[1])) for e in errors] if errors else [])
522        self._skip_reason = skip_reason
523        self._attr = list(attr) if attr else []
524        self._timeout_ns = timeout_ns
525
526    def name(self):
527        # pylint: disable=missing-docstring
528        return self._name
529
530    def start_time(self):
531        # pylint: disable=missing-docstring
532        return self._start_time
533
534    def test(self):
535        """Returns a test dict printed by the 'list' command.
536
537        @return: dict representing a Tast testing.Test struct.
538        """
539        return {
540            'name': self._name,
541            'attr': self._attr,
542            'timeout': self._timeout_ns,
543        }
544
545    def test_result(self):
546        """Returns a dict representing a result written by the 'run' command.
547
548        @return: dict representing a Tast TestResult struct.
549        """
550        return {
551            'name': self._name,
552            'start': to_rfc3339(self._start_time),
553            'end': to_rfc3339(self._end_time),
554            'errors': [{'reason': e[0], 'time': to_rfc3339(e[1])}
555                       for e in self._errors],
556            'skipReason': self._skip_reason,
557            'attr': self._attr,
558            'timeout': self._timeout_ns,
559        }
560
561    def status_entries(self, run_error_msg=None):
562        """Returns expected base_job.status_log_entry objects for this test.
563
564        @param run_error_msg: String containing run error message, or None if no
565            run error was encountered.
566        @return: List of Autotest base_job.status_log_entry objects.
567        """
568        # Deliberately-skipped tests shouldn't have status entries unless errors
569        # were also reported.
570        if self._skip_reason and not self._errors:
571            return []
572
573        # Tests that weren't even started (e.g. because of an earlier issue)
574        # shouldn't have status entries.
575        if not self._start_time:
576            return []
577
578        def make(status_code, dt, msg=''):
579            """Makes a base_job.status_log_entry.
580
581            @param status_code: String status code.
582            @param dt: datetime.datetime object containing entry time.
583            @param msg: String message (typically only set for errors).
584            @return: base_job.status_log_entry object.
585            """
586            timestamp = int((dt - tast._UNIX_EPOCH).total_seconds())
587            return base_job.status_log_entry(
588                    status_code, None,
589                    tast.tast._TEST_NAME_PREFIX + self._name, msg, None,
590                    timestamp=timestamp)
591
592        entries = [make(tast.tast._JOB_STATUS_START, self._start_time)]
593
594        if self._end_time and not self._errors:
595            entries.append(make(tast.tast._JOB_STATUS_GOOD, self._end_time))
596            entries.append(make(tast.tast._JOB_STATUS_END_GOOD, self._end_time))
597        else:
598            for e in self._errors:
599                entries.append(make(tast.tast._JOB_STATUS_FAIL, e[1], e[0]))
600            if not self._end_time:
601                # If the test didn't finish, the run error (if any) should be
602                # included.
603                if run_error_msg:
604                    entries.append(make(tast.tast._JOB_STATUS_FAIL,
605                                        self._start_time, run_error_msg))
606                entries.append(make(tast.tast._JOB_STATUS_FAIL,
607                                    self._start_time,
608                                    tast.tast._TEST_DID_NOT_FINISH_MSG))
609            entries.append(make(tast.tast._JOB_STATUS_END_FAIL,
610                                self._end_time or self._start_time or NOW))
611
612        return entries
613
614
615class FakeServerJob:
616    """Fake implementation of server_job from server/server_job.py."""
617    def __init__(self, result_dir, tmp_dir):
618        self.pkgmgr = None
619        self.autodir = None
620        self.resultdir = result_dir
621        self.tmpdir = tmp_dir
622        self.post_run_hook = None
623        self.status_entries = []
624
625    def add_post_run_hook(self, hook):
626        """Stub implementation of server_job.add_post_run_hook."""
627        self.post_run_hook = hook
628
629    def record_entry(self, entry, log_in_subdir=True):
630        """Stub implementation of server_job.record_entry."""
631        assert(not log_in_subdir)
632        self.status_entries.append(entry)
633
634
635class FakeHost:
636    """Fake implementation of AbstractSSHHost from server/hosts/abstract_ssh.py.
637    """
638    def __init__(self, hostname, port):
639        self.hostname = hostname
640        self.port = port
641
642
643class TastCommand(object):
644    """Args and behavior for fake_tast.py for a given command, e.g. "list"."""
645
646    def __init__(self, required_args, status=0, stdout=None, stderr=None,
647                 files_to_write=None):
648        """
649        @param required_args: List of required args, each specified as
650            'name=value'. Names correspond to argparse-provided names in
651            fake_tast.py (typically just the flag name, e.g. 'build' or
652            'resultsdir'). Values correspond to str() representations of the
653            argparse-provided values.
654        @param status: Status code for fake_tast.py to return.
655        @param stdout: Data to write to stdout.
656        @param stderr: Data to write to stderr.
657        @param files_to_write: Dict mapping from paths of files to write to
658            their contents, or None to not write any files.
659        """
660        self.required_args = required_args
661        self.status = status
662        self.stdout = stdout
663        self.stderr = stderr
664        self.files_to_write = files_to_write if files_to_write else {}
665
666
667def to_rfc3339(t):
668    """Returns an RFC3339 timestamp.
669
670    @param t: UTC datetime.datetime object or None for the zero time.
671    @return: String RFC3339 time, e.g. '2018-01-02T02:34:28Z'.
672    """
673    if t is None:
674        return '0001-01-01T00:00:00Z'
675    assert(not t.utcoffset())
676    return t.strftime('%Y-%m-%dT%H:%M:%SZ')
677
678
679def get_status_entries_from_tests(tests, run_error_msg=None):
680    """Returns a flattened list of status entries from TestInfo objects.
681
682    @param tests: List of TestInfo objects.
683    @param run_error_msg: String containing run error message, or None if no
684        run error was encountered.
685    @return: Flattened list of base_job.status_log_entry objects produced by
686        calling status_entries() on each TestInfo object.
687    """
688    return sum([t.status_entries(run_error_msg) for t in tests], [])
689
690
691def status_string(entries):
692    """Returns a string describing a list of base_job.status_log_entry objects.
693
694    @param entries: List of base_job.status_log_entry objects.
695    @return: String containing space-separated representations of entries.
696    """
697    strings = []
698    for entry in entries:
699        timestamp = entry.fields[base_job.status_log_entry.TIMESTAMP_FIELD]
700        s = '[%s %s %s %s]' % (timestamp, entry.operation, entry.status_code,
701                               repr(str(entry.message)))
702        strings.append(s)
703
704    return ' '.join(strings)
705
706
707if __name__ == '__main__':
708    unittest.main()
709