• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2# -*- coding: utf-8 -*-
3# Copyright 2018 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import datetime
8import json
9import os
10import shutil
11import tempfile
12import unittest
13import yaml
14
15import dateutil.parser
16
17import common
18import tast
19
20from autotest_lib.client.common_lib import base_job
21from autotest_lib.client.common_lib import error
22from autotest_lib.client.common_lib import utils
23from autotest_lib.server.cros.network import wifi_test_context_manager
24from autotest_lib.server.hosts import host_info
25from autotest_lib.server.hosts import servo_constants
26
27
28# Arbitrary base time to use in tests.
29BASE_TIME = dateutil.parser.parse('2018-01-01T00:00:00Z')
30
31# Arbitrary fixed time to use in place of time.time() when running tests.
32NOW = BASE_TIME + datetime.timedelta(0, 60)
33
34
35class TastTest(unittest.TestCase):
36    """Tests the tast.tast Autotest server test.
37
38    This unit test verifies interactions between the tast.py Autotest server
39    test and the 'tast' executable that's actually responsible for running
40    individual Tast tests and reporting their results. To do that, it sets up a
41    fake environment in which it can run the Autotest test, including a fake
42    implementation of the 'tast' executable provided by testdata/fake_tast.py.
43    """
44
45    # Arbitrary data to pass to the tast command.
46    HOST = 'dut.example.net'
47    PORT = 22
48    TEST_PATTERNS = ['(bvt)']
49    MAX_RUN_SEC = 300
50
51    # Default paths where Tast files are installed by Portage packages.
52    _PORTAGE_TAST_PATH = tast.tast._PORTAGE_TAST_PATH
53    _PORTAGE_REMOTE_BUNDLE_DIR = '/usr/libexec/tast/bundles/remote'
54    _PORTAGE_REMOTE_DATA_DIR = '/usr/share/tast/data'
55    _PORTAGE_REMOTE_TEST_RUNNER_PATH = '/usr/bin/remote_test_runner'
56
57    def setUp(self):
58        self._temp_dir = tempfile.mkdtemp('.tast_unittest')
59
60        def make_subdir(subdir):
61            # pylint: disable=missing-docstring
62            path = os.path.join(self._temp_dir, subdir)
63            os.mkdir(path)
64            return path
65
66        self._job = FakeServerJob(make_subdir('job'), make_subdir('tmp'))
67        self._bin_dir = make_subdir('bin')
68        self._out_dir = make_subdir('out')
69        self._root_dir = make_subdir('root')
70        self._set_up_root()
71
72        self._test = tast.tast(self._job, self._bin_dir, self._out_dir)
73        self._host = FakeHost(self.HOST, self.PORT)
74
75        self._test_patterns = []
76        self._tast_commands = {}
77
78    def tearDown(self):
79        shutil.rmtree(self._temp_dir)
80
81    def _get_path_in_root(self, orig_path):
82        """Appends a path to self._root_dir (which stores Tast-related files).
83
84        @param orig_path: Path to append, e.g. '/usr/bin/tast'.
85        @return: Path within the root dir, e.g. '/path/to/root/usr/bin/tast'.
86        """
87        return os.path.join(self._root_dir, os.path.relpath(orig_path, '/'))
88
89    def _set_up_root(self, ssp=False):
90        """Creates Tast-related files and dirs within self._root_dir.
91
92        @param ssp: If True, install files to locations used with Server-Side
93            Packaging. Otherwise, install to locations used by Portage packages.
94        """
95        def create_file(orig_dest, src=None):
96            """Creates a file under self._root_dir.
97
98            @param orig_dest: Original absolute path, e.g. "/usr/bin/tast".
99            @param src: Absolute path to file to copy, or none to create empty.
100            @return: Absolute path to created file.
101            """
102            dest = self._get_path_in_root(orig_dest)
103            if not os.path.exists(os.path.dirname(dest)):
104                os.makedirs(os.path.dirname(dest))
105            if src:
106                shutil.copyfile(src, dest)
107                shutil.copymode(src, dest)
108            else:
109                open(dest, 'a').close()
110            return dest
111
112        # Copy fake_tast.py to the usual location for the 'tast' executable.
113        # The remote bundle dir and remote_test_runner just need to exist so
114        # tast.py can find them; their contents don't matter since fake_tast.py
115        # won't actually use them.
116        self._tast_path = create_file(
117                tast.tast._SSP_TAST_PATH if ssp else self._PORTAGE_TAST_PATH,
118                os.path.join(os.path.dirname(os.path.realpath(__file__)),
119                             'testdata', 'fake_tast.py'))
120        self._remote_bundle_dir = os.path.dirname(
121                create_file(os.path.join(tast.tast._SSP_REMOTE_BUNDLE_DIR if ssp
122                                         else self._PORTAGE_REMOTE_BUNDLE_DIR,
123                                         'fake')))
124        self._remote_data_dir = os.path.dirname(
125                create_file(os.path.join(tast.tast._SSP_REMOTE_DATA_DIR if ssp
126                                         else self._PORTAGE_REMOTE_DATA_DIR,
127                                         'fake')))
128        self._remote_test_runner_path = create_file(
129                tast.tast._SSP_REMOTE_TEST_RUNNER_PATH if ssp
130                else self._PORTAGE_REMOTE_TEST_RUNNER_PATH)
131
132    def _init_tast_commands(self, tests, ssp=False, build=False,
133                            build_bundle='fakebundle', run_private_tests=False,
134                            run_vars=[], run_varsfiles=[],
135                            download_data_lazily=False):
136        """Sets fake_tast.py's behavior for 'list' and 'run' commands.
137
138        @param tests: List of TestInfo objects.
139        @param run_private_tests: Whether to run private tests.
140        @param run_vars: List of string values that should be passed to 'run'
141            via -var.
142        @param run_varsfiles: filenames should be passed to 'run' via -varsfile.
143        @param download_data_lazily: Whether to download external data files
144            lazily.
145        """
146        list_args = [
147            'build=%s' % build,
148            'patterns=%s' % self.TEST_PATTERNS,
149            'sshretries=%d' % tast.tast._SSH_CONNECT_RETRIES,
150            'downloaddata=%s' % ('lazy' if download_data_lazily else 'batch'),
151            'target=%s:%d' % (self.HOST, self.PORT),
152            'verbose=True',
153        ]
154        if build:
155            list_args.extend([
156                'buildbundle=%s' % build_bundle,
157                'checkbuilddeps=False',
158            ])
159        else:
160            if ssp:
161                list_args.extend([
162                    'remotebundledir=%s' % self._remote_bundle_dir,
163                    'remotedatadir=%s' % self._remote_data_dir,
164                    'remoterunner=%s' % self._remote_test_runner_path,
165                ])
166            else:
167                list_args.extend([
168                    'remotebundledir=None',
169                    'remotedatadir=None',
170                    'remoterunner=None',
171                ])
172            list_args.append('downloadprivatebundles=%s' % run_private_tests)
173        run_args = list_args + [
174            'resultsdir=%s' % self._test.resultsdir,
175            'continueafterfailure=True',
176            'var=%s' % run_vars,
177        ]
178        if run_varsfiles:
179            run_args.append('varsfile=%s' % run_varsfiles)
180
181        test_list = json.dumps([t.test() for t in tests])
182        run_files = {
183            self._results_path(): ''.join(
184                    [json.dumps(t.test_result()) + '\n'
185                     for t in tests if t.start_time()]),
186        }
187        self._tast_commands = {
188            'list': TastCommand(list_args, stdout=test_list),
189            'run': TastCommand(run_args, files_to_write=run_files),
190        }
191
192    def _results_path(self):
193        """Returns the path where "tast run" writes streamed results.
194
195        @return Path to streamed results file.
196        """
197        return os.path.join(self._test.resultsdir,
198                            tast.tast._STREAMED_RESULTS_FILENAME)
199
200    def _run_test(self,
201                  ignore_test_failures=False,
202                  command_args=[],
203                  ssp=False,
204                  build=False,
205                  build_bundle='fakebundle',
206                  run_private_tests=False,
207                  varsfiles=[],
208                  download_data_lazily=False,
209                  varslist=[]):
210        """Writes fake_tast.py's configuration and runs the test.
211
212        @param ignore_test_failures: Passed as the identically-named arg to
213            Tast.initialize().
214        @param command_args: Passed as the identically-named arg to
215            Tast.initialize().
216        @param ssp: Passed as the identically-named arg to Tast.initialize().
217        @param build: Passed as the identically-named arg to Tast.initialize().
218        @param build_bundle: Passed as the identically-named arg to
219            Tast.initialize().
220        @param run_private_tests: Passed as the identically-named arg to
221            Tast.initialize().
222        @param varsfiles: list of names of yaml files containing variables set
223             in |-varsfile| arguments.
224        @param download_data_lazily: Whether to download external data files
225            lazily.
226        @param varslist: list of strings to pass to tast run command as |-vars|
227            arguments. Each string should be formatted as "name=value".
228        """
229        self._test.initialize(self._host,
230                              self.TEST_PATTERNS,
231                              ignore_test_failures=ignore_test_failures,
232                              max_run_sec=self.MAX_RUN_SEC,
233                              command_args=command_args,
234                              install_root=self._root_dir,
235                              ssp=ssp,
236                              build=build,
237                              build_bundle=build_bundle,
238                              run_private_tests=run_private_tests,
239                              varsfiles=varsfiles,
240                              download_data_lazily=download_data_lazily,
241                              varslist=varslist)
242        self._test.set_fake_now_for_testing(
243                (NOW - tast._UNIX_EPOCH).total_seconds())
244
245        cfg = {}
246        for name, cmd in self._tast_commands.iteritems():
247            cfg[name] = vars(cmd)
248        path = os.path.join(os.path.dirname(self._tast_path), 'config.json')
249        with open(path, 'a') as f:
250            json.dump(cfg, f)
251
252        try:
253            self._test.run_once()
254        finally:
255            if self._job.post_run_hook:
256                self._job.post_run_hook()
257
258    def _run_test_for_failure(self, failed, missing):
259        """Calls _run_test and checks the resulting failure message.
260
261        @param failed: List of TestInfo objects for expected-to-fail tests.
262        @param missing: List of TestInfo objects for expected-missing tests.
263        """
264        with self.assertRaises(error.TestFail) as cm:
265            self._run_test()
266
267        msg = self._test._get_failure_message([t.name() for t in failed],
268                                              [t.name() for t in missing])
269        self.assertEqual(msg, str(cm.exception))
270
271    def _load_job_keyvals(self):
272        """Loads job keyvals.
273
274        @return Keyvals as a str-to-str dict, or None if keyval file is missing.
275        """
276        if not os.path.exists(os.path.join(self._job.resultdir,
277                                           'keyval')):
278            return None
279        return utils.read_keyval(self._job.resultdir)
280
281    def testPassingTests(self):
282        """Tests that passing tests are reported correctly."""
283        tests = [TestInfo('pkg.Test1', 0, 2),
284                 TestInfo('pkg.Test2', 3, 5),
285                 TestInfo('pkg.Test3', 6, 8)]
286        self._init_tast_commands(tests)
287        self._run_test()
288        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
289                         status_string(self._job.status_entries))
290        self.assertIs(self._load_job_keyvals(), None)
291
292    def testFailingTests(self):
293        """Tests that failing tests are reported correctly."""
294        tests = [TestInfo('pkg.Test1', 0, 2, errors=[('failed', 1)]),
295                 TestInfo('pkg.Test2', 3, 6),
296                 TestInfo('pkg.Test2', 7, 8, errors=[('another', 7)])]
297        self._init_tast_commands(tests)
298        self._run_test_for_failure([tests[0], tests[2]], [])
299        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
300                         status_string(self._job.status_entries))
301        self.assertIs(self._load_job_keyvals(), None)
302
303    def testIgnoreTestFailures(self):
304        """Tests that tast.tast can still pass with Tast test failures."""
305        tests = [TestInfo('pkg.Test', 0, 2, errors=[('failed', 1)])]
306        self._init_tast_commands(tests)
307        self._run_test(ignore_test_failures=True)
308        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
309                         status_string(self._job.status_entries))
310
311    def testSkippedTest(self):
312        """Tests that skipped tests aren't reported."""
313        tests = [TestInfo('pkg.Normal', 0, 1),
314                 TestInfo('pkg.Skipped', 2, 2, skip_reason='missing deps')]
315        self._init_tast_commands(tests)
316        self._run_test()
317        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
318                         status_string(self._job.status_entries))
319        self.assertIs(self._load_job_keyvals(), None)
320
321    def testSkippedTestWithErrors(self):
322        """Tests that skipped tests are reported if they also report errors."""
323        tests = [TestInfo('pkg.Normal', 0, 1),
324                 TestInfo('pkg.SkippedWithErrors', 2, 2, skip_reason='bad deps',
325                          errors=[('bad deps', 2)])]
326        self._init_tast_commands(tests)
327        self._run_test_for_failure([tests[1]], [])
328        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
329                         status_string(self._job.status_entries))
330        self.assertIs(self._load_job_keyvals(), None)
331
332    def testMissingTests(self):
333        """Tests that missing tests are reported when there's another test."""
334        tests = [TestInfo('pkg.Test1', None, None),
335                 TestInfo('pkg.Test2', 0, 2),
336                 TestInfo('pkg.Test3', None, None)]
337        self._init_tast_commands(tests)
338        self._run_test_for_failure([], [tests[0], tests[2]])
339        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
340                         status_string(self._job.status_entries))
341        self.assertEqual(self._load_job_keyvals(),
342                         {'tast_missing_test.0': 'pkg.Test1',
343                          'tast_missing_test.1': 'pkg.Test3'})
344
345    def testNoTestsRun(self):
346        """Tests that a missing test is reported when it's the only test."""
347        tests = [TestInfo('pkg.Test', None, None)]
348        self._init_tast_commands(tests)
349        self._run_test_for_failure([], [tests[0]])
350        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
351                         status_string(self._job.status_entries))
352        self.assertEqual(self._load_job_keyvals(),
353                         {'tast_missing_test.0': 'pkg.Test'})
354
355    def testHangingTest(self):
356        """Tests that a not-finished test is reported."""
357        tests = [TestInfo('pkg.Test1', 0, 2),
358                 TestInfo('pkg.Test2', 3, None),
359                 TestInfo('pkg.Test3', None, None)]
360        self._init_tast_commands(tests)
361        self._run_test_for_failure([tests[1]], [tests[2]])
362        self.assertEqual(
363                status_string(get_status_entries_from_tests(tests[:2])),
364                status_string(self._job.status_entries))
365        self.assertEqual(self._load_job_keyvals(),
366                         {'tast_missing_test.0': 'pkg.Test3'})
367
368    def testRunError(self):
369        """Tests that a run error is reported for a non-finished test."""
370        tests = [TestInfo('pkg.Test1', 0, 2),
371                 TestInfo('pkg.Test2', 3, None),
372                 TestInfo('pkg.Test3', None, None)]
373        self._init_tast_commands(tests)
374
375        # Simulate the run being aborted due to a lost SSH connection.
376        path = os.path.join(self._test.resultsdir,
377                            tast.tast._RUN_ERROR_FILENAME)
378        msg = 'Lost SSH connection to DUT'
379        self._tast_commands['run'].files_to_write[path] = msg
380        self._tast_commands['run'].status = 1
381
382        self._run_test_for_failure([tests[1]], [tests[2]])
383        self.assertEqual(
384                status_string(get_status_entries_from_tests(tests[:2], msg)),
385                status_string(self._job.status_entries))
386        self.assertEqual(self._load_job_keyvals(),
387                         {'tast_missing_test.0': 'pkg.Test3'})
388
389    def testNoTestsMatched(self):
390        """Tests that no error is raised if no tests are matched."""
391        self._init_tast_commands([])
392        self._run_test()
393
394    def testListCommandFails(self):
395        """Tests that an error is raised if the list command fails."""
396        self._init_tast_commands([])
397
398        # The list subcommand writes log messages to stderr on failure.
399        FAILURE_MSG = "failed to connect"
400        self._tast_commands['list'].status = 1
401        self._tast_commands['list'].stdout = None
402        self._tast_commands['list'].stderr = 'blah blah\n%s\n' % FAILURE_MSG
403
404        # The first line of the exception should include the last line of output
405        # from tast.
406        with self.assertRaises(error.TestFail) as cm:
407            self._run_test()
408        first_line = str(cm.exception).split('\n')[0]
409        self.assertTrue(FAILURE_MSG in first_line,
410                        '"%s" not in "%s"' % (FAILURE_MSG, first_line))
411
412    def testListCommandPrintsGarbage(self):
413        """Tests that an error is raised if the list command prints bad data."""
414        self._init_tast_commands([])
415        self._tast_commands['list'].stdout = 'not valid JSON data'
416        with self.assertRaises(error.TestFail) as _:
417            self._run_test()
418
419    def testRunCommandFails(self):
420        """Tests that an error is raised if the run command fails."""
421        tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', 2, 3)]
422        self._init_tast_commands(tests)
423        FAILURE_MSG = "this is the failure"
424        self._tast_commands['run'].status = 1
425        self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG
426
427        with self.assertRaises(error.TestFail) as cm:
428            self._run_test()
429        self.assertTrue(FAILURE_MSG in str(cm.exception),
430                        '"%s" not in "%s"' % (FAILURE_MSG, str(cm.exception)))
431        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
432                         status_string(self._job.status_entries))
433
434    def testRunCommandWritesTrailingGarbage(self):
435        """Tests that an error is raised if the run command prints bad data."""
436        tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', None, None)]
437        self._init_tast_commands(tests)
438        self._tast_commands['run'].files_to_write[self._results_path()] += \
439                'not valid JSON data'
440        with self.assertRaises(error.TestFail) as _:
441            self._run_test()
442        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
443                         status_string(self._job.status_entries))
444
445    def testNoResultsFile(self):
446        """Tests that an error is raised if no results file is written."""
447        tests = [TestInfo('pkg.Test1', None, None)]
448        self._init_tast_commands(tests)
449        self._tast_commands['run'].files_to_write = {}
450        with self.assertRaises(error.TestFail) as _:
451            self._run_test()
452        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
453                         status_string(self._job.status_entries))
454
455    def testNoResultsFileAfterRunCommandFails(self):
456        """Tests that stdout is included in error after missing results."""
457        tests = [TestInfo('pkg.Test1', None, None)]
458        self._init_tast_commands(tests)
459        FAILURE_MSG = "this is the failure"
460        self._tast_commands['run'].status = 1
461        self._tast_commands['run'].files_to_write = {}
462        self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG
463
464        # The first line of the exception should include the last line of output
465        # from tast rather than a message about the missing results file.
466        with self.assertRaises(error.TestFail) as cm:
467            self._run_test()
468        first_line = str(cm.exception).split('\n')[0]
469        self.assertTrue(FAILURE_MSG in first_line,
470                        '"%s" not in "%s"' % (FAILURE_MSG, first_line))
471        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
472                         status_string(self._job.status_entries))
473
474    def testMissingTastExecutable(self):
475        """Tests that an error is raised if the tast command isn't found."""
476        os.remove(self._get_path_in_root(self._PORTAGE_TAST_PATH))
477        with self.assertRaises(error.TestFail) as _:
478            self._run_test()
479
480    def testMissingRemoteTestRunner(self):
481        """Tests that an error is raised if remote_test_runner isn't found."""
482        os.remove(self._get_path_in_root(self._PORTAGE_REMOTE_TEST_RUNNER_PATH))
483        with self.assertRaises(error.TestFail) as _:
484            self._run_test()
485
486    def testMissingRemoteBundleDir(self):
487        """Tests that an error is raised if remote bundles aren't found."""
488        shutil.rmtree(self._get_path_in_root(self._PORTAGE_REMOTE_BUNDLE_DIR))
489        with self.assertRaises(error.TestFail) as _:
490            self._run_test()
491
492    def testSspPaths(self):
493        """Tests that files can be located at their alternate SSP locations."""
494        for p in os.listdir(self._root_dir):
495            shutil.rmtree(os.path.join(self._root_dir, p))
496        self._set_up_root(ssp=True)
497
498        tests = [TestInfo('pkg.Test', 0, 1)]
499        self._init_tast_commands(tests, ssp=True)
500        self._run_test(ssp=True)
501        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
502                         status_string(self._job.status_entries))
503
504    def testBuild(self):
505        """Tests that Tast tests can be built."""
506        tests = [TestInfo('pkg.Test', 0, 1)]
507        self._init_tast_commands(tests, build=True)
508        self._run_test(build=True)
509        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
510                         status_string(self._job.status_entries))
511
512    def testFailureMessage(self):
513        """Tests that appropriate failure messages are generated."""
514        # Just do this to initialize the self._test.
515        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)])
516        self._run_test()
517
518        msg = lambda f, m: self._test._get_failure_message(f, m)
519        self.assertEqual('', msg([], []))
520        self.assertEqual('1 failed: t1', msg(['t1'], []))
521        self.assertEqual('2 failed: t1 t2', msg(['t1', 't2'], []))
522        self.assertEqual('1 missing: t1', msg([], ['t1']))
523        self.assertEqual('1 failed: t1; 1 missing: t2', msg(['t1'], ['t2']))
524
525    def testFailureMessageIgnoreTestFailures(self):
526        """Tests that test failures are ignored in messages when requested."""
527        # Just do this to initialize the self._test.
528        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)])
529        self._run_test(ignore_test_failures=True)
530
531        msg = lambda f, m: self._test._get_failure_message(f, m)
532        self.assertEqual('', msg([], []))
533        self.assertEqual('', msg(['t1'], []))
534        self.assertEqual('1 missing: t1', msg([], ['t1']))
535        self.assertEqual('1 missing: t2', msg(['t1'], ['t2']))
536
537    def testNonAsciiFailureMessage(self):
538        """Tests that non-ascii failure message should be handled correctly"""
539        tests = [TestInfo('pkg.Test', 0, 2, errors=[('失敗', 1)])]
540        self._init_tast_commands(tests)
541        self._run_test(ignore_test_failures=True)
542        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
543                         status_string(self._job.status_entries))
544
545    def testRunPrivateTests(self):
546        """Tests running private tests."""
547        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
548                                 run_private_tests=True)
549        self._run_test(ignore_test_failures=True, run_private_tests=True)
550
551    def testDownloadDataLazily(self):
552        """Tests downloading external data files lazily."""
553        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
554                                 download_data_lazily=True)
555        self._run_test(ignore_test_failures=True, download_data_lazily=True)
556
557    def testServoFromCommandArgs(self):
558        """Tests passing servo info via command-line arg."""
559        SERVO_HOST = 'chromeos6-row2-rack21-labstation1.cros'
560        SERVO_PORT = '9995'
561
562        servo_var = 'servo=%s:%s' % (SERVO_HOST, SERVO_PORT)
563        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
564                                 run_vars=[servo_var])
565
566        # Simulate servo info being passed on the command line via --args.
567        args = [
568            '%s=%s' % (servo_constants.SERVO_HOST_ATTR, SERVO_HOST),
569            '%s=%s' % (servo_constants.SERVO_PORT_ATTR, SERVO_PORT),
570        ]
571        self._run_test(command_args=args)
572
573    def testServoFromHostInfoStore(self):
574        """Tests getting servo info from the host."""
575        SERVO_HOST = 'chromeos6-row2-rack21-labstation1.cros'
576        SERVO_PORT = '9995'
577
578        servo_var = 'servo=%s:%s' % (SERVO_HOST, SERVO_PORT)
579        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
580                                 run_vars=[servo_var])
581
582        # Simulate the host's servo info being stored in the Autotest DB.
583        attr = {
584            servo_constants.SERVO_HOST_ATTR: SERVO_HOST,
585            servo_constants.SERVO_PORT_ATTR: SERVO_PORT,
586        }
587        self._host.host_info_store.commit(host_info.HostInfo(attributes=attr))
588        self._run_test()
589
590    def testWificellArgs(self):
591        """Tests passing Wificell specific args into Tast runner."""
592        ROUTER_IP = '192.168.1.2:1234'
593        PCAP_IP = '192.168.1.3:2345'
594        wificell_var = [
595            'router=%s' % ROUTER_IP,
596            'pcap=%s' % PCAP_IP,
597        ]
598        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
599                                 run_vars=wificell_var)
600
601        WiFiManager = wifi_test_context_manager.WiFiTestContextManager
602        arg_list = [
603            (WiFiManager.CMDLINE_ROUTER_ADDR, ROUTER_IP),
604            (WiFiManager.CMDLINE_PCAP_ADDR, PCAP_IP),
605        ]
606        args = map(lambda x: ("%s=%s" % x), arg_list)
607        self._run_test(command_args=args)
608
609    def testVarsfileOption(self):
610        with tempfile.NamedTemporaryFile(
611                suffix='.yaml', dir=self._temp_dir) as temp_file:
612            yaml.dump({"var1": "val1", "var2": "val2"}, stream=temp_file)
613            varsfiles = [temp_file.name]
614            self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
615                                     run_varsfiles=varsfiles)
616            self._run_test(varsfiles=varsfiles)
617
618    def testVarslistOption(self):
619        varslist = ["var1=val1", "var2=val2"]
620        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
621                                 run_vars=varslist)
622        self._run_test(varslist=varslist)
623
624
625class TestInfo:
626    """Wraps information about a Tast test.
627
628    This struct is used to:
629    - get test definitions printed by fake_tast.py's 'list' command
630    - get test results written by fake_tast.py's 'run' command
631    - get expected base_job.status_log_entry objects that unit tests compare
632      against what tast.Tast actually recorded
633    """
634    def __init__(self, name, start_offset, end_offset, errors=None,
635                 skip_reason=None, attr=None, timeout_ns=0):
636        """
637        @param name: Name of the test, e.g. 'ui.ChromeLogin'.
638        @param start_offset: Start time as int seconds offset from BASE_TIME,
639            or None to indicate that tast didn't report a result for this test.
640        @param end_offset: End time as int seconds offset from BASE_TIME, or
641            None to indicate that tast reported that this test started but not
642            that it finished.
643        @param errors: List of (string, int) tuples containing reasons and
644            seconds offsets of errors encountered while running the test, or
645            None if no errors were encountered.
646        @param skip_reason: Human-readable reason that the test was skipped, or
647            None to indicate that it wasn't skipped.
648        @param attr: List of string test attributes assigned to the test, or
649            None if no attributes are assigned.
650        @param timeout_ns: Test timeout in nanoseconds.
651        """
652        def from_offset(offset):
653            """Returns an offset from BASE_TIME.
654
655            @param offset: Offset as integer seconds.
656            @return: datetime.datetime object.
657            """
658            if offset is None:
659                return None
660            return BASE_TIME + datetime.timedelta(seconds=offset)
661
662        self._name = name
663        self._start_time = from_offset(start_offset)
664        self._end_time = from_offset(end_offset)
665        self._errors = (
666                [(e[0], from_offset(e[1])) for e in errors] if errors else [])
667        self._skip_reason = skip_reason
668        self._attr = list(attr) if attr else []
669        self._timeout_ns = timeout_ns
670
671    def name(self):
672        # pylint: disable=missing-docstring
673        return self._name
674
675    def start_time(self):
676        # pylint: disable=missing-docstring
677        return self._start_time
678
679    def test(self):
680        """Returns a test dict printed by the 'list' command.
681
682        @return: dict representing a Tast testing.Test struct.
683        """
684        return {
685            'name': self._name,
686            'attr': self._attr,
687            'timeout': self._timeout_ns,
688        }
689
690    def test_result(self):
691        """Returns a dict representing a result written by the 'run' command.
692
693        @return: dict representing a Tast TestResult struct.
694        """
695        return {
696            'name': self._name,
697            'start': to_rfc3339(self._start_time),
698            'end': to_rfc3339(self._end_time),
699            'errors': [{'reason': e[0], 'time': to_rfc3339(e[1])}
700                       for e in self._errors],
701            'skipReason': self._skip_reason,
702            'attr': self._attr,
703            'timeout': self._timeout_ns,
704        }
705
706    def status_entries(self, run_error_msg=None):
707        """Returns expected base_job.status_log_entry objects for this test.
708
709        @param run_error_msg: String containing run error message, or None if no
710            run error was encountered.
711        @return: List of Autotest base_job.status_log_entry objects.
712        """
713        # Deliberately-skipped tests shouldn't have status entries unless errors
714        # were also reported.
715        if self._skip_reason and not self._errors:
716            return []
717
718        # Tests that weren't even started (e.g. because of an earlier issue)
719        # shouldn't have status entries.
720        if not self._start_time:
721            return []
722
723        def make(status_code, dt, msg=''):
724            """Makes a base_job.status_log_entry.
725
726            @param status_code: String status code.
727            @param dt: datetime.datetime object containing entry time.
728            @param msg: String message (typically only set for errors).
729            @return: base_job.status_log_entry object.
730            """
731            timestamp = int((dt - tast._UNIX_EPOCH).total_seconds())
732            return base_job.status_log_entry(
733                    status_code, None,
734                    tast.tast._TEST_NAME_PREFIX + self._name, msg, None,
735                    timestamp=timestamp)
736
737        entries = [make(tast.tast._JOB_STATUS_START, self._start_time)]
738
739        if self._end_time and not self._errors:
740            entries.append(make(tast.tast._JOB_STATUS_GOOD, self._end_time))
741            entries.append(make(tast.tast._JOB_STATUS_END_GOOD, self._end_time))
742        else:
743            for e in self._errors:
744                entries.append(make(tast.tast._JOB_STATUS_FAIL, e[1], e[0]))
745            if not self._end_time:
746                # If the test didn't finish, the run error (if any) should be
747                # included.
748                if run_error_msg:
749                    entries.append(make(tast.tast._JOB_STATUS_FAIL,
750                                        self._start_time, run_error_msg))
751                entries.append(make(tast.tast._JOB_STATUS_FAIL,
752                                    self._start_time,
753                                    tast.tast._TEST_DID_NOT_FINISH_MSG))
754            entries.append(make(tast.tast._JOB_STATUS_END_FAIL,
755                                self._end_time or self._start_time or NOW))
756
757        return entries
758
759
760class FakeServerJob:
761    """Fake implementation of server_job from server/server_job.py."""
762    def __init__(self, result_dir, tmp_dir):
763        self.pkgmgr = None
764        self.autodir = None
765        self.resultdir = result_dir
766        self.tmpdir = tmp_dir
767        self.post_run_hook = None
768        self.status_entries = []
769
770    def add_post_run_hook(self, hook):
771        """Stub implementation of server_job.add_post_run_hook."""
772        self.post_run_hook = hook
773
774    def record_entry(self, entry, log_in_subdir=True):
775        """Stub implementation of server_job.record_entry."""
776        assert(not log_in_subdir)
777        self.status_entries.append(entry)
778
779
780class FakeHost:
781    """Fake implementation of AbstractSSHHost from server/hosts/abstract_ssh.py.
782    """
783    def __init__(self, hostname, port):
784        self.hostname = hostname
785        self.port = port
786        self.host_info_store = host_info.InMemoryHostInfoStore(None)
787
788
789class TastCommand(object):
790    """Args and behavior for fake_tast.py for a given command, e.g. "list"."""
791
792    def __init__(self, required_args, status=0, stdout=None, stderr=None,
793                 files_to_write=None):
794        """
795        @param required_args: List of required args, each specified as
796            'name=value'. Names correspond to argparse-provided names in
797            fake_tast.py (typically just the flag name, e.g. 'build' or
798            'resultsdir'). Values correspond to str() representations of the
799            argparse-provided values.
800        @param status: Status code for fake_tast.py to return.
801        @param stdout: Data to write to stdout.
802        @param stderr: Data to write to stderr.
803        @param files_to_write: Dict mapping from paths of files to write to
804            their contents, or None to not write any files.
805        """
806        self.required_args = required_args
807        self.status = status
808        self.stdout = stdout
809        self.stderr = stderr
810        self.files_to_write = files_to_write if files_to_write else {}
811
812
813def to_rfc3339(t):
814    """Returns an RFC3339 timestamp.
815
816    @param t: UTC datetime.datetime object or None for the zero time.
817    @return: String RFC3339 time, e.g. '2018-01-02T02:34:28Z'.
818    """
819    if t is None:
820        return '0001-01-01T00:00:00Z'
821    assert(not t.utcoffset())
822    return t.strftime('%Y-%m-%dT%H:%M:%SZ')
823
824
825def get_status_entries_from_tests(tests, run_error_msg=None):
826    """Returns a flattened list of status entries from TestInfo objects.
827
828    @param tests: List of TestInfo objects.
829    @param run_error_msg: String containing run error message, or None if no
830        run error was encountered.
831    @return: Flattened list of base_job.status_log_entry objects produced by
832        calling status_entries() on each TestInfo object.
833    """
834    return sum([t.status_entries(run_error_msg) for t in tests], [])
835
836
837def status_string(entries):
838    """Returns a string describing a list of base_job.status_log_entry objects.
839
840    @param entries: List of base_job.status_log_entry objects.
841    @return: String containing space-separated representations of entries.
842    """
843    strings = []
844    for entry in entries:
845        timestamp = entry.fields[base_job.status_log_entry.TIMESTAMP_FIELD]
846        s = '[%s %s %s %s]' % (timestamp, entry.operation, entry.status_code,
847                               repr(str(entry.message)))
848        strings.append(s)
849
850    return ' '.join(strings)
851
852
853if __name__ == '__main__':
854    unittest.main()
855