1#!/usr/bin/python2 2# -*- coding: utf-8 -*- 3# Copyright 2018 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import datetime 8import json 9import os 10import shutil 11import tempfile 12import unittest 13import yaml 14 15import dateutil.parser 16 17import common 18import tast 19 20from autotest_lib.client.common_lib import base_job 21from autotest_lib.client.common_lib import error 22from autotest_lib.client.common_lib import utils 23from autotest_lib.server.cros.network import wifi_test_context_manager 24from autotest_lib.server.hosts import host_info 25from autotest_lib.server.hosts import servo_constants 26 27 28# Arbitrary base time to use in tests. 29BASE_TIME = dateutil.parser.parse('2018-01-01T00:00:00Z') 30 31# Arbitrary fixed time to use in place of time.time() when running tests. 32NOW = BASE_TIME + datetime.timedelta(0, 60) 33 34 35class TastTest(unittest.TestCase): 36 """Tests the tast.tast Autotest server test. 37 38 This unit test verifies interactions between the tast.py Autotest server 39 test and the 'tast' executable that's actually responsible for running 40 individual Tast tests and reporting their results. To do that, it sets up a 41 fake environment in which it can run the Autotest test, including a fake 42 implementation of the 'tast' executable provided by testdata/fake_tast.py. 43 """ 44 45 # Arbitrary data to pass to the tast command. 46 HOST = 'dut.example.net' 47 PORT = 22 48 TEST_PATTERNS = ['(bvt)'] 49 MAX_RUN_SEC = 300 50 51 # Default paths where Tast files are installed by Portage packages. 52 _PORTAGE_TAST_PATH = tast.tast._PORTAGE_TAST_PATH 53 _PORTAGE_REMOTE_BUNDLE_DIR = '/usr/libexec/tast/bundles/remote' 54 _PORTAGE_REMOTE_DATA_DIR = '/usr/share/tast/data' 55 _PORTAGE_REMOTE_TEST_RUNNER_PATH = '/usr/bin/remote_test_runner' 56 57 def setUp(self): 58 self._temp_dir = tempfile.mkdtemp('.tast_unittest') 59 60 def make_subdir(subdir): 61 # pylint: disable=missing-docstring 62 path = os.path.join(self._temp_dir, subdir) 63 os.mkdir(path) 64 return path 65 66 self._job = FakeServerJob(make_subdir('job'), make_subdir('tmp')) 67 self._bin_dir = make_subdir('bin') 68 self._out_dir = make_subdir('out') 69 self._root_dir = make_subdir('root') 70 self._set_up_root() 71 72 self._test = tast.tast(self._job, self._bin_dir, self._out_dir) 73 self._host = FakeHost(self.HOST, self.PORT) 74 75 self._test_patterns = [] 76 self._tast_commands = {} 77 78 def tearDown(self): 79 shutil.rmtree(self._temp_dir) 80 81 def _get_path_in_root(self, orig_path): 82 """Appends a path to self._root_dir (which stores Tast-related files). 83 84 @param orig_path: Path to append, e.g. '/usr/bin/tast'. 85 @return: Path within the root dir, e.g. '/path/to/root/usr/bin/tast'. 86 """ 87 return os.path.join(self._root_dir, os.path.relpath(orig_path, '/')) 88 89 def _set_up_root(self, ssp=False): 90 """Creates Tast-related files and dirs within self._root_dir. 91 92 @param ssp: If True, install files to locations used with Server-Side 93 Packaging. Otherwise, install to locations used by Portage packages. 94 """ 95 def create_file(orig_dest, src=None): 96 """Creates a file under self._root_dir. 97 98 @param orig_dest: Original absolute path, e.g. "/usr/bin/tast". 99 @param src: Absolute path to file to copy, or none to create empty. 100 @return: Absolute path to created file. 101 """ 102 dest = self._get_path_in_root(orig_dest) 103 if not os.path.exists(os.path.dirname(dest)): 104 os.makedirs(os.path.dirname(dest)) 105 if src: 106 shutil.copyfile(src, dest) 107 shutil.copymode(src, dest) 108 else: 109 open(dest, 'a').close() 110 return dest 111 112 # Copy fake_tast.py to the usual location for the 'tast' executable. 113 # The remote bundle dir and remote_test_runner just need to exist so 114 # tast.py can find them; their contents don't matter since fake_tast.py 115 # won't actually use them. 116 self._tast_path = create_file( 117 tast.tast._SSP_TAST_PATH if ssp else self._PORTAGE_TAST_PATH, 118 os.path.join(os.path.dirname(os.path.realpath(__file__)), 119 'testdata', 'fake_tast.py')) 120 self._remote_bundle_dir = os.path.dirname( 121 create_file(os.path.join(tast.tast._SSP_REMOTE_BUNDLE_DIR if ssp 122 else self._PORTAGE_REMOTE_BUNDLE_DIR, 123 'fake'))) 124 self._remote_data_dir = os.path.dirname( 125 create_file(os.path.join(tast.tast._SSP_REMOTE_DATA_DIR if ssp 126 else self._PORTAGE_REMOTE_DATA_DIR, 127 'fake'))) 128 self._remote_test_runner_path = create_file( 129 tast.tast._SSP_REMOTE_TEST_RUNNER_PATH if ssp 130 else self._PORTAGE_REMOTE_TEST_RUNNER_PATH) 131 132 def _init_tast_commands(self, tests, ssp=False, build=False, 133 build_bundle='fakebundle', run_private_tests=False, 134 run_vars=[], run_varsfiles=[], 135 download_data_lazily=False): 136 """Sets fake_tast.py's behavior for 'list' and 'run' commands. 137 138 @param tests: List of TestInfo objects. 139 @param run_private_tests: Whether to run private tests. 140 @param run_vars: List of string values that should be passed to 'run' 141 via -var. 142 @param run_varsfiles: filenames should be passed to 'run' via -varsfile. 143 @param download_data_lazily: Whether to download external data files 144 lazily. 145 """ 146 list_args = [ 147 'build=%s' % build, 148 'patterns=%s' % self.TEST_PATTERNS, 149 'sshretries=%d' % tast.tast._SSH_CONNECT_RETRIES, 150 'downloaddata=%s' % ('lazy' if download_data_lazily else 'batch'), 151 'target=%s:%d' % (self.HOST, self.PORT), 152 'verbose=True', 153 ] 154 if build: 155 list_args.extend([ 156 'buildbundle=%s' % build_bundle, 157 'checkbuilddeps=False', 158 ]) 159 else: 160 if ssp: 161 list_args.extend([ 162 'remotebundledir=%s' % self._remote_bundle_dir, 163 'remotedatadir=%s' % self._remote_data_dir, 164 'remoterunner=%s' % self._remote_test_runner_path, 165 ]) 166 else: 167 list_args.extend([ 168 'remotebundledir=None', 169 'remotedatadir=None', 170 'remoterunner=None', 171 ]) 172 list_args.append('downloadprivatebundles=%s' % run_private_tests) 173 run_args = list_args + [ 174 'resultsdir=%s' % self._test.resultsdir, 175 'continueafterfailure=True', 176 'var=%s' % run_vars, 177 ] 178 if run_varsfiles: 179 run_args.append('varsfile=%s' % run_varsfiles) 180 181 test_list = json.dumps([t.test() for t in tests]) 182 run_files = { 183 self._results_path(): ''.join( 184 [json.dumps(t.test_result()) + '\n' 185 for t in tests if t.start_time()]), 186 } 187 self._tast_commands = { 188 'list': TastCommand(list_args, stdout=test_list), 189 'run': TastCommand(run_args, files_to_write=run_files), 190 } 191 192 def _results_path(self): 193 """Returns the path where "tast run" writes streamed results. 194 195 @return Path to streamed results file. 196 """ 197 return os.path.join(self._test.resultsdir, 198 tast.tast._STREAMED_RESULTS_FILENAME) 199 200 def _run_test(self, 201 ignore_test_failures=False, 202 command_args=[], 203 ssp=False, 204 build=False, 205 build_bundle='fakebundle', 206 run_private_tests=False, 207 varsfiles=[], 208 download_data_lazily=False, 209 varslist=[]): 210 """Writes fake_tast.py's configuration and runs the test. 211 212 @param ignore_test_failures: Passed as the identically-named arg to 213 Tast.initialize(). 214 @param command_args: Passed as the identically-named arg to 215 Tast.initialize(). 216 @param ssp: Passed as the identically-named arg to Tast.initialize(). 217 @param build: Passed as the identically-named arg to Tast.initialize(). 218 @param build_bundle: Passed as the identically-named arg to 219 Tast.initialize(). 220 @param run_private_tests: Passed as the identically-named arg to 221 Tast.initialize(). 222 @param varsfiles: list of names of yaml files containing variables set 223 in |-varsfile| arguments. 224 @param download_data_lazily: Whether to download external data files 225 lazily. 226 @param varslist: list of strings to pass to tast run command as |-vars| 227 arguments. Each string should be formatted as "name=value". 228 """ 229 self._test.initialize(self._host, 230 self.TEST_PATTERNS, 231 ignore_test_failures=ignore_test_failures, 232 max_run_sec=self.MAX_RUN_SEC, 233 command_args=command_args, 234 install_root=self._root_dir, 235 ssp=ssp, 236 build=build, 237 build_bundle=build_bundle, 238 run_private_tests=run_private_tests, 239 varsfiles=varsfiles, 240 download_data_lazily=download_data_lazily, 241 varslist=varslist) 242 self._test.set_fake_now_for_testing( 243 (NOW - tast._UNIX_EPOCH).total_seconds()) 244 245 cfg = {} 246 for name, cmd in self._tast_commands.iteritems(): 247 cfg[name] = vars(cmd) 248 path = os.path.join(os.path.dirname(self._tast_path), 'config.json') 249 with open(path, 'a') as f: 250 json.dump(cfg, f) 251 252 try: 253 self._test.run_once() 254 finally: 255 if self._job.post_run_hook: 256 self._job.post_run_hook() 257 258 def _run_test_for_failure(self, failed, missing): 259 """Calls _run_test and checks the resulting failure message. 260 261 @param failed: List of TestInfo objects for expected-to-fail tests. 262 @param missing: List of TestInfo objects for expected-missing tests. 263 """ 264 with self.assertRaises(error.TestFail) as cm: 265 self._run_test() 266 267 msg = self._test._get_failure_message([t.name() for t in failed], 268 [t.name() for t in missing]) 269 self.assertEqual(msg, str(cm.exception)) 270 271 def _load_job_keyvals(self): 272 """Loads job keyvals. 273 274 @return Keyvals as a str-to-str dict, or None if keyval file is missing. 275 """ 276 if not os.path.exists(os.path.join(self._job.resultdir, 277 'keyval')): 278 return None 279 return utils.read_keyval(self._job.resultdir) 280 281 def testPassingTests(self): 282 """Tests that passing tests are reported correctly.""" 283 tests = [TestInfo('pkg.Test1', 0, 2), 284 TestInfo('pkg.Test2', 3, 5), 285 TestInfo('pkg.Test3', 6, 8)] 286 self._init_tast_commands(tests) 287 self._run_test() 288 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 289 status_string(self._job.status_entries)) 290 self.assertIs(self._load_job_keyvals(), None) 291 292 def testFailingTests(self): 293 """Tests that failing tests are reported correctly.""" 294 tests = [TestInfo('pkg.Test1', 0, 2, errors=[('failed', 1)]), 295 TestInfo('pkg.Test2', 3, 6), 296 TestInfo('pkg.Test2', 7, 8, errors=[('another', 7)])] 297 self._init_tast_commands(tests) 298 self._run_test_for_failure([tests[0], tests[2]], []) 299 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 300 status_string(self._job.status_entries)) 301 self.assertIs(self._load_job_keyvals(), None) 302 303 def testIgnoreTestFailures(self): 304 """Tests that tast.tast can still pass with Tast test failures.""" 305 tests = [TestInfo('pkg.Test', 0, 2, errors=[('failed', 1)])] 306 self._init_tast_commands(tests) 307 self._run_test(ignore_test_failures=True) 308 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 309 status_string(self._job.status_entries)) 310 311 def testSkippedTest(self): 312 """Tests that skipped tests aren't reported.""" 313 tests = [TestInfo('pkg.Normal', 0, 1), 314 TestInfo('pkg.Skipped', 2, 2, skip_reason='missing deps')] 315 self._init_tast_commands(tests) 316 self._run_test() 317 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 318 status_string(self._job.status_entries)) 319 self.assertIs(self._load_job_keyvals(), None) 320 321 def testSkippedTestWithErrors(self): 322 """Tests that skipped tests are reported if they also report errors.""" 323 tests = [TestInfo('pkg.Normal', 0, 1), 324 TestInfo('pkg.SkippedWithErrors', 2, 2, skip_reason='bad deps', 325 errors=[('bad deps', 2)])] 326 self._init_tast_commands(tests) 327 self._run_test_for_failure([tests[1]], []) 328 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 329 status_string(self._job.status_entries)) 330 self.assertIs(self._load_job_keyvals(), None) 331 332 def testMissingTests(self): 333 """Tests that missing tests are reported when there's another test.""" 334 tests = [TestInfo('pkg.Test1', None, None), 335 TestInfo('pkg.Test2', 0, 2), 336 TestInfo('pkg.Test3', None, None)] 337 self._init_tast_commands(tests) 338 self._run_test_for_failure([], [tests[0], tests[2]]) 339 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 340 status_string(self._job.status_entries)) 341 self.assertEqual(self._load_job_keyvals(), 342 {'tast_missing_test.0': 'pkg.Test1', 343 'tast_missing_test.1': 'pkg.Test3'}) 344 345 def testNoTestsRun(self): 346 """Tests that a missing test is reported when it's the only test.""" 347 tests = [TestInfo('pkg.Test', None, None)] 348 self._init_tast_commands(tests) 349 self._run_test_for_failure([], [tests[0]]) 350 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 351 status_string(self._job.status_entries)) 352 self.assertEqual(self._load_job_keyvals(), 353 {'tast_missing_test.0': 'pkg.Test'}) 354 355 def testHangingTest(self): 356 """Tests that a not-finished test is reported.""" 357 tests = [TestInfo('pkg.Test1', 0, 2), 358 TestInfo('pkg.Test2', 3, None), 359 TestInfo('pkg.Test3', None, None)] 360 self._init_tast_commands(tests) 361 self._run_test_for_failure([tests[1]], [tests[2]]) 362 self.assertEqual( 363 status_string(get_status_entries_from_tests(tests[:2])), 364 status_string(self._job.status_entries)) 365 self.assertEqual(self._load_job_keyvals(), 366 {'tast_missing_test.0': 'pkg.Test3'}) 367 368 def testRunError(self): 369 """Tests that a run error is reported for a non-finished test.""" 370 tests = [TestInfo('pkg.Test1', 0, 2), 371 TestInfo('pkg.Test2', 3, None), 372 TestInfo('pkg.Test3', None, None)] 373 self._init_tast_commands(tests) 374 375 # Simulate the run being aborted due to a lost SSH connection. 376 path = os.path.join(self._test.resultsdir, 377 tast.tast._RUN_ERROR_FILENAME) 378 msg = 'Lost SSH connection to DUT' 379 self._tast_commands['run'].files_to_write[path] = msg 380 self._tast_commands['run'].status = 1 381 382 self._run_test_for_failure([tests[1]], [tests[2]]) 383 self.assertEqual( 384 status_string(get_status_entries_from_tests(tests[:2], msg)), 385 status_string(self._job.status_entries)) 386 self.assertEqual(self._load_job_keyvals(), 387 {'tast_missing_test.0': 'pkg.Test3'}) 388 389 def testNoTestsMatched(self): 390 """Tests that no error is raised if no tests are matched.""" 391 self._init_tast_commands([]) 392 self._run_test() 393 394 def testListCommandFails(self): 395 """Tests that an error is raised if the list command fails.""" 396 self._init_tast_commands([]) 397 398 # The list subcommand writes log messages to stderr on failure. 399 FAILURE_MSG = "failed to connect" 400 self._tast_commands['list'].status = 1 401 self._tast_commands['list'].stdout = None 402 self._tast_commands['list'].stderr = 'blah blah\n%s\n' % FAILURE_MSG 403 404 # The first line of the exception should include the last line of output 405 # from tast. 406 with self.assertRaises(error.TestFail) as cm: 407 self._run_test() 408 first_line = str(cm.exception).split('\n')[0] 409 self.assertTrue(FAILURE_MSG in first_line, 410 '"%s" not in "%s"' % (FAILURE_MSG, first_line)) 411 412 def testListCommandPrintsGarbage(self): 413 """Tests that an error is raised if the list command prints bad data.""" 414 self._init_tast_commands([]) 415 self._tast_commands['list'].stdout = 'not valid JSON data' 416 with self.assertRaises(error.TestFail) as _: 417 self._run_test() 418 419 def testRunCommandFails(self): 420 """Tests that an error is raised if the run command fails.""" 421 tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', 2, 3)] 422 self._init_tast_commands(tests) 423 FAILURE_MSG = "this is the failure" 424 self._tast_commands['run'].status = 1 425 self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG 426 427 with self.assertRaises(error.TestFail) as cm: 428 self._run_test() 429 self.assertTrue(FAILURE_MSG in str(cm.exception), 430 '"%s" not in "%s"' % (FAILURE_MSG, str(cm.exception))) 431 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 432 status_string(self._job.status_entries)) 433 434 def testRunCommandWritesTrailingGarbage(self): 435 """Tests that an error is raised if the run command prints bad data.""" 436 tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', None, None)] 437 self._init_tast_commands(tests) 438 self._tast_commands['run'].files_to_write[self._results_path()] += \ 439 'not valid JSON data' 440 with self.assertRaises(error.TestFail) as _: 441 self._run_test() 442 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 443 status_string(self._job.status_entries)) 444 445 def testNoResultsFile(self): 446 """Tests that an error is raised if no results file is written.""" 447 tests = [TestInfo('pkg.Test1', None, None)] 448 self._init_tast_commands(tests) 449 self._tast_commands['run'].files_to_write = {} 450 with self.assertRaises(error.TestFail) as _: 451 self._run_test() 452 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 453 status_string(self._job.status_entries)) 454 455 def testNoResultsFileAfterRunCommandFails(self): 456 """Tests that stdout is included in error after missing results.""" 457 tests = [TestInfo('pkg.Test1', None, None)] 458 self._init_tast_commands(tests) 459 FAILURE_MSG = "this is the failure" 460 self._tast_commands['run'].status = 1 461 self._tast_commands['run'].files_to_write = {} 462 self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG 463 464 # The first line of the exception should include the last line of output 465 # from tast rather than a message about the missing results file. 466 with self.assertRaises(error.TestFail) as cm: 467 self._run_test() 468 first_line = str(cm.exception).split('\n')[0] 469 self.assertTrue(FAILURE_MSG in first_line, 470 '"%s" not in "%s"' % (FAILURE_MSG, first_line)) 471 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 472 status_string(self._job.status_entries)) 473 474 def testMissingTastExecutable(self): 475 """Tests that an error is raised if the tast command isn't found.""" 476 os.remove(self._get_path_in_root(self._PORTAGE_TAST_PATH)) 477 with self.assertRaises(error.TestFail) as _: 478 self._run_test() 479 480 def testMissingRemoteTestRunner(self): 481 """Tests that an error is raised if remote_test_runner isn't found.""" 482 os.remove(self._get_path_in_root(self._PORTAGE_REMOTE_TEST_RUNNER_PATH)) 483 with self.assertRaises(error.TestFail) as _: 484 self._run_test() 485 486 def testMissingRemoteBundleDir(self): 487 """Tests that an error is raised if remote bundles aren't found.""" 488 shutil.rmtree(self._get_path_in_root(self._PORTAGE_REMOTE_BUNDLE_DIR)) 489 with self.assertRaises(error.TestFail) as _: 490 self._run_test() 491 492 def testSspPaths(self): 493 """Tests that files can be located at their alternate SSP locations.""" 494 for p in os.listdir(self._root_dir): 495 shutil.rmtree(os.path.join(self._root_dir, p)) 496 self._set_up_root(ssp=True) 497 498 tests = [TestInfo('pkg.Test', 0, 1)] 499 self._init_tast_commands(tests, ssp=True) 500 self._run_test(ssp=True) 501 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 502 status_string(self._job.status_entries)) 503 504 def testBuild(self): 505 """Tests that Tast tests can be built.""" 506 tests = [TestInfo('pkg.Test', 0, 1)] 507 self._init_tast_commands(tests, build=True) 508 self._run_test(build=True) 509 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 510 status_string(self._job.status_entries)) 511 512 def testFailureMessage(self): 513 """Tests that appropriate failure messages are generated.""" 514 # Just do this to initialize the self._test. 515 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)]) 516 self._run_test() 517 518 msg = lambda f, m: self._test._get_failure_message(f, m) 519 self.assertEqual('', msg([], [])) 520 self.assertEqual('1 failed: t1', msg(['t1'], [])) 521 self.assertEqual('2 failed: t1 t2', msg(['t1', 't2'], [])) 522 self.assertEqual('1 missing: t1', msg([], ['t1'])) 523 self.assertEqual('1 failed: t1; 1 missing: t2', msg(['t1'], ['t2'])) 524 525 def testFailureMessageIgnoreTestFailures(self): 526 """Tests that test failures are ignored in messages when requested.""" 527 # Just do this to initialize the self._test. 528 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)]) 529 self._run_test(ignore_test_failures=True) 530 531 msg = lambda f, m: self._test._get_failure_message(f, m) 532 self.assertEqual('', msg([], [])) 533 self.assertEqual('', msg(['t1'], [])) 534 self.assertEqual('1 missing: t1', msg([], ['t1'])) 535 self.assertEqual('1 missing: t2', msg(['t1'], ['t2'])) 536 537 def testNonAsciiFailureMessage(self): 538 """Tests that non-ascii failure message should be handled correctly""" 539 tests = [TestInfo('pkg.Test', 0, 2, errors=[('失敗', 1)])] 540 self._init_tast_commands(tests) 541 self._run_test(ignore_test_failures=True) 542 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 543 status_string(self._job.status_entries)) 544 545 def testRunPrivateTests(self): 546 """Tests running private tests.""" 547 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 548 run_private_tests=True) 549 self._run_test(ignore_test_failures=True, run_private_tests=True) 550 551 def testDownloadDataLazily(self): 552 """Tests downloading external data files lazily.""" 553 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 554 download_data_lazily=True) 555 self._run_test(ignore_test_failures=True, download_data_lazily=True) 556 557 def testServoFromCommandArgs(self): 558 """Tests passing servo info via command-line arg.""" 559 SERVO_HOST = 'chromeos6-row2-rack21-labstation1.cros' 560 SERVO_PORT = '9995' 561 562 servo_var = 'servo=%s:%s' % (SERVO_HOST, SERVO_PORT) 563 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 564 run_vars=[servo_var]) 565 566 # Simulate servo info being passed on the command line via --args. 567 args = [ 568 '%s=%s' % (servo_constants.SERVO_HOST_ATTR, SERVO_HOST), 569 '%s=%s' % (servo_constants.SERVO_PORT_ATTR, SERVO_PORT), 570 ] 571 self._run_test(command_args=args) 572 573 def testServoFromHostInfoStore(self): 574 """Tests getting servo info from the host.""" 575 SERVO_HOST = 'chromeos6-row2-rack21-labstation1.cros' 576 SERVO_PORT = '9995' 577 578 servo_var = 'servo=%s:%s' % (SERVO_HOST, SERVO_PORT) 579 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 580 run_vars=[servo_var]) 581 582 # Simulate the host's servo info being stored in the Autotest DB. 583 attr = { 584 servo_constants.SERVO_HOST_ATTR: SERVO_HOST, 585 servo_constants.SERVO_PORT_ATTR: SERVO_PORT, 586 } 587 self._host.host_info_store.commit(host_info.HostInfo(attributes=attr)) 588 self._run_test() 589 590 def testWificellArgs(self): 591 """Tests passing Wificell specific args into Tast runner.""" 592 ROUTER_IP = '192.168.1.2:1234' 593 PCAP_IP = '192.168.1.3:2345' 594 wificell_var = [ 595 'router=%s' % ROUTER_IP, 596 'pcap=%s' % PCAP_IP, 597 ] 598 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 599 run_vars=wificell_var) 600 601 WiFiManager = wifi_test_context_manager.WiFiTestContextManager 602 arg_list = [ 603 (WiFiManager.CMDLINE_ROUTER_ADDR, ROUTER_IP), 604 (WiFiManager.CMDLINE_PCAP_ADDR, PCAP_IP), 605 ] 606 args = map(lambda x: ("%s=%s" % x), arg_list) 607 self._run_test(command_args=args) 608 609 def testVarsfileOption(self): 610 with tempfile.NamedTemporaryFile( 611 suffix='.yaml', dir=self._temp_dir) as temp_file: 612 yaml.dump({"var1": "val1", "var2": "val2"}, stream=temp_file) 613 varsfiles = [temp_file.name] 614 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 615 run_varsfiles=varsfiles) 616 self._run_test(varsfiles=varsfiles) 617 618 def testVarslistOption(self): 619 varslist = ["var1=val1", "var2=val2"] 620 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 621 run_vars=varslist) 622 self._run_test(varslist=varslist) 623 624 625class TestInfo: 626 """Wraps information about a Tast test. 627 628 This struct is used to: 629 - get test definitions printed by fake_tast.py's 'list' command 630 - get test results written by fake_tast.py's 'run' command 631 - get expected base_job.status_log_entry objects that unit tests compare 632 against what tast.Tast actually recorded 633 """ 634 def __init__(self, name, start_offset, end_offset, errors=None, 635 skip_reason=None, attr=None, timeout_ns=0): 636 """ 637 @param name: Name of the test, e.g. 'ui.ChromeLogin'. 638 @param start_offset: Start time as int seconds offset from BASE_TIME, 639 or None to indicate that tast didn't report a result for this test. 640 @param end_offset: End time as int seconds offset from BASE_TIME, or 641 None to indicate that tast reported that this test started but not 642 that it finished. 643 @param errors: List of (string, int) tuples containing reasons and 644 seconds offsets of errors encountered while running the test, or 645 None if no errors were encountered. 646 @param skip_reason: Human-readable reason that the test was skipped, or 647 None to indicate that it wasn't skipped. 648 @param attr: List of string test attributes assigned to the test, or 649 None if no attributes are assigned. 650 @param timeout_ns: Test timeout in nanoseconds. 651 """ 652 def from_offset(offset): 653 """Returns an offset from BASE_TIME. 654 655 @param offset: Offset as integer seconds. 656 @return: datetime.datetime object. 657 """ 658 if offset is None: 659 return None 660 return BASE_TIME + datetime.timedelta(seconds=offset) 661 662 self._name = name 663 self._start_time = from_offset(start_offset) 664 self._end_time = from_offset(end_offset) 665 self._errors = ( 666 [(e[0], from_offset(e[1])) for e in errors] if errors else []) 667 self._skip_reason = skip_reason 668 self._attr = list(attr) if attr else [] 669 self._timeout_ns = timeout_ns 670 671 def name(self): 672 # pylint: disable=missing-docstring 673 return self._name 674 675 def start_time(self): 676 # pylint: disable=missing-docstring 677 return self._start_time 678 679 def test(self): 680 """Returns a test dict printed by the 'list' command. 681 682 @return: dict representing a Tast testing.Test struct. 683 """ 684 return { 685 'name': self._name, 686 'attr': self._attr, 687 'timeout': self._timeout_ns, 688 } 689 690 def test_result(self): 691 """Returns a dict representing a result written by the 'run' command. 692 693 @return: dict representing a Tast TestResult struct. 694 """ 695 return { 696 'name': self._name, 697 'start': to_rfc3339(self._start_time), 698 'end': to_rfc3339(self._end_time), 699 'errors': [{'reason': e[0], 'time': to_rfc3339(e[1])} 700 for e in self._errors], 701 'skipReason': self._skip_reason, 702 'attr': self._attr, 703 'timeout': self._timeout_ns, 704 } 705 706 def status_entries(self, run_error_msg=None): 707 """Returns expected base_job.status_log_entry objects for this test. 708 709 @param run_error_msg: String containing run error message, or None if no 710 run error was encountered. 711 @return: List of Autotest base_job.status_log_entry objects. 712 """ 713 # Deliberately-skipped tests shouldn't have status entries unless errors 714 # were also reported. 715 if self._skip_reason and not self._errors: 716 return [] 717 718 # Tests that weren't even started (e.g. because of an earlier issue) 719 # shouldn't have status entries. 720 if not self._start_time: 721 return [] 722 723 def make(status_code, dt, msg=''): 724 """Makes a base_job.status_log_entry. 725 726 @param status_code: String status code. 727 @param dt: datetime.datetime object containing entry time. 728 @param msg: String message (typically only set for errors). 729 @return: base_job.status_log_entry object. 730 """ 731 timestamp = int((dt - tast._UNIX_EPOCH).total_seconds()) 732 return base_job.status_log_entry( 733 status_code, None, 734 tast.tast._TEST_NAME_PREFIX + self._name, msg, None, 735 timestamp=timestamp) 736 737 entries = [make(tast.tast._JOB_STATUS_START, self._start_time)] 738 739 if self._end_time and not self._errors: 740 entries.append(make(tast.tast._JOB_STATUS_GOOD, self._end_time)) 741 entries.append(make(tast.tast._JOB_STATUS_END_GOOD, self._end_time)) 742 else: 743 for e in self._errors: 744 entries.append(make(tast.tast._JOB_STATUS_FAIL, e[1], e[0])) 745 if not self._end_time: 746 # If the test didn't finish, the run error (if any) should be 747 # included. 748 if run_error_msg: 749 entries.append(make(tast.tast._JOB_STATUS_FAIL, 750 self._start_time, run_error_msg)) 751 entries.append(make(tast.tast._JOB_STATUS_FAIL, 752 self._start_time, 753 tast.tast._TEST_DID_NOT_FINISH_MSG)) 754 entries.append(make(tast.tast._JOB_STATUS_END_FAIL, 755 self._end_time or self._start_time or NOW)) 756 757 return entries 758 759 760class FakeServerJob: 761 """Fake implementation of server_job from server/server_job.py.""" 762 def __init__(self, result_dir, tmp_dir): 763 self.pkgmgr = None 764 self.autodir = None 765 self.resultdir = result_dir 766 self.tmpdir = tmp_dir 767 self.post_run_hook = None 768 self.status_entries = [] 769 770 def add_post_run_hook(self, hook): 771 """Stub implementation of server_job.add_post_run_hook.""" 772 self.post_run_hook = hook 773 774 def record_entry(self, entry, log_in_subdir=True): 775 """Stub implementation of server_job.record_entry.""" 776 assert(not log_in_subdir) 777 self.status_entries.append(entry) 778 779 780class FakeHost: 781 """Fake implementation of AbstractSSHHost from server/hosts/abstract_ssh.py. 782 """ 783 def __init__(self, hostname, port): 784 self.hostname = hostname 785 self.port = port 786 self.host_info_store = host_info.InMemoryHostInfoStore(None) 787 788 789class TastCommand(object): 790 """Args and behavior for fake_tast.py for a given command, e.g. "list".""" 791 792 def __init__(self, required_args, status=0, stdout=None, stderr=None, 793 files_to_write=None): 794 """ 795 @param required_args: List of required args, each specified as 796 'name=value'. Names correspond to argparse-provided names in 797 fake_tast.py (typically just the flag name, e.g. 'build' or 798 'resultsdir'). Values correspond to str() representations of the 799 argparse-provided values. 800 @param status: Status code for fake_tast.py to return. 801 @param stdout: Data to write to stdout. 802 @param stderr: Data to write to stderr. 803 @param files_to_write: Dict mapping from paths of files to write to 804 their contents, or None to not write any files. 805 """ 806 self.required_args = required_args 807 self.status = status 808 self.stdout = stdout 809 self.stderr = stderr 810 self.files_to_write = files_to_write if files_to_write else {} 811 812 813def to_rfc3339(t): 814 """Returns an RFC3339 timestamp. 815 816 @param t: UTC datetime.datetime object or None for the zero time. 817 @return: String RFC3339 time, e.g. '2018-01-02T02:34:28Z'. 818 """ 819 if t is None: 820 return '0001-01-01T00:00:00Z' 821 assert(not t.utcoffset()) 822 return t.strftime('%Y-%m-%dT%H:%M:%SZ') 823 824 825def get_status_entries_from_tests(tests, run_error_msg=None): 826 """Returns a flattened list of status entries from TestInfo objects. 827 828 @param tests: List of TestInfo objects. 829 @param run_error_msg: String containing run error message, or None if no 830 run error was encountered. 831 @return: Flattened list of base_job.status_log_entry objects produced by 832 calling status_entries() on each TestInfo object. 833 """ 834 return sum([t.status_entries(run_error_msg) for t in tests], []) 835 836 837def status_string(entries): 838 """Returns a string describing a list of base_job.status_log_entry objects. 839 840 @param entries: List of base_job.status_log_entry objects. 841 @return: String containing space-separated representations of entries. 842 """ 843 strings = [] 844 for entry in entries: 845 timestamp = entry.fields[base_job.status_log_entry.TIMESTAMP_FIELD] 846 s = '[%s %s %s %s]' % (timestamp, entry.operation, entry.status_code, 847 repr(str(entry.message))) 848 strings.append(s) 849 850 return ' '.join(strings) 851 852 853if __name__ == '__main__': 854 unittest.main() 855