1# Lint as: python2, python3 2# pylint: disable-msg=C0111 3 4# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7""" 8The main job wrapper for the server side. 9 10This is the core infrastructure. Derived from the client side job.py 11 12Copyright Martin J. Bligh, Andy Whitcroft 2007 13""" 14 15from __future__ import absolute_import 16from __future__ import division 17from __future__ import print_function 18 19import errno 20import fcntl 21import getpass 22import itertools 23import logging 24import os 25import pickle 26import platform 27import re 28import select 29import shutil 30import sys 31import tempfile 32import time 33import traceback 34import uuid 35import warnings 36 37from datetime import datetime 38 39from autotest_lib.client.bin import sysinfo 40from autotest_lib.client.common_lib import base_job 41from autotest_lib.client.common_lib import control_data 42from autotest_lib.client.common_lib import error 43from autotest_lib.client.common_lib import logging_manager 44from autotest_lib.client.common_lib import packages 45from autotest_lib.client.common_lib import utils 46from autotest_lib.client.common_lib import seven 47from autotest_lib.server import profilers 48from autotest_lib.server import site_gtest_runner 49from autotest_lib.server import subcommand 50from autotest_lib.server import test 51from autotest_lib.server.autotest import OFFLOAD_ENVVAR 52from autotest_lib.server import utils as server_utils 53from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 54from autotest_lib.server import hosts 55from autotest_lib.server.hosts import abstract_ssh 56from autotest_lib.server.hosts import afe_store 57from autotest_lib.server.hosts import file_store 58from autotest_lib.server.hosts import shadowing_store 59from autotest_lib.server.hosts import factory as host_factory 60from autotest_lib.server.hosts import host_info 61from autotest_lib.server.hosts import ssh_multiplex 62from autotest_lib.tko import models as tko_models 63from autotest_lib.tko import parser_lib 64from six.moves import zip 65 66try: 67 from chromite.lib import metrics 68except ImportError: 69 metrics = utils.metrics_mock 70 71 72def _control_segment_path(name): 73 """Get the pathname of the named control segment file.""" 74 server_dir = os.path.dirname(os.path.abspath(__file__)) 75 return os.path.join(server_dir, "control_segments", name) 76 77 78CLIENT_CONTROL_FILENAME = 'control' 79SERVER_CONTROL_FILENAME = 'control.srv' 80MACHINES_FILENAME = '.machines' 81DUT_STATEFUL_PATH = "/usr/local" 82 83CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 84CLIENT_TRAMPOLINE_CONTROL_FILE = _control_segment_path('client_trampoline') 85CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 86CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 87CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 88VERIFY_CONTROL_FILE = _control_segment_path('verify') 89REPAIR_CONTROL_FILE = _control_segment_path('repair') 90PROVISION_CONTROL_FILE = _control_segment_path('provision') 91VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url') 92RESET_CONTROL_FILE = _control_segment_path('reset') 93GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats') 94 95 96def get_machine_dicts(machine_names, store_dir, in_lab, use_shadow_store, 97 host_attributes=None): 98 """Converts a list of machine names to list of dicts. 99 100 TODO(crbug.com/678430): This function temporarily has a side effect of 101 creating files under workdir for backing a FileStore. This side-effect will 102 go away once callers of autoserv start passing in the FileStore. 103 104 @param machine_names: A list of machine names. 105 @param store_dir: A directory to contain store backing files. 106 @param use_shadow_store: If True, we should create a ShadowingStore where 107 actual store is backed by the AFE but we create a backing file to 108 shadow the store. If False, backing file should already exist at: 109 ${store_dir}/${hostname}.store 110 @param in_lab: A boolean indicating whether we're running in lab. 111 @param host_attributes: Optional list of host attributes to add for each 112 host. 113 @returns: A list of dicts. Each dict has the following keys: 114 'hostname': Name of the machine originally in machine_names (str). 115 'afe_host': A frontend.Host object for the machine, or a stub if 116 in_lab is false. 117 'host_info_store': A host_info.CachingHostInfoStore object to obtain 118 host information. A stub if in_lab is False. 119 'connection_pool': ssh_multiplex.ConnectionPool instance to share 120 ssh connection across control scripts. This is set to 121 None, and should be overridden for connection sharing. 122 """ 123 # See autoserv_parser.parse_args. Only one of in_lab or host_attributes can 124 # be provided. 125 if in_lab and host_attributes: 126 raise error.AutoservError( 127 'in_lab and host_attribute are mutually exclusive.') 128 129 machine_dict_list = [] 130 for machine in machine_names: 131 if not in_lab: 132 afe_host = server_utils.EmptyAFEHost() 133 host_info_store = host_info.InMemoryHostInfoStore() 134 if host_attributes is not None: 135 afe_host.attributes.update(host_attributes) 136 info = host_info.HostInfo(attributes=host_attributes) 137 host_info_store.commit(info) 138 elif use_shadow_store: 139 afe_host = _create_afe_host(machine) 140 host_info_store = _create_afe_backed_host_info_store(store_dir, 141 machine) 142 else: 143 afe_host = server_utils.EmptyAFEHost() 144 host_info_store = _create_file_backed_host_info_store(store_dir, 145 machine) 146 147 machine_dict_list.append({ 148 'hostname' : machine, 149 'afe_host' : afe_host, 150 'host_info_store': host_info_store, 151 'connection_pool': None, 152 }) 153 154 return machine_dict_list 155 156 157class status_indenter(base_job.status_indenter): 158 """Provide a simple integer-backed status indenter.""" 159 def __init__(self): 160 self._indent = 0 161 162 163 @property 164 def indent(self): 165 return self._indent 166 167 168 def increment(self): 169 self._indent += 1 170 171 172 def decrement(self): 173 self._indent -= 1 174 175 176 def get_context(self): 177 """Returns a context object for use by job.get_record_context.""" 178 class context(object): 179 def __init__(self, indenter, indent): 180 self._indenter = indenter 181 self._indent = indent 182 def restore(self): 183 self._indenter._indent = self._indent 184 return context(self, self._indent) 185 186 187class server_job_record_hook(object): 188 """The job.record hook for server job. Used to inject WARN messages from 189 the console or vlm whenever new logs are written, and to echo any logs 190 to INFO level logging. Implemented as a class so that it can use state to 191 block recursive calls, so that the hook can call job.record itself to 192 log WARN messages. 193 194 Depends on job._read_warnings and job._logger. 195 """ 196 def __init__(self, job): 197 self._job = job 198 self._being_called = False 199 200 201 def __call__(self, entry): 202 """A wrapper around the 'real' record hook, the _hook method, which 203 prevents recursion. This isn't making any effort to be threadsafe, 204 the intent is to outright block infinite recursion via a 205 job.record->_hook->job.record->_hook->job.record... chain.""" 206 if self._being_called: 207 return 208 self._being_called = True 209 try: 210 self._hook(self._job, entry) 211 finally: 212 self._being_called = False 213 214 215 @staticmethod 216 def _hook(job, entry): 217 """The core hook, which can safely call job.record.""" 218 entries = [] 219 # poll all our warning loggers for new warnings 220 for timestamp, msg in job._read_warnings(): 221 warning_entry = base_job.status_log_entry( 222 'WARN', None, None, msg, {}, timestamp=timestamp) 223 entries.append(warning_entry) 224 job.record_entry(warning_entry) 225 # echo rendered versions of all the status logs to info 226 entries.append(entry) 227 for entry in entries: 228 rendered_entry = job._logger.render_entry(entry) 229 logging.info(rendered_entry) 230 231 232class server_job(base_job.base_job): 233 """The server-side concrete implementation of base_job. 234 235 Optional properties provided by this implementation: 236 serverdir 237 238 warning_manager 239 warning_loggers 240 """ 241 242 _STATUS_VERSION = 1 243 244 # TODO crbug.com/285395 eliminate ssh_verbosity_flag 245 def __init__(self, control, args, resultdir, label, user, machines, 246 machine_dict_list, 247 client=False, 248 ssh_user=host_factory.DEFAULT_SSH_USER, 249 ssh_port=host_factory.DEFAULT_SSH_PORT, 250 ssh_pass=host_factory.DEFAULT_SSH_PASS, 251 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY, 252 ssh_options=host_factory.DEFAULT_SSH_OPTIONS, 253 group_name='', 254 tag='', disable_sysinfo=False, 255 control_filename=SERVER_CONTROL_FILENAME, 256 parent_job_id=None, in_lab=False, 257 use_client_trampoline=False, 258 sync_offload_dir=''): 259 """ 260 Create a server side job object. 261 262 @param control: The pathname of the control file. 263 @param args: Passed to the control file. 264 @param resultdir: Where to throw the results. 265 @param label: Description of the job. 266 @param user: Username for the job (email address). 267 @param machines: A list of hostnames of the machines to use for the job. 268 @param machine_dict_list: A list of dicts for each of the machines above 269 as returned by get_machine_dicts. 270 @param client: True if this is a client-side control file. 271 @param ssh_user: The SSH username. [root] 272 @param ssh_port: The SSH port number. [22] 273 @param ssh_pass: The SSH passphrase, if needed. 274 @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv', 275 '-vvv', or an empty string if not needed. 276 @param ssh_options: A string giving additional options that will be 277 included in ssh commands. 278 @param group_name: If supplied, this will be written out as 279 host_group_name in the keyvals file for the parser. 280 @param tag: The job execution tag from the scheduler. [optional] 281 @param disable_sysinfo: Whether we should disable the sysinfo step of 282 tests for a modest shortening of test time. [optional] 283 @param control_filename: The filename where the server control file 284 should be written in the results directory. 285 @param parent_job_id: Job ID of the parent job. Default to None if the 286 job does not have a parent job. 287 @param in_lab: Boolean that indicates if this is running in the lab 288 environment. 289 @param use_client_trampoline: Boolean that indicates whether to 290 use the client trampoline flow. If this is True, control 291 is interpreted as the name of the client test to run. 292 The client control file will be client_trampoline. The 293 test name will be passed to client_trampoline, which will 294 install the test package and re-exec the actual test 295 control file. 296 @param sync_offload_dir: String; relative path to synchronous offload 297 dir, relative to the results directory. Ignored if empty. 298 """ 299 super(server_job, self).__init__(resultdir=resultdir) 300 self.control = control 301 self._uncollected_log_file = os.path.join(self.resultdir, 302 'uncollected_logs') 303 debugdir = os.path.join(self.resultdir, 'debug') 304 if not os.path.exists(debugdir): 305 os.mkdir(debugdir) 306 307 if user: 308 self.user = user 309 else: 310 self.user = getpass.getuser() 311 312 self.args = args 313 self.label = label 314 self.machines = machines 315 self._client = client 316 self.warning_loggers = set() 317 self.warning_manager = warning_manager() 318 self._ssh_user = ssh_user 319 self._ssh_port = ssh_port 320 self._ssh_pass = ssh_pass 321 self._ssh_verbosity_flag = ssh_verbosity_flag 322 self._ssh_options = ssh_options 323 self.tag = tag 324 self.hosts = set() 325 self.drop_caches = False 326 self.drop_caches_between_iterations = False 327 self._control_filename = control_filename 328 self._disable_sysinfo = disable_sysinfo 329 self._use_client_trampoline = use_client_trampoline 330 331 self.logging = logging_manager.get_logging_manager( 332 manage_stdout_and_stderr=True, redirect_fds=True) 333 subcommand.logging_manager_object = self.logging 334 335 self.sysinfo = sysinfo.sysinfo(self.resultdir) 336 self.profilers = profilers.profilers(self) 337 self._sync_offload_dir = sync_offload_dir 338 339 job_data = {'label' : label, 'user' : user, 340 'hostname' : ','.join(machines), 341 'drone' : platform.node(), 342 'status_version' : str(self._STATUS_VERSION), 343 'job_started' : str(int(time.time()))} 344 # Save parent job id to keyvals, so parser can retrieve the info and 345 # write to tko_jobs record. 346 if parent_job_id: 347 job_data['parent_job_id'] = parent_job_id 348 if group_name: 349 job_data['host_group_name'] = group_name 350 351 # only write these keyvals out on the first job in a resultdir 352 if 'job_started' not in utils.read_keyval(self.resultdir): 353 job_data.update(self._get_job_data()) 354 utils.write_keyval(self.resultdir, job_data) 355 356 self.pkgmgr = packages.PackageManager( 357 self.autodir, run_function_dargs={'timeout':600}) 358 359 self._register_subcommand_hooks() 360 361 # We no longer parse results as part of the server_job. These arguments 362 # can't be dropped yet because clients haven't all be cleaned up yet. 363 self.num_tests_run = -1 364 self.num_tests_failed = -1 365 366 # set up the status logger 367 self._indenter = status_indenter() 368 self._logger = base_job.status_logger( 369 self, self._indenter, 'status.log', 'status.log', 370 record_hook=server_job_record_hook(self)) 371 372 # Initialize a flag to indicate DUT failure during the test, e.g., 373 # unexpected reboot. 374 self.failed_with_device_error = False 375 376 self._connection_pool = ssh_multiplex.ConnectionPool() 377 378 # List of functions to run after the main job function. 379 self._post_run_hooks = [] 380 381 self.parent_job_id = parent_job_id 382 self.in_lab = in_lab 383 self.machine_dict_list = machine_dict_list 384 for machine_dict in self.machine_dict_list: 385 machine_dict['connection_pool'] = self._connection_pool 386 387 # TODO(jrbarnette) The harness attribute is only relevant to 388 # client jobs, but it's required to be present, or we will fail 389 # server job unit tests. Yes, really. 390 # 391 # TODO(jrbarnette) The utility of the 'harness' attribute even 392 # to client jobs is suspect. Probably, we should remove it. 393 self.harness = None 394 395 # TODO(ayatane): fast and max_result_size_KB are not set for 396 # client_trampoline jobs. 397 if control and not use_client_trampoline: 398 parsed_control = control_data.parse_control( 399 control, raise_warnings=False) 400 self.fast = parsed_control.fast 401 self.max_result_size_KB = parsed_control.max_result_size_KB 402 else: 403 self.fast = False 404 # Set the maximum result size to be the default specified in 405 # global config, if the job has no control file associated. 406 self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB 407 408 409 @classmethod 410 def _find_base_directories(cls): 411 """ 412 Determine locations of autodir, clientdir and serverdir. Assumes 413 that this file is located within serverdir and uses __file__ along 414 with relative paths to resolve the location. 415 """ 416 serverdir = os.path.abspath(os.path.dirname(__file__)) 417 autodir = os.path.normpath(os.path.join(serverdir, '..')) 418 clientdir = os.path.join(autodir, 'client') 419 return autodir, clientdir, serverdir 420 421 422 def _find_resultdir(self, resultdir, *args, **dargs): 423 """ 424 Determine the location of resultdir. For server jobs we expect one to 425 always be explicitly passed in to __init__, so just return that. 426 """ 427 if resultdir: 428 return os.path.normpath(resultdir) 429 else: 430 return None 431 432 433 def _get_status_logger(self): 434 """Return a reference to the status logger.""" 435 return self._logger 436 437 438 @staticmethod 439 def _load_control_file(path): 440 f = open(path) 441 try: 442 control_file = f.read() 443 finally: 444 f.close() 445 return re.sub('\r', '', control_file) 446 447 448 def _register_subcommand_hooks(self): 449 """ 450 Register some hooks into the subcommand modules that allow us 451 to properly clean up self.hosts created in forked subprocesses. 452 """ 453 def on_fork(cmd): 454 self._existing_hosts_on_fork = set(self.hosts) 455 def on_join(cmd): 456 new_hosts = self.hosts - self._existing_hosts_on_fork 457 for host in new_hosts: 458 host.close() 459 subcommand.subcommand.register_fork_hook(on_fork) 460 subcommand.subcommand.register_join_hook(on_join) 461 462 463 # TODO crbug.com/285395 add a kwargs parameter. 464 def _make_namespace(self): 465 """Create a namespace dictionary to be passed along to control file. 466 467 Creates a namespace argument populated with standard values: 468 machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag, 469 and ssh_options. 470 """ 471 namespace = {'machines' : self.machine_dict_list, 472 'job' : self, 473 'ssh_user' : self._ssh_user, 474 'ssh_port' : self._ssh_port, 475 'ssh_pass' : self._ssh_pass, 476 'ssh_verbosity_flag' : self._ssh_verbosity_flag, 477 'ssh_options' : self._ssh_options} 478 return namespace 479 480 481 def cleanup(self, labels): 482 """Cleanup machines. 483 484 @param labels: Comma separated job labels, will be used to 485 determine special task actions. 486 """ 487 if not self.machines: 488 raise error.AutoservError('No machines specified to cleanup') 489 if self.resultdir: 490 os.chdir(self.resultdir) 491 492 namespace = self._make_namespace() 493 namespace.update({'job_labels': labels, 'args': ''}) 494 self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False) 495 496 497 def verify(self, labels): 498 """Verify machines are all ssh-able. 499 500 @param labels: Comma separated job labels, will be used to 501 determine special task actions. 502 """ 503 if not self.machines: 504 raise error.AutoservError('No machines specified to verify') 505 if self.resultdir: 506 os.chdir(self.resultdir) 507 508 namespace = self._make_namespace() 509 namespace.update({'job_labels': labels, 'args': ''}) 510 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 511 512 513 def reset(self, labels): 514 """Reset machines by first cleanup then verify each machine. 515 516 @param labels: Comma separated job labels, will be used to 517 determine special task actions. 518 """ 519 if not self.machines: 520 raise error.AutoservError('No machines specified to reset.') 521 if self.resultdir: 522 os.chdir(self.resultdir) 523 524 namespace = self._make_namespace() 525 namespace.update({'job_labels': labels, 'args': ''}) 526 self._execute_code(RESET_CONTROL_FILE, namespace, protect=False) 527 528 529 def repair(self, labels): 530 """Repair machines. 531 532 @param labels: Comma separated job labels, will be used to 533 determine special task actions. 534 """ 535 if not self.machines: 536 raise error.AutoservError('No machines specified to repair') 537 if self.resultdir: 538 os.chdir(self.resultdir) 539 540 namespace = self._make_namespace() 541 namespace.update({'job_labels': labels, 'args': ''}) 542 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 543 544 545 def provision(self, labels): 546 """ 547 Provision all hosts to match |labels|. 548 549 @param labels: A comma seperated string of labels to provision the 550 host to. 551 552 """ 553 control = self._load_control_file(PROVISION_CONTROL_FILE) 554 self.run(control=control, job_labels=labels) 555 556 557 def precheck(self): 558 """ 559 perform any additional checks in derived classes. 560 """ 561 pass 562 563 564 def enable_external_logging(self): 565 """ 566 Start or restart external logging mechanism. 567 """ 568 pass 569 570 571 def disable_external_logging(self): 572 """ 573 Pause or stop external logging mechanism. 574 """ 575 pass 576 577 578 def use_external_logging(self): 579 """ 580 Return True if external logging should be used. 581 """ 582 return False 583 584 585 def _make_parallel_wrapper(self, function, machines, log): 586 """Wrap function as appropriate for calling by parallel_simple.""" 587 # machines could be a list of dictionaries, e.g., 588 # [{'host_attributes': {}, 'hostname': '100.96.51.226'}] 589 # The dictionary is generated in server_job.__init__, refer to 590 # variable machine_dict_list, then passed in with namespace, see method 591 # server_job._make_namespace. 592 # To compare the machinese to self.machines, which is a list of machine 593 # hostname, we need to convert machines back to a list of hostnames. 594 if (machines and isinstance(machines, list) 595 and isinstance(machines[0], dict)): 596 machines = [m['hostname'] for m in machines] 597 if len(machines) > 1 and log: 598 def wrapper(machine): 599 hostname = server_utils.get_hostname_from_machine(machine) 600 self.push_execution_context(hostname) 601 os.chdir(self.resultdir) 602 machine_data = {'hostname' : hostname, 603 'status_version' : str(self._STATUS_VERSION)} 604 utils.write_keyval(self.resultdir, machine_data) 605 result = function(machine) 606 return result 607 else: 608 wrapper = function 609 return wrapper 610 611 612 def parallel_simple(self, function, machines, log=True, timeout=None, 613 return_results=False): 614 """ 615 Run 'function' using parallel_simple, with an extra wrapper to handle 616 the necessary setup for continuous parsing, if possible. If continuous 617 parsing is already properly initialized then this should just work. 618 619 @param function: A callable to run in parallel given each machine. 620 @param machines: A list of machine names to be passed one per subcommand 621 invocation of function. 622 @param log: If True, output will be written to output in a subdirectory 623 named after each machine. 624 @param timeout: Seconds after which the function call should timeout. 625 @param return_results: If True instead of an AutoServError being raised 626 on any error a list of the results|exceptions from the function 627 called on each arg is returned. [default: False] 628 629 @raises error.AutotestError: If any of the functions failed. 630 """ 631 wrapper = self._make_parallel_wrapper(function, machines, log) 632 return subcommand.parallel_simple( 633 wrapper, machines, 634 subdir_name_constructor=server_utils.get_hostname_from_machine, 635 log=log, timeout=timeout, return_results=return_results) 636 637 638 def parallel_on_machines(self, function, machines, timeout=None): 639 """ 640 @param function: Called in parallel with one machine as its argument. 641 @param machines: A list of machines to call function(machine) on. 642 @param timeout: Seconds after which the function call should timeout. 643 644 @returns A list of machines on which function(machine) returned 645 without raising an exception. 646 """ 647 results = self.parallel_simple(function, machines, timeout=timeout, 648 return_results=True) 649 success_machines = [] 650 for result, machine in zip(results, machines): 651 if not isinstance(result, Exception): 652 success_machines.append(machine) 653 return success_machines 654 655 656 def record_skipped_test(self, skipped_test, message=None): 657 """Insert a failure record into status.log for this test.""" 658 msg = message 659 if msg is None: 660 msg = 'No valid machines found for test %s.' % skipped_test 661 logging.info(msg) 662 self.record('START', None, skipped_test.test_name) 663 self.record('INFO', None, skipped_test.test_name, msg) 664 self.record('END TEST_NA', None, skipped_test.test_name, msg) 665 666 667 def _has_failed_tests(self): 668 """Parse status log for failed tests. 669 670 This checks the current working directory and is intended only for use 671 by the run() method. 672 673 @return boolean 674 """ 675 path = os.getcwd() 676 677 # TODO(ayatane): Copied from tko/parse.py. Needs extensive refactor to 678 # make code reuse plausible. 679 job_keyval = tko_models.job.read_keyval(path) 680 status_version = job_keyval.get("status_version", 0) 681 682 # parse out the job 683 parser = parser_lib.parser(status_version) 684 job = parser.make_job(path) 685 status_log = os.path.join(path, "status.log") 686 if not os.path.exists(status_log): 687 status_log = os.path.join(path, "status") 688 if not os.path.exists(status_log): 689 logging.warning("! Unable to parse job, no status file") 690 return True 691 692 # parse the status logs 693 status_lines = open(status_log).readlines() 694 parser.start(job) 695 tests = parser.end(status_lines) 696 697 # parser.end can return the same object multiple times, so filter out 698 # dups 699 job.tests = [] 700 already_added = set() 701 for test in tests: 702 if test not in already_added: 703 already_added.add(test) 704 job.tests.append(test) 705 706 failed = False 707 for test in job.tests: 708 # The current job is still running and shouldn't count as failed. 709 # The parser will fail to parse the exit status of the job since it 710 # hasn't exited yet (this running right now is the job). 711 failed = failed or (test.status != 'GOOD' 712 and not _is_current_server_job(test)) 713 return failed 714 715 716 def _collect_crashes(self, namespace, collect_crashinfo): 717 """Collect crashes. 718 719 @param namespace: namespace dict. 720 @param collect_crashinfo: whether to collect crashinfo in addition to 721 dumps 722 """ 723 if collect_crashinfo: 724 # includes crashdumps 725 crash_control_file = CRASHINFO_CONTROL_FILE 726 else: 727 crash_control_file = CRASHDUMPS_CONTROL_FILE 728 self._execute_code(crash_control_file, namespace) 729 730 731 _USE_TEMP_DIR = object() 732 def run(self, collect_crashdumps=True, namespace={}, control=None, 733 control_file_dir=None, verify_job_repo_url=False, 734 only_collect_crashinfo=False, skip_crash_collection=False, 735 job_labels='', use_packaging=True): 736 # for a normal job, make sure the uncollected logs file exists 737 # for a crashinfo-only run it should already exist, bail out otherwise 738 created_uncollected_logs = False 739 logging.info("I am PID %s", os.getpid()) 740 if self.resultdir and not os.path.exists(self._uncollected_log_file): 741 if only_collect_crashinfo: 742 # if this is a crashinfo-only run, and there were no existing 743 # uncollected logs, just bail out early 744 logging.info("No existing uncollected logs, " 745 "skipping crashinfo collection") 746 return 747 else: 748 log_file = open(self._uncollected_log_file, "wb") 749 pickle.dump([], log_file) 750 log_file.close() 751 created_uncollected_logs = True 752 753 # use a copy so changes don't affect the original dictionary 754 namespace = namespace.copy() 755 machines = self.machines 756 if control is None: 757 if self.control is None: 758 control = '' 759 elif self._use_client_trampoline: 760 # Some tests must be loaded and staged before they can be run, 761 # see crbug.com/883403#c42 and #c46 for details. 762 control = self._load_control_file( 763 CLIENT_TRAMPOLINE_CONTROL_FILE) 764 # repr of a string is safe for eval. 765 control = (('trampoline_testname = %r\n' % str(self.control)) 766 + control) 767 else: 768 control = self._load_control_file(self.control) 769 if control_file_dir is None: 770 control_file_dir = self.resultdir 771 772 self.aborted = False 773 namespace.update(self._make_namespace()) 774 namespace.update({ 775 'args': self.args, 776 'job_labels': job_labels, 777 'gtest_runner': site_gtest_runner.gtest_runner(), 778 }) 779 test_start_time = int(time.time()) 780 781 if self.resultdir: 782 os.chdir(self.resultdir) 783 # touch status.log so that the parser knows a job is running here 784 open(self.get_status_log_path(), 'a').close() 785 self.enable_external_logging() 786 787 collect_crashinfo = True 788 temp_control_file_dir = None 789 try: 790 try: 791 if not self.fast: 792 with metrics.SecondsTimer( 793 'chromeos/autotest/job/get_network_stats', 794 fields = {'stage': 'start'}): 795 namespace['network_stats_label'] = 'at-start' 796 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, 797 namespace) 798 799 if only_collect_crashinfo: 800 return 801 802 # If the verify_job_repo_url option is set but we're unable 803 # to actually verify that the job_repo_url contains the autotest 804 # package, this job will fail. 805 if verify_job_repo_url: 806 self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE, 807 namespace) 808 else: 809 logging.warning('Not checking if job_repo_url contains ' 810 'autotest packages on %s', machines) 811 812 # determine the dir to write the control files to 813 cfd_specified = (control_file_dir 814 and control_file_dir is not self._USE_TEMP_DIR) 815 if cfd_specified: 816 temp_control_file_dir = None 817 else: 818 temp_control_file_dir = tempfile.mkdtemp( 819 suffix='temp_control_file_dir') 820 control_file_dir = temp_control_file_dir 821 server_control_file = os.path.join(control_file_dir, 822 self._control_filename) 823 client_control_file = os.path.join(control_file_dir, 824 CLIENT_CONTROL_FILENAME) 825 if self._client: 826 namespace['control'] = control 827 utils.open_write_close(client_control_file, control) 828 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 829 server_control_file) 830 else: 831 utils.open_write_close(server_control_file, control) 832 833 sync_dir = self._offload_dir_target_path() 834 if self._sync_offload_dir: 835 logging.info("Preparing synchronous offload dir") 836 self.create_marker_file() 837 logging.info("Offload dir and marker file ready") 838 logging.debug("Results dir is %s", self.resultdir) 839 logging.debug("Synchronous offload dir is %s", sync_dir) 840 logging.info("Processing control file") 841 namespace['use_packaging'] = use_packaging 842 namespace['synchronous_offload_dir'] = sync_dir 843 os.environ[OFFLOAD_ENVVAR] = sync_dir 844 self._execute_code(server_control_file, namespace) 845 logging.info("Finished processing control file") 846 self._maybe_retrieve_client_offload_dirs() 847 # If no device error occurred, no need to collect crashinfo. 848 collect_crashinfo = self.failed_with_device_error 849 except Exception as e: 850 try: 851 logging.exception( 852 'Exception escaped control file, job aborting:') 853 reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX, 854 ' ', str(e)) 855 self.record('INFO', None, None, str(e), 856 {'job_abort_reason': reason}) 857 except: 858 pass # don't let logging exceptions here interfere 859 raise 860 finally: 861 if temp_control_file_dir: 862 # Clean up temp directory used for copies of the control files 863 try: 864 shutil.rmtree(temp_control_file_dir) 865 except Exception as e: 866 logging.warning('Could not remove temp directory %s: %s', 867 temp_control_file_dir, e) 868 869 if machines and (collect_crashdumps or collect_crashinfo): 870 if skip_crash_collection or self.fast: 871 logging.info('Skipping crash dump/info collection ' 872 'as requested.') 873 else: 874 with metrics.SecondsTimer( 875 'chromeos/autotest/job/collect_crashinfo'): 876 namespace['test_start_time'] = test_start_time 877 # Remove crash files for passing tests. 878 # TODO(ayatane): Tests that create crash files should be 879 # reported. 880 namespace['has_failed_tests'] = self._has_failed_tests() 881 self._collect_crashes(namespace, collect_crashinfo) 882 self.disable_external_logging() 883 if self._uncollected_log_file and created_uncollected_logs: 884 os.remove(self._uncollected_log_file) 885 886 if not self.fast: 887 with metrics.SecondsTimer( 888 'chromeos/autotest/job/get_network_stats', 889 fields = {'stage': 'end'}): 890 namespace['network_stats_label'] = 'at-end' 891 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, 892 namespace) 893 894 def _server_offload_dir_path(self): 895 return os.path.join(self.resultdir, self._sync_offload_dir) 896 897 def _offload_dir_target_path(self): 898 if not self._sync_offload_dir: 899 return '' 900 if self._client: 901 return os.path.join(DUT_STATEFUL_PATH, self._sync_offload_dir) 902 return os.path.join(self.resultdir, self._sync_offload_dir) 903 904 def _maybe_retrieve_client_offload_dirs(self): 905 if not(self._sync_offload_dir and self._client): 906 logging.info("No client dir to retrieve.") 907 return '' 908 logging.info("Retrieving synchronous offload dir from client") 909 server_path = self._server_offload_dir_path() 910 client_path = self._offload_dir_target_path() 911 def serial(machine): 912 host = hosts.create_host(machine) 913 server_subpath = os.path.join(server_path, host.hostname) 914 # Empty final piece ensures that get_file gets a trailing slash, 915 # which makes it copy dir contents rather than the dir itself. 916 client_subpath = os.path.join(client_path, '') 917 logging.debug("Client dir to retrieve is %s", client_subpath) 918 os.makedirs(server_subpath) 919 host.get_file(client_subpath, server_subpath) 920 self.parallel_simple(serial, self.machines) 921 logging.debug("Synchronous offload dir retrieved to %s", server_path) 922 return server_path 923 924 def _create_client_offload_dirs(self): 925 marker_string = "client %s%s" % ( 926 "in SSP " if utils.is_in_container() else "", 927 str(datetime.utcnow()) 928 ) 929 offload_path = self._offload_dir_target_path() 930 _, file_path = tempfile.mkstemp() 931 def serial(machine): 932 host = hosts.create_host(machine) 933 marker_path = os.path.join(offload_path, "sync_offloads_marker") 934 host.run(("mkdir -p %s" % offload_path), ignore_status=False) 935 host.send_file(file_path, marker_path) 936 937 try: 938 utils.open_write_close(file_path, marker_string) 939 self.parallel_simple(serial, self.machines) 940 finally: 941 os.remove(file_path) 942 943 944 def create_marker_file(self): 945 """Create a marker file in the leaf task's synchronous offload dir. 946 947 This ensures that we will get some results offloaded if the test fails 948 to create output properly, distinguishing offload errs from test errs. 949 @obj_param _client: Boolean, whether the control file is client-side. 950 @obj_param _sync_offload_dir: rel. path from results dir to offload dir. 951 952 @returns: path to offload dir on the machine of the leaf task 953 """ 954 # Make the server-side directory regardless 955 try: 956 # 2.7 makedirs doesn't have an option for pre-existing directories 957 os.makedirs(self._server_offload_dir_path()) 958 except OSError as e: 959 if e.errno != errno.EEXIST: 960 raise 961 if not self._client: 962 offload_path = self._offload_dir_target_path() 963 marker_string = "server %s%s" % ( 964 "in SSP " if utils.is_in_container() else "", 965 str(datetime.utcnow()) 966 ) 967 utils.open_write_close( 968 os.path.join(offload_path, "sync_offloads_marker"), 969 marker_string 970 ) 971 return offload_path 972 return self._create_client_offload_dirs() 973 974 def run_test(self, url, *args, **dargs): 975 """ 976 Summon a test object and run it. 977 978 tag 979 tag to add to testname 980 url 981 url of the test to run 982 """ 983 if self._disable_sysinfo: 984 dargs['disable_sysinfo'] = True 985 986 group, testname = self.pkgmgr.get_package_name(url, 'test') 987 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 988 outputdir = self._make_test_outputdir(subdir) 989 990 def group_func(): 991 try: 992 test.runtest(self, url, tag, args, dargs) 993 except error.TestBaseException as e: 994 self.record(e.exit_status, subdir, testname, str(e)) 995 raise 996 except Exception as e: 997 info = str(e) + "\n" + traceback.format_exc() 998 self.record('FAIL', subdir, testname, info) 999 raise 1000 else: 1001 self.record('GOOD', subdir, testname, 'completed successfully') 1002 1003 try: 1004 result = self._run_group(testname, subdir, group_func) 1005 except error.TestBaseException as e: 1006 return False 1007 else: 1008 return True 1009 1010 1011 def _run_group(self, name, subdir, function, *args, **dargs): 1012 """Underlying method for running something inside of a group.""" 1013 result, exc_info = None, None 1014 try: 1015 self.record('START', subdir, name) 1016 result = function(*args, **dargs) 1017 except error.TestBaseException as e: 1018 self.record("END %s" % e.exit_status, subdir, name) 1019 raise 1020 except Exception as e: 1021 err_msg = str(e) + '\n' 1022 err_msg += traceback.format_exc() 1023 self.record('END ABORT', subdir, name, err_msg) 1024 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 1025 else: 1026 self.record('END GOOD', subdir, name) 1027 finally: 1028 for hook in self._post_run_hooks: 1029 hook() 1030 1031 return result 1032 1033 1034 def run_group(self, function, *args, **dargs): 1035 """\ 1036 @param function: subroutine to run 1037 @returns: (result, exc_info). When the call succeeds, result contains 1038 the return value of |function| and exc_info is None. If 1039 |function| raises an exception, exc_info contains the tuple 1040 returned by sys.exc_info(), and result is None. 1041 """ 1042 1043 name = function.__name__ 1044 # Allow the tag for the group to be specified. 1045 tag = dargs.pop('tag', None) 1046 if tag: 1047 name = tag 1048 1049 try: 1050 result = self._run_group(name, None, function, *args, **dargs)[0] 1051 except error.TestBaseException: 1052 return None, sys.exc_info() 1053 return result, None 1054 1055 1056 def run_op(self, op, op_func, get_kernel_func): 1057 """\ 1058 A specialization of run_group meant specifically for handling 1059 management operation. Includes support for capturing the kernel version 1060 after the operation. 1061 1062 Args: 1063 op: name of the operation. 1064 op_func: a function that carries out the operation (reboot, suspend) 1065 get_kernel_func: a function that returns a string 1066 representing the kernel version. 1067 """ 1068 try: 1069 self.record('START', None, op) 1070 op_func() 1071 except Exception as e: 1072 err_msg = str(e) + '\n' + traceback.format_exc() 1073 self.record('END FAIL', None, op, err_msg) 1074 raise 1075 else: 1076 kernel = get_kernel_func() 1077 self.record('END GOOD', None, op, 1078 optional_fields={"kernel": kernel}) 1079 1080 1081 def run_control(self, path): 1082 """Execute a control file found at path (relative to the autotest 1083 path). Intended for executing a control file within a control file, 1084 not for running the top-level job control file.""" 1085 path = os.path.join(self.autodir, path) 1086 control_file = self._load_control_file(path) 1087 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR) 1088 1089 1090 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1091 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1092 on_every_test) 1093 1094 1095 def add_sysinfo_logfile(self, file, on_every_test=False): 1096 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1097 1098 1099 def _add_sysinfo_loggable(self, loggable, on_every_test): 1100 if on_every_test: 1101 self.sysinfo.test_loggables.add(loggable) 1102 else: 1103 self.sysinfo.boot_loggables.add(loggable) 1104 1105 1106 def _read_warnings(self): 1107 """Poll all the warning loggers and extract any new warnings that have 1108 been logged. If the warnings belong to a category that is currently 1109 disabled, this method will discard them and they will no longer be 1110 retrievable. 1111 1112 Returns a list of (timestamp, message) tuples, where timestamp is an 1113 integer epoch timestamp.""" 1114 warnings = [] 1115 while True: 1116 # pull in a line of output from every logger that has 1117 # output ready to be read 1118 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 1119 closed_loggers = set() 1120 for logger in loggers: 1121 line = logger.readline() 1122 # record any broken pipes (aka line == empty) 1123 if len(line) == 0: 1124 closed_loggers.add(logger) 1125 continue 1126 # parse out the warning 1127 timestamp, msgtype, msg = line.split('\t', 2) 1128 timestamp = int(timestamp) 1129 # if the warning is valid, add it to the results 1130 if self.warning_manager.is_valid(timestamp, msgtype): 1131 warnings.append((timestamp, msg.strip())) 1132 1133 # stop listening to loggers that are closed 1134 self.warning_loggers -= closed_loggers 1135 1136 # stop if none of the loggers have any output left 1137 if not loggers: 1138 break 1139 1140 # sort into timestamp order 1141 warnings.sort() 1142 return warnings 1143 1144 1145 def _unique_subdirectory(self, base_subdirectory_name): 1146 """Compute a unique results subdirectory based on the given name. 1147 1148 Appends base_subdirectory_name with a number as necessary to find a 1149 directory name that doesn't already exist. 1150 """ 1151 subdirectory = base_subdirectory_name 1152 counter = 1 1153 while os.path.exists(os.path.join(self.resultdir, subdirectory)): 1154 subdirectory = base_subdirectory_name + '.' + str(counter) 1155 counter += 1 1156 return subdirectory 1157 1158 1159 def get_record_context(self): 1160 """Returns an object representing the current job.record context. 1161 1162 The object returned is an opaque object with a 0-arg restore method 1163 which can be called to restore the job.record context (i.e. indentation) 1164 to the current level. The intention is that it should be used when 1165 something external which generate job.record calls (e.g. an autotest 1166 client) can fail catastrophically and the server job record state 1167 needs to be reset to its original "known good" state. 1168 1169 @return: A context object with a 0-arg restore() method.""" 1170 return self._indenter.get_context() 1171 1172 1173 def record_summary(self, status_code, test_name, reason='', attributes=None, 1174 distinguishing_attributes=(), child_test_ids=None): 1175 """Record a summary test result. 1176 1177 @param status_code: status code string, see 1178 common_lib.log.is_valid_status() 1179 @param test_name: name of the test 1180 @param reason: (optional) string providing detailed reason for test 1181 outcome 1182 @param attributes: (optional) dict of string keyvals to associate with 1183 this result 1184 @param distinguishing_attributes: (optional) list of attribute names 1185 that should be used to distinguish identically-named test 1186 results. These attributes should be present in the attributes 1187 parameter. This is used to generate user-friendly subdirectory 1188 names. 1189 @param child_test_ids: (optional) list of test indices for test results 1190 used in generating this result. 1191 """ 1192 subdirectory_name_parts = [test_name] 1193 for attribute in distinguishing_attributes: 1194 assert attributes 1195 assert attribute in attributes, '%s not in %s' % (attribute, 1196 attributes) 1197 subdirectory_name_parts.append(attributes[attribute]) 1198 base_subdirectory_name = '.'.join(subdirectory_name_parts) 1199 1200 subdirectory = self._unique_subdirectory(base_subdirectory_name) 1201 subdirectory_path = os.path.join(self.resultdir, subdirectory) 1202 os.mkdir(subdirectory_path) 1203 1204 self.record(status_code, subdirectory, test_name, 1205 status=reason, optional_fields={'is_summary': True}) 1206 1207 if attributes: 1208 utils.write_keyval(subdirectory_path, attributes) 1209 1210 if child_test_ids: 1211 ids_string = ','.join(str(test_id) for test_id in child_test_ids) 1212 summary_data = {'child_test_ids': ids_string} 1213 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'), 1214 summary_data) 1215 1216 1217 def add_post_run_hook(self, hook): 1218 """ 1219 Registers a hook to run after the main job function. 1220 1221 This provides a mechanism by which tests that perform multiple tests of 1222 their own can write additional top-level results to the TKO status.log 1223 file. 1224 1225 @param hook: Function to invoke (without any args) after the main job 1226 function completes and the job status is logged. 1227 """ 1228 self._post_run_hooks.append(hook) 1229 1230 1231 def disable_warnings(self, warning_type): 1232 self.warning_manager.disable_warnings(warning_type) 1233 self.record("INFO", None, None, 1234 "disabling %s warnings" % warning_type, 1235 {"warnings.disable": warning_type}) 1236 1237 1238 def enable_warnings(self, warning_type): 1239 self.warning_manager.enable_warnings(warning_type) 1240 self.record("INFO", None, None, 1241 "enabling %s warnings" % warning_type, 1242 {"warnings.enable": warning_type}) 1243 1244 1245 def get_status_log_path(self, subdir=None): 1246 """Return the path to the job status log. 1247 1248 @param subdir - Optional paramter indicating that you want the path 1249 to a subdirectory status log. 1250 1251 @returns The path where the status log should be. 1252 """ 1253 if self.resultdir: 1254 if subdir: 1255 return os.path.join(self.resultdir, subdir, "status.log") 1256 else: 1257 return os.path.join(self.resultdir, "status.log") 1258 else: 1259 return None 1260 1261 1262 def _update_uncollected_logs_list(self, update_func): 1263 """Updates the uncollected logs list in a multi-process safe manner. 1264 1265 @param update_func - a function that updates the list of uncollected 1266 logs. Should take one parameter, the list to be updated. 1267 """ 1268 # Skip log collection if file _uncollected_log_file does not exist. 1269 if not (self._uncollected_log_file and 1270 os.path.exists(self._uncollected_log_file)): 1271 return 1272 if self._uncollected_log_file: 1273 log_file = open(self._uncollected_log_file, "rb+") 1274 fcntl.flock(log_file, fcntl.LOCK_EX) 1275 try: 1276 uncollected_logs = pickle.load(log_file) 1277 update_func(uncollected_logs) 1278 log_file.seek(0) 1279 log_file.truncate() 1280 pickle.dump(uncollected_logs, log_file) 1281 log_file.flush() 1282 finally: 1283 fcntl.flock(log_file, fcntl.LOCK_UN) 1284 log_file.close() 1285 1286 1287 def add_client_log(self, hostname, remote_path, local_path): 1288 """Adds a new set of client logs to the list of uncollected logs, 1289 to allow for future log recovery. 1290 1291 @param host - the hostname of the machine holding the logs 1292 @param remote_path - the directory on the remote machine holding logs 1293 @param local_path - the local directory to copy the logs into 1294 """ 1295 def update_func(logs_list): 1296 logs_list.append((hostname, remote_path, local_path)) 1297 self._update_uncollected_logs_list(update_func) 1298 1299 1300 def remove_client_log(self, hostname, remote_path, local_path): 1301 """Removes a set of client logs from the list of uncollected logs, 1302 to allow for future log recovery. 1303 1304 @param host - the hostname of the machine holding the logs 1305 @param remote_path - the directory on the remote machine holding logs 1306 @param local_path - the local directory to copy the logs into 1307 """ 1308 def update_func(logs_list): 1309 logs_list.remove((hostname, remote_path, local_path)) 1310 self._update_uncollected_logs_list(update_func) 1311 1312 1313 def get_client_logs(self): 1314 """Retrieves the list of uncollected logs, if it exists. 1315 1316 @returns A list of (host, remote_path, local_path) tuples. Returns 1317 an empty list if no uncollected logs file exists. 1318 """ 1319 log_exists = (self._uncollected_log_file and 1320 os.path.exists(self._uncollected_log_file)) 1321 if log_exists: 1322 return pickle.load(open(self._uncollected_log_file)) 1323 else: 1324 return [] 1325 1326 1327 def _fill_server_control_namespace(self, namespace, protect=True): 1328 """ 1329 Prepare a namespace to be used when executing server control files. 1330 1331 This sets up the control file API by importing modules and making them 1332 available under the appropriate names within namespace. 1333 1334 For use by _execute_code(). 1335 1336 Args: 1337 namespace: The namespace dictionary to fill in. 1338 protect: Boolean. If True (the default) any operation that would 1339 clobber an existing entry in namespace will cause an error. 1340 Raises: 1341 error.AutoservError: When a name would be clobbered by import. 1342 """ 1343 def _import_names(module_name, names=()): 1344 """ 1345 Import a module and assign named attributes into namespace. 1346 1347 Args: 1348 module_name: The string module name. 1349 names: A limiting list of names to import from module_name. If 1350 empty (the default), all names are imported from the module 1351 similar to a "from foo.bar import *" statement. 1352 Raises: 1353 error.AutoservError: When a name being imported would clobber 1354 a name already in namespace. 1355 """ 1356 module = __import__(module_name, {}, {}, names) 1357 1358 # No names supplied? Import * from the lowest level module. 1359 # (Ugh, why do I have to implement this part myself?) 1360 if not names: 1361 for submodule_name in module_name.split('.')[1:]: 1362 module = getattr(module, submodule_name) 1363 if hasattr(module, '__all__'): 1364 names = getattr(module, '__all__') 1365 else: 1366 names = dir(module) 1367 1368 # Install each name into namespace, checking to make sure it 1369 # doesn't override anything that already exists. 1370 for name in names: 1371 # Check for conflicts to help prevent future problems. 1372 if name in namespace and protect: 1373 if namespace[name] is not getattr(module, name): 1374 raise error.AutoservError('importing name ' 1375 '%s from %s %r would override %r' % 1376 (name, module_name, getattr(module, name), 1377 namespace[name])) 1378 else: 1379 # Encourage cleanliness and the use of __all__ for a 1380 # more concrete API with less surprises on '*' imports. 1381 warnings.warn('%s (%r) being imported from %s for use ' 1382 'in server control files is not the ' 1383 'first occurrence of that import.' % 1384 (name, namespace[name], module_name)) 1385 1386 namespace[name] = getattr(module, name) 1387 1388 1389 # This is the equivalent of prepending a bunch of import statements to 1390 # the front of the control script. 1391 namespace.update(os=os, sys=sys, logging=logging) 1392 _import_names('autotest_lib.server', 1393 ('hosts', 'autotest', 'standalone_profiler')) 1394 _import_names('autotest_lib.server.subcommand', 1395 ('parallel', 'parallel_simple', 'subcommand')) 1396 _import_names('autotest_lib.server.utils', 1397 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 1398 _import_names('autotest_lib.client.common_lib.error') 1399 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 1400 1401 # Inject ourself as the job object into other classes within the API. 1402 # (Yuck, this injection is a gross thing be part of a public API. -gps) 1403 # 1404 # XXX Autotest does not appear to use .job. Who does? 1405 namespace['autotest'].Autotest.job = self 1406 # server.hosts.base_classes.Host uses .job. 1407 namespace['hosts'].Host.job = self 1408 namespace['hosts'].factory.ssh_user = self._ssh_user 1409 namespace['hosts'].factory.ssh_port = self._ssh_port 1410 namespace['hosts'].factory.ssh_pass = self._ssh_pass 1411 namespace['hosts'].factory.ssh_verbosity_flag = ( 1412 self._ssh_verbosity_flag) 1413 namespace['hosts'].factory.ssh_options = self._ssh_options 1414 1415 1416 def _execute_code(self, code_file, namespace, protect=True): 1417 """ 1418 Execute code using a copy of namespace as a server control script. 1419 1420 Unless protect_namespace is explicitly set to False, the dict will not 1421 be modified. 1422 1423 Args: 1424 code_file: The filename of the control file to execute. 1425 namespace: A dict containing names to make available during execution. 1426 protect: Boolean. If True (the default) a copy of the namespace dict 1427 is used during execution to prevent the code from modifying its 1428 contents outside of this function. If False the raw dict is 1429 passed in and modifications will be allowed. 1430 """ 1431 if protect: 1432 namespace = namespace.copy() 1433 self._fill_server_control_namespace(namespace, protect=protect) 1434 # TODO: Simplify and get rid of the special cases for only 1 machine. 1435 if len(self.machines) > 1: 1436 machines_text = '\n'.join(self.machines) + '\n' 1437 # Only rewrite the file if it does not match our machine list. 1438 try: 1439 machines_f = open(MACHINES_FILENAME, 'r') 1440 existing_machines_text = machines_f.read() 1441 machines_f.close() 1442 except EnvironmentError: 1443 existing_machines_text = None 1444 if machines_text != existing_machines_text: 1445 utils.open_write_close(MACHINES_FILENAME, machines_text) 1446 seven.exec_file(code_file, locals_=namespace, globals_=namespace) 1447 1448 1449 def preprocess_client_state(self): 1450 """ 1451 Produce a state file for initializing the state of a client job. 1452 1453 Creates a new client state file with all the current server state, as 1454 well as some pre-set client state. 1455 1456 @returns The path of the file the state was written into. 1457 """ 1458 # initialize the sysinfo state 1459 self._state.set('client', 'sysinfo', self.sysinfo.serialize()) 1460 1461 # dump the state out to a tempfile 1462 fd, file_path = tempfile.mkstemp(dir=self.tmpdir) 1463 os.close(fd) 1464 1465 # write_to_file doesn't need locking, we exclusively own file_path 1466 self._state.write_to_file(file_path) 1467 return file_path 1468 1469 1470 def postprocess_client_state(self, state_path): 1471 """ 1472 Update the state of this job with the state from a client job. 1473 1474 Updates the state of the server side of a job with the final state 1475 of a client job that was run. Updates the non-client-specific state, 1476 pulls in some specific bits from the client-specific state, and then 1477 discards the rest. Removes the state file afterwards 1478 1479 @param state_file A path to the state file from the client. 1480 """ 1481 # update the on-disk state 1482 try: 1483 self._state.read_from_file(state_path) 1484 os.remove(state_path) 1485 except OSError as e: 1486 # ignore file-not-found errors 1487 if e.errno != errno.ENOENT: 1488 raise 1489 else: 1490 logging.debug('Client state file %s not found', state_path) 1491 1492 # update the sysinfo state 1493 if self._state.has('client', 'sysinfo'): 1494 self.sysinfo.deserialize(self._state.get('client', 'sysinfo')) 1495 1496 # drop all the client-specific state 1497 self._state.discard_namespace('client') 1498 1499 1500 def clear_all_known_hosts(self): 1501 """Clears known hosts files for all AbstractSSHHosts.""" 1502 for host in self.hosts: 1503 if isinstance(host, abstract_ssh.AbstractSSHHost): 1504 host.clear_known_hosts() 1505 1506 1507 def close(self): 1508 """Closes this job's operation.""" 1509 1510 # Use shallow copy, because host.close() internally discards itself. 1511 for host in list(self.hosts): 1512 host.close() 1513 assert not self.hosts 1514 self._connection_pool.shutdown() 1515 1516 1517 def _get_job_data(self): 1518 """Add custom data to the job keyval info. 1519 1520 When multiple machines are used in a job, change the hostname to 1521 the platform of the first machine instead of machine1,machine2,... This 1522 makes the job reports easier to read and keeps the tko_machines table from 1523 growing too large. 1524 1525 Returns: 1526 keyval dictionary with new hostname value, or empty dictionary. 1527 """ 1528 job_data = {} 1529 # Only modify hostname on multimachine jobs. Assume all host have the same 1530 # platform. 1531 if len(self.machines) > 1: 1532 # Search through machines for first machine with a platform. 1533 for host in self.machines: 1534 keyval_path = os.path.join(self.resultdir, 'host_keyvals', host) 1535 keyvals = utils.read_keyval(keyval_path) 1536 host_plat = keyvals.get('platform', None) 1537 if not host_plat: 1538 continue 1539 job_data['hostname'] = host_plat 1540 break 1541 return job_data 1542 1543 1544class warning_manager(object): 1545 """Class for controlling warning logs. Manages the enabling and disabling 1546 of warnings.""" 1547 def __init__(self): 1548 # a map of warning types to a list of disabled time intervals 1549 self.disabled_warnings = {} 1550 1551 1552 def is_valid(self, timestamp, warning_type): 1553 """Indicates if a warning (based on the time it occured and its type) 1554 is a valid warning. A warning is considered "invalid" if this type of 1555 warning was marked as "disabled" at the time the warning occured.""" 1556 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1557 for start, end in disabled_intervals: 1558 if timestamp >= start and (end is None or timestamp < end): 1559 return False 1560 return True 1561 1562 1563 def disable_warnings(self, warning_type, current_time_func=time.time): 1564 """As of now, disables all further warnings of this type.""" 1565 intervals = self.disabled_warnings.setdefault(warning_type, []) 1566 if not intervals or intervals[-1][1] is not None: 1567 intervals.append((int(current_time_func()), None)) 1568 1569 1570 def enable_warnings(self, warning_type, current_time_func=time.time): 1571 """As of now, enables all further warnings of this type.""" 1572 intervals = self.disabled_warnings.get(warning_type, []) 1573 if intervals and intervals[-1][1] is None: 1574 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1575 1576 1577def _is_current_server_job(test): 1578 """Return True if parsed test is the currently running job. 1579 1580 @param test: test instance from tko parser. 1581 """ 1582 return test.testname == 'SERVER_JOB' 1583 1584 1585def _create_afe_host(hostname): 1586 """Create an afe_host object backed by the AFE. 1587 1588 @param hostname: Name of the host for which we want the Host object. 1589 @returns: An object of type frontend.AFE 1590 """ 1591 afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10) 1592 hosts = afe.get_hosts(hostname=hostname) 1593 if not hosts: 1594 raise error.AutoservError('No hosts named %s found' % hostname) 1595 1596 return hosts[0] 1597 1598 1599def _create_file_backed_host_info_store(store_dir, hostname): 1600 """Create a CachingHostInfoStore backed by an existing file. 1601 1602 @param store_dir: A directory to contain store backing files. 1603 @param hostname: Name of the host for which we want the store. 1604 1605 @returns: An object of type CachingHostInfoStore. 1606 """ 1607 backing_file_path = os.path.join(store_dir, '%s.store' % hostname) 1608 if not os.path.isfile(backing_file_path): 1609 raise error.AutoservError( 1610 'Requested FileStore but no backing file at %s' 1611 % backing_file_path 1612 ) 1613 return file_store.FileStore(backing_file_path) 1614 1615 1616def _create_afe_backed_host_info_store(store_dir, hostname): 1617 """Create a CachingHostInfoStore backed by the AFE. 1618 1619 @param store_dir: A directory to contain store backing files. 1620 @param hostname: Name of the host for which we want the store. 1621 1622 @returns: An object of type CachingHostInfoStore. 1623 """ 1624 primary_store = afe_store.AfeStore(hostname) 1625 try: 1626 primary_store.get(force_refresh=True) 1627 except host_info.StoreError: 1628 raise error.AutoservError( 1629 'Could not obtain HostInfo for hostname %s' % hostname) 1630 # Since the store wasn't initialized external to autoserv, we must 1631 # ensure that the store we create is unique within store_dir. 1632 backing_file_path = os.path.join( 1633 _make_unique_subdir(store_dir), 1634 '%s.store' % hostname, 1635 ) 1636 logging.info('Shadowing AFE store with a FileStore at %s', 1637 backing_file_path) 1638 shadow_store = file_store.FileStore(backing_file_path) 1639 return shadowing_store.ShadowingStore(primary_store, shadow_store) 1640 1641 1642def _make_unique_subdir(workdir): 1643 """Creates a new subdir within workdir and returns the path to it.""" 1644 store_dir = os.path.join(workdir, 'dir_%s' % uuid.uuid4()) 1645 _make_dirs_if_needed(store_dir) 1646 return store_dir 1647 1648 1649def _make_dirs_if_needed(path): 1650 """os.makedirs, but ignores failure because the leaf directory exists""" 1651 try: 1652 os.makedirs(path) 1653 except OSError as e: 1654 if e.errno != errno.EEXIST: 1655 raise 1656