1# pylint: disable-msg=C0111 2 3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6""" 7The main job wrapper for the server side. 8 9This is the core infrastructure. Derived from the client side job.py 10 11Copyright Martin J. Bligh, Andy Whitcroft 2007 12""" 13 14import errno 15import fcntl 16import getpass 17import itertools 18import logging 19import os 20import pickle 21import platform 22import re 23import select 24import shutil 25import sys 26import tempfile 27import time 28import traceback 29import uuid 30import warnings 31 32from autotest_lib.client.bin import sysinfo 33from autotest_lib.client.common_lib import base_job 34from autotest_lib.client.common_lib import control_data 35from autotest_lib.client.common_lib import error 36from autotest_lib.client.common_lib import logging_manager 37from autotest_lib.client.common_lib import packages 38from autotest_lib.client.common_lib import utils 39from autotest_lib.server import profilers 40from autotest_lib.server import site_gtest_runner 41from autotest_lib.server import subcommand 42from autotest_lib.server import test 43from autotest_lib.server import utils as server_utils 44from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 45from autotest_lib.server.hosts import abstract_ssh 46from autotest_lib.server.hosts import afe_store 47from autotest_lib.server.hosts import file_store 48from autotest_lib.server.hosts import shadowing_store 49from autotest_lib.server.hosts import factory as host_factory 50from autotest_lib.server.hosts import host_info 51from autotest_lib.server.hosts import ssh_multiplex 52from autotest_lib.tko import models as tko_models 53from autotest_lib.tko import parser_lib 54 55try: 56 from chromite.lib import metrics 57except ImportError: 58 metrics = utils.metrics_mock 59 60 61def _control_segment_path(name): 62 """Get the pathname of the named control segment file.""" 63 server_dir = os.path.dirname(os.path.abspath(__file__)) 64 return os.path.join(server_dir, "control_segments", name) 65 66 67CLIENT_CONTROL_FILENAME = 'control' 68SERVER_CONTROL_FILENAME = 'control.srv' 69MACHINES_FILENAME = '.machines' 70 71CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 72CLIENT_TRAMPOLINE_CONTROL_FILE = _control_segment_path('client_trampoline') 73CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 74CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 75CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 76VERIFY_CONTROL_FILE = _control_segment_path('verify') 77REPAIR_CONTROL_FILE = _control_segment_path('repair') 78PROVISION_CONTROL_FILE = _control_segment_path('provision') 79VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url') 80RESET_CONTROL_FILE = _control_segment_path('reset') 81GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats') 82 83 84def get_machine_dicts(machine_names, store_dir, in_lab, use_shadow_store, 85 host_attributes=None): 86 """Converts a list of machine names to list of dicts. 87 88 TODO(crbug.com/678430): This function temporarily has a side effect of 89 creating files under workdir for backing a FileStore. This side-effect will 90 go away once callers of autoserv start passing in the FileStore. 91 92 @param machine_names: A list of machine names. 93 @param store_dir: A directory to contain store backing files. 94 @param use_shadow_store: If True, we should create a ShadowingStore where 95 actual store is backed by the AFE but we create a backing file to 96 shadow the store. If False, backing file should already exist at: 97 ${store_dir}/${hostname}.store 98 @param in_lab: A boolean indicating whether we're running in lab. 99 @param host_attributes: Optional list of host attributes to add for each 100 host. 101 @returns: A list of dicts. Each dict has the following keys: 102 'hostname': Name of the machine originally in machine_names (str). 103 'afe_host': A frontend.Host object for the machine, or a stub if 104 in_lab is false. 105 'host_info_store': A host_info.CachingHostInfoStore object to obtain 106 host information. A stub if in_lab is False. 107 'connection_pool': ssh_multiplex.ConnectionPool instance to share 108 master ssh connection across control scripts. This is set to 109 None, and should be overridden for connection sharing. 110 """ 111 # See autoserv_parser.parse_args. Only one of in_lab or host_attributes can 112 # be provided. 113 if in_lab and host_attributes: 114 raise error.AutoservError( 115 'in_lab and host_attribute are mutually exclusive.') 116 117 machine_dict_list = [] 118 for machine in machine_names: 119 if not in_lab: 120 afe_host = server_utils.EmptyAFEHost() 121 host_info_store = host_info.InMemoryHostInfoStore() 122 if host_attributes is not None: 123 afe_host.attributes.update(host_attributes) 124 info = host_info.HostInfo(attributes=host_attributes) 125 host_info_store.commit(info) 126 elif use_shadow_store: 127 afe_host = _create_afe_host(machine) 128 host_info_store = _create_afe_backed_host_info_store(store_dir, 129 machine) 130 else: 131 afe_host = server_utils.EmptyAFEHost() 132 host_info_store = _create_file_backed_host_info_store(store_dir, 133 machine) 134 135 machine_dict_list.append({ 136 'hostname' : machine, 137 'afe_host' : afe_host, 138 'host_info_store': host_info_store, 139 'connection_pool': None, 140 }) 141 142 return machine_dict_list 143 144 145class status_indenter(base_job.status_indenter): 146 """Provide a simple integer-backed status indenter.""" 147 def __init__(self): 148 self._indent = 0 149 150 151 @property 152 def indent(self): 153 return self._indent 154 155 156 def increment(self): 157 self._indent += 1 158 159 160 def decrement(self): 161 self._indent -= 1 162 163 164 def get_context(self): 165 """Returns a context object for use by job.get_record_context.""" 166 class context(object): 167 def __init__(self, indenter, indent): 168 self._indenter = indenter 169 self._indent = indent 170 def restore(self): 171 self._indenter._indent = self._indent 172 return context(self, self._indent) 173 174 175class server_job_record_hook(object): 176 """The job.record hook for server job. Used to inject WARN messages from 177 the console or vlm whenever new logs are written, and to echo any logs 178 to INFO level logging. Implemented as a class so that it can use state to 179 block recursive calls, so that the hook can call job.record itself to 180 log WARN messages. 181 182 Depends on job._read_warnings and job._logger. 183 """ 184 def __init__(self, job): 185 self._job = job 186 self._being_called = False 187 188 189 def __call__(self, entry): 190 """A wrapper around the 'real' record hook, the _hook method, which 191 prevents recursion. This isn't making any effort to be threadsafe, 192 the intent is to outright block infinite recursion via a 193 job.record->_hook->job.record->_hook->job.record... chain.""" 194 if self._being_called: 195 return 196 self._being_called = True 197 try: 198 self._hook(self._job, entry) 199 finally: 200 self._being_called = False 201 202 203 @staticmethod 204 def _hook(job, entry): 205 """The core hook, which can safely call job.record.""" 206 entries = [] 207 # poll all our warning loggers for new warnings 208 for timestamp, msg in job._read_warnings(): 209 warning_entry = base_job.status_log_entry( 210 'WARN', None, None, msg, {}, timestamp=timestamp) 211 entries.append(warning_entry) 212 job.record_entry(warning_entry) 213 # echo rendered versions of all the status logs to info 214 entries.append(entry) 215 for entry in entries: 216 rendered_entry = job._logger.render_entry(entry) 217 logging.info(rendered_entry) 218 219 220class server_job(base_job.base_job): 221 """The server-side concrete implementation of base_job. 222 223 Optional properties provided by this implementation: 224 serverdir 225 226 warning_manager 227 warning_loggers 228 """ 229 230 _STATUS_VERSION = 1 231 232 # TODO crbug.com/285395 eliminate ssh_verbosity_flag 233 def __init__(self, control, args, resultdir, label, user, machines, 234 machine_dict_list, 235 client=False, 236 ssh_user=host_factory.DEFAULT_SSH_USER, 237 ssh_port=host_factory.DEFAULT_SSH_PORT, 238 ssh_pass=host_factory.DEFAULT_SSH_PASS, 239 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY, 240 ssh_options=host_factory.DEFAULT_SSH_OPTIONS, 241 group_name='', 242 tag='', disable_sysinfo=False, 243 control_filename=SERVER_CONTROL_FILENAME, 244 parent_job_id=None, in_lab=False, 245 use_client_trampoline=False): 246 """ 247 Create a server side job object. 248 249 @param control: The pathname of the control file. 250 @param args: Passed to the control file. 251 @param resultdir: Where to throw the results. 252 @param label: Description of the job. 253 @param user: Username for the job (email address). 254 @param machines: A list of hostnames of the machines to use for the job. 255 @param machine_dict_list: A list of dicts for each of the machines above 256 as returned by get_machine_dicts. 257 @param client: True if this is a client-side control file. 258 @param ssh_user: The SSH username. [root] 259 @param ssh_port: The SSH port number. [22] 260 @param ssh_pass: The SSH passphrase, if needed. 261 @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv', 262 '-vvv', or an empty string if not needed. 263 @param ssh_options: A string giving additional options that will be 264 included in ssh commands. 265 @param group_name: If supplied, this will be written out as 266 host_group_name in the keyvals file for the parser. 267 @param tag: The job execution tag from the scheduler. [optional] 268 @param disable_sysinfo: Whether we should disable the sysinfo step of 269 tests for a modest shortening of test time. [optional] 270 @param control_filename: The filename where the server control file 271 should be written in the results directory. 272 @param parent_job_id: Job ID of the parent job. Default to None if the 273 job does not have a parent job. 274 @param in_lab: Boolean that indicates if this is running in the lab 275 environment. 276 @param use_client_trampoline: Boolean that indicates whether to 277 use the client trampoline flow. If this is True, control 278 is interpreted as the name of the client test to run. 279 The client control file will be client_trampoline. The 280 test name will be passed to client_trampoline, which will 281 install the test package and re-exec the actual test 282 control file. 283 """ 284 super(server_job, self).__init__(resultdir=resultdir) 285 self.control = control 286 self._uncollected_log_file = os.path.join(self.resultdir, 287 'uncollected_logs') 288 debugdir = os.path.join(self.resultdir, 'debug') 289 if not os.path.exists(debugdir): 290 os.mkdir(debugdir) 291 292 if user: 293 self.user = user 294 else: 295 self.user = getpass.getuser() 296 297 self.args = args 298 self.label = label 299 self.machines = machines 300 self._client = client 301 self.warning_loggers = set() 302 self.warning_manager = warning_manager() 303 self._ssh_user = ssh_user 304 self._ssh_port = ssh_port 305 self._ssh_pass = ssh_pass 306 self._ssh_verbosity_flag = ssh_verbosity_flag 307 self._ssh_options = ssh_options 308 self.tag = tag 309 self.hosts = set() 310 self.drop_caches = False 311 self.drop_caches_between_iterations = False 312 self._control_filename = control_filename 313 self._disable_sysinfo = disable_sysinfo 314 self._use_client_trampoline = use_client_trampoline 315 316 self.logging = logging_manager.get_logging_manager( 317 manage_stdout_and_stderr=True, redirect_fds=True) 318 subcommand.logging_manager_object = self.logging 319 320 self.sysinfo = sysinfo.sysinfo(self.resultdir) 321 self.profilers = profilers.profilers(self) 322 323 job_data = {'label' : label, 'user' : user, 324 'hostname' : ','.join(machines), 325 'drone' : platform.node(), 326 'status_version' : str(self._STATUS_VERSION), 327 'job_started' : str(int(time.time()))} 328 # Save parent job id to keyvals, so parser can retrieve the info and 329 # write to tko_jobs record. 330 if parent_job_id: 331 job_data['parent_job_id'] = parent_job_id 332 if group_name: 333 job_data['host_group_name'] = group_name 334 335 # only write these keyvals out on the first job in a resultdir 336 if 'job_started' not in utils.read_keyval(self.resultdir): 337 job_data.update(self._get_job_data()) 338 utils.write_keyval(self.resultdir, job_data) 339 340 self.pkgmgr = packages.PackageManager( 341 self.autodir, run_function_dargs={'timeout':600}) 342 343 self._register_subcommand_hooks() 344 345 # We no longer parse results as part of the server_job. These arguments 346 # can't be dropped yet because clients haven't all be cleaned up yet. 347 self.num_tests_run = -1 348 self.num_tests_failed = -1 349 350 # set up the status logger 351 self._indenter = status_indenter() 352 self._logger = base_job.status_logger( 353 self, self._indenter, 'status.log', 'status.log', 354 record_hook=server_job_record_hook(self)) 355 356 # Initialize a flag to indicate DUT failure during the test, e.g., 357 # unexpected reboot. 358 self.failed_with_device_error = False 359 360 self._connection_pool = ssh_multiplex.ConnectionPool() 361 362 # List of functions to run after the main job function. 363 self._post_run_hooks = [] 364 365 self.parent_job_id = parent_job_id 366 self.in_lab = in_lab 367 self.machine_dict_list = machine_dict_list 368 for machine_dict in self.machine_dict_list: 369 machine_dict['connection_pool'] = self._connection_pool 370 371 # TODO(jrbarnette) The harness attribute is only relevant to 372 # client jobs, but it's required to be present, or we will fail 373 # server job unit tests. Yes, really. 374 # 375 # TODO(jrbarnette) The utility of the 'harness' attribute even 376 # to client jobs is suspect. Probably, we should remove it. 377 self.harness = None 378 379 # TODO(ayatane): fast and max_result_size_KB are not set for 380 # client_trampoline jobs. 381 if control and not use_client_trampoline: 382 parsed_control = control_data.parse_control( 383 control, raise_warnings=False) 384 self.fast = parsed_control.fast 385 self.max_result_size_KB = parsed_control.max_result_size_KB 386 else: 387 self.fast = False 388 # Set the maximum result size to be the default specified in 389 # global config, if the job has no control file associated. 390 self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB 391 392 393 @classmethod 394 def _find_base_directories(cls): 395 """ 396 Determine locations of autodir, clientdir and serverdir. Assumes 397 that this file is located within serverdir and uses __file__ along 398 with relative paths to resolve the location. 399 """ 400 serverdir = os.path.abspath(os.path.dirname(__file__)) 401 autodir = os.path.normpath(os.path.join(serverdir, '..')) 402 clientdir = os.path.join(autodir, 'client') 403 return autodir, clientdir, serverdir 404 405 406 def _find_resultdir(self, resultdir, *args, **dargs): 407 """ 408 Determine the location of resultdir. For server jobs we expect one to 409 always be explicitly passed in to __init__, so just return that. 410 """ 411 if resultdir: 412 return os.path.normpath(resultdir) 413 else: 414 return None 415 416 417 def _get_status_logger(self): 418 """Return a reference to the status logger.""" 419 return self._logger 420 421 422 @staticmethod 423 def _load_control_file(path): 424 f = open(path) 425 try: 426 control_file = f.read() 427 finally: 428 f.close() 429 return re.sub('\r', '', control_file) 430 431 432 def _register_subcommand_hooks(self): 433 """ 434 Register some hooks into the subcommand modules that allow us 435 to properly clean up self.hosts created in forked subprocesses. 436 """ 437 def on_fork(cmd): 438 self._existing_hosts_on_fork = set(self.hosts) 439 def on_join(cmd): 440 new_hosts = self.hosts - self._existing_hosts_on_fork 441 for host in new_hosts: 442 host.close() 443 subcommand.subcommand.register_fork_hook(on_fork) 444 subcommand.subcommand.register_join_hook(on_join) 445 446 447 # TODO crbug.com/285395 add a kwargs parameter. 448 def _make_namespace(self): 449 """Create a namespace dictionary to be passed along to control file. 450 451 Creates a namespace argument populated with standard values: 452 machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag, 453 and ssh_options. 454 """ 455 namespace = {'machines' : self.machine_dict_list, 456 'job' : self, 457 'ssh_user' : self._ssh_user, 458 'ssh_port' : self._ssh_port, 459 'ssh_pass' : self._ssh_pass, 460 'ssh_verbosity_flag' : self._ssh_verbosity_flag, 461 'ssh_options' : self._ssh_options} 462 return namespace 463 464 465 def cleanup(self, labels): 466 """Cleanup machines. 467 468 @param labels: Comma separated job labels, will be used to 469 determine special task actions. 470 """ 471 if not self.machines: 472 raise error.AutoservError('No machines specified to cleanup') 473 if self.resultdir: 474 os.chdir(self.resultdir) 475 476 namespace = self._make_namespace() 477 namespace.update({'job_labels': labels, 'args': ''}) 478 self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False) 479 480 481 def verify(self, labels): 482 """Verify machines are all ssh-able. 483 484 @param labels: Comma separated job labels, will be used to 485 determine special task actions. 486 """ 487 if not self.machines: 488 raise error.AutoservError('No machines specified to verify') 489 if self.resultdir: 490 os.chdir(self.resultdir) 491 492 namespace = self._make_namespace() 493 namespace.update({'job_labels': labels, 'args': ''}) 494 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 495 496 497 def reset(self, labels): 498 """Reset machines by first cleanup then verify each machine. 499 500 @param labels: Comma separated job labels, will be used to 501 determine special task actions. 502 """ 503 if not self.machines: 504 raise error.AutoservError('No machines specified to reset.') 505 if self.resultdir: 506 os.chdir(self.resultdir) 507 508 namespace = self._make_namespace() 509 namespace.update({'job_labels': labels, 'args': ''}) 510 self._execute_code(RESET_CONTROL_FILE, namespace, protect=False) 511 512 513 def repair(self, labels): 514 """Repair machines. 515 516 @param labels: Comma separated job labels, will be used to 517 determine special task actions. 518 """ 519 if not self.machines: 520 raise error.AutoservError('No machines specified to repair') 521 if self.resultdir: 522 os.chdir(self.resultdir) 523 524 namespace = self._make_namespace() 525 namespace.update({'job_labels': labels, 'args': ''}) 526 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 527 528 529 def provision(self, labels): 530 """ 531 Provision all hosts to match |labels|. 532 533 @param labels: A comma seperated string of labels to provision the 534 host to. 535 536 """ 537 control = self._load_control_file(PROVISION_CONTROL_FILE) 538 self.run(control=control, job_labels=labels) 539 540 541 def precheck(self): 542 """ 543 perform any additional checks in derived classes. 544 """ 545 pass 546 547 548 def enable_external_logging(self): 549 """ 550 Start or restart external logging mechanism. 551 """ 552 pass 553 554 555 def disable_external_logging(self): 556 """ 557 Pause or stop external logging mechanism. 558 """ 559 pass 560 561 562 def use_external_logging(self): 563 """ 564 Return True if external logging should be used. 565 """ 566 return False 567 568 569 def _make_parallel_wrapper(self, function, machines, log): 570 """Wrap function as appropriate for calling by parallel_simple.""" 571 # machines could be a list of dictionaries, e.g., 572 # [{'host_attributes': {}, 'hostname': '100.96.51.226'}] 573 # The dictionary is generated in server_job.__init__, refer to 574 # variable machine_dict_list, then passed in with namespace, see method 575 # server_job._make_namespace. 576 # To compare the machinese to self.machines, which is a list of machine 577 # hostname, we need to convert machines back to a list of hostnames. 578 if (machines and isinstance(machines, list) 579 and isinstance(machines[0], dict)): 580 machines = [m['hostname'] for m in machines] 581 if len(machines) > 1 and log: 582 def wrapper(machine): 583 hostname = server_utils.get_hostname_from_machine(machine) 584 self.push_execution_context(hostname) 585 os.chdir(self.resultdir) 586 machine_data = {'hostname' : hostname, 587 'status_version' : str(self._STATUS_VERSION)} 588 utils.write_keyval(self.resultdir, machine_data) 589 result = function(machine) 590 return result 591 else: 592 wrapper = function 593 return wrapper 594 595 596 def parallel_simple(self, function, machines, log=True, timeout=None, 597 return_results=False): 598 """ 599 Run 'function' using parallel_simple, with an extra wrapper to handle 600 the necessary setup for continuous parsing, if possible. If continuous 601 parsing is already properly initialized then this should just work. 602 603 @param function: A callable to run in parallel given each machine. 604 @param machines: A list of machine names to be passed one per subcommand 605 invocation of function. 606 @param log: If True, output will be written to output in a subdirectory 607 named after each machine. 608 @param timeout: Seconds after which the function call should timeout. 609 @param return_results: If True instead of an AutoServError being raised 610 on any error a list of the results|exceptions from the function 611 called on each arg is returned. [default: False] 612 613 @raises error.AutotestError: If any of the functions failed. 614 """ 615 wrapper = self._make_parallel_wrapper(function, machines, log) 616 return subcommand.parallel_simple( 617 wrapper, machines, 618 subdir_name_constructor=server_utils.get_hostname_from_machine, 619 log=log, timeout=timeout, return_results=return_results) 620 621 622 def parallel_on_machines(self, function, machines, timeout=None): 623 """ 624 @param function: Called in parallel with one machine as its argument. 625 @param machines: A list of machines to call function(machine) on. 626 @param timeout: Seconds after which the function call should timeout. 627 628 @returns A list of machines on which function(machine) returned 629 without raising an exception. 630 """ 631 results = self.parallel_simple(function, machines, timeout=timeout, 632 return_results=True) 633 success_machines = [] 634 for result, machine in itertools.izip(results, machines): 635 if not isinstance(result, Exception): 636 success_machines.append(machine) 637 return success_machines 638 639 640 def record_skipped_test(self, skipped_test, message=None): 641 """Insert a failure record into status.log for this test.""" 642 msg = message 643 if msg is None: 644 msg = 'No valid machines found for test %s.' % skipped_test 645 logging.info(msg) 646 self.record('START', None, skipped_test.test_name) 647 self.record('INFO', None, skipped_test.test_name, msg) 648 self.record('END TEST_NA', None, skipped_test.test_name, msg) 649 650 651 def _has_failed_tests(self): 652 """Parse status log for failed tests. 653 654 This checks the current working directory and is intended only for use 655 by the run() method. 656 657 @return boolean 658 """ 659 path = os.getcwd() 660 661 # TODO(ayatane): Copied from tko/parse.py. Needs extensive refactor to 662 # make code reuse plausible. 663 job_keyval = tko_models.job.read_keyval(path) 664 status_version = job_keyval.get("status_version", 0) 665 666 # parse out the job 667 parser = parser_lib.parser(status_version) 668 job = parser.make_job(path) 669 status_log = os.path.join(path, "status.log") 670 if not os.path.exists(status_log): 671 status_log = os.path.join(path, "status") 672 if not os.path.exists(status_log): 673 logging.warning("! Unable to parse job, no status file") 674 return True 675 676 # parse the status logs 677 status_lines = open(status_log).readlines() 678 parser.start(job) 679 tests = parser.end(status_lines) 680 681 # parser.end can return the same object multiple times, so filter out 682 # dups 683 job.tests = [] 684 already_added = set() 685 for test in tests: 686 if test not in already_added: 687 already_added.add(test) 688 job.tests.append(test) 689 690 failed = False 691 for test in job.tests: 692 # The current job is still running and shouldn't count as failed. 693 # The parser will fail to parse the exit status of the job since it 694 # hasn't exited yet (this running right now is the job). 695 failed = failed or (test.status != 'GOOD' 696 and not _is_current_server_job(test)) 697 return failed 698 699 700 def _collect_crashes(self, namespace, collect_crashinfo): 701 """Collect crashes. 702 703 @param namespace: namespace dict. 704 @param collect_crashinfo: whether to collect crashinfo in addition to 705 dumps 706 """ 707 if collect_crashinfo: 708 # includes crashdumps 709 crash_control_file = CRASHINFO_CONTROL_FILE 710 else: 711 crash_control_file = CRASHDUMPS_CONTROL_FILE 712 self._execute_code(crash_control_file, namespace) 713 714 715 _USE_TEMP_DIR = object() 716 def run(self, collect_crashdumps=True, namespace={}, control=None, 717 control_file_dir=None, verify_job_repo_url=False, 718 only_collect_crashinfo=False, skip_crash_collection=False, 719 job_labels='', use_packaging=True): 720 # for a normal job, make sure the uncollected logs file exists 721 # for a crashinfo-only run it should already exist, bail out otherwise 722 created_uncollected_logs = False 723 logging.info("I am PID %s", os.getpid()) 724 if self.resultdir and not os.path.exists(self._uncollected_log_file): 725 if only_collect_crashinfo: 726 # if this is a crashinfo-only run, and there were no existing 727 # uncollected logs, just bail out early 728 logging.info("No existing uncollected logs, " 729 "skipping crashinfo collection") 730 return 731 else: 732 log_file = open(self._uncollected_log_file, "w") 733 pickle.dump([], log_file) 734 log_file.close() 735 created_uncollected_logs = True 736 737 # use a copy so changes don't affect the original dictionary 738 namespace = namespace.copy() 739 machines = self.machines 740 if control is None: 741 if self.control is None: 742 control = '' 743 elif self._use_client_trampoline: 744 control = self._load_control_file( 745 CLIENT_TRAMPOLINE_CONTROL_FILE) 746 # repr of a string is safe for eval. 747 control = (('trampoline_testname = %r\n' % str(self.control)) 748 + control) 749 else: 750 control = self._load_control_file(self.control) 751 if control_file_dir is None: 752 control_file_dir = self.resultdir 753 754 self.aborted = False 755 namespace.update(self._make_namespace()) 756 namespace.update({ 757 'args': self.args, 758 'job_labels': job_labels, 759 'gtest_runner': site_gtest_runner.gtest_runner(), 760 }) 761 test_start_time = int(time.time()) 762 763 if self.resultdir: 764 os.chdir(self.resultdir) 765 # touch status.log so that the parser knows a job is running here 766 open(self.get_status_log_path(), 'a').close() 767 self.enable_external_logging() 768 769 collect_crashinfo = True 770 temp_control_file_dir = None 771 try: 772 try: 773 if not self.fast: 774 with metrics.SecondsTimer( 775 'chromeos/autotest/job/get_network_stats', 776 fields = {'stage': 'start'}): 777 namespace['network_stats_label'] = 'at-start' 778 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, 779 namespace) 780 781 if only_collect_crashinfo: 782 return 783 784 # If the verify_job_repo_url option is set but we're unable 785 # to actually verify that the job_repo_url contains the autotest 786 # package, this job will fail. 787 if verify_job_repo_url: 788 self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE, 789 namespace) 790 else: 791 logging.warning('Not checking if job_repo_url contains ' 792 'autotest packages on %s', machines) 793 794 # determine the dir to write the control files to 795 cfd_specified = (control_file_dir 796 and control_file_dir is not self._USE_TEMP_DIR) 797 if cfd_specified: 798 temp_control_file_dir = None 799 else: 800 temp_control_file_dir = tempfile.mkdtemp( 801 suffix='temp_control_file_dir') 802 control_file_dir = temp_control_file_dir 803 server_control_file = os.path.join(control_file_dir, 804 self._control_filename) 805 client_control_file = os.path.join(control_file_dir, 806 CLIENT_CONTROL_FILENAME) 807 if self._client: 808 namespace['control'] = control 809 utils.open_write_close(client_control_file, control) 810 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 811 server_control_file) 812 else: 813 utils.open_write_close(server_control_file, control) 814 815 logging.info("Processing control file") 816 namespace['use_packaging'] = use_packaging 817 self._execute_code(server_control_file, namespace) 818 logging.info("Finished processing control file") 819 820 # If no device error occured, no need to collect crashinfo. 821 collect_crashinfo = self.failed_with_device_error 822 except Exception as e: 823 try: 824 logging.exception( 825 'Exception escaped control file, job aborting:') 826 reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX, 827 ' ', str(e)) 828 self.record('INFO', None, None, str(e), 829 {'job_abort_reason': reason}) 830 except: 831 pass # don't let logging exceptions here interfere 832 raise 833 finally: 834 if temp_control_file_dir: 835 # Clean up temp directory used for copies of the control files 836 try: 837 shutil.rmtree(temp_control_file_dir) 838 except Exception as e: 839 logging.warning('Could not remove temp directory %s: %s', 840 temp_control_file_dir, e) 841 842 if machines and (collect_crashdumps or collect_crashinfo): 843 if skip_crash_collection or self.fast: 844 logging.info('Skipping crash dump/info collection ' 845 'as requested.') 846 else: 847 with metrics.SecondsTimer( 848 'chromeos/autotest/job/collect_crashinfo'): 849 namespace['test_start_time'] = test_start_time 850 # Remove crash files for passing tests. 851 # TODO(ayatane): Tests that create crash files should be 852 # reported. 853 namespace['has_failed_tests'] = self._has_failed_tests() 854 self._collect_crashes(namespace, collect_crashinfo) 855 self.disable_external_logging() 856 if self._uncollected_log_file and created_uncollected_logs: 857 os.remove(self._uncollected_log_file) 858 859 if not self.fast: 860 with metrics.SecondsTimer( 861 'chromeos/autotest/job/get_network_stats', 862 fields = {'stage': 'end'}): 863 namespace['network_stats_label'] = 'at-end' 864 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, 865 namespace) 866 867 868 def run_test(self, url, *args, **dargs): 869 """ 870 Summon a test object and run it. 871 872 tag 873 tag to add to testname 874 url 875 url of the test to run 876 """ 877 if self._disable_sysinfo: 878 dargs['disable_sysinfo'] = True 879 880 group, testname = self.pkgmgr.get_package_name(url, 'test') 881 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 882 outputdir = self._make_test_outputdir(subdir) 883 884 def group_func(): 885 try: 886 test.runtest(self, url, tag, args, dargs) 887 except error.TestBaseException as e: 888 self.record(e.exit_status, subdir, testname, str(e)) 889 raise 890 except Exception as e: 891 info = str(e) + "\n" + traceback.format_exc() 892 self.record('FAIL', subdir, testname, info) 893 raise 894 else: 895 self.record('GOOD', subdir, testname, 'completed successfully') 896 897 try: 898 result = self._run_group(testname, subdir, group_func) 899 except error.TestBaseException as e: 900 return False 901 else: 902 return True 903 904 905 def _run_group(self, name, subdir, function, *args, **dargs): 906 """Underlying method for running something inside of a group.""" 907 result, exc_info = None, None 908 try: 909 self.record('START', subdir, name) 910 result = function(*args, **dargs) 911 except error.TestBaseException as e: 912 self.record("END %s" % e.exit_status, subdir, name) 913 raise 914 except Exception as e: 915 err_msg = str(e) + '\n' 916 err_msg += traceback.format_exc() 917 self.record('END ABORT', subdir, name, err_msg) 918 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 919 else: 920 self.record('END GOOD', subdir, name) 921 finally: 922 for hook in self._post_run_hooks: 923 hook() 924 925 return result 926 927 928 def run_group(self, function, *args, **dargs): 929 """\ 930 @param function: subroutine to run 931 @returns: (result, exc_info). When the call succeeds, result contains 932 the return value of |function| and exc_info is None. If 933 |function| raises an exception, exc_info contains the tuple 934 returned by sys.exc_info(), and result is None. 935 """ 936 937 name = function.__name__ 938 # Allow the tag for the group to be specified. 939 tag = dargs.pop('tag', None) 940 if tag: 941 name = tag 942 943 try: 944 result = self._run_group(name, None, function, *args, **dargs)[0] 945 except error.TestBaseException: 946 return None, sys.exc_info() 947 return result, None 948 949 950 def run_op(self, op, op_func, get_kernel_func): 951 """\ 952 A specialization of run_group meant specifically for handling 953 management operation. Includes support for capturing the kernel version 954 after the operation. 955 956 Args: 957 op: name of the operation. 958 op_func: a function that carries out the operation (reboot, suspend) 959 get_kernel_func: a function that returns a string 960 representing the kernel version. 961 """ 962 try: 963 self.record('START', None, op) 964 op_func() 965 except Exception as e: 966 err_msg = str(e) + '\n' + traceback.format_exc() 967 self.record('END FAIL', None, op, err_msg) 968 raise 969 else: 970 kernel = get_kernel_func() 971 self.record('END GOOD', None, op, 972 optional_fields={"kernel": kernel}) 973 974 975 def run_control(self, path): 976 """Execute a control file found at path (relative to the autotest 977 path). Intended for executing a control file within a control file, 978 not for running the top-level job control file.""" 979 path = os.path.join(self.autodir, path) 980 control_file = self._load_control_file(path) 981 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR) 982 983 984 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 985 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 986 on_every_test) 987 988 989 def add_sysinfo_logfile(self, file, on_every_test=False): 990 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 991 992 993 def _add_sysinfo_loggable(self, loggable, on_every_test): 994 if on_every_test: 995 self.sysinfo.test_loggables.add(loggable) 996 else: 997 self.sysinfo.boot_loggables.add(loggable) 998 999 1000 def _read_warnings(self): 1001 """Poll all the warning loggers and extract any new warnings that have 1002 been logged. If the warnings belong to a category that is currently 1003 disabled, this method will discard them and they will no longer be 1004 retrievable. 1005 1006 Returns a list of (timestamp, message) tuples, where timestamp is an 1007 integer epoch timestamp.""" 1008 warnings = [] 1009 while True: 1010 # pull in a line of output from every logger that has 1011 # output ready to be read 1012 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 1013 closed_loggers = set() 1014 for logger in loggers: 1015 line = logger.readline() 1016 # record any broken pipes (aka line == empty) 1017 if len(line) == 0: 1018 closed_loggers.add(logger) 1019 continue 1020 # parse out the warning 1021 timestamp, msgtype, msg = line.split('\t', 2) 1022 timestamp = int(timestamp) 1023 # if the warning is valid, add it to the results 1024 if self.warning_manager.is_valid(timestamp, msgtype): 1025 warnings.append((timestamp, msg.strip())) 1026 1027 # stop listening to loggers that are closed 1028 self.warning_loggers -= closed_loggers 1029 1030 # stop if none of the loggers have any output left 1031 if not loggers: 1032 break 1033 1034 # sort into timestamp order 1035 warnings.sort() 1036 return warnings 1037 1038 1039 def _unique_subdirectory(self, base_subdirectory_name): 1040 """Compute a unique results subdirectory based on the given name. 1041 1042 Appends base_subdirectory_name with a number as necessary to find a 1043 directory name that doesn't already exist. 1044 """ 1045 subdirectory = base_subdirectory_name 1046 counter = 1 1047 while os.path.exists(os.path.join(self.resultdir, subdirectory)): 1048 subdirectory = base_subdirectory_name + '.' + str(counter) 1049 counter += 1 1050 return subdirectory 1051 1052 1053 def get_record_context(self): 1054 """Returns an object representing the current job.record context. 1055 1056 The object returned is an opaque object with a 0-arg restore method 1057 which can be called to restore the job.record context (i.e. indentation) 1058 to the current level. The intention is that it should be used when 1059 something external which generate job.record calls (e.g. an autotest 1060 client) can fail catastrophically and the server job record state 1061 needs to be reset to its original "known good" state. 1062 1063 @return: A context object with a 0-arg restore() method.""" 1064 return self._indenter.get_context() 1065 1066 1067 def record_summary(self, status_code, test_name, reason='', attributes=None, 1068 distinguishing_attributes=(), child_test_ids=None): 1069 """Record a summary test result. 1070 1071 @param status_code: status code string, see 1072 common_lib.log.is_valid_status() 1073 @param test_name: name of the test 1074 @param reason: (optional) string providing detailed reason for test 1075 outcome 1076 @param attributes: (optional) dict of string keyvals to associate with 1077 this result 1078 @param distinguishing_attributes: (optional) list of attribute names 1079 that should be used to distinguish identically-named test 1080 results. These attributes should be present in the attributes 1081 parameter. This is used to generate user-friendly subdirectory 1082 names. 1083 @param child_test_ids: (optional) list of test indices for test results 1084 used in generating this result. 1085 """ 1086 subdirectory_name_parts = [test_name] 1087 for attribute in distinguishing_attributes: 1088 assert attributes 1089 assert attribute in attributes, '%s not in %s' % (attribute, 1090 attributes) 1091 subdirectory_name_parts.append(attributes[attribute]) 1092 base_subdirectory_name = '.'.join(subdirectory_name_parts) 1093 1094 subdirectory = self._unique_subdirectory(base_subdirectory_name) 1095 subdirectory_path = os.path.join(self.resultdir, subdirectory) 1096 os.mkdir(subdirectory_path) 1097 1098 self.record(status_code, subdirectory, test_name, 1099 status=reason, optional_fields={'is_summary': True}) 1100 1101 if attributes: 1102 utils.write_keyval(subdirectory_path, attributes) 1103 1104 if child_test_ids: 1105 ids_string = ','.join(str(test_id) for test_id in child_test_ids) 1106 summary_data = {'child_test_ids': ids_string} 1107 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'), 1108 summary_data) 1109 1110 1111 def add_post_run_hook(self, hook): 1112 """ 1113 Registers a hook to run after the main job function. 1114 1115 This provides a mechanism by which tests that perform multiple tests of 1116 their own can write additional top-level results to the TKO status.log 1117 file. 1118 1119 @param hook: Function to invoke (without any args) after the main job 1120 function completes and the job status is logged. 1121 """ 1122 self._post_run_hooks.append(hook) 1123 1124 1125 def disable_warnings(self, warning_type): 1126 self.warning_manager.disable_warnings(warning_type) 1127 self.record("INFO", None, None, 1128 "disabling %s warnings" % warning_type, 1129 {"warnings.disable": warning_type}) 1130 1131 1132 def enable_warnings(self, warning_type): 1133 self.warning_manager.enable_warnings(warning_type) 1134 self.record("INFO", None, None, 1135 "enabling %s warnings" % warning_type, 1136 {"warnings.enable": warning_type}) 1137 1138 1139 def get_status_log_path(self, subdir=None): 1140 """Return the path to the job status log. 1141 1142 @param subdir - Optional paramter indicating that you want the path 1143 to a subdirectory status log. 1144 1145 @returns The path where the status log should be. 1146 """ 1147 if self.resultdir: 1148 if subdir: 1149 return os.path.join(self.resultdir, subdir, "status.log") 1150 else: 1151 return os.path.join(self.resultdir, "status.log") 1152 else: 1153 return None 1154 1155 1156 def _update_uncollected_logs_list(self, update_func): 1157 """Updates the uncollected logs list in a multi-process safe manner. 1158 1159 @param update_func - a function that updates the list of uncollected 1160 logs. Should take one parameter, the list to be updated. 1161 """ 1162 # Skip log collection if file _uncollected_log_file does not exist. 1163 if not (self._uncollected_log_file and 1164 os.path.exists(self._uncollected_log_file)): 1165 return 1166 if self._uncollected_log_file: 1167 log_file = open(self._uncollected_log_file, "r+") 1168 fcntl.flock(log_file, fcntl.LOCK_EX) 1169 try: 1170 uncollected_logs = pickle.load(log_file) 1171 update_func(uncollected_logs) 1172 log_file.seek(0) 1173 log_file.truncate() 1174 pickle.dump(uncollected_logs, log_file) 1175 log_file.flush() 1176 finally: 1177 fcntl.flock(log_file, fcntl.LOCK_UN) 1178 log_file.close() 1179 1180 1181 def add_client_log(self, hostname, remote_path, local_path): 1182 """Adds a new set of client logs to the list of uncollected logs, 1183 to allow for future log recovery. 1184 1185 @param host - the hostname of the machine holding the logs 1186 @param remote_path - the directory on the remote machine holding logs 1187 @param local_path - the local directory to copy the logs into 1188 """ 1189 def update_func(logs_list): 1190 logs_list.append((hostname, remote_path, local_path)) 1191 self._update_uncollected_logs_list(update_func) 1192 1193 1194 def remove_client_log(self, hostname, remote_path, local_path): 1195 """Removes a set of client logs from the list of uncollected logs, 1196 to allow for future log recovery. 1197 1198 @param host - the hostname of the machine holding the logs 1199 @param remote_path - the directory on the remote machine holding logs 1200 @param local_path - the local directory to copy the logs into 1201 """ 1202 def update_func(logs_list): 1203 logs_list.remove((hostname, remote_path, local_path)) 1204 self._update_uncollected_logs_list(update_func) 1205 1206 1207 def get_client_logs(self): 1208 """Retrieves the list of uncollected logs, if it exists. 1209 1210 @returns A list of (host, remote_path, local_path) tuples. Returns 1211 an empty list if no uncollected logs file exists. 1212 """ 1213 log_exists = (self._uncollected_log_file and 1214 os.path.exists(self._uncollected_log_file)) 1215 if log_exists: 1216 return pickle.load(open(self._uncollected_log_file)) 1217 else: 1218 return [] 1219 1220 1221 def _fill_server_control_namespace(self, namespace, protect=True): 1222 """ 1223 Prepare a namespace to be used when executing server control files. 1224 1225 This sets up the control file API by importing modules and making them 1226 available under the appropriate names within namespace. 1227 1228 For use by _execute_code(). 1229 1230 Args: 1231 namespace: The namespace dictionary to fill in. 1232 protect: Boolean. If True (the default) any operation that would 1233 clobber an existing entry in namespace will cause an error. 1234 Raises: 1235 error.AutoservError: When a name would be clobbered by import. 1236 """ 1237 def _import_names(module_name, names=()): 1238 """ 1239 Import a module and assign named attributes into namespace. 1240 1241 Args: 1242 module_name: The string module name. 1243 names: A limiting list of names to import from module_name. If 1244 empty (the default), all names are imported from the module 1245 similar to a "from foo.bar import *" statement. 1246 Raises: 1247 error.AutoservError: When a name being imported would clobber 1248 a name already in namespace. 1249 """ 1250 module = __import__(module_name, {}, {}, names) 1251 1252 # No names supplied? Import * from the lowest level module. 1253 # (Ugh, why do I have to implement this part myself?) 1254 if not names: 1255 for submodule_name in module_name.split('.')[1:]: 1256 module = getattr(module, submodule_name) 1257 if hasattr(module, '__all__'): 1258 names = getattr(module, '__all__') 1259 else: 1260 names = dir(module) 1261 1262 # Install each name into namespace, checking to make sure it 1263 # doesn't override anything that already exists. 1264 for name in names: 1265 # Check for conflicts to help prevent future problems. 1266 if name in namespace and protect: 1267 if namespace[name] is not getattr(module, name): 1268 raise error.AutoservError('importing name ' 1269 '%s from %s %r would override %r' % 1270 (name, module_name, getattr(module, name), 1271 namespace[name])) 1272 else: 1273 # Encourage cleanliness and the use of __all__ for a 1274 # more concrete API with less surprises on '*' imports. 1275 warnings.warn('%s (%r) being imported from %s for use ' 1276 'in server control files is not the ' 1277 'first occurrence of that import.' % 1278 (name, namespace[name], module_name)) 1279 1280 namespace[name] = getattr(module, name) 1281 1282 1283 # This is the equivalent of prepending a bunch of import statements to 1284 # the front of the control script. 1285 namespace.update(os=os, sys=sys, logging=logging) 1286 _import_names('autotest_lib.server', 1287 ('hosts', 'autotest', 'standalone_profiler')) 1288 _import_names('autotest_lib.server.subcommand', 1289 ('parallel', 'parallel_simple', 'subcommand')) 1290 _import_names('autotest_lib.server.utils', 1291 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 1292 _import_names('autotest_lib.client.common_lib.error') 1293 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 1294 1295 # Inject ourself as the job object into other classes within the API. 1296 # (Yuck, this injection is a gross thing be part of a public API. -gps) 1297 # 1298 # XXX Autotest does not appear to use .job. Who does? 1299 namespace['autotest'].Autotest.job = self 1300 # server.hosts.base_classes.Host uses .job. 1301 namespace['hosts'].Host.job = self 1302 namespace['hosts'].factory.ssh_user = self._ssh_user 1303 namespace['hosts'].factory.ssh_port = self._ssh_port 1304 namespace['hosts'].factory.ssh_pass = self._ssh_pass 1305 namespace['hosts'].factory.ssh_verbosity_flag = ( 1306 self._ssh_verbosity_flag) 1307 namespace['hosts'].factory.ssh_options = self._ssh_options 1308 1309 1310 def _execute_code(self, code_file, namespace, protect=True): 1311 """ 1312 Execute code using a copy of namespace as a server control script. 1313 1314 Unless protect_namespace is explicitly set to False, the dict will not 1315 be modified. 1316 1317 Args: 1318 code_file: The filename of the control file to execute. 1319 namespace: A dict containing names to make available during execution. 1320 protect: Boolean. If True (the default) a copy of the namespace dict 1321 is used during execution to prevent the code from modifying its 1322 contents outside of this function. If False the raw dict is 1323 passed in and modifications will be allowed. 1324 """ 1325 if protect: 1326 namespace = namespace.copy() 1327 self._fill_server_control_namespace(namespace, protect=protect) 1328 # TODO: Simplify and get rid of the special cases for only 1 machine. 1329 if len(self.machines) > 1: 1330 machines_text = '\n'.join(self.machines) + '\n' 1331 # Only rewrite the file if it does not match our machine list. 1332 try: 1333 machines_f = open(MACHINES_FILENAME, 'r') 1334 existing_machines_text = machines_f.read() 1335 machines_f.close() 1336 except EnvironmentError: 1337 existing_machines_text = None 1338 if machines_text != existing_machines_text: 1339 utils.open_write_close(MACHINES_FILENAME, machines_text) 1340 execfile(code_file, namespace, namespace) 1341 1342 1343 def preprocess_client_state(self): 1344 """ 1345 Produce a state file for initializing the state of a client job. 1346 1347 Creates a new client state file with all the current server state, as 1348 well as some pre-set client state. 1349 1350 @returns The path of the file the state was written into. 1351 """ 1352 # initialize the sysinfo state 1353 self._state.set('client', 'sysinfo', self.sysinfo.serialize()) 1354 1355 # dump the state out to a tempfile 1356 fd, file_path = tempfile.mkstemp(dir=self.tmpdir) 1357 os.close(fd) 1358 1359 # write_to_file doesn't need locking, we exclusively own file_path 1360 self._state.write_to_file(file_path) 1361 return file_path 1362 1363 1364 def postprocess_client_state(self, state_path): 1365 """ 1366 Update the state of this job with the state from a client job. 1367 1368 Updates the state of the server side of a job with the final state 1369 of a client job that was run. Updates the non-client-specific state, 1370 pulls in some specific bits from the client-specific state, and then 1371 discards the rest. Removes the state file afterwards 1372 1373 @param state_file A path to the state file from the client. 1374 """ 1375 # update the on-disk state 1376 try: 1377 self._state.read_from_file(state_path) 1378 os.remove(state_path) 1379 except OSError, e: 1380 # ignore file-not-found errors 1381 if e.errno != errno.ENOENT: 1382 raise 1383 else: 1384 logging.debug('Client state file %s not found', state_path) 1385 1386 # update the sysinfo state 1387 if self._state.has('client', 'sysinfo'): 1388 self.sysinfo.deserialize(self._state.get('client', 'sysinfo')) 1389 1390 # drop all the client-specific state 1391 self._state.discard_namespace('client') 1392 1393 1394 def clear_all_known_hosts(self): 1395 """Clears known hosts files for all AbstractSSHHosts.""" 1396 for host in self.hosts: 1397 if isinstance(host, abstract_ssh.AbstractSSHHost): 1398 host.clear_known_hosts() 1399 1400 1401 def close(self): 1402 """Closes this job's operation.""" 1403 1404 # Use shallow copy, because host.close() internally discards itself. 1405 for host in list(self.hosts): 1406 host.close() 1407 assert not self.hosts 1408 self._connection_pool.shutdown() 1409 1410 1411 def _get_job_data(self): 1412 """Add custom data to the job keyval info. 1413 1414 When multiple machines are used in a job, change the hostname to 1415 the platform of the first machine instead of machine1,machine2,... This 1416 makes the job reports easier to read and keeps the tko_machines table from 1417 growing too large. 1418 1419 Returns: 1420 keyval dictionary with new hostname value, or empty dictionary. 1421 """ 1422 job_data = {} 1423 # Only modify hostname on multimachine jobs. Assume all host have the same 1424 # platform. 1425 if len(self.machines) > 1: 1426 # Search through machines for first machine with a platform. 1427 for host in self.machines: 1428 keyval_path = os.path.join(self.resultdir, 'host_keyvals', host) 1429 keyvals = utils.read_keyval(keyval_path) 1430 host_plat = keyvals.get('platform', None) 1431 if not host_plat: 1432 continue 1433 job_data['hostname'] = host_plat 1434 break 1435 return job_data 1436 1437 1438class warning_manager(object): 1439 """Class for controlling warning logs. Manages the enabling and disabling 1440 of warnings.""" 1441 def __init__(self): 1442 # a map of warning types to a list of disabled time intervals 1443 self.disabled_warnings = {} 1444 1445 1446 def is_valid(self, timestamp, warning_type): 1447 """Indicates if a warning (based on the time it occured and its type) 1448 is a valid warning. A warning is considered "invalid" if this type of 1449 warning was marked as "disabled" at the time the warning occured.""" 1450 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1451 for start, end in disabled_intervals: 1452 if timestamp >= start and (end is None or timestamp < end): 1453 return False 1454 return True 1455 1456 1457 def disable_warnings(self, warning_type, current_time_func=time.time): 1458 """As of now, disables all further warnings of this type.""" 1459 intervals = self.disabled_warnings.setdefault(warning_type, []) 1460 if not intervals or intervals[-1][1] is not None: 1461 intervals.append((int(current_time_func()), None)) 1462 1463 1464 def enable_warnings(self, warning_type, current_time_func=time.time): 1465 """As of now, enables all further warnings of this type.""" 1466 intervals = self.disabled_warnings.get(warning_type, []) 1467 if intervals and intervals[-1][1] is None: 1468 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1469 1470 1471def _is_current_server_job(test): 1472 """Return True if parsed test is the currently running job. 1473 1474 @param test: test instance from tko parser. 1475 """ 1476 return test.testname == 'SERVER_JOB' 1477 1478 1479def _create_afe_host(hostname): 1480 """Create an afe_host object backed by the AFE. 1481 1482 @param hostname: Name of the host for which we want the Host object. 1483 @returns: An object of type frontend.AFE 1484 """ 1485 afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10) 1486 hosts = afe.get_hosts(hostname=hostname) 1487 if not hosts: 1488 raise error.AutoservError('No hosts named %s found' % hostname) 1489 1490 return hosts[0] 1491 1492 1493def _create_file_backed_host_info_store(store_dir, hostname): 1494 """Create a CachingHostInfoStore backed by an existing file. 1495 1496 @param store_dir: A directory to contain store backing files. 1497 @param hostname: Name of the host for which we want the store. 1498 1499 @returns: An object of type CachingHostInfoStore. 1500 """ 1501 backing_file_path = os.path.join(store_dir, '%s.store' % hostname) 1502 if not os.path.isfile(backing_file_path): 1503 raise error.AutoservError( 1504 'Requested FileStore but no backing file at %s' 1505 % backing_file_path 1506 ) 1507 return file_store.FileStore(backing_file_path) 1508 1509 1510def _create_afe_backed_host_info_store(store_dir, hostname): 1511 """Create a CachingHostInfoStore backed by the AFE. 1512 1513 @param store_dir: A directory to contain store backing files. 1514 @param hostname: Name of the host for which we want the store. 1515 1516 @returns: An object of type CachingHostInfoStore. 1517 """ 1518 primary_store = afe_store.AfeStore(hostname) 1519 try: 1520 primary_store.get(force_refresh=True) 1521 except host_info.StoreError: 1522 raise error.AutoservError( 1523 'Could not obtain HostInfo for hostname %s' % hostname) 1524 # Since the store wasn't initialized external to autoserv, we must 1525 # ensure that the store we create is unique within store_dir. 1526 backing_file_path = os.path.join( 1527 _make_unique_subdir(store_dir), 1528 '%s.store' % hostname, 1529 ) 1530 logging.info('Shadowing AFE store with a FileStore at %s', 1531 backing_file_path) 1532 shadow_store = file_store.FileStore(backing_file_path) 1533 return shadowing_store.ShadowingStore(primary_store, shadow_store) 1534 1535 1536def _make_unique_subdir(workdir): 1537 """Creates a new subdir within workdir and returns the path to it.""" 1538 store_dir = os.path.join(workdir, 'dir_%s' % uuid.uuid4()) 1539 _make_dirs_if_needed(store_dir) 1540 return store_dir 1541 1542 1543def _make_dirs_if_needed(path): 1544 """os.makedirs, but ignores failure because the leaf directory exists""" 1545 try: 1546 os.makedirs(path) 1547 except OSError as e: 1548 if e.errno != errno.EEXIST: 1549 raise 1550