1# Lint as: python2, python3 2"""The main job wrapper 3 4This is the core infrastructure. 5 6Copyright Andy Whitcroft, Martin J. Bligh 2006 7""" 8 9# pylint: disable=missing-docstring 10 11import copy 12from datetime import datetime 13import getpass 14import glob 15import logging 16import os 17import re 18import shutil 19import sys 20import time 21import traceback 22import types 23import weakref 24 25import six 26 27import common 28from autotest_lib.client.bin import client_logging_config 29from autotest_lib.client.bin import harness 30from autotest_lib.client.bin import local_host 31from autotest_lib.client.bin import parallel 32from autotest_lib.client.bin import partition as partition_lib 33from autotest_lib.client.bin import profilers 34from autotest_lib.client.bin import sysinfo 35from autotest_lib.client.bin import test 36from autotest_lib.client.bin import utils 37from autotest_lib.client.common_lib import barrier 38from autotest_lib.client.common_lib import base_job 39from autotest_lib.client.common_lib import control_data 40from autotest_lib.client.common_lib import error 41from autotest_lib.client.common_lib import global_config 42from autotest_lib.client.common_lib import logging_manager 43from autotest_lib.client.common_lib import packages 44from autotest_lib.client.cros import cros_logging 45from autotest_lib.client.tools import html_report 46 47GLOBAL_CONFIG = global_config.global_config 48 49LAST_BOOT_TAG = object() 50JOB_PREAMBLE = """ 51from autotest_lib.client.common_lib.error import * 52from autotest_lib.client.bin.utils import * 53""" 54 55 56class StepError(error.AutotestError): 57 pass 58 59class NotAvailableError(error.AutotestError): 60 pass 61 62 63 64def _run_test_complete_on_exit(f): 65 """Decorator for job methods that automatically calls 66 self.harness.run_test_complete when the method exits, if appropriate.""" 67 def wrapped(self, *args, **dargs): 68 try: 69 return f(self, *args, **dargs) 70 finally: 71 if self._logger.global_filename == 'status': 72 self.harness.run_test_complete() 73 if self.drop_caches: 74 utils.drop_caches() 75 wrapped.__name__ = f.__name__ 76 wrapped.__doc__ = f.__doc__ 77 wrapped.__dict__.update(f.__dict__) 78 return wrapped 79 80 81class status_indenter(base_job.status_indenter): 82 """Provide a status indenter that is backed by job._record_prefix.""" 83 def __init__(self, job_): 84 self._job = weakref.proxy(job_) # avoid a circular reference 85 86 87 @property 88 def indent(self): 89 return self._job._record_indent 90 91 92 def increment(self): 93 self._job._record_indent += 1 94 95 96 def decrement(self): 97 self._job._record_indent -= 1 98 99 100class base_client_job(base_job.base_job): 101 """The client-side concrete implementation of base_job. 102 103 Optional properties provided by this implementation: 104 control 105 harness 106 """ 107 108 _WARNING_DISABLE_DELAY = 5 109 110 # _record_indent is a persistent property, but only on the client 111 _job_state = base_job.base_job._job_state 112 _record_indent = _job_state.property_factory( 113 '_state', '_record_indent', 0, namespace='client') 114 _max_disk_usage_rate = _job_state.property_factory( 115 '_state', '_max_disk_usage_rate', 0.0, namespace='client') 116 117 118 def __init__(self, control, options, drop_caches=True): 119 """ 120 Prepare a client side job object. 121 122 @param control: The control file (pathname of). 123 @param options: an object which includes: 124 jobtag: The job tag string (eg "default"). 125 cont: If this is the continuation of this job. 126 harness_type: An alternative server harness. [None] 127 use_external_logging: If true, the enable_external_logging 128 method will be called during construction. [False] 129 @param drop_caches: If true, utils.drop_caches() is called before and 130 between all tests. [True] 131 """ 132 super(base_client_job, self).__init__(options=options) 133 self._pre_record_init(control, options) 134 try: 135 self._post_record_init(control, options, drop_caches) 136 except Exception as err: 137 self.record( 138 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 139 str(err)) 140 raise 141 142 143 @classmethod 144 def _get_environ_autodir(cls): 145 return os.environ['AUTODIR'] 146 147 148 @classmethod 149 def _find_base_directories(cls): 150 """ 151 Determine locations of autodir and clientdir (which are the same) 152 using os.environ. Serverdir does not exist in this context. 153 """ 154 autodir = clientdir = cls._get_environ_autodir() 155 return autodir, clientdir, None 156 157 158 @classmethod 159 def _parse_args(cls, args): 160 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) 161 162 163 def _find_resultdir(self, options): 164 """ 165 Determine the directory for storing results. On a client this is 166 always <autodir>/results/<tag>, where tag is passed in on the command 167 line as an option. 168 """ 169 output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT', 170 'output_dir', 171 default="") 172 if options.output_dir: 173 basedir = options.output_dir 174 elif output_dir_config: 175 basedir = output_dir_config 176 else: 177 basedir = self.autodir 178 179 return os.path.join(basedir, 'results', options.tag) 180 181 182 def _get_status_logger(self): 183 """Return a reference to the status logger.""" 184 return self._logger 185 186 187 def _pre_record_init(self, control, options): 188 """ 189 Initialization function that should peform ONLY the required 190 setup so that the self.record() method works. 191 192 As of now self.record() needs self.resultdir, self._group_level, 193 self.harness and of course self._logger. 194 """ 195 if not options.cont: 196 self._cleanup_debugdir_files() 197 self._cleanup_results_dir() 198 199 logging_manager.configure_logging( 200 client_logging_config.ClientLoggingConfig(), 201 results_dir=self.resultdir, 202 verbose=options.verbose) 203 logging.info('Writing results to %s', self.resultdir) 204 205 # init_group_level needs the state 206 self.control = os.path.realpath(control) 207 self._is_continuation = options.cont 208 self._current_step_ancestry = [] 209 self._next_step_index = 0 210 self._load_state() 211 212 _harness = self.handle_persistent_option(options, 'harness') 213 _harness_args = self.handle_persistent_option(options, 'harness_args') 214 215 self.harness = harness.select(_harness, self, _harness_args) 216 217 if self.control: 218 parsed_control = control_data.parse_control( 219 self.control, raise_warnings=False) 220 self.fast = parsed_control.fast 221 222 # set up the status logger 223 def client_job_record_hook(entry): 224 msg_tag = '' 225 if '.' in self._logger.global_filename: 226 msg_tag = self._logger.global_filename.split('.', 1)[1] 227 # send the entry to the job harness 228 message = '\n'.join([entry.message] + entry.extra_message_lines) 229 rendered_entry = self._logger.render_entry(entry) 230 self.harness.test_status_detail(entry.status_code, entry.subdir, 231 entry.operation, message, msg_tag, 232 entry.fields) 233 self.harness.test_status(rendered_entry, msg_tag) 234 # send the entry to stdout, if it's enabled 235 logging.info(rendered_entry) 236 self._logger = base_job.status_logger( 237 self, status_indenter(self), record_hook=client_job_record_hook) 238 239 240 def _post_record_init(self, control, options, drop_caches): 241 """ 242 Perform job initialization not required by self.record(). 243 """ 244 self._init_drop_caches(drop_caches) 245 246 self._init_packages() 247 248 self.sysinfo = sysinfo.sysinfo(self.resultdir) 249 self._load_sysinfo_state() 250 251 if not options.cont: 252 download = os.path.join(self.testdir, 'download') 253 if not os.path.exists(download): 254 os.mkdir(download) 255 256 shutil.copyfile(self.control, 257 os.path.join(self.resultdir, 'control')) 258 259 self.control = control 260 261 self.logging = logging_manager.get_logging_manager( 262 manage_stdout_and_stderr=True, redirect_fds=True) 263 self.logging.start_logging() 264 265 self.profilers = profilers.profilers(self) 266 267 self.machines = [options.hostname] 268 self.machine_dict_list = [{'hostname' : options.hostname}] 269 # Client side tests should always run the same whether or not they are 270 # running in the lab. 271 self.in_lab = False 272 self.hosts = set([local_host.LocalHost(hostname=options.hostname)]) 273 274 self.args = [] 275 if options.args: 276 self.args = self._parse_args(options.args) 277 278 if options.user: 279 self.user = options.user 280 else: 281 self.user = getpass.getuser() 282 283 self.sysinfo.log_per_reboot_data() 284 285 if not options.cont: 286 self.record('START', None, None) 287 288 self.harness.run_start() 289 290 if options.log: 291 self.enable_external_logging() 292 293 self.num_tests_run = None 294 self.num_tests_failed = None 295 296 self.warning_loggers = None 297 self.warning_manager = None 298 299 300 def _init_drop_caches(self, drop_caches): 301 """ 302 Perform the drop caches initialization. 303 """ 304 self.drop_caches_between_iterations = ( 305 GLOBAL_CONFIG.get_config_value('CLIENT', 306 'drop_caches_between_iterations', 307 type=bool, default=True)) 308 self.drop_caches = drop_caches 309 if self.drop_caches: 310 utils.drop_caches() 311 312 313 def _init_packages(self): 314 """ 315 Perform the packages support initialization. 316 """ 317 self.pkgmgr = packages.PackageManager( 318 self.autodir, run_function_dargs={'timeout':3600}) 319 320 321 def _cleanup_results_dir(self): 322 """Delete everything in resultsdir""" 323 assert os.path.exists(self.resultdir) 324 list_files = glob.glob('%s/*' % self.resultdir) 325 for f in list_files: 326 if os.path.isdir(f): 327 shutil.rmtree(f) 328 elif os.path.isfile(f): 329 os.remove(f) 330 331 332 def _cleanup_debugdir_files(self): 333 """ 334 Delete any leftover debugdir files 335 """ 336 list_files = glob.glob("/tmp/autotest_results_dir.*") 337 for f in list_files: 338 os.remove(f) 339 340 341 def disable_warnings(self, warning_type): 342 self.record("INFO", None, None, 343 "disabling %s warnings" % warning_type, 344 {"warnings.disable": warning_type}) 345 time.sleep(self._WARNING_DISABLE_DELAY) 346 347 348 def enable_warnings(self, warning_type): 349 time.sleep(self._WARNING_DISABLE_DELAY) 350 self.record("INFO", None, None, 351 "enabling %s warnings" % warning_type, 352 {"warnings.enable": warning_type}) 353 354 355 def monitor_disk_usage(self, max_rate): 356 """\ 357 Signal that the job should monitor disk space usage on / 358 and generate a warning if a test uses up disk space at a 359 rate exceeding 'max_rate'. 360 361 Parameters: 362 max_rate - the maximium allowed rate of disk consumption 363 during a test, in MB/hour, or 0 to indicate 364 no limit. 365 """ 366 self._max_disk_usage_rate = max_rate 367 368 369 def control_get(self): 370 return self.control 371 372 373 def control_set(self, control): 374 self.control = os.path.abspath(control) 375 376 377 def harness_select(self, which, harness_args): 378 self.harness = harness.select(which, self, harness_args) 379 380 381 def setup_dirs(self, results_dir, tmp_dir): 382 if not tmp_dir: 383 tmp_dir = os.path.join(self.tmpdir, 'build') 384 if not os.path.exists(tmp_dir): 385 os.mkdir(tmp_dir) 386 if not os.path.isdir(tmp_dir): 387 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 388 raise ValueError(e_msg) 389 390 # We label the first build "build" and then subsequent ones 391 # as "build.2", "build.3", etc. Whilst this is a little bit 392 # inconsistent, 99.9% of jobs will only have one build 393 # (that's not done as kernbench, sparse, or buildtest), 394 # so it works out much cleaner. One of life's compromises. 395 if not results_dir: 396 results_dir = os.path.join(self.resultdir, 'build') 397 i = 2 398 while os.path.exists(results_dir): 399 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 400 i += 1 401 if not os.path.exists(results_dir): 402 os.mkdir(results_dir) 403 404 return (results_dir, tmp_dir) 405 406 407 def barrier(self, *args, **kwds): 408 """Create a barrier object""" 409 return barrier.barrier(*args, **kwds) 410 411 412 def install_pkg(self, name, pkg_type, install_dir): 413 ''' 414 This method is a simple wrapper around the actual package 415 installation method in the Packager class. This is used 416 internally by the profilers, deps and tests code. 417 name : name of the package (ex: sleeptest, dbench etc.) 418 pkg_type : Type of the package (ex: test, dep etc.) 419 install_dir : The directory in which the source is actually 420 untarred into. (ex: client/profilers/<name> for profilers) 421 ''' 422 if self.pkgmgr.repositories: 423 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 424 425 426 def add_repository(self, repo_urls): 427 ''' 428 Adds the repository locations to the job so that packages 429 can be fetched from them when needed. The repository list 430 needs to be a string list 431 Ex: job.add_repository(['http://blah1','http://blah2']) 432 ''' 433 for repo_url in repo_urls: 434 self.pkgmgr.add_repository(repo_url) 435 436 # Fetch the packages' checksum file that contains the checksums 437 # of all the packages if it is not already fetched. The checksum 438 # is always fetched whenever a job is first started. This 439 # is not done in the job's constructor as we don't have the list of 440 # the repositories there (and obviously don't care about this file 441 # if we are not using the repos) 442 try: 443 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 444 packages.CHECKSUM_FILE) 445 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, 446 checksum_file_path, use_checksum=False) 447 except error.PackageFetchError: 448 # packaging system might not be working in this case 449 # Silently fall back to the normal case 450 pass 451 452 453 def require_gcc(self): 454 """ 455 Test whether gcc is installed on the machine. 456 """ 457 # check if gcc is installed on the system. 458 try: 459 utils.system('which gcc') 460 except error.CmdError: 461 raise NotAvailableError('gcc is required by this job and is ' 462 'not available on the system') 463 464 465 def setup_dep(self, deps): 466 """Set up the dependencies for this test. 467 deps is a list of libraries required for this test. 468 """ 469 # Fetch the deps from the repositories and set them up. 470 for dep in deps: 471 dep_dir = os.path.join(self.autodir, 'deps', dep) 472 # Search for the dependency in the repositories if specified, 473 # else check locally. 474 try: 475 self.install_pkg(dep, 'dep', dep_dir) 476 except error.PackageInstallError: 477 # see if the dep is there locally 478 pass 479 480 # dep_dir might not exist if it is not fetched from the repos 481 if not os.path.exists(dep_dir): 482 raise error.TestError("Dependency %s does not exist" % dep) 483 484 os.chdir(dep_dir) 485 # Run the dependency, as it could create more files needed for the 486 # tests. 487 # In future this might want to be changed, as this always returns 488 # None, unless the dep.py errors. In which case, it'll error rather 489 # than returning. 490 if eval(compile(open('%s.py' % dep, "rb").read(), 491 '%s.py' % dep, 'exec'), {}) is None: 492 logging.info('Dependency %s successfuly built', dep) 493 494 def _runtest(self, url, tag, timeout, args, dargs): 495 try: 496 l = lambda : test.runtest(self, url, tag, args, dargs) 497 pid = parallel.fork_start(self.resultdir, l) 498 499 self._forkwait(pid, timeout) 500 501 except error.TestBaseException: 502 # These are already classified with an error type (exit_status) 503 raise 504 except error.JobError: 505 raise # Caught further up and turned into an ABORT. 506 except Exception as e: 507 # Converts all other exceptions thrown by the test regardless 508 # of phase into a TestError(TestBaseException) subclass that 509 # reports them with their full stack trace. 510 raise error.UnhandledTestError(e) 511 512 def _forkwait(self, pid, timeout=None): 513 """Wait for the given pid to complete 514 515 @param pid (int) process id to wait for 516 @param timeout (int) seconds to wait before timing out the process""" 517 if timeout: 518 logging.debug('Waiting for pid %d for %d seconds', pid, timeout) 519 parallel.fork_waitfor_timed(self.resultdir, pid, timeout) 520 else: 521 logging.debug('Waiting for pid %d', pid) 522 parallel.fork_waitfor(self.resultdir, pid) 523 logging.info('pid %d completed', pid) 524 525 526 def _run_test_base(self, url, *args, **dargs): 527 """ 528 Prepares arguments and run functions to run_test and run_test_detail. 529 530 @param url A url that identifies the test to run. 531 @param tag An optional keyword argument that will be added to the 532 test and subdir name. 533 @param subdir_tag An optional keyword argument that will be added 534 to the subdir name. 535 536 @returns: 537 subdir: Test subdirectory 538 testname: Test name 539 group_func: Actual test run function 540 timeout: Test timeout 541 """ 542 _group, testname = self.pkgmgr.get_package_name(url, 'test') 543 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 544 self._make_test_outputdir(subdir) 545 546 timeout = dargs.pop('timeout', None) 547 if timeout: 548 logging.debug('Test has timeout: %d sec.', timeout) 549 550 def log_warning(reason): 551 self.record("WARN", subdir, testname, reason) 552 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 553 def group_func(): 554 try: 555 self._runtest(url, tag, timeout, args, dargs) 556 except error.TestBaseException as detail: 557 # The error is already classified, record it properly. 558 self.record(detail.exit_status, subdir, testname, str(detail)) 559 raise 560 else: 561 self.record('GOOD', subdir, testname, 'completed successfully') 562 563 return (subdir, testname, group_func, timeout) 564 565 566 @_run_test_complete_on_exit 567 def run_test(self, url, *args, **dargs): 568 """ 569 Summon a test object and run it. 570 571 @param url A url that identifies the test to run. 572 @param tag An optional keyword argument that will be added to the 573 test and subdir name. 574 @param subdir_tag An optional keyword argument that will be added 575 to the subdir name. 576 577 @returns True if the test passes, False otherwise. 578 """ 579 (subdir, testname, group_func, timeout) = self._run_test_base(url, 580 *args, 581 **dargs) 582 try: 583 self._rungroup(subdir, testname, group_func, timeout) 584 return True 585 except error.TestBaseException: 586 return False 587 # Any other exception here will be given to the caller 588 # 589 # NOTE: The only exception possible from the control file here 590 # is error.JobError as _runtest() turns all others into an 591 # UnhandledTestError that is caught above. 592 593 594 def stage_control_file(self, url): 595 """ 596 Install the test package and return the control file path. 597 598 @param url The name of the test, e.g. login_LoginSuccess. This is the 599 string passed to run_test in the client test control file: 600 job.run_test('login_LoginSuccess') 601 This name can also be something like 'camera_HAL3.jea', 602 which corresponds to a test package containing multiple 603 control files, each with calls to: 604 job.run_test('camera_HAL3', **opts) 605 606 @returns Absolute path to the control file for the test. 607 """ 608 testname, _, _tag = url.partition('.') 609 bindir = os.path.join(self.testdir, testname) 610 self.install_pkg(testname, 'test', bindir) 611 return _locate_test_control_file(bindir, url) 612 613 614 @_run_test_complete_on_exit 615 def run_test_detail(self, url, *args, **dargs): 616 """ 617 Summon a test object and run it, returning test status. 618 619 @param url A url that identifies the test to run. 620 @param tag An optional keyword argument that will be added to the 621 test and subdir name. 622 @param subdir_tag An optional keyword argument that will be added 623 to the subdir name. 624 625 @returns Test status 626 @see: client/common_lib/error.py, exit_status 627 """ 628 (subdir, testname, group_func, timeout) = self._run_test_base(url, 629 *args, 630 **dargs) 631 try: 632 self._rungroup(subdir, testname, group_func, timeout) 633 return 'GOOD' 634 except error.TestBaseException as detail: 635 return detail.exit_status 636 637 638 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs): 639 """\ 640 subdir: 641 name of the group 642 testname: 643 name of the test to run, or support step 644 function: 645 subroutine to run 646 *args: 647 arguments for the function 648 649 Returns the result of the passed in function 650 """ 651 652 try: 653 optional_fields = None 654 if timeout: 655 optional_fields = {} 656 optional_fields['timeout'] = timeout 657 self.record('START', subdir, testname, 658 optional_fields=optional_fields) 659 660 self._state.set('client', 'unexpected_reboot', (subdir, testname)) 661 try: 662 result = function(*args, **dargs) 663 self.record('END GOOD', subdir, testname) 664 return result 665 except error.TestBaseException as e: 666 self.record('END %s' % e.exit_status, subdir, testname) 667 raise 668 except error.JobError as e: 669 self.record('END ABORT', subdir, testname) 670 raise 671 except Exception as e: 672 # This should only ever happen due to a bug in the given 673 # function's code. The common case of being called by 674 # run_test() will never reach this. If a control file called 675 # run_group() itself, bugs in its function will be caught 676 # here. 677 err_msg = str(e) + '\n' + traceback.format_exc() 678 self.record('END ERROR', subdir, testname, err_msg) 679 raise 680 finally: 681 self._state.discard('client', 'unexpected_reboot') 682 683 684 def run_group(self, function, tag=None, **dargs): 685 """ 686 Run a function nested within a group level. 687 688 function: 689 Callable to run. 690 tag: 691 An optional tag name for the group. If None (default) 692 function.__name__ will be used. 693 **dargs: 694 Named arguments for the function. 695 """ 696 if tag: 697 name = tag 698 else: 699 name = function.__name__ 700 701 try: 702 return self._rungroup(subdir=None, testname=name, 703 function=function, timeout=None, **dargs) 704 except (SystemExit, error.TestBaseException): 705 raise 706 # If there was a different exception, turn it into a TestError. 707 # It will be caught by step_engine or _run_step_fn. 708 except Exception as e: 709 raise error.UnhandledTestError(e) 710 711 712 def cpu_count(self): 713 return utils.count_cpus() # use total system count 714 715 716 def start_reboot(self): 717 self.record('START', None, 'reboot') 718 self.record('GOOD', None, 'reboot.start') 719 720 721 def _record_reboot_failure(self, subdir, operation, status, 722 running_id=None): 723 self.record("ABORT", subdir, operation, status) 724 if not running_id: 725 running_id = utils.running_os_ident() 726 kernel = {"kernel": running_id.split("::")[0]} 727 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 728 729 730 def _check_post_reboot(self, subdir, running_id=None): 731 """ 732 Function to perform post boot checks such as if the system configuration 733 has changed across reboots (specifically, CPUs and partitions). 734 735 @param subdir: The subdir to use in the job.record call. 736 @param running_id: An optional running_id to include in the reboot 737 failure log message 738 739 @raise JobError: Raised if the current configuration does not match the 740 pre-reboot configuration. 741 """ 742 # check to see if any partitions have changed 743 partition_list = partition_lib.get_partition_list(self, 744 exclude_swap=False) 745 mount_info = partition_lib.get_mount_info(partition_list) 746 old_mount_info = self._state.get('client', 'mount_info') 747 if mount_info != old_mount_info: 748 new_entries = mount_info - old_mount_info 749 old_entries = old_mount_info - mount_info 750 description = ("mounted partitions are different after reboot " 751 "(old entries: %s, new entries: %s)" % 752 (old_entries, new_entries)) 753 self._record_reboot_failure(subdir, "reboot.verify_config", 754 description, running_id=running_id) 755 raise error.JobError("Reboot failed: %s" % description) 756 757 # check to see if any CPUs have changed 758 cpu_count = utils.count_cpus() 759 old_count = self._state.get('client', 'cpu_count') 760 if cpu_count != old_count: 761 description = ('Number of CPUs changed after reboot ' 762 '(old count: %d, new count: %d)' % 763 (old_count, cpu_count)) 764 self._record_reboot_failure(subdir, 'reboot.verify_config', 765 description, running_id=running_id) 766 raise error.JobError('Reboot failed: %s' % description) 767 768 769 def partition(self, device, loop_size=0, mountpoint=None): 770 """ 771 Work with a machine partition 772 773 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 774 @param mountpoint: Specify a directory to mount to. If not specified 775 autotest tmp directory will be used. 776 @param loop_size: Size of loopback device (in MB). Defaults to 0. 777 778 @return: A L{client.bin.partition.partition} object 779 """ 780 781 if not mountpoint: 782 mountpoint = self.tmpdir 783 return partition_lib.partition(self, device, loop_size, mountpoint) 784 785 @utils.deprecated 786 def filesystem(self, device, mountpoint=None, loop_size=0): 787 """ Same as partition 788 789 @deprecated: Use partition method instead 790 """ 791 return self.partition(device, loop_size, mountpoint) 792 793 794 def enable_external_logging(self): 795 pass 796 797 798 def disable_external_logging(self): 799 pass 800 801 802 def reboot_setup(self): 803 # save the partition list and mount points, as well as the cpu count 804 partition_list = partition_lib.get_partition_list(self, 805 exclude_swap=False) 806 mount_info = partition_lib.get_mount_info(partition_list) 807 self._state.set('client', 'mount_info', mount_info) 808 self._state.set('client', 'cpu_count', utils.count_cpus()) 809 810 811 def reboot(self): 812 self.reboot_setup() 813 self.harness.run_reboot() 814 815 # HACK: using this as a module sometimes hangs shutdown, so if it's 816 # installed unload it first 817 utils.system("modprobe -r netconsole", ignore_status=True) 818 819 # sync first, so that a sync during shutdown doesn't time out 820 utils.system("sync; sync", ignore_status=True) 821 822 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 823 self.quit() 824 825 826 def noop(self, text): 827 logging.info("job: noop: " + text) 828 829 830 @_run_test_complete_on_exit 831 def parallel(self, *tasklist, **kwargs): 832 """Run tasks in parallel""" 833 834 pids = [] 835 old_log_filename = self._logger.global_filename 836 for i, task in enumerate(tasklist): 837 assert isinstance(task, (tuple, list)) 838 self._logger.global_filename = old_log_filename + (".%d" % i) 839 def task_func(): 840 # stub out _record_indent with a process-local one 841 base_record_indent = self._record_indent 842 proc_local = self._job_state.property_factory( 843 '_state', '_record_indent.%d' % os.getpid(), 844 base_record_indent, namespace='client') 845 self.__class__._record_indent = proc_local 846 task[0](*task[1:]) 847 forked_pid = parallel.fork_start(self.resultdir, task_func) 848 logging.info('Just forked pid %d', forked_pid) 849 pids.append(forked_pid) 850 851 old_log_path = os.path.join(self.resultdir, old_log_filename) 852 old_log = open(old_log_path, "a") 853 exceptions = [] 854 for i, pid in enumerate(pids): 855 # wait for the task to finish 856 try: 857 self._forkwait(pid, kwargs.get('timeout')) 858 except Exception as e: 859 logging.info('pid %d completed with error', pid) 860 exceptions.append(e) 861 # copy the logs from the subtask into the main log 862 new_log_path = old_log_path + (".%d" % i) 863 if os.path.exists(new_log_path): 864 new_log = open(new_log_path) 865 old_log.write(new_log.read()) 866 new_log.close() 867 old_log.flush() 868 os.remove(new_log_path) 869 old_log.close() 870 871 self._logger.global_filename = old_log_filename 872 873 # handle any exceptions raised by the parallel tasks 874 if exceptions: 875 msg = "%d task(s) failed in job.parallel" % len(exceptions) 876 raise error.JobError(msg) 877 878 879 def quit(self): 880 # XXX: should have a better name. 881 self.harness.run_pause() 882 raise error.JobContinue("more to come") 883 884 885 def complete(self, status): 886 """Write pending reports, clean up, and exit""" 887 # We are about to exit 'complete' so clean up the control file. 888 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 889 shutil.move(self._state_file, dest) 890 891 self.harness.run_complete() 892 self.disable_external_logging() 893 sys.exit(status) 894 895 896 def _load_state(self): 897 # grab any initial state and set up $CONTROL.state as the backing file 898 init_state_file = self.control + '.init.state' 899 self._state_file = self.control + '.state' 900 if os.path.exists(init_state_file): 901 shutil.move(init_state_file, self._state_file) 902 self._state.set_backing_file(self._state_file) 903 904 # initialize the state engine, if necessary 905 has_steps = self._state.has('client', 'steps') 906 if not self._is_continuation and has_steps: 907 raise RuntimeError('Loaded state can only contain client.steps if ' 908 'this is a continuation') 909 910 if not has_steps: 911 logging.debug('Initializing the state engine') 912 self._state.set('client', 'steps', []) 913 914 915 def handle_persistent_option(self, options, option_name): 916 """ 917 Select option from command line or persistent state. 918 Store selected option to allow standalone client to continue 919 after reboot with previously selected options. 920 Priority: 921 1. explicitly specified via command line 922 2. stored in state file (if continuing job '-c') 923 3. default == None 924 """ 925 option = None 926 cmd_line_option = getattr(options, option_name) 927 if cmd_line_option: 928 option = cmd_line_option 929 self._state.set('client', option_name, option) 930 else: 931 stored_option = self._state.get('client', option_name, None) 932 if stored_option: 933 option = stored_option 934 logging.debug('Persistent option %s now set to %s', option_name, option) 935 return option 936 937 938 def __create_step_tuple(self, fn, args, dargs): 939 # Legacy code passes in an array where the first arg is 940 # the function or its name. 941 if isinstance(fn, list): 942 assert(len(args) == 0) 943 assert(len(dargs) == 0) 944 args = fn[1:] 945 fn = fn[0] 946 # Pickling actual functions is hairy, thus we have to call 947 # them by name. Unfortunately, this means only functions 948 # defined globally can be used as a next step. 949 if callable(fn): 950 fn = fn.__name__ 951 if not isinstance(fn, six.string_types): 952 raise StepError("Next steps must be functions or " 953 "strings containing the function name") 954 ancestry = copy.copy(self._current_step_ancestry) 955 return (ancestry, fn, args, dargs) 956 957 958 def next_step_append(self, fn, *args, **dargs): 959 """Define the next step and place it at the end""" 960 steps = self._state.get('client', 'steps') 961 steps.append(self.__create_step_tuple(fn, args, dargs)) 962 self._state.set('client', 'steps', steps) 963 964 965 def next_step(self, fn, *args, **dargs): 966 """Create a new step and place it after any steps added 967 while running the current step but before any steps added in 968 previous steps""" 969 steps = self._state.get('client', 'steps') 970 steps.insert(self._next_step_index, 971 self.__create_step_tuple(fn, args, dargs)) 972 self._next_step_index += 1 973 self._state.set('client', 'steps', steps) 974 975 976 def next_step_prepend(self, fn, *args, **dargs): 977 """Insert a new step, executing first""" 978 steps = self._state.get('client', 'steps') 979 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 980 self._next_step_index += 1 981 self._state.set('client', 'steps', steps) 982 983 984 985 def _run_step_fn(self, local_vars, fn, args, dargs): 986 """Run a (step) function within the given context""" 987 988 local_vars['__args'] = args 989 local_vars['__dargs'] = dargs 990 try: 991 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 992 return local_vars['__ret'] 993 except SystemExit: 994 raise # Send error.JobContinue and JobComplete on up to runjob. 995 except error.TestNAError as detail: 996 self.record(detail.exit_status, None, fn, str(detail)) 997 except Exception as detail: 998 raise error.UnhandledJobError(detail) 999 1000 1001 def _create_frame(self, global_vars, ancestry, fn_name): 1002 """Set up the environment like it would have been when this 1003 function was first defined. 1004 1005 Child step engine 'implementations' must have 'return locals()' 1006 at end end of their steps. Because of this, we can call the 1007 parent function and get back all child functions (i.e. those 1008 defined within it). 1009 1010 Unfortunately, the call stack of the function calling 1011 job.next_step might have been deeper than the function it 1012 added. In order to make sure that the environment is what it 1013 should be, we need to then pop off the frames we built until 1014 we find the frame where the function was first defined.""" 1015 1016 # The copies ensure that the parent frames are not modified 1017 # while building child frames. This matters if we then 1018 # pop some frames in the next part of this function. 1019 current_frame = copy.copy(global_vars) 1020 frames = [current_frame] 1021 for steps_fn_name in ancestry: 1022 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 1023 current_frame = copy.copy(ret) 1024 frames.append(current_frame) 1025 1026 # Walk up the stack frames until we find the place fn_name was defined. 1027 while len(frames) > 2: 1028 if fn_name not in frames[-2]: 1029 break 1030 if frames[-2][fn_name] != frames[-1][fn_name]: 1031 break 1032 frames.pop() 1033 ancestry.pop() 1034 1035 return (frames[-1], ancestry) 1036 1037 1038 def _add_step_init(self, local_vars, current_function): 1039 """If the function returned a dictionary that includes a 1040 function named 'step_init', prepend it to our list of steps. 1041 This will only get run the first time a function with a nested 1042 use of the step engine is run.""" 1043 1044 if (isinstance(local_vars, dict) and 1045 'step_init' in local_vars and 1046 callable(local_vars['step_init'])): 1047 # The init step is a child of the function 1048 # we were just running. 1049 self._current_step_ancestry.append(current_function) 1050 self.next_step_prepend('step_init') 1051 1052 1053 def step_engine(self): 1054 """The multi-run engine used when the control file defines step_init. 1055 1056 Does the next step. 1057 """ 1058 1059 # Set up the environment and then interpret the control file. 1060 # Some control files will have code outside of functions, 1061 # which means we need to have our state engine initialized 1062 # before reading in the file. 1063 global_control_vars = {'job': self, 1064 'args': self.args} 1065 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1066 try: 1067 exec(compile(open(self.control, "rb").read(), self.control, 'exec'), 1068 global_control_vars, global_control_vars) 1069 except error.TestNAError as detail: 1070 self.record(detail.exit_status, None, self.control, str(detail)) 1071 except SystemExit: 1072 raise # Send error.JobContinue and JobComplete on up to runjob. 1073 except Exception as detail: 1074 # Syntax errors or other general Python exceptions coming out of 1075 # the top level of the control file itself go through here. 1076 raise error.UnhandledJobError(detail) 1077 1078 # If we loaded in a mid-job state file, then we presumably 1079 # know what steps we have yet to run. 1080 if not self._is_continuation: 1081 if 'step_init' in global_control_vars: 1082 self.next_step(global_control_vars['step_init']) 1083 else: 1084 # if last job failed due to unexpected reboot, record it as fail 1085 # so harness gets called 1086 last_job = self._state.get('client', 'unexpected_reboot', None) 1087 if last_job: 1088 subdir, testname = last_job 1089 self.record('FAIL', subdir, testname, 'unexpected reboot') 1090 self.record('END FAIL', subdir, testname) 1091 1092 # Iterate through the steps. If we reboot, we'll simply 1093 # continue iterating on the next step. 1094 while len(self._state.get('client', 'steps')) > 0: 1095 steps = self._state.get('client', 'steps') 1096 (ancestry, fn_name, args, dargs) = steps.pop(0) 1097 self._state.set('client', 'steps', steps) 1098 1099 self._next_step_index = 0 1100 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1101 local_vars, self._current_step_ancestry = ret 1102 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1103 self._add_step_init(local_vars, fn_name) 1104 1105 1106 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1107 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1108 on_every_test) 1109 1110 1111 def add_sysinfo_logfile(self, file, on_every_test=False): 1112 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1113 1114 1115 def _add_sysinfo_loggable(self, loggable, on_every_test): 1116 if on_every_test: 1117 self.sysinfo.test_loggables.add(loggable) 1118 else: 1119 self.sysinfo.boot_loggables.add(loggable) 1120 self._save_sysinfo_state() 1121 1122 1123 def _load_sysinfo_state(self): 1124 state = self._state.get('client', 'sysinfo', None) 1125 if state: 1126 self.sysinfo.deserialize(state) 1127 1128 1129 def _save_sysinfo_state(self): 1130 state = self.sysinfo.serialize() 1131 self._state.set('client', 'sysinfo', state) 1132 1133 1134class disk_usage_monitor: 1135 def __init__(self, logging_func, device, max_mb_per_hour): 1136 self.func = logging_func 1137 self.device = device 1138 self.max_mb_per_hour = max_mb_per_hour 1139 1140 1141 def start(self): 1142 self.initial_space = utils.freespace(self.device) 1143 self.start_time = time.time() 1144 1145 1146 def stop(self): 1147 # if no maximum usage rate was set, we don't need to 1148 # generate any warnings 1149 if not self.max_mb_per_hour: 1150 return 1151 1152 final_space = utils.freespace(self.device) 1153 used_space = self.initial_space - final_space 1154 stop_time = time.time() 1155 total_time = stop_time - self.start_time 1156 # round up the time to one minute, to keep extremely short 1157 # tests from generating false positives due to short, badly 1158 # timed bursts of activity 1159 total_time = max(total_time, 60.0) 1160 1161 # determine the usage rate 1162 bytes_per_sec = used_space / total_time 1163 mb_per_sec = bytes_per_sec / 1024**2 1164 mb_per_hour = mb_per_sec * 60 * 60 1165 1166 if mb_per_hour > self.max_mb_per_hour: 1167 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1168 msg %= (self.device, mb_per_hour) 1169 self.func(msg) 1170 1171 1172 @classmethod 1173 def watch(cls, *monitor_args, **monitor_dargs): 1174 """ Generic decorator to wrap a function call with the 1175 standard create-monitor -> start -> call -> stop idiom.""" 1176 def decorator(func): 1177 def watched_func(*args, **dargs): 1178 monitor = cls(*monitor_args, **monitor_dargs) 1179 monitor.start() 1180 try: 1181 func(*args, **dargs) 1182 finally: 1183 monitor.stop() 1184 return watched_func 1185 return decorator 1186 1187 1188def runjob(control, drop_caches, options): 1189 """ 1190 Run a job using the given control file. 1191 1192 This is the main interface to this module. 1193 1194 @see base_job.__init__ for parameter info. 1195 """ 1196 control = os.path.abspath(control) 1197 state = control + '.state' 1198 # Ensure state file is cleaned up before the job starts to run if autotest 1199 # is not running with the --continue flag 1200 if not options.cont and os.path.isfile(state): 1201 logging.debug('Cleaning up previously found state file') 1202 os.remove(state) 1203 1204 # instantiate the job object ready for the control file. 1205 myjob = None 1206 try: 1207 # Check that the control file is valid 1208 if not os.path.exists(control): 1209 raise error.JobError(control + ": control file not found") 1210 1211 # When continuing, the job is complete when there is no 1212 # state file, ensure we don't try and continue. 1213 if options.cont and not os.path.exists(state): 1214 raise error.JobComplete("all done") 1215 1216 myjob = job(control=control, drop_caches=drop_caches, options=options) 1217 1218 # Load in the users control file, may do any one of: 1219 # 1) execute in toto 1220 # 2) define steps, and select the first via next_step() 1221 myjob.step_engine() 1222 1223 except error.JobContinue: 1224 sys.exit(5) 1225 1226 except error.JobComplete: 1227 sys.exit(1) 1228 1229 except error.JobError as instance: 1230 logging.error("JOB ERROR: " + str(instance)) 1231 if myjob: 1232 command = None 1233 if len(instance.args) > 1: 1234 command = instance.args[1] 1235 myjob.record('ABORT', None, command, str(instance)) 1236 myjob.record('END ABORT', None, None, str(instance)) 1237 assert myjob._record_indent == 0 1238 myjob.complete(1) 1239 else: 1240 sys.exit(1) 1241 1242 except Exception as e: 1243 # NOTE: job._run_step_fn and job.step_engine will turn things into 1244 # a JobError for us. If we get here, its likely an autotest bug. 1245 msg = str(e) + '\n' + traceback.format_exc() 1246 logging.critical("JOB ERROR (autotest bug?): " + msg) 1247 if myjob: 1248 myjob.record('END ABORT', None, None, msg) 1249 assert myjob._record_indent == 0 1250 myjob.complete(1) 1251 else: 1252 sys.exit(1) 1253 1254 # If we get here, then we assume the job is complete and good. 1255 myjob.record('END GOOD', None, None) 1256 assert myjob._record_indent == 0 1257 1258 myjob.complete(0) 1259 1260 1261class job(base_client_job): 1262 1263 def __init__(self, *args, **kwargs): 1264 base_client_job.__init__(self, *args, **kwargs) 1265 1266 1267 def run_test(self, url, *args, **dargs): 1268 log_pauser = cros_logging.LogRotationPauser() 1269 passed = False 1270 try: 1271 log_pauser.begin() 1272 passed = base_client_job.run_test(self, url, *args, **dargs) 1273 if not passed: 1274 # Save the VM state immediately after the test failure. 1275 # This is a NOOP if the the test isn't running in a VM or 1276 # if the VM is not properly configured to save state. 1277 _group, testname = self.pkgmgr.get_package_name(url, 'test') 1278 now = datetime.now().strftime('%I:%M:%S.%f') 1279 checkpoint_name = '%s-%s' % (testname, now) 1280 utils.save_vm_state(checkpoint_name) 1281 finally: 1282 log_pauser.end() 1283 return passed 1284 1285 1286 def reboot(self): 1287 self.reboot_setup() 1288 self.harness.run_reboot() 1289 1290 # sync first, so that a sync during shutdown doesn't time out 1291 utils.system('sync; sync', ignore_status=True) 1292 1293 utils.system('reboot </dev/null >/dev/null 2>&1 &') 1294 self.quit() 1295 1296 1297 def require_gcc(self): 1298 return False 1299 1300 1301# TODO(ayatane): This logic should be deduplicated with 1302# server/cros/dynamic_suite/control_file_getter.py, but the server 1303# libraries are not available on clients. 1304def _locate_test_control_file(dirpath, testname): 1305 """ 1306 Locate the control file for the given test. 1307 1308 @param dirpath Root directory to search. 1309 @param testname Name of test. 1310 1311 @returns Absolute path to the control file. 1312 @raise JobError: Raised if control file not found. 1313 """ 1314 for dirpath, _dirnames, filenames in os.walk(dirpath): 1315 for filename in filenames: 1316 if 'control' not in filename: 1317 continue 1318 path = os.path.join(dirpath, filename) 1319 if _is_control_file_for_test(path, testname): 1320 return os.path.abspath(path) 1321 raise error.JobError( 1322 'could not find client test control file', 1323 dirpath, testname) 1324 1325 1326_NAME_PATTERN = "NAME *= *['\"]([^'\"]+)['\"]" 1327 1328 1329def _is_control_file_for_test(path, testname): 1330 with open(path) as f: 1331 for line in f: 1332 match = re.match(_NAME_PATTERN, line) 1333 if match is not None: 1334 return match.group(1) == testname 1335