1# pylint: disable-msg=C0111 2 3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6""" 7The main job wrapper for the server side. 8 9This is the core infrastructure. Derived from the client side job.py 10 11Copyright Martin J. Bligh, Andy Whitcroft 2007 12""" 13 14import errno 15import fcntl 16import getpass 17import itertools 18import logging 19import os 20import pickle 21import platform 22import re 23import select 24import shutil 25import sys 26import tempfile 27import time 28import traceback 29import uuid 30import warnings 31 32from autotest_lib.client.bin import sysinfo 33from autotest_lib.client.common_lib import base_job 34from autotest_lib.client.common_lib import control_data 35from autotest_lib.client.common_lib import error 36from autotest_lib.client.common_lib import global_config 37from autotest_lib.client.common_lib import logging_manager 38from autotest_lib.client.common_lib import packages 39from autotest_lib.client.common_lib import utils 40from autotest_lib.server import profilers 41from autotest_lib.server import site_gtest_runner 42from autotest_lib.server import subcommand 43from autotest_lib.server import test 44from autotest_lib.server import utils as server_utils 45from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 46from autotest_lib.server.hosts import abstract_ssh 47from autotest_lib.server.hosts import afe_store 48from autotest_lib.server.hosts import file_store 49from autotest_lib.server.hosts import shadowing_store 50from autotest_lib.server.hosts import factory as host_factory 51from autotest_lib.server.hosts import host_info 52from autotest_lib.server.hosts import ssh_multiplex 53from autotest_lib.tko import db as tko_db 54from autotest_lib.tko import models as tko_models 55from autotest_lib.tko import status_lib 56from autotest_lib.tko import parser_lib 57from autotest_lib.tko import utils as tko_utils 58 59try: 60 from chromite.lib import metrics 61except ImportError: 62 metrics = utils.metrics_mock 63 64 65INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value( 66 'autoserv', 'incremental_tko_parsing', type=bool, default=False) 67 68def _control_segment_path(name): 69 """Get the pathname of the named control segment file.""" 70 server_dir = os.path.dirname(os.path.abspath(__file__)) 71 return os.path.join(server_dir, "control_segments", name) 72 73 74CLIENT_CONTROL_FILENAME = 'control' 75SERVER_CONTROL_FILENAME = 'control.srv' 76MACHINES_FILENAME = '.machines' 77 78CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 79CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 80CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 81INSTALL_CONTROL_FILE = _control_segment_path('install') 82CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 83VERIFY_CONTROL_FILE = _control_segment_path('verify') 84REPAIR_CONTROL_FILE = _control_segment_path('repair') 85PROVISION_CONTROL_FILE = _control_segment_path('provision') 86VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url') 87RESET_CONTROL_FILE = _control_segment_path('reset') 88GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats') 89 90 91def get_machine_dicts(machine_names, workdir, in_lab, host_attributes=None, 92 connection_pool=None): 93 """Converts a list of machine names to list of dicts. 94 95 TODO(crbug.com/678430): This function temporarily has a side effect of 96 creating files under workdir for backing a FileStore. This side-effect will 97 go away once callers of autoserv start passing in the FileStore. 98 99 @param machine_names: A list of machine names. 100 @param workdir: A directory where any on-disk files related to the machines 101 may be created. 102 @param in_lab: A boolean indicating whether we're running in lab. 103 @param host_attributes: Optional list of host attributes to add for each 104 host. 105 @param connection_pool: ssh_multiplex.ConnectionPool instance to share 106 master connections across control scripts. 107 @returns: A list of dicts. Each dict has the following keys: 108 'hostname': Name of the machine originally in machine_names (str). 109 'afe_host': A frontend.Host object for the machine, or a stub if 110 in_lab is false. 111 'host_info_store': A host_info.CachingHostInfoStore object to obtain 112 host information. A stub if in_lab is False. 113 'connection_pool': ssh_multiplex.ConnectionPool instance to share 114 master ssh connection across control scripts. 115 """ 116 machine_dict_list = [] 117 for machine in machine_names: 118 # See autoserv_parser.parse_args. Only one of in_lab or host_attributes 119 # can be provided. 120 if not in_lab: 121 afe_host = server_utils.EmptyAFEHost() 122 host_info_store = host_info.InMemoryHostInfoStore() 123 if host_attributes is not None: 124 afe_host.attributes.update(host_attributes) 125 info = host_info.HostInfo(attributes=host_attributes) 126 host_info_store.commit(info) 127 elif host_attributes: 128 raise error.AutoservError( 129 'in_lab and host_attribute are mutually exclusive. ' 130 'Obtained in_lab:%s, host_attributes:%s' 131 % (in_lab, host_attributes)) 132 else: 133 afe_host = _create_afe_host(machine) 134 host_info_store = _create_host_info_store(machine, workdir) 135 136 machine_dict_list.append({ 137 'hostname' : machine, 138 'afe_host' : afe_host, 139 'host_info_store': host_info_store, 140 'connection_pool': connection_pool, 141 }) 142 143 return machine_dict_list 144 145 146class status_indenter(base_job.status_indenter): 147 """Provide a simple integer-backed status indenter.""" 148 def __init__(self): 149 self._indent = 0 150 151 152 @property 153 def indent(self): 154 return self._indent 155 156 157 def increment(self): 158 self._indent += 1 159 160 161 def decrement(self): 162 self._indent -= 1 163 164 165 def get_context(self): 166 """Returns a context object for use by job.get_record_context.""" 167 class context(object): 168 def __init__(self, indenter, indent): 169 self._indenter = indenter 170 self._indent = indent 171 def restore(self): 172 self._indenter._indent = self._indent 173 return context(self, self._indent) 174 175 176class server_job_record_hook(object): 177 """The job.record hook for server job. Used to inject WARN messages from 178 the console or vlm whenever new logs are written, and to echo any logs 179 to INFO level logging. Implemented as a class so that it can use state to 180 block recursive calls, so that the hook can call job.record itself to 181 log WARN messages. 182 183 Depends on job._read_warnings and job._logger. 184 """ 185 def __init__(self, job): 186 self._job = job 187 self._being_called = False 188 189 190 def __call__(self, entry): 191 """A wrapper around the 'real' record hook, the _hook method, which 192 prevents recursion. This isn't making any effort to be threadsafe, 193 the intent is to outright block infinite recursion via a 194 job.record->_hook->job.record->_hook->job.record... chain.""" 195 if self._being_called: 196 return 197 self._being_called = True 198 try: 199 self._hook(self._job, entry) 200 finally: 201 self._being_called = False 202 203 204 @staticmethod 205 def _hook(job, entry): 206 """The core hook, which can safely call job.record.""" 207 entries = [] 208 # poll all our warning loggers for new warnings 209 for timestamp, msg in job._read_warnings(): 210 warning_entry = base_job.status_log_entry( 211 'WARN', None, None, msg, {}, timestamp=timestamp) 212 entries.append(warning_entry) 213 job.record_entry(warning_entry) 214 # echo rendered versions of all the status logs to info 215 entries.append(entry) 216 for entry in entries: 217 rendered_entry = job._logger.render_entry(entry) 218 logging.info(rendered_entry) 219 job._parse_status(rendered_entry) 220 221 222class server_job(base_job.base_job): 223 """The server-side concrete implementation of base_job. 224 225 Optional properties provided by this implementation: 226 serverdir 227 228 num_tests_run 229 num_tests_failed 230 231 warning_manager 232 warning_loggers 233 """ 234 235 _STATUS_VERSION = 1 236 237 # TODO crbug.com/285395 eliminate ssh_verbosity_flag 238 def __init__(self, control, args, resultdir, label, user, machines, 239 client=False, parse_job='', 240 ssh_user=host_factory.DEFAULT_SSH_USER, 241 ssh_port=host_factory.DEFAULT_SSH_PORT, 242 ssh_pass=host_factory.DEFAULT_SSH_PASS, 243 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY, 244 ssh_options=host_factory.DEFAULT_SSH_OPTIONS, 245 test_retry=0, group_name='', 246 tag='', disable_sysinfo=False, 247 control_filename=SERVER_CONTROL_FILENAME, 248 parent_job_id=None, host_attributes=None, in_lab=False): 249 """ 250 Create a server side job object. 251 252 @param control: The pathname of the control file. 253 @param args: Passed to the control file. 254 @param resultdir: Where to throw the results. 255 @param label: Description of the job. 256 @param user: Username for the job (email address). 257 @param client: True if this is a client-side control file. 258 @param parse_job: string, if supplied it is the job execution tag that 259 the results will be passed through to the TKO parser with. 260 @param ssh_user: The SSH username. [root] 261 @param ssh_port: The SSH port number. [22] 262 @param ssh_pass: The SSH passphrase, if needed. 263 @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv', 264 '-vvv', or an empty string if not needed. 265 @param ssh_options: A string giving additional options that will be 266 included in ssh commands. 267 @param test_retry: The number of times to retry a test if the test did 268 not complete successfully. 269 @param group_name: If supplied, this will be written out as 270 host_group_name in the keyvals file for the parser. 271 @param tag: The job execution tag from the scheduler. [optional] 272 @param disable_sysinfo: Whether we should disable the sysinfo step of 273 tests for a modest shortening of test time. [optional] 274 @param control_filename: The filename where the server control file 275 should be written in the results directory. 276 @param parent_job_id: Job ID of the parent job. Default to None if the 277 job does not have a parent job. 278 @param host_attributes: Dict of host attributes passed into autoserv 279 via the command line. If specified here, these 280 attributes will apply to all machines. 281 @param in_lab: Boolean that indicates if this is running in the lab 282 environment. 283 """ 284 super(server_job, self).__init__(resultdir=resultdir, 285 test_retry=test_retry) 286 self.test_retry = test_retry 287 self.control = control 288 self._uncollected_log_file = os.path.join(self.resultdir, 289 'uncollected_logs') 290 debugdir = os.path.join(self.resultdir, 'debug') 291 if not os.path.exists(debugdir): 292 os.mkdir(debugdir) 293 294 if user: 295 self.user = user 296 else: 297 self.user = getpass.getuser() 298 299 self.args = args 300 self.label = label 301 self.machines = machines 302 self._client = client 303 self.warning_loggers = set() 304 self.warning_manager = warning_manager() 305 self._ssh_user = ssh_user 306 self._ssh_port = ssh_port 307 self._ssh_pass = ssh_pass 308 self._ssh_verbosity_flag = ssh_verbosity_flag 309 self._ssh_options = ssh_options 310 self.tag = tag 311 self.hosts = set() 312 self.drop_caches = False 313 self.drop_caches_between_iterations = False 314 self._control_filename = control_filename 315 self._disable_sysinfo = disable_sysinfo 316 317 self.logging = logging_manager.get_logging_manager( 318 manage_stdout_and_stderr=True, redirect_fds=True) 319 subcommand.logging_manager_object = self.logging 320 321 self.sysinfo = sysinfo.sysinfo(self.resultdir) 322 self.profilers = profilers.profilers(self) 323 324 job_data = {'label' : label, 'user' : user, 325 'hostname' : ','.join(machines), 326 'drone' : platform.node(), 327 'status_version' : str(self._STATUS_VERSION), 328 'job_started' : str(int(time.time()))} 329 # Save parent job id to keyvals, so parser can retrieve the info and 330 # write to tko_jobs record. 331 if parent_job_id: 332 job_data['parent_job_id'] = parent_job_id 333 if group_name: 334 job_data['host_group_name'] = group_name 335 336 # only write these keyvals out on the first job in a resultdir 337 if 'job_started' not in utils.read_keyval(self.resultdir): 338 job_data.update(self._get_job_data()) 339 utils.write_keyval(self.resultdir, job_data) 340 341 self._parse_job = parse_job 342 self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job 343 and len(machines) <= 1) 344 self.pkgmgr = packages.PackageManager( 345 self.autodir, run_function_dargs={'timeout':600}) 346 self.num_tests_run = 0 347 self.num_tests_failed = 0 348 349 self._register_subcommand_hooks() 350 351 # set up the status logger 352 self._indenter = status_indenter() 353 self._logger = base_job.status_logger( 354 self, self._indenter, 'status.log', 'status.log', 355 record_hook=server_job_record_hook(self)) 356 357 # Initialize a flag to indicate DUT failure during the test, e.g., 358 # unexpected reboot. 359 self.failed_with_device_error = False 360 361 self._connection_pool = ssh_multiplex.ConnectionPool() 362 363 self.parent_job_id = parent_job_id 364 self.in_lab = in_lab 365 self.machine_dict_list = get_machine_dicts( 366 self.machines, self.resultdir, self.in_lab, host_attributes, 367 self._connection_pool) 368 369 # TODO(jrbarnette) The harness attribute is only relevant to 370 # client jobs, but it's required to be present, or we will fail 371 # server job unit tests. Yes, really. 372 # 373 # TODO(jrbarnette) The utility of the 'harness' attribute even 374 # to client jobs is suspect. Probably, we should remove it. 375 self.harness = None 376 377 if control: 378 parsed_control = control_data.parse_control( 379 control, raise_warnings=False) 380 self.fast = parsed_control.fast 381 self.max_result_size_KB = parsed_control.max_result_size_KB 382 else: 383 self.fast = False 384 # Set the maximum result size to be the default specified in 385 # global config, if the job has no control file associated. 386 self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB 387 388 389 @classmethod 390 def _find_base_directories(cls): 391 """ 392 Determine locations of autodir, clientdir and serverdir. Assumes 393 that this file is located within serverdir and uses __file__ along 394 with relative paths to resolve the location. 395 """ 396 serverdir = os.path.abspath(os.path.dirname(__file__)) 397 autodir = os.path.normpath(os.path.join(serverdir, '..')) 398 clientdir = os.path.join(autodir, 'client') 399 return autodir, clientdir, serverdir 400 401 402 def _find_resultdir(self, resultdir, *args, **dargs): 403 """ 404 Determine the location of resultdir. For server jobs we expect one to 405 always be explicitly passed in to __init__, so just return that. 406 """ 407 if resultdir: 408 return os.path.normpath(resultdir) 409 else: 410 return None 411 412 413 def _get_status_logger(self): 414 """Return a reference to the status logger.""" 415 return self._logger 416 417 418 @staticmethod 419 def _load_control_file(path): 420 f = open(path) 421 try: 422 control_file = f.read() 423 finally: 424 f.close() 425 return re.sub('\r', '', control_file) 426 427 428 def _register_subcommand_hooks(self): 429 """ 430 Register some hooks into the subcommand modules that allow us 431 to properly clean up self.hosts created in forked subprocesses. 432 """ 433 def on_fork(cmd): 434 self._existing_hosts_on_fork = set(self.hosts) 435 def on_join(cmd): 436 new_hosts = self.hosts - self._existing_hosts_on_fork 437 for host in new_hosts: 438 host.close() 439 subcommand.subcommand.register_fork_hook(on_fork) 440 subcommand.subcommand.register_join_hook(on_join) 441 442 443 def init_parser(self): 444 """ 445 Start the continuous parsing of self.resultdir. This sets up 446 the database connection and inserts the basic job object into 447 the database if necessary. 448 """ 449 if self.fast and not self._using_parser: 450 self.parser = parser_lib.parser(self._STATUS_VERSION) 451 self.job_model = self.parser.make_job(self.resultdir) 452 self.parser.start(self.job_model) 453 return 454 455 if not self._using_parser: 456 return 457 458 # redirect parser debugging to .parse.log 459 parse_log = os.path.join(self.resultdir, '.parse.log') 460 parse_log = open(parse_log, 'w', 0) 461 tko_utils.redirect_parser_debugging(parse_log) 462 # create a job model object and set up the db 463 self.results_db = tko_db.db(autocommit=True) 464 self.parser = parser_lib.parser(self._STATUS_VERSION) 465 self.job_model = self.parser.make_job(self.resultdir) 466 self.parser.start(self.job_model) 467 # check if a job already exists in the db and insert it if 468 # it does not 469 job_idx = self.results_db.find_job(self._parse_job) 470 if job_idx is None: 471 self.results_db.insert_job(self._parse_job, self.job_model, 472 self.parent_job_id) 473 else: 474 machine_idx = self.results_db.lookup_machine(self.job_model.machine) 475 self.job_model.index = job_idx 476 self.job_model.machine_idx = machine_idx 477 478 479 def cleanup_parser(self): 480 """ 481 This should be called after the server job is finished 482 to carry out any remaining cleanup (e.g. flushing any 483 remaining test results to the results db) 484 """ 485 if self.fast and not self._using_parser: 486 final_tests = self.parser.end() 487 for test in final_tests: 488 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 489 self.num_tests_failed += 1 490 return 491 492 if not self._using_parser: 493 return 494 495 final_tests = self.parser.end() 496 for test in final_tests: 497 self.__insert_test(test) 498 self._using_parser = False 499 500 # TODO crbug.com/285395 add a kwargs parameter. 501 def _make_namespace(self): 502 """Create a namespace dictionary to be passed along to control file. 503 504 Creates a namespace argument populated with standard values: 505 machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag, 506 and ssh_options. 507 """ 508 namespace = {'machines' : self.machine_dict_list, 509 'job' : self, 510 'ssh_user' : self._ssh_user, 511 'ssh_port' : self._ssh_port, 512 'ssh_pass' : self._ssh_pass, 513 'ssh_verbosity_flag' : self._ssh_verbosity_flag, 514 'ssh_options' : self._ssh_options} 515 return namespace 516 517 518 def cleanup(self, labels): 519 """Cleanup machines. 520 521 @param labels: Comma separated job labels, will be used to 522 determine special task actions. 523 """ 524 if not self.machines: 525 raise error.AutoservError('No machines specified to cleanup') 526 if self.resultdir: 527 os.chdir(self.resultdir) 528 529 namespace = self._make_namespace() 530 namespace.update({'job_labels': labels, 'args': ''}) 531 self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False) 532 533 534 def verify(self, labels): 535 """Verify machines are all ssh-able. 536 537 @param labels: Comma separated job labels, will be used to 538 determine special task actions. 539 """ 540 if not self.machines: 541 raise error.AutoservError('No machines specified to verify') 542 if self.resultdir: 543 os.chdir(self.resultdir) 544 545 namespace = self._make_namespace() 546 namespace.update({'job_labels': labels, 'args': ''}) 547 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 548 549 550 def reset(self, labels): 551 """Reset machines by first cleanup then verify each machine. 552 553 @param labels: Comma separated job labels, will be used to 554 determine special task actions. 555 """ 556 if not self.machines: 557 raise error.AutoservError('No machines specified to reset.') 558 if self.resultdir: 559 os.chdir(self.resultdir) 560 561 namespace = self._make_namespace() 562 namespace.update({'job_labels': labels, 'args': ''}) 563 self._execute_code(RESET_CONTROL_FILE, namespace, protect=False) 564 565 566 def repair(self, labels): 567 """Repair machines. 568 569 @param labels: Comma separated job labels, will be used to 570 determine special task actions. 571 """ 572 if not self.machines: 573 raise error.AutoservError('No machines specified to repair') 574 if self.resultdir: 575 os.chdir(self.resultdir) 576 577 namespace = self._make_namespace() 578 namespace.update({'job_labels': labels, 'args': ''}) 579 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 580 581 582 def provision(self, labels): 583 """ 584 Provision all hosts to match |labels|. 585 586 @param labels: A comma seperated string of labels to provision the 587 host to. 588 589 """ 590 control = self._load_control_file(PROVISION_CONTROL_FILE) 591 self.run(control=control, job_labels=labels) 592 593 594 def precheck(self): 595 """ 596 perform any additional checks in derived classes. 597 """ 598 pass 599 600 601 def enable_external_logging(self): 602 """ 603 Start or restart external logging mechanism. 604 """ 605 pass 606 607 608 def disable_external_logging(self): 609 """ 610 Pause or stop external logging mechanism. 611 """ 612 pass 613 614 615 def use_external_logging(self): 616 """ 617 Return True if external logging should be used. 618 """ 619 return False 620 621 622 def _make_parallel_wrapper(self, function, machines, log): 623 """Wrap function as appropriate for calling by parallel_simple.""" 624 # machines could be a list of dictionaries, e.g., 625 # [{'host_attributes': {}, 'hostname': '100.96.51.226'}] 626 # The dictionary is generated in server_job.__init__, refer to 627 # variable machine_dict_list, then passed in with namespace, see method 628 # server_job._make_namespace. 629 # To compare the machinese to self.machines, which is a list of machine 630 # hostname, we need to convert machines back to a list of hostnames. 631 # Note that the order of hostnames doesn't matter, as is_forking will be 632 # True if there are more than one machine. 633 if (machines and isinstance(machines, list) 634 and isinstance(machines[0], dict)): 635 machines = [m['hostname'] for m in machines] 636 is_forking = not (len(machines) == 1 and self.machines == machines) 637 if self._parse_job and is_forking and log: 638 def wrapper(machine): 639 hostname = server_utils.get_hostname_from_machine(machine) 640 self._parse_job += "/" + hostname 641 self._using_parser = INCREMENTAL_TKO_PARSING 642 self.machines = [machine] 643 self.push_execution_context(hostname) 644 os.chdir(self.resultdir) 645 utils.write_keyval(self.resultdir, {"hostname": hostname}) 646 self.init_parser() 647 result = function(machine) 648 self.cleanup_parser() 649 return result 650 elif len(machines) > 1 and log: 651 def wrapper(machine): 652 hostname = server_utils.get_hostname_from_machine(machine) 653 self.push_execution_context(hostname) 654 os.chdir(self.resultdir) 655 machine_data = {'hostname' : hostname, 656 'status_version' : str(self._STATUS_VERSION)} 657 utils.write_keyval(self.resultdir, machine_data) 658 result = function(machine) 659 return result 660 else: 661 wrapper = function 662 return wrapper 663 664 665 def parallel_simple(self, function, machines, log=True, timeout=None, 666 return_results=False): 667 """ 668 Run 'function' using parallel_simple, with an extra wrapper to handle 669 the necessary setup for continuous parsing, if possible. If continuous 670 parsing is already properly initialized then this should just work. 671 672 @param function: A callable to run in parallel given each machine. 673 @param machines: A list of machine names to be passed one per subcommand 674 invocation of function. 675 @param log: If True, output will be written to output in a subdirectory 676 named after each machine. 677 @param timeout: Seconds after which the function call should timeout. 678 @param return_results: If True instead of an AutoServError being raised 679 on any error a list of the results|exceptions from the function 680 called on each arg is returned. [default: False] 681 682 @raises error.AutotestError: If any of the functions failed. 683 """ 684 wrapper = self._make_parallel_wrapper(function, machines, log) 685 return subcommand.parallel_simple( 686 wrapper, machines, 687 subdir_name_constructor=server_utils.get_hostname_from_machine, 688 log=log, timeout=timeout, return_results=return_results) 689 690 691 def parallel_on_machines(self, function, machines, timeout=None): 692 """ 693 @param function: Called in parallel with one machine as its argument. 694 @param machines: A list of machines to call function(machine) on. 695 @param timeout: Seconds after which the function call should timeout. 696 697 @returns A list of machines on which function(machine) returned 698 without raising an exception. 699 """ 700 results = self.parallel_simple(function, machines, timeout=timeout, 701 return_results=True) 702 success_machines = [] 703 for result, machine in itertools.izip(results, machines): 704 if not isinstance(result, Exception): 705 success_machines.append(machine) 706 return success_machines 707 708 709 def record_skipped_test(self, skipped_test, message=None): 710 """Insert a failure record into status.log for this test.""" 711 msg = message 712 if msg is None: 713 msg = 'No valid machines found for test %s.' % skipped_test 714 logging.info(msg) 715 self.record('START', None, skipped_test.test_name) 716 self.record('INFO', None, skipped_test.test_name, msg) 717 self.record('END TEST_NA', None, skipped_test.test_name, msg) 718 719 720 def _has_failed_tests(self): 721 """Parse status log for failed tests. 722 723 This checks the current working directory and is intended only for use 724 by the run() method. 725 726 @return boolean 727 """ 728 path = os.getcwd() 729 730 # TODO(ayatane): Copied from tko/parse.py. Needs extensive refactor to 731 # make code reuse plausible. 732 job_keyval = tko_models.job.read_keyval(path) 733 status_version = job_keyval.get("status_version", 0) 734 735 # parse out the job 736 parser = parser_lib.parser(status_version) 737 job = parser.make_job(path) 738 status_log = os.path.join(path, "status.log") 739 if not os.path.exists(status_log): 740 status_log = os.path.join(path, "status") 741 if not os.path.exists(status_log): 742 logging.warning("! Unable to parse job, no status file") 743 return True 744 745 # parse the status logs 746 status_lines = open(status_log).readlines() 747 parser.start(job) 748 tests = parser.end(status_lines) 749 750 # parser.end can return the same object multiple times, so filter out 751 # dups 752 job.tests = [] 753 already_added = set() 754 for test in tests: 755 if test not in already_added: 756 already_added.add(test) 757 job.tests.append(test) 758 759 failed = False 760 for test in job.tests: 761 # The current job is still running and shouldn't count as failed. 762 # The parser will fail to parse the exit status of the job since it 763 # hasn't exited yet (this running right now is the job). 764 failed = failed or (test.status != 'GOOD' 765 and not _is_current_server_job(test)) 766 return failed 767 768 769 def _collect_crashes(self, namespace, collect_crashinfo): 770 """Collect crashes. 771 772 @param namespace: namespace dict. 773 @param collect_crashinfo: whether to collect crashinfo in addition to 774 dumps 775 """ 776 if collect_crashinfo: 777 # includes crashdumps 778 crash_control_file = CRASHINFO_CONTROL_FILE 779 else: 780 crash_control_file = CRASHDUMPS_CONTROL_FILE 781 self._execute_code(crash_control_file, namespace) 782 783 784 _USE_TEMP_DIR = object() 785 def run(self, install_before=False, install_after=False, 786 collect_crashdumps=True, namespace={}, control=None, 787 control_file_dir=None, verify_job_repo_url=False, 788 only_collect_crashinfo=False, skip_crash_collection=False, 789 job_labels='', use_packaging=True): 790 # for a normal job, make sure the uncollected logs file exists 791 # for a crashinfo-only run it should already exist, bail out otherwise 792 created_uncollected_logs = False 793 logging.info("I am PID %s", os.getpid()) 794 if self.resultdir and not os.path.exists(self._uncollected_log_file): 795 if only_collect_crashinfo: 796 # if this is a crashinfo-only run, and there were no existing 797 # uncollected logs, just bail out early 798 logging.info("No existing uncollected logs, " 799 "skipping crashinfo collection") 800 return 801 else: 802 log_file = open(self._uncollected_log_file, "w") 803 pickle.dump([], log_file) 804 log_file.close() 805 created_uncollected_logs = True 806 807 # use a copy so changes don't affect the original dictionary 808 namespace = namespace.copy() 809 machines = self.machines 810 if control is None: 811 if self.control is None: 812 control = '' 813 else: 814 control = self._load_control_file(self.control) 815 if control_file_dir is None: 816 control_file_dir = self.resultdir 817 818 self.aborted = False 819 namespace.update(self._make_namespace()) 820 namespace.update({ 821 'args': self.args, 822 'job_labels': job_labels, 823 'gtest_runner': site_gtest_runner.gtest_runner(), 824 }) 825 test_start_time = int(time.time()) 826 827 if self.resultdir: 828 os.chdir(self.resultdir) 829 # touch status.log so that the parser knows a job is running here 830 open(self.get_status_log_path(), 'a').close() 831 self.enable_external_logging() 832 833 collect_crashinfo = True 834 temp_control_file_dir = None 835 try: 836 try: 837 if not self.fast: 838 with metrics.SecondsTimer( 839 'chromeos/autotest/job/get_network_stats', 840 fields = {'stage': 'start'}): 841 namespace['network_stats_label'] = 'at-start' 842 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, 843 namespace) 844 845 if install_before and machines: 846 self._execute_code(INSTALL_CONTROL_FILE, namespace) 847 848 if only_collect_crashinfo: 849 return 850 851 # If the verify_job_repo_url option is set but we're unable 852 # to actually verify that the job_repo_url contains the autotest 853 # package, this job will fail. 854 if verify_job_repo_url: 855 self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE, 856 namespace) 857 else: 858 logging.warning('Not checking if job_repo_url contains ' 859 'autotest packages on %s', machines) 860 861 # determine the dir to write the control files to 862 cfd_specified = (control_file_dir 863 and control_file_dir is not self._USE_TEMP_DIR) 864 if cfd_specified: 865 temp_control_file_dir = None 866 else: 867 temp_control_file_dir = tempfile.mkdtemp( 868 suffix='temp_control_file_dir') 869 control_file_dir = temp_control_file_dir 870 server_control_file = os.path.join(control_file_dir, 871 self._control_filename) 872 client_control_file = os.path.join(control_file_dir, 873 CLIENT_CONTROL_FILENAME) 874 if self._client: 875 namespace['control'] = control 876 utils.open_write_close(client_control_file, control) 877 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 878 server_control_file) 879 else: 880 utils.open_write_close(server_control_file, control) 881 882 logging.info("Processing control file") 883 namespace['use_packaging'] = use_packaging 884 self._execute_code(server_control_file, namespace) 885 logging.info("Finished processing control file") 886 887 # If no device error occured, no need to collect crashinfo. 888 collect_crashinfo = self.failed_with_device_error 889 except Exception as e: 890 try: 891 # Add num_tests_failed if any extra exceptions are raised 892 # outside _execute_code(). 893 self.num_tests_failed += 1 894 logging.exception( 895 'Exception escaped control file, job aborting:') 896 reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX, 897 ' ', str(e)) 898 self.record('INFO', None, None, str(e), 899 {'job_abort_reason': reason}) 900 except: 901 pass # don't let logging exceptions here interfere 902 raise 903 finally: 904 if temp_control_file_dir: 905 # Clean up temp directory used for copies of the control files 906 try: 907 shutil.rmtree(temp_control_file_dir) 908 except Exception as e: 909 logging.warning('Could not remove temp directory %s: %s', 910 temp_control_file_dir, e) 911 912 if machines and (collect_crashdumps or collect_crashinfo): 913 if skip_crash_collection or self.fast: 914 logging.info('Skipping crash dump/info collection ' 915 'as requested.') 916 else: 917 with metrics.SecondsTimer( 918 'chromeos/autotest/job/collect_crashinfo'): 919 namespace['test_start_time'] = test_start_time 920 # Remove crash files for passing tests. 921 # TODO(ayatane): Tests that create crash files should be 922 # reported. 923 namespace['has_failed_tests'] = self._has_failed_tests() 924 self._collect_crashes(namespace, collect_crashinfo) 925 self.disable_external_logging() 926 if self._uncollected_log_file and created_uncollected_logs: 927 os.remove(self._uncollected_log_file) 928 if install_after and machines: 929 self._execute_code(INSTALL_CONTROL_FILE, namespace) 930 931 if not self.fast: 932 with metrics.SecondsTimer( 933 'chromeos/autotest/job/get_network_stats', 934 fields = {'stage': 'end'}): 935 namespace['network_stats_label'] = 'at-end' 936 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, 937 namespace) 938 939 940 def run_test(self, url, *args, **dargs): 941 """ 942 Summon a test object and run it. 943 944 tag 945 tag to add to testname 946 url 947 url of the test to run 948 """ 949 if self._disable_sysinfo: 950 dargs['disable_sysinfo'] = True 951 952 group, testname = self.pkgmgr.get_package_name(url, 'test') 953 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 954 outputdir = self._make_test_outputdir(subdir) 955 956 def group_func(): 957 try: 958 test.runtest(self, url, tag, args, dargs) 959 except error.TestBaseException as e: 960 self.record(e.exit_status, subdir, testname, str(e)) 961 raise 962 except Exception as e: 963 info = str(e) + "\n" + traceback.format_exc() 964 self.record('FAIL', subdir, testname, info) 965 raise 966 else: 967 self.record('GOOD', subdir, testname, 'completed successfully') 968 969 try: 970 result = self._run_group(testname, subdir, group_func) 971 except error.TestBaseException as e: 972 return False 973 else: 974 return True 975 976 977 def _run_group(self, name, subdir, function, *args, **dargs): 978 """Underlying method for running something inside of a group.""" 979 result, exc_info = None, None 980 try: 981 self.record('START', subdir, name) 982 result = function(*args, **dargs) 983 except error.TestBaseException as e: 984 self.record("END %s" % e.exit_status, subdir, name) 985 raise 986 except Exception as e: 987 err_msg = str(e) + '\n' 988 err_msg += traceback.format_exc() 989 self.record('END ABORT', subdir, name, err_msg) 990 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 991 else: 992 self.record('END GOOD', subdir, name) 993 994 return result 995 996 997 def run_group(self, function, *args, **dargs): 998 """\ 999 @param function: subroutine to run 1000 @returns: (result, exc_info). When the call succeeds, result contains 1001 the return value of |function| and exc_info is None. If 1002 |function| raises an exception, exc_info contains the tuple 1003 returned by sys.exc_info(), and result is None. 1004 """ 1005 1006 name = function.__name__ 1007 # Allow the tag for the group to be specified. 1008 tag = dargs.pop('tag', None) 1009 if tag: 1010 name = tag 1011 1012 try: 1013 result = self._run_group(name, None, function, *args, **dargs)[0] 1014 except error.TestBaseException: 1015 return None, sys.exc_info() 1016 return result, None 1017 1018 1019 def run_op(self, op, op_func, get_kernel_func): 1020 """\ 1021 A specialization of run_group meant specifically for handling 1022 management operation. Includes support for capturing the kernel version 1023 after the operation. 1024 1025 Args: 1026 op: name of the operation. 1027 op_func: a function that carries out the operation (reboot, suspend) 1028 get_kernel_func: a function that returns a string 1029 representing the kernel version. 1030 """ 1031 try: 1032 self.record('START', None, op) 1033 op_func() 1034 except Exception as e: 1035 err_msg = str(e) + '\n' + traceback.format_exc() 1036 self.record('END FAIL', None, op, err_msg) 1037 raise 1038 else: 1039 kernel = get_kernel_func() 1040 self.record('END GOOD', None, op, 1041 optional_fields={"kernel": kernel}) 1042 1043 1044 def run_control(self, path): 1045 """Execute a control file found at path (relative to the autotest 1046 path). Intended for executing a control file within a control file, 1047 not for running the top-level job control file.""" 1048 path = os.path.join(self.autodir, path) 1049 control_file = self._load_control_file(path) 1050 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR) 1051 1052 1053 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1054 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1055 on_every_test) 1056 1057 1058 def add_sysinfo_logfile(self, file, on_every_test=False): 1059 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1060 1061 1062 def _add_sysinfo_loggable(self, loggable, on_every_test): 1063 if on_every_test: 1064 self.sysinfo.test_loggables.add(loggable) 1065 else: 1066 self.sysinfo.boot_loggables.add(loggable) 1067 1068 1069 def _read_warnings(self): 1070 """Poll all the warning loggers and extract any new warnings that have 1071 been logged. If the warnings belong to a category that is currently 1072 disabled, this method will discard them and they will no longer be 1073 retrievable. 1074 1075 Returns a list of (timestamp, message) tuples, where timestamp is an 1076 integer epoch timestamp.""" 1077 warnings = [] 1078 while True: 1079 # pull in a line of output from every logger that has 1080 # output ready to be read 1081 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 1082 closed_loggers = set() 1083 for logger in loggers: 1084 line = logger.readline() 1085 # record any broken pipes (aka line == empty) 1086 if len(line) == 0: 1087 closed_loggers.add(logger) 1088 continue 1089 # parse out the warning 1090 timestamp, msgtype, msg = line.split('\t', 2) 1091 timestamp = int(timestamp) 1092 # if the warning is valid, add it to the results 1093 if self.warning_manager.is_valid(timestamp, msgtype): 1094 warnings.append((timestamp, msg.strip())) 1095 1096 # stop listening to loggers that are closed 1097 self.warning_loggers -= closed_loggers 1098 1099 # stop if none of the loggers have any output left 1100 if not loggers: 1101 break 1102 1103 # sort into timestamp order 1104 warnings.sort() 1105 return warnings 1106 1107 1108 def _unique_subdirectory(self, base_subdirectory_name): 1109 """Compute a unique results subdirectory based on the given name. 1110 1111 Appends base_subdirectory_name with a number as necessary to find a 1112 directory name that doesn't already exist. 1113 """ 1114 subdirectory = base_subdirectory_name 1115 counter = 1 1116 while os.path.exists(os.path.join(self.resultdir, subdirectory)): 1117 subdirectory = base_subdirectory_name + '.' + str(counter) 1118 counter += 1 1119 return subdirectory 1120 1121 1122 def get_record_context(self): 1123 """Returns an object representing the current job.record context. 1124 1125 The object returned is an opaque object with a 0-arg restore method 1126 which can be called to restore the job.record context (i.e. indentation) 1127 to the current level. The intention is that it should be used when 1128 something external which generate job.record calls (e.g. an autotest 1129 client) can fail catastrophically and the server job record state 1130 needs to be reset to its original "known good" state. 1131 1132 @return: A context object with a 0-arg restore() method.""" 1133 return self._indenter.get_context() 1134 1135 1136 def record_summary(self, status_code, test_name, reason='', attributes=None, 1137 distinguishing_attributes=(), child_test_ids=None): 1138 """Record a summary test result. 1139 1140 @param status_code: status code string, see 1141 common_lib.log.is_valid_status() 1142 @param test_name: name of the test 1143 @param reason: (optional) string providing detailed reason for test 1144 outcome 1145 @param attributes: (optional) dict of string keyvals to associate with 1146 this result 1147 @param distinguishing_attributes: (optional) list of attribute names 1148 that should be used to distinguish identically-named test 1149 results. These attributes should be present in the attributes 1150 parameter. This is used to generate user-friendly subdirectory 1151 names. 1152 @param child_test_ids: (optional) list of test indices for test results 1153 used in generating this result. 1154 """ 1155 subdirectory_name_parts = [test_name] 1156 for attribute in distinguishing_attributes: 1157 assert attributes 1158 assert attribute in attributes, '%s not in %s' % (attribute, 1159 attributes) 1160 subdirectory_name_parts.append(attributes[attribute]) 1161 base_subdirectory_name = '.'.join(subdirectory_name_parts) 1162 1163 subdirectory = self._unique_subdirectory(base_subdirectory_name) 1164 subdirectory_path = os.path.join(self.resultdir, subdirectory) 1165 os.mkdir(subdirectory_path) 1166 1167 self.record(status_code, subdirectory, test_name, 1168 status=reason, optional_fields={'is_summary': True}) 1169 1170 if attributes: 1171 utils.write_keyval(subdirectory_path, attributes) 1172 1173 if child_test_ids: 1174 ids_string = ','.join(str(test_id) for test_id in child_test_ids) 1175 summary_data = {'child_test_ids': ids_string} 1176 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'), 1177 summary_data) 1178 1179 1180 def disable_warnings(self, warning_type): 1181 self.warning_manager.disable_warnings(warning_type) 1182 self.record("INFO", None, None, 1183 "disabling %s warnings" % warning_type, 1184 {"warnings.disable": warning_type}) 1185 1186 1187 def enable_warnings(self, warning_type): 1188 self.warning_manager.enable_warnings(warning_type) 1189 self.record("INFO", None, None, 1190 "enabling %s warnings" % warning_type, 1191 {"warnings.enable": warning_type}) 1192 1193 1194 def get_status_log_path(self, subdir=None): 1195 """Return the path to the job status log. 1196 1197 @param subdir - Optional paramter indicating that you want the path 1198 to a subdirectory status log. 1199 1200 @returns The path where the status log should be. 1201 """ 1202 if self.resultdir: 1203 if subdir: 1204 return os.path.join(self.resultdir, subdir, "status.log") 1205 else: 1206 return os.path.join(self.resultdir, "status.log") 1207 else: 1208 return None 1209 1210 1211 def _update_uncollected_logs_list(self, update_func): 1212 """Updates the uncollected logs list in a multi-process safe manner. 1213 1214 @param update_func - a function that updates the list of uncollected 1215 logs. Should take one parameter, the list to be updated. 1216 """ 1217 # Skip log collection if file _uncollected_log_file does not exist. 1218 if not (self._uncollected_log_file and 1219 os.path.exists(self._uncollected_log_file)): 1220 return 1221 if self._uncollected_log_file: 1222 log_file = open(self._uncollected_log_file, "r+") 1223 fcntl.flock(log_file, fcntl.LOCK_EX) 1224 try: 1225 uncollected_logs = pickle.load(log_file) 1226 update_func(uncollected_logs) 1227 log_file.seek(0) 1228 log_file.truncate() 1229 pickle.dump(uncollected_logs, log_file) 1230 log_file.flush() 1231 finally: 1232 fcntl.flock(log_file, fcntl.LOCK_UN) 1233 log_file.close() 1234 1235 1236 def add_client_log(self, hostname, remote_path, local_path): 1237 """Adds a new set of client logs to the list of uncollected logs, 1238 to allow for future log recovery. 1239 1240 @param host - the hostname of the machine holding the logs 1241 @param remote_path - the directory on the remote machine holding logs 1242 @param local_path - the local directory to copy the logs into 1243 """ 1244 def update_func(logs_list): 1245 logs_list.append((hostname, remote_path, local_path)) 1246 self._update_uncollected_logs_list(update_func) 1247 1248 1249 def remove_client_log(self, hostname, remote_path, local_path): 1250 """Removes a set of client logs from the list of uncollected logs, 1251 to allow for future log recovery. 1252 1253 @param host - the hostname of the machine holding the logs 1254 @param remote_path - the directory on the remote machine holding logs 1255 @param local_path - the local directory to copy the logs into 1256 """ 1257 def update_func(logs_list): 1258 logs_list.remove((hostname, remote_path, local_path)) 1259 self._update_uncollected_logs_list(update_func) 1260 1261 1262 def get_client_logs(self): 1263 """Retrieves the list of uncollected logs, if it exists. 1264 1265 @returns A list of (host, remote_path, local_path) tuples. Returns 1266 an empty list if no uncollected logs file exists. 1267 """ 1268 log_exists = (self._uncollected_log_file and 1269 os.path.exists(self._uncollected_log_file)) 1270 if log_exists: 1271 return pickle.load(open(self._uncollected_log_file)) 1272 else: 1273 return [] 1274 1275 1276 def _fill_server_control_namespace(self, namespace, protect=True): 1277 """ 1278 Prepare a namespace to be used when executing server control files. 1279 1280 This sets up the control file API by importing modules and making them 1281 available under the appropriate names within namespace. 1282 1283 For use by _execute_code(). 1284 1285 Args: 1286 namespace: The namespace dictionary to fill in. 1287 protect: Boolean. If True (the default) any operation that would 1288 clobber an existing entry in namespace will cause an error. 1289 Raises: 1290 error.AutoservError: When a name would be clobbered by import. 1291 """ 1292 def _import_names(module_name, names=()): 1293 """ 1294 Import a module and assign named attributes into namespace. 1295 1296 Args: 1297 module_name: The string module name. 1298 names: A limiting list of names to import from module_name. If 1299 empty (the default), all names are imported from the module 1300 similar to a "from foo.bar import *" statement. 1301 Raises: 1302 error.AutoservError: When a name being imported would clobber 1303 a name already in namespace. 1304 """ 1305 module = __import__(module_name, {}, {}, names) 1306 1307 # No names supplied? Import * from the lowest level module. 1308 # (Ugh, why do I have to implement this part myself?) 1309 if not names: 1310 for submodule_name in module_name.split('.')[1:]: 1311 module = getattr(module, submodule_name) 1312 if hasattr(module, '__all__'): 1313 names = getattr(module, '__all__') 1314 else: 1315 names = dir(module) 1316 1317 # Install each name into namespace, checking to make sure it 1318 # doesn't override anything that already exists. 1319 for name in names: 1320 # Check for conflicts to help prevent future problems. 1321 if name in namespace and protect: 1322 if namespace[name] is not getattr(module, name): 1323 raise error.AutoservError('importing name ' 1324 '%s from %s %r would override %r' % 1325 (name, module_name, getattr(module, name), 1326 namespace[name])) 1327 else: 1328 # Encourage cleanliness and the use of __all__ for a 1329 # more concrete API with less surprises on '*' imports. 1330 warnings.warn('%s (%r) being imported from %s for use ' 1331 'in server control files is not the ' 1332 'first occurrence of that import.' % 1333 (name, namespace[name], module_name)) 1334 1335 namespace[name] = getattr(module, name) 1336 1337 1338 # This is the equivalent of prepending a bunch of import statements to 1339 # the front of the control script. 1340 namespace.update(os=os, sys=sys, logging=logging) 1341 _import_names('autotest_lib.server', 1342 ('hosts', 'autotest', 'standalone_profiler')) 1343 _import_names('autotest_lib.server.subcommand', 1344 ('parallel', 'parallel_simple', 'subcommand')) 1345 _import_names('autotest_lib.server.utils', 1346 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 1347 _import_names('autotest_lib.client.common_lib.error') 1348 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 1349 1350 # Inject ourself as the job object into other classes within the API. 1351 # (Yuck, this injection is a gross thing be part of a public API. -gps) 1352 # 1353 # XXX Autotest does not appear to use .job. Who does? 1354 namespace['autotest'].Autotest.job = self 1355 # server.hosts.base_classes.Host uses .job. 1356 namespace['hosts'].Host.job = self 1357 namespace['hosts'].TestBed.job = self 1358 namespace['hosts'].factory.ssh_user = self._ssh_user 1359 namespace['hosts'].factory.ssh_port = self._ssh_port 1360 namespace['hosts'].factory.ssh_pass = self._ssh_pass 1361 namespace['hosts'].factory.ssh_verbosity_flag = ( 1362 self._ssh_verbosity_flag) 1363 namespace['hosts'].factory.ssh_options = self._ssh_options 1364 1365 1366 def _execute_code(self, code_file, namespace, protect=True): 1367 """ 1368 Execute code using a copy of namespace as a server control script. 1369 1370 Unless protect_namespace is explicitly set to False, the dict will not 1371 be modified. 1372 1373 Args: 1374 code_file: The filename of the control file to execute. 1375 namespace: A dict containing names to make available during execution. 1376 protect: Boolean. If True (the default) a copy of the namespace dict 1377 is used during execution to prevent the code from modifying its 1378 contents outside of this function. If False the raw dict is 1379 passed in and modifications will be allowed. 1380 """ 1381 if protect: 1382 namespace = namespace.copy() 1383 self._fill_server_control_namespace(namespace, protect=protect) 1384 # TODO: Simplify and get rid of the special cases for only 1 machine. 1385 if len(self.machines) > 1: 1386 machines_text = '\n'.join(self.machines) + '\n' 1387 # Only rewrite the file if it does not match our machine list. 1388 try: 1389 machines_f = open(MACHINES_FILENAME, 'r') 1390 existing_machines_text = machines_f.read() 1391 machines_f.close() 1392 except EnvironmentError: 1393 existing_machines_text = None 1394 if machines_text != existing_machines_text: 1395 utils.open_write_close(MACHINES_FILENAME, machines_text) 1396 execfile(code_file, namespace, namespace) 1397 1398 1399 def _parse_status(self, new_line): 1400 if self.fast and not self._using_parser: 1401 logging.info('Parsing lines in fast mode') 1402 new_tests = self.parser.process_lines([new_line]) 1403 for test in new_tests: 1404 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 1405 self.num_tests_failed += 1 1406 1407 if not self._using_parser: 1408 return 1409 1410 new_tests = self.parser.process_lines([new_line]) 1411 for test in new_tests: 1412 self.__insert_test(test) 1413 1414 1415 def __insert_test(self, test): 1416 """ 1417 An internal method to insert a new test result into the 1418 database. This method will not raise an exception, even if an 1419 error occurs during the insert, to avoid failing a test 1420 simply because of unexpected database issues.""" 1421 self.num_tests_run += 1 1422 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 1423 self.num_tests_failed += 1 1424 try: 1425 self.results_db.insert_test(self.job_model, test) 1426 except Exception: 1427 msg = ("WARNING: An unexpected error occured while " 1428 "inserting test results into the database. " 1429 "Ignoring error.\n" + traceback.format_exc()) 1430 print >> sys.stderr, msg 1431 1432 1433 def preprocess_client_state(self): 1434 """ 1435 Produce a state file for initializing the state of a client job. 1436 1437 Creates a new client state file with all the current server state, as 1438 well as some pre-set client state. 1439 1440 @returns The path of the file the state was written into. 1441 """ 1442 # initialize the sysinfo state 1443 self._state.set('client', 'sysinfo', self.sysinfo.serialize()) 1444 1445 # dump the state out to a tempfile 1446 fd, file_path = tempfile.mkstemp(dir=self.tmpdir) 1447 os.close(fd) 1448 1449 # write_to_file doesn't need locking, we exclusively own file_path 1450 self._state.write_to_file(file_path) 1451 return file_path 1452 1453 1454 def postprocess_client_state(self, state_path): 1455 """ 1456 Update the state of this job with the state from a client job. 1457 1458 Updates the state of the server side of a job with the final state 1459 of a client job that was run. Updates the non-client-specific state, 1460 pulls in some specific bits from the client-specific state, and then 1461 discards the rest. Removes the state file afterwards 1462 1463 @param state_file A path to the state file from the client. 1464 """ 1465 # update the on-disk state 1466 try: 1467 self._state.read_from_file(state_path) 1468 os.remove(state_path) 1469 except OSError, e: 1470 # ignore file-not-found errors 1471 if e.errno != errno.ENOENT: 1472 raise 1473 else: 1474 logging.debug('Client state file %s not found', state_path) 1475 1476 # update the sysinfo state 1477 if self._state.has('client', 'sysinfo'): 1478 self.sysinfo.deserialize(self._state.get('client', 'sysinfo')) 1479 1480 # drop all the client-specific state 1481 self._state.discard_namespace('client') 1482 1483 1484 def clear_all_known_hosts(self): 1485 """Clears known hosts files for all AbstractSSHHosts.""" 1486 for host in self.hosts: 1487 if isinstance(host, abstract_ssh.AbstractSSHHost): 1488 host.clear_known_hosts() 1489 1490 1491 def close(self): 1492 """Closes this job's operation.""" 1493 1494 # Use shallow copy, because host.close() internally discards itself. 1495 for host in list(self.hosts): 1496 host.close() 1497 assert not self.hosts 1498 self._connection_pool.shutdown() 1499 1500 1501 def _get_job_data(self): 1502 """Add custom data to the job keyval info. 1503 1504 When multiple machines are used in a job, change the hostname to 1505 the platform of the first machine instead of machine1,machine2,... This 1506 makes the job reports easier to read and keeps the tko_machines table from 1507 growing too large. 1508 1509 Returns: 1510 keyval dictionary with new hostname value, or empty dictionary. 1511 """ 1512 job_data = {} 1513 # Only modify hostname on multimachine jobs. Assume all host have the same 1514 # platform. 1515 if len(self.machines) > 1: 1516 # Search through machines for first machine with a platform. 1517 for host in self.machines: 1518 keyval_path = os.path.join(self.resultdir, 'host_keyvals', host) 1519 keyvals = utils.read_keyval(keyval_path) 1520 host_plat = keyvals.get('platform', None) 1521 if not host_plat: 1522 continue 1523 job_data['hostname'] = host_plat 1524 break 1525 return job_data 1526 1527 1528class warning_manager(object): 1529 """Class for controlling warning logs. Manages the enabling and disabling 1530 of warnings.""" 1531 def __init__(self): 1532 # a map of warning types to a list of disabled time intervals 1533 self.disabled_warnings = {} 1534 1535 1536 def is_valid(self, timestamp, warning_type): 1537 """Indicates if a warning (based on the time it occured and its type) 1538 is a valid warning. A warning is considered "invalid" if this type of 1539 warning was marked as "disabled" at the time the warning occured.""" 1540 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1541 for start, end in disabled_intervals: 1542 if timestamp >= start and (end is None or timestamp < end): 1543 return False 1544 return True 1545 1546 1547 def disable_warnings(self, warning_type, current_time_func=time.time): 1548 """As of now, disables all further warnings of this type.""" 1549 intervals = self.disabled_warnings.setdefault(warning_type, []) 1550 if not intervals or intervals[-1][1] is not None: 1551 intervals.append((int(current_time_func()), None)) 1552 1553 1554 def enable_warnings(self, warning_type, current_time_func=time.time): 1555 """As of now, enables all further warnings of this type.""" 1556 intervals = self.disabled_warnings.get(warning_type, []) 1557 if intervals and intervals[-1][1] is None: 1558 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1559 1560 1561def _is_current_server_job(test): 1562 """Return True if parsed test is the currently running job. 1563 1564 @param test: test instance from tko parser. 1565 """ 1566 return test.testname == 'SERVER_JOB' 1567 1568 1569def _create_afe_host(hostname): 1570 """Create an afe_host object backed by the AFE. 1571 1572 @param hostname: Name of the host for which we want the Host object. 1573 @returns: An object of type frontend.AFE 1574 """ 1575 afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10) 1576 hosts = afe.get_hosts(hostname=hostname) 1577 if not hosts: 1578 raise error.AutoservError('No hosts named %s found' % hostname) 1579 1580 return hosts[0] 1581 1582 1583def _create_host_info_store(hostname, workdir): 1584 """Create a CachingHostInfo store backed by the AFE. 1585 1586 @param hostname: Name of the host for which we want the store. 1587 @param workdir: A directory where any on-disk files related to the machines 1588 may be created. 1589 @returns: An object of type shadowing_store.ShadowingStore 1590 """ 1591 primary_store = afe_store.AfeStore(hostname) 1592 try: 1593 primary_store.get(force_refresh=True) 1594 except host_info.StoreError: 1595 raise error.AutoservError('Could not obtain HostInfo for hostname %s' % 1596 hostname) 1597 backing_file_path = _file_store_unique_file_path(workdir) 1598 logging.info('Shadowing AFE store with a FileStore at %s', 1599 backing_file_path) 1600 shadow_store = file_store.FileStore(backing_file_path) 1601 return shadowing_store.ShadowingStore(primary_store, shadow_store) 1602 1603 1604def _file_store_unique_file_path(workdir): 1605 """Returns a unique filepath for the on-disk FileStore. 1606 1607 Also makes sure that the workdir exists. 1608 1609 @param: Top level working directory. 1610 """ 1611 store_dir = os.path.join(workdir, 'host_info_store') 1612 _make_dirs_if_needed(store_dir) 1613 file_path = os.path.join(store_dir, 'store_%s' % uuid.uuid4()) 1614 return file_path 1615 1616 1617def _make_dirs_if_needed(path): 1618 """os.makedirs, but ignores failure because the leaf directory exists""" 1619 try: 1620 os.makedirs(path) 1621 except OSError as e: 1622 if e.errno != errno.EEXIST: 1623 raise 1624