1# pylint: disable-msg=C0111 2 3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6""" 7The main job wrapper for the server side. 8 9This is the core infrastructure. Derived from the client side job.py 10 11Copyright Martin J. Bligh, Andy Whitcroft 2007 12""" 13 14import errno 15import fcntl 16import getpass 17import itertools 18import logging 19import multiprocessing 20import os 21import pickle 22import platform 23import re 24import select 25import shutil 26import sys 27import tempfile 28import time 29import traceback 30import warnings 31 32from autotest_lib.client.bin import sysinfo 33from autotest_lib.client.common_lib import base_job 34from autotest_lib.client.common_lib import control_data 35from autotest_lib.client.common_lib import error 36from autotest_lib.client.common_lib import global_config 37from autotest_lib.client.common_lib import logging_manager 38from autotest_lib.client.common_lib import packages 39from autotest_lib.client.common_lib import utils 40from autotest_lib.server import profilers 41from autotest_lib.server import site_gtest_runner 42from autotest_lib.server import site_server_job_utils 43from autotest_lib.server import subcommand 44from autotest_lib.server import test 45from autotest_lib.server import utils as server_utils 46from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 47from autotest_lib.server.hosts import abstract_ssh 48from autotest_lib.server.hosts import afe_store 49from autotest_lib.server.hosts import factory as host_factory 50from autotest_lib.server.hosts import host_info 51from autotest_lib.server.hosts import ssh_multiplex 52from autotest_lib.tko import db as tko_db 53from autotest_lib.tko import models as tko_models 54from autotest_lib.tko import status_lib 55from autotest_lib.tko import parser_lib 56from autotest_lib.tko import utils as tko_utils 57 58 59INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value( 60 'autoserv', 'incremental_tko_parsing', type=bool, default=False) 61 62def _control_segment_path(name): 63 """Get the pathname of the named control segment file.""" 64 server_dir = os.path.dirname(os.path.abspath(__file__)) 65 return os.path.join(server_dir, "control_segments", name) 66 67 68CLIENT_CONTROL_FILENAME = 'control' 69SERVER_CONTROL_FILENAME = 'control.srv' 70MACHINES_FILENAME = '.machines' 71 72CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 73CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 74CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 75INSTALL_CONTROL_FILE = _control_segment_path('install') 76CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 77VERIFY_CONTROL_FILE = _control_segment_path('verify') 78REPAIR_CONTROL_FILE = _control_segment_path('repair') 79PROVISION_CONTROL_FILE = _control_segment_path('provision') 80VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url') 81RESET_CONTROL_FILE = _control_segment_path('reset') 82GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats') 83 84 85def get_machine_dicts(machine_names, in_lab, host_attributes=None, 86 connection_pool=None): 87 """Converts a list of machine names to list of dicts. 88 89 @param machine_names: A list of machine names. 90 @param in_lab: A boolean indicating whether we're running in lab. 91 @param host_attributes: Optional list of host attributes to add for each 92 host. 93 @param connection_pool: ssh_multiplex.ConnectionPool instance to share 94 master connections across control scripts. 95 @returns: A list of dicts. Each dict has the following keys: 96 'hostname': Name of the machine originally in machine_names (str). 97 'afe_host': A frontend.Host object for the machine, or a stub if 98 in_lab is false. 99 'host_info_store': A host_info.CachingHostInfoStore object to obtain 100 host information. A stub if in_lab is False. 101 'connection_pool': ssh_multiplex.ConnectionPool instance to share 102 master ssh connection across control scripts. 103 """ 104 machine_dict_list = [] 105 for machine in machine_names: 106 # See autoserv_parser.parse_args. Only one of in_lab or host_attributes 107 # can be provided. 108 if not in_lab: 109 afe_host = server_utils.EmptyAFEHost() 110 host_info_store = host_info.InMemoryHostInfoStore() 111 if host_attributes is not None: 112 afe_host.attributes.update(host_attributes) 113 info = host_info.HostInfo(attributes=host_attributes) 114 host_info_store.commit(info) 115 elif host_attributes: 116 raise error.AutoservError( 117 'in_lab and host_attribute are mutually exclusive. ' 118 'Obtained in_lab:%s, host_attributes:%s' 119 % (in_lab, host_attributes)) 120 else: 121 afe_host = _create_afe_host(machine) 122 host_info_store = _create_host_info_store(machine) 123 124 machine_dict_list.append({ 125 'hostname' : machine, 126 'afe_host' : afe_host, 127 'host_info_store': host_info_store, 128 'connection_pool': connection_pool, 129 }) 130 131 return machine_dict_list 132 133 134class status_indenter(base_job.status_indenter): 135 """Provide a simple integer-backed status indenter.""" 136 def __init__(self): 137 self._indent = 0 138 139 140 @property 141 def indent(self): 142 return self._indent 143 144 145 def increment(self): 146 self._indent += 1 147 148 149 def decrement(self): 150 self._indent -= 1 151 152 153 def get_context(self): 154 """Returns a context object for use by job.get_record_context.""" 155 class context(object): 156 def __init__(self, indenter, indent): 157 self._indenter = indenter 158 self._indent = indent 159 def restore(self): 160 self._indenter._indent = self._indent 161 return context(self, self._indent) 162 163 164class server_job_record_hook(object): 165 """The job.record hook for server job. Used to inject WARN messages from 166 the console or vlm whenever new logs are written, and to echo any logs 167 to INFO level logging. Implemented as a class so that it can use state to 168 block recursive calls, so that the hook can call job.record itself to 169 log WARN messages. 170 171 Depends on job._read_warnings and job._logger. 172 """ 173 def __init__(self, job): 174 self._job = job 175 self._being_called = False 176 177 178 def __call__(self, entry): 179 """A wrapper around the 'real' record hook, the _hook method, which 180 prevents recursion. This isn't making any effort to be threadsafe, 181 the intent is to outright block infinite recursion via a 182 job.record->_hook->job.record->_hook->job.record... chain.""" 183 if self._being_called: 184 return 185 self._being_called = True 186 try: 187 self._hook(self._job, entry) 188 finally: 189 self._being_called = False 190 191 192 @staticmethod 193 def _hook(job, entry): 194 """The core hook, which can safely call job.record.""" 195 entries = [] 196 # poll all our warning loggers for new warnings 197 for timestamp, msg in job._read_warnings(): 198 warning_entry = base_job.status_log_entry( 199 'WARN', None, None, msg, {}, timestamp=timestamp) 200 entries.append(warning_entry) 201 job.record_entry(warning_entry) 202 # echo rendered versions of all the status logs to info 203 entries.append(entry) 204 for entry in entries: 205 rendered_entry = job._logger.render_entry(entry) 206 logging.info(rendered_entry) 207 job._parse_status(rendered_entry) 208 209 210class server_job(base_job.base_job): 211 """The server-side concrete implementation of base_job. 212 213 Optional properties provided by this implementation: 214 serverdir 215 216 num_tests_run 217 num_tests_failed 218 219 warning_manager 220 warning_loggers 221 """ 222 223 _STATUS_VERSION = 1 224 225 # TODO crbug.com/285395 eliminate ssh_verbosity_flag 226 def __init__(self, control, args, resultdir, label, user, machines, 227 client=False, parse_job='', 228 ssh_user=host_factory.DEFAULT_SSH_USER, 229 ssh_port=host_factory.DEFAULT_SSH_PORT, 230 ssh_pass=host_factory.DEFAULT_SSH_PASS, 231 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY, 232 ssh_options=host_factory.DEFAULT_SSH_OPTIONS, 233 test_retry=0, group_name='', 234 tag='', disable_sysinfo=False, 235 control_filename=SERVER_CONTROL_FILENAME, 236 parent_job_id=None, host_attributes=None, in_lab=False): 237 """ 238 Create a server side job object. 239 240 @param control: The pathname of the control file. 241 @param args: Passed to the control file. 242 @param resultdir: Where to throw the results. 243 @param label: Description of the job. 244 @param user: Username for the job (email address). 245 @param client: True if this is a client-side control file. 246 @param parse_job: string, if supplied it is the job execution tag that 247 the results will be passed through to the TKO parser with. 248 @param ssh_user: The SSH username. [root] 249 @param ssh_port: The SSH port number. [22] 250 @param ssh_pass: The SSH passphrase, if needed. 251 @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv', 252 '-vvv', or an empty string if not needed. 253 @param ssh_options: A string giving additional options that will be 254 included in ssh commands. 255 @param test_retry: The number of times to retry a test if the test did 256 not complete successfully. 257 @param group_name: If supplied, this will be written out as 258 host_group_name in the keyvals file for the parser. 259 @param tag: The job execution tag from the scheduler. [optional] 260 @param disable_sysinfo: Whether we should disable the sysinfo step of 261 tests for a modest shortening of test time. [optional] 262 @param control_filename: The filename where the server control file 263 should be written in the results directory. 264 @param parent_job_id: Job ID of the parent job. Default to None if the 265 job does not have a parent job. 266 @param host_attributes: Dict of host attributes passed into autoserv 267 via the command line. If specified here, these 268 attributes will apply to all machines. 269 @param in_lab: Boolean that indicates if this is running in the lab 270 environment. 271 """ 272 super(server_job, self).__init__(resultdir=resultdir, 273 test_retry=test_retry) 274 self.test_retry = test_retry 275 self.control = control 276 self._uncollected_log_file = os.path.join(self.resultdir, 277 'uncollected_logs') 278 debugdir = os.path.join(self.resultdir, 'debug') 279 if not os.path.exists(debugdir): 280 os.mkdir(debugdir) 281 282 if user: 283 self.user = user 284 else: 285 self.user = getpass.getuser() 286 287 self.args = args 288 self.label = label 289 self.machines = machines 290 self._client = client 291 self.warning_loggers = set() 292 self.warning_manager = warning_manager() 293 self._ssh_user = ssh_user 294 self._ssh_port = ssh_port 295 self._ssh_pass = ssh_pass 296 self._ssh_verbosity_flag = ssh_verbosity_flag 297 self._ssh_options = ssh_options 298 self.tag = tag 299 self.hosts = set() 300 self.drop_caches = False 301 self.drop_caches_between_iterations = False 302 self._control_filename = control_filename 303 self._disable_sysinfo = disable_sysinfo 304 305 self.logging = logging_manager.get_logging_manager( 306 manage_stdout_and_stderr=True, redirect_fds=True) 307 subcommand.logging_manager_object = self.logging 308 309 self.sysinfo = sysinfo.sysinfo(self.resultdir) 310 self.profilers = profilers.profilers(self) 311 312 job_data = {'label' : label, 'user' : user, 313 'hostname' : ','.join(machines), 314 'drone' : platform.node(), 315 'status_version' : str(self._STATUS_VERSION), 316 'job_started' : str(int(time.time()))} 317 # Save parent job id to keyvals, so parser can retrieve the info and 318 # write to tko_jobs record. 319 if parent_job_id: 320 job_data['parent_job_id'] = parent_job_id 321 if group_name: 322 job_data['host_group_name'] = group_name 323 324 # only write these keyvals out on the first job in a resultdir 325 if 'job_started' not in utils.read_keyval(self.resultdir): 326 job_data.update(self._get_job_data()) 327 utils.write_keyval(self.resultdir, job_data) 328 329 self._parse_job = parse_job 330 self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job 331 and len(machines) <= 1) 332 self.pkgmgr = packages.PackageManager( 333 self.autodir, run_function_dargs={'timeout':600}) 334 self.num_tests_run = 0 335 self.num_tests_failed = 0 336 337 self._register_subcommand_hooks() 338 339 # set up the status logger 340 self._indenter = status_indenter() 341 self._logger = base_job.status_logger( 342 self, self._indenter, 'status.log', 'status.log', 343 record_hook=server_job_record_hook(self)) 344 345 # Initialize a flag to indicate DUT failure during the test, e.g., 346 # unexpected reboot. 347 self.failed_with_device_error = False 348 349 self._connection_pool = ssh_multiplex.ConnectionPool() 350 351 self.parent_job_id = parent_job_id 352 self.in_lab = in_lab 353 self.machine_dict_list = get_machine_dicts( 354 self.machines, self.in_lab, host_attributes, 355 self._connection_pool) 356 357 # TODO(jrbarnette) The harness attribute is only relevant to 358 # client jobs, but it's required to be present, or we will fail 359 # server job unit tests. Yes, really. 360 # 361 # TODO(jrbarnette) The utility of the 'harness' attribute even 362 # to client jobs is suspect. Probably, we should remove it. 363 self.harness = None 364 365 if control: 366 self.max_result_size_KB = control_data.parse_control( 367 control, raise_warnings=False).max_result_size_KB 368 else: 369 # Set the maximum result size to be the default specified in 370 # global config, if the job has no control file associated. 371 self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB 372 373 374 @classmethod 375 def _find_base_directories(cls): 376 """ 377 Determine locations of autodir, clientdir and serverdir. Assumes 378 that this file is located within serverdir and uses __file__ along 379 with relative paths to resolve the location. 380 """ 381 serverdir = os.path.abspath(os.path.dirname(__file__)) 382 autodir = os.path.normpath(os.path.join(serverdir, '..')) 383 clientdir = os.path.join(autodir, 'client') 384 return autodir, clientdir, serverdir 385 386 387 def _find_resultdir(self, resultdir, *args, **dargs): 388 """ 389 Determine the location of resultdir. For server jobs we expect one to 390 always be explicitly passed in to __init__, so just return that. 391 """ 392 if resultdir: 393 return os.path.normpath(resultdir) 394 else: 395 return None 396 397 398 def _get_status_logger(self): 399 """Return a reference to the status logger.""" 400 return self._logger 401 402 403 @staticmethod 404 def _load_control_file(path): 405 f = open(path) 406 try: 407 control_file = f.read() 408 finally: 409 f.close() 410 return re.sub('\r', '', control_file) 411 412 413 def _register_subcommand_hooks(self): 414 """ 415 Register some hooks into the subcommand modules that allow us 416 to properly clean up self.hosts created in forked subprocesses. 417 """ 418 def on_fork(cmd): 419 self._existing_hosts_on_fork = set(self.hosts) 420 def on_join(cmd): 421 new_hosts = self.hosts - self._existing_hosts_on_fork 422 for host in new_hosts: 423 host.close() 424 subcommand.subcommand.register_fork_hook(on_fork) 425 subcommand.subcommand.register_join_hook(on_join) 426 427 428 def init_parser(self): 429 """ 430 Start the continuous parsing of self.resultdir. This sets up 431 the database connection and inserts the basic job object into 432 the database if necessary. 433 """ 434 if not self._using_parser: 435 return 436 # redirect parser debugging to .parse.log 437 parse_log = os.path.join(self.resultdir, '.parse.log') 438 parse_log = open(parse_log, 'w', 0) 439 tko_utils.redirect_parser_debugging(parse_log) 440 # create a job model object and set up the db 441 self.results_db = tko_db.db(autocommit=True) 442 self.parser = parser_lib.parser(self._STATUS_VERSION) 443 self.job_model = self.parser.make_job(self.resultdir) 444 self.parser.start(self.job_model) 445 # check if a job already exists in the db and insert it if 446 # it does not 447 job_idx = self.results_db.find_job(self._parse_job) 448 if job_idx is None: 449 self.results_db.insert_job(self._parse_job, self.job_model, 450 self.parent_job_id) 451 else: 452 machine_idx = self.results_db.lookup_machine(self.job_model.machine) 453 self.job_model.index = job_idx 454 self.job_model.machine_idx = machine_idx 455 456 457 def cleanup_parser(self): 458 """ 459 This should be called after the server job is finished 460 to carry out any remaining cleanup (e.g. flushing any 461 remaining test results to the results db) 462 """ 463 if not self._using_parser: 464 return 465 final_tests = self.parser.end() 466 for test in final_tests: 467 self.__insert_test(test) 468 self._using_parser = False 469 470 # TODO crbug.com/285395 add a kwargs parameter. 471 def _make_namespace(self): 472 """Create a namespace dictionary to be passed along to control file. 473 474 Creates a namespace argument populated with standard values: 475 machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag, 476 and ssh_options. 477 """ 478 namespace = {'machines' : self.machine_dict_list, 479 'job' : self, 480 'ssh_user' : self._ssh_user, 481 'ssh_port' : self._ssh_port, 482 'ssh_pass' : self._ssh_pass, 483 'ssh_verbosity_flag' : self._ssh_verbosity_flag, 484 'ssh_options' : self._ssh_options} 485 return namespace 486 487 488 def cleanup(self, labels): 489 """Cleanup machines. 490 491 @param labels: Comma separated job labels, will be used to 492 determine special task actions. 493 """ 494 if not self.machines: 495 raise error.AutoservError('No machines specified to cleanup') 496 if self.resultdir: 497 os.chdir(self.resultdir) 498 499 namespace = self._make_namespace() 500 namespace.update({'job_labels': labels, 'args': ''}) 501 self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False) 502 503 504 def verify(self, labels): 505 """Verify machines are all ssh-able. 506 507 @param labels: Comma separated job labels, will be used to 508 determine special task actions. 509 """ 510 if not self.machines: 511 raise error.AutoservError('No machines specified to verify') 512 if self.resultdir: 513 os.chdir(self.resultdir) 514 515 namespace = self._make_namespace() 516 namespace.update({'job_labels': labels, 'args': ''}) 517 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 518 519 520 def reset(self, labels): 521 """Reset machines by first cleanup then verify each machine. 522 523 @param labels: Comma separated job labels, will be used to 524 determine special task actions. 525 """ 526 if not self.machines: 527 raise error.AutoservError('No machines specified to reset.') 528 if self.resultdir: 529 os.chdir(self.resultdir) 530 531 namespace = self._make_namespace() 532 namespace.update({'job_labels': labels, 'args': ''}) 533 self._execute_code(RESET_CONTROL_FILE, namespace, protect=False) 534 535 536 def repair(self, labels): 537 """Repair machines. 538 539 @param labels: Comma separated job labels, will be used to 540 determine special task actions. 541 """ 542 if not self.machines: 543 raise error.AutoservError('No machines specified to repair') 544 if self.resultdir: 545 os.chdir(self.resultdir) 546 547 namespace = self._make_namespace() 548 namespace.update({'job_labels': labels, 'args': ''}) 549 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 550 551 552 def provision(self, labels): 553 """ 554 Provision all hosts to match |labels|. 555 556 @param labels: A comma seperated string of labels to provision the 557 host to. 558 559 """ 560 control = self._load_control_file(PROVISION_CONTROL_FILE) 561 self.run(control=control, job_labels=labels) 562 563 564 def precheck(self): 565 """ 566 perform any additional checks in derived classes. 567 """ 568 pass 569 570 571 def enable_external_logging(self): 572 """ 573 Start or restart external logging mechanism. 574 """ 575 pass 576 577 578 def disable_external_logging(self): 579 """ 580 Pause or stop external logging mechanism. 581 """ 582 pass 583 584 585 def use_external_logging(self): 586 """ 587 Return True if external logging should be used. 588 """ 589 return False 590 591 592 def _make_parallel_wrapper(self, function, machines, log): 593 """Wrap function as appropriate for calling by parallel_simple.""" 594 # machines could be a list of dictionaries, e.g., 595 # [{'host_attributes': {}, 'hostname': '100.96.51.226'}] 596 # The dictionary is generated in server_job.__init__, refer to 597 # variable machine_dict_list, then passed in with namespace, see method 598 # server_job._make_namespace. 599 # To compare the machinese to self.machines, which is a list of machine 600 # hostname, we need to convert machines back to a list of hostnames. 601 # Note that the order of hostnames doesn't matter, as is_forking will be 602 # True if there are more than one machine. 603 if (machines and isinstance(machines, list) 604 and isinstance(machines[0], dict)): 605 machines = [m['hostname'] for m in machines] 606 is_forking = not (len(machines) == 1 and self.machines == machines) 607 if self._parse_job and is_forking and log: 608 def wrapper(machine): 609 hostname = server_utils.get_hostname_from_machine(machine) 610 self._parse_job += "/" + hostname 611 self._using_parser = INCREMENTAL_TKO_PARSING 612 self.machines = [machine] 613 self.push_execution_context(hostname) 614 os.chdir(self.resultdir) 615 utils.write_keyval(self.resultdir, {"hostname": hostname}) 616 self.init_parser() 617 result = function(machine) 618 self.cleanup_parser() 619 return result 620 elif len(machines) > 1 and log: 621 def wrapper(machine): 622 hostname = server_utils.get_hostname_from_machine(machine) 623 self.push_execution_context(hostname) 624 os.chdir(self.resultdir) 625 machine_data = {'hostname' : hostname, 626 'status_version' : str(self._STATUS_VERSION)} 627 utils.write_keyval(self.resultdir, machine_data) 628 result = function(machine) 629 return result 630 else: 631 wrapper = function 632 return wrapper 633 634 635 def parallel_simple(self, function, machines, log=True, timeout=None, 636 return_results=False): 637 """ 638 Run 'function' using parallel_simple, with an extra wrapper to handle 639 the necessary setup for continuous parsing, if possible. If continuous 640 parsing is already properly initialized then this should just work. 641 642 @param function: A callable to run in parallel given each machine. 643 @param machines: A list of machine names to be passed one per subcommand 644 invocation of function. 645 @param log: If True, output will be written to output in a subdirectory 646 named after each machine. 647 @param timeout: Seconds after which the function call should timeout. 648 @param return_results: If True instead of an AutoServError being raised 649 on any error a list of the results|exceptions from the function 650 called on each arg is returned. [default: False] 651 652 @raises error.AutotestError: If any of the functions failed. 653 """ 654 wrapper = self._make_parallel_wrapper(function, machines, log) 655 return subcommand.parallel_simple(wrapper, machines, 656 log=log, timeout=timeout, 657 return_results=return_results) 658 659 660 def parallel_on_machines(self, function, machines, timeout=None): 661 """ 662 @param function: Called in parallel with one machine as its argument. 663 @param machines: A list of machines to call function(machine) on. 664 @param timeout: Seconds after which the function call should timeout. 665 666 @returns A list of machines on which function(machine) returned 667 without raising an exception. 668 """ 669 results = self.parallel_simple(function, machines, timeout=timeout, 670 return_results=True) 671 success_machines = [] 672 for result, machine in itertools.izip(results, machines): 673 if not isinstance(result, Exception): 674 success_machines.append(machine) 675 return success_machines 676 677 678 def distribute_across_machines(self, tests, machines, 679 continuous_parsing=False): 680 """Run each test in tests once using machines. 681 682 Instead of running each test on each machine like parallel_on_machines, 683 run each test once across all machines. Put another way, the total 684 number of tests run by parallel_on_machines is len(tests) * 685 len(machines). The number of tests run by distribute_across_machines is 686 len(tests). 687 688 Args: 689 tests: List of tests to run. 690 machines: List of machines to use. 691 continuous_parsing: Bool, if true parse job while running. 692 """ 693 # The Queue is thread safe, but since a machine may have to search 694 # through the queue to find a valid test the lock provides exclusive 695 # queue access for more than just the get call. 696 test_queue = multiprocessing.JoinableQueue() 697 test_queue_lock = multiprocessing.Lock() 698 699 unique_machine_attributes = [] 700 sub_commands = [] 701 work_dir = self.resultdir 702 703 for machine in machines: 704 if 'group' in self.resultdir: 705 work_dir = os.path.join(self.resultdir, machine) 706 707 mw = site_server_job_utils.machine_worker(self, 708 machine, 709 work_dir, 710 test_queue, 711 test_queue_lock, 712 continuous_parsing) 713 714 # Create the subcommand instance to run this machine worker. 715 sub_commands.append(subcommand.subcommand(mw.run, 716 [], 717 work_dir)) 718 719 # To (potentially) speed up searching for valid tests create a list 720 # of unique attribute sets present in the machines for this job. If 721 # sets were hashable we could just use a dictionary for fast 722 # verification. This at least reduces the search space from the 723 # number of machines to the number of unique machines. 724 if not mw.attribute_set in unique_machine_attributes: 725 unique_machine_attributes.append(mw.attribute_set) 726 727 # Only queue tests which are valid on at least one machine. Record 728 # skipped tests in the status.log file using record_skipped_test(). 729 for test_entry in tests: 730 # Check if it's an old style test entry. 731 if len(test_entry) > 2 and not isinstance(test_entry[2], dict): 732 test_attribs = {'include': test_entry[2]} 733 if len(test_entry) > 3: 734 test_attribs['exclude'] = test_entry[3] 735 if len(test_entry) > 4: 736 test_attribs['attributes'] = test_entry[4] 737 738 test_entry = list(test_entry[:2]) 739 test_entry.append(test_attribs) 740 741 ti = site_server_job_utils.test_item(*test_entry) 742 machine_found = False 743 for ma in unique_machine_attributes: 744 if ti.validate(ma): 745 test_queue.put(ti) 746 machine_found = True 747 break 748 if not machine_found: 749 self.record_skipped_test(ti) 750 751 # Run valid tests and wait for completion. 752 subcommand.parallel(sub_commands) 753 754 755 def record_skipped_test(self, skipped_test, message=None): 756 """Insert a failure record into status.log for this test.""" 757 msg = message 758 if msg is None: 759 msg = 'No valid machines found for test %s.' % skipped_test 760 logging.info(msg) 761 self.record('START', None, skipped_test.test_name) 762 self.record('INFO', None, skipped_test.test_name, msg) 763 self.record('END TEST_NA', None, skipped_test.test_name, msg) 764 765 766 def _has_failed_tests(self): 767 """Parse status log for failed tests. 768 769 This checks the current working directory and is intended only for use 770 by the run() method. 771 772 @return boolean 773 """ 774 path = os.getcwd() 775 776 # TODO(ayatane): Copied from tko/parse.py. Needs extensive refactor to 777 # make code reuse plausible. 778 job_keyval = tko_models.job.read_keyval(path) 779 status_version = job_keyval.get("status_version", 0) 780 781 # parse out the job 782 parser = parser_lib.parser(status_version) 783 job = parser.make_job(path) 784 status_log = os.path.join(path, "status.log") 785 if not os.path.exists(status_log): 786 status_log = os.path.join(path, "status") 787 if not os.path.exists(status_log): 788 logging.warning("! Unable to parse job, no status file") 789 return True 790 791 # parse the status logs 792 status_lines = open(status_log).readlines() 793 parser.start(job) 794 tests = parser.end(status_lines) 795 796 # parser.end can return the same object multiple times, so filter out 797 # dups 798 job.tests = [] 799 already_added = set() 800 for test in tests: 801 if test not in already_added: 802 already_added.add(test) 803 job.tests.append(test) 804 805 failed = False 806 for test in job.tests: 807 # The current job is still running and shouldn't count as failed. 808 # The parser will fail to parse the exit status of the job since it 809 # hasn't exited yet (this running right now is the job). 810 failed = failed or (test.status != 'GOOD' 811 and not _is_current_server_job(test)) 812 return failed 813 814 815 def _collect_crashes(self, namespace, collect_crashinfo): 816 """Collect crashes. 817 818 @param namespace: namespace dict. 819 @param collect_crashinfo: whether to collect crashinfo in addition to 820 dumps 821 """ 822 if collect_crashinfo: 823 # includes crashdumps 824 crash_control_file = CRASHINFO_CONTROL_FILE 825 else: 826 crash_control_file = CRASHDUMPS_CONTROL_FILE 827 self._execute_code(crash_control_file, namespace) 828 829 830 _USE_TEMP_DIR = object() 831 def run(self, install_before=False, install_after=False, 832 collect_crashdumps=True, namespace={}, control=None, 833 control_file_dir=None, verify_job_repo_url=False, 834 only_collect_crashinfo=False, skip_crash_collection=False, 835 job_labels='', use_packaging=True): 836 # for a normal job, make sure the uncollected logs file exists 837 # for a crashinfo-only run it should already exist, bail out otherwise 838 created_uncollected_logs = False 839 logging.info("I am PID %s", os.getpid()) 840 if self.resultdir and not os.path.exists(self._uncollected_log_file): 841 if only_collect_crashinfo: 842 # if this is a crashinfo-only run, and there were no existing 843 # uncollected logs, just bail out early 844 logging.info("No existing uncollected logs, " 845 "skipping crashinfo collection") 846 return 847 else: 848 log_file = open(self._uncollected_log_file, "w") 849 pickle.dump([], log_file) 850 log_file.close() 851 created_uncollected_logs = True 852 853 # use a copy so changes don't affect the original dictionary 854 namespace = namespace.copy() 855 machines = self.machines 856 if control is None: 857 if self.control is None: 858 control = '' 859 else: 860 control = self._load_control_file(self.control) 861 if control_file_dir is None: 862 control_file_dir = self.resultdir 863 864 self.aborted = False 865 namespace.update(self._make_namespace()) 866 namespace.update({ 867 'args': self.args, 868 'job_labels': job_labels, 869 'gtest_runner': site_gtest_runner.gtest_runner(), 870 }) 871 test_start_time = int(time.time()) 872 873 if self.resultdir: 874 os.chdir(self.resultdir) 875 # touch status.log so that the parser knows a job is running here 876 open(self.get_status_log_path(), 'a').close() 877 self.enable_external_logging() 878 879 collect_crashinfo = True 880 temp_control_file_dir = None 881 try: 882 try: 883 namespace['network_stats_label'] = 'at-start' 884 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace) 885 if install_before and machines: 886 self._execute_code(INSTALL_CONTROL_FILE, namespace) 887 888 if only_collect_crashinfo: 889 return 890 891 # If the verify_job_repo_url option is set but we're unable 892 # to actually verify that the job_repo_url contains the autotest 893 # package, this job will fail. 894 if verify_job_repo_url: 895 self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE, 896 namespace) 897 else: 898 logging.warning('Not checking if job_repo_url contains ' 899 'autotest packages on %s', machines) 900 901 # determine the dir to write the control files to 902 cfd_specified = (control_file_dir 903 and control_file_dir is not self._USE_TEMP_DIR) 904 if cfd_specified: 905 temp_control_file_dir = None 906 else: 907 temp_control_file_dir = tempfile.mkdtemp( 908 suffix='temp_control_file_dir') 909 control_file_dir = temp_control_file_dir 910 server_control_file = os.path.join(control_file_dir, 911 self._control_filename) 912 client_control_file = os.path.join(control_file_dir, 913 CLIENT_CONTROL_FILENAME) 914 if self._client: 915 namespace['control'] = control 916 utils.open_write_close(client_control_file, control) 917 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 918 server_control_file) 919 else: 920 utils.open_write_close(server_control_file, control) 921 922 logging.info("Processing control file") 923 namespace['use_packaging'] = use_packaging 924 self._execute_code(server_control_file, namespace) 925 logging.info("Finished processing control file") 926 927 # If no device error occured, no need to collect crashinfo. 928 collect_crashinfo = self.failed_with_device_error 929 except Exception as e: 930 try: 931 logging.exception( 932 'Exception escaped control file, job aborting:') 933 reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX, 934 ' ', str(e)) 935 self.record('INFO', None, None, str(e), 936 {'job_abort_reason': reason}) 937 except: 938 pass # don't let logging exceptions here interfere 939 raise 940 finally: 941 if temp_control_file_dir: 942 # Clean up temp directory used for copies of the control files 943 try: 944 shutil.rmtree(temp_control_file_dir) 945 except Exception as e: 946 logging.warning('Could not remove temp directory %s: %s', 947 temp_control_file_dir, e) 948 949 if machines and (collect_crashdumps or collect_crashinfo): 950 if skip_crash_collection: 951 logging.info('Skipping crash dump/info collection ' 952 'as requested.') 953 else: 954 namespace['test_start_time'] = test_start_time 955 # Remove crash files for passing tests. 956 # TODO(ayatane): Tests that create crash files should be 957 # reported. 958 namespace['has_failed_tests'] = self._has_failed_tests() 959 self._collect_crashes(namespace, collect_crashinfo) 960 self.disable_external_logging() 961 if self._uncollected_log_file and created_uncollected_logs: 962 os.remove(self._uncollected_log_file) 963 if install_after and machines: 964 self._execute_code(INSTALL_CONTROL_FILE, namespace) 965 namespace['network_stats_label'] = 'at-end' 966 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace) 967 968 969 def run_test(self, url, *args, **dargs): 970 """ 971 Summon a test object and run it. 972 973 tag 974 tag to add to testname 975 url 976 url of the test to run 977 """ 978 if self._disable_sysinfo: 979 dargs['disable_sysinfo'] = True 980 981 group, testname = self.pkgmgr.get_package_name(url, 'test') 982 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 983 outputdir = self._make_test_outputdir(subdir) 984 985 def group_func(): 986 try: 987 test.runtest(self, url, tag, args, dargs) 988 except error.TestBaseException as e: 989 self.record(e.exit_status, subdir, testname, str(e)) 990 raise 991 except Exception as e: 992 info = str(e) + "\n" + traceback.format_exc() 993 self.record('FAIL', subdir, testname, info) 994 raise 995 else: 996 self.record('GOOD', subdir, testname, 'completed successfully') 997 998 try: 999 result = self._run_group(testname, subdir, group_func) 1000 except error.TestBaseException as e: 1001 return False 1002 else: 1003 return True 1004 1005 1006 def _run_group(self, name, subdir, function, *args, **dargs): 1007 """Underlying method for running something inside of a group.""" 1008 result, exc_info = None, None 1009 try: 1010 self.record('START', subdir, name) 1011 result = function(*args, **dargs) 1012 except error.TestBaseException as e: 1013 self.record("END %s" % e.exit_status, subdir, name) 1014 raise 1015 except Exception as e: 1016 err_msg = str(e) + '\n' 1017 err_msg += traceback.format_exc() 1018 self.record('END ABORT', subdir, name, err_msg) 1019 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 1020 else: 1021 self.record('END GOOD', subdir, name) 1022 1023 return result 1024 1025 1026 def run_group(self, function, *args, **dargs): 1027 """\ 1028 @param function: subroutine to run 1029 @returns: (result, exc_info). When the call succeeds, result contains 1030 the return value of |function| and exc_info is None. If 1031 |function| raises an exception, exc_info contains the tuple 1032 returned by sys.exc_info(), and result is None. 1033 """ 1034 1035 name = function.__name__ 1036 # Allow the tag for the group to be specified. 1037 tag = dargs.pop('tag', None) 1038 if tag: 1039 name = tag 1040 1041 try: 1042 result = self._run_group(name, None, function, *args, **dargs)[0] 1043 except error.TestBaseException: 1044 return None, sys.exc_info() 1045 return result, None 1046 1047 1048 def run_op(self, op, op_func, get_kernel_func): 1049 """\ 1050 A specialization of run_group meant specifically for handling 1051 management operation. Includes support for capturing the kernel version 1052 after the operation. 1053 1054 Args: 1055 op: name of the operation. 1056 op_func: a function that carries out the operation (reboot, suspend) 1057 get_kernel_func: a function that returns a string 1058 representing the kernel version. 1059 """ 1060 try: 1061 self.record('START', None, op) 1062 op_func() 1063 except Exception as e: 1064 err_msg = str(e) + '\n' + traceback.format_exc() 1065 self.record('END FAIL', None, op, err_msg) 1066 raise 1067 else: 1068 kernel = get_kernel_func() 1069 self.record('END GOOD', None, op, 1070 optional_fields={"kernel": kernel}) 1071 1072 1073 def run_control(self, path): 1074 """Execute a control file found at path (relative to the autotest 1075 path). Intended for executing a control file within a control file, 1076 not for running the top-level job control file.""" 1077 path = os.path.join(self.autodir, path) 1078 control_file = self._load_control_file(path) 1079 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR) 1080 1081 1082 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1083 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1084 on_every_test) 1085 1086 1087 def add_sysinfo_logfile(self, file, on_every_test=False): 1088 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1089 1090 1091 def _add_sysinfo_loggable(self, loggable, on_every_test): 1092 if on_every_test: 1093 self.sysinfo.test_loggables.add(loggable) 1094 else: 1095 self.sysinfo.boot_loggables.add(loggable) 1096 1097 1098 def _read_warnings(self): 1099 """Poll all the warning loggers and extract any new warnings that have 1100 been logged. If the warnings belong to a category that is currently 1101 disabled, this method will discard them and they will no longer be 1102 retrievable. 1103 1104 Returns a list of (timestamp, message) tuples, where timestamp is an 1105 integer epoch timestamp.""" 1106 warnings = [] 1107 while True: 1108 # pull in a line of output from every logger that has 1109 # output ready to be read 1110 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 1111 closed_loggers = set() 1112 for logger in loggers: 1113 line = logger.readline() 1114 # record any broken pipes (aka line == empty) 1115 if len(line) == 0: 1116 closed_loggers.add(logger) 1117 continue 1118 # parse out the warning 1119 timestamp, msgtype, msg = line.split('\t', 2) 1120 timestamp = int(timestamp) 1121 # if the warning is valid, add it to the results 1122 if self.warning_manager.is_valid(timestamp, msgtype): 1123 warnings.append((timestamp, msg.strip())) 1124 1125 # stop listening to loggers that are closed 1126 self.warning_loggers -= closed_loggers 1127 1128 # stop if none of the loggers have any output left 1129 if not loggers: 1130 break 1131 1132 # sort into timestamp order 1133 warnings.sort() 1134 return warnings 1135 1136 1137 def _unique_subdirectory(self, base_subdirectory_name): 1138 """Compute a unique results subdirectory based on the given name. 1139 1140 Appends base_subdirectory_name with a number as necessary to find a 1141 directory name that doesn't already exist. 1142 """ 1143 subdirectory = base_subdirectory_name 1144 counter = 1 1145 while os.path.exists(os.path.join(self.resultdir, subdirectory)): 1146 subdirectory = base_subdirectory_name + '.' + str(counter) 1147 counter += 1 1148 return subdirectory 1149 1150 1151 def get_record_context(self): 1152 """Returns an object representing the current job.record context. 1153 1154 The object returned is an opaque object with a 0-arg restore method 1155 which can be called to restore the job.record context (i.e. indentation) 1156 to the current level. The intention is that it should be used when 1157 something external which generate job.record calls (e.g. an autotest 1158 client) can fail catastrophically and the server job record state 1159 needs to be reset to its original "known good" state. 1160 1161 @return: A context object with a 0-arg restore() method.""" 1162 return self._indenter.get_context() 1163 1164 1165 def record_summary(self, status_code, test_name, reason='', attributes=None, 1166 distinguishing_attributes=(), child_test_ids=None): 1167 """Record a summary test result. 1168 1169 @param status_code: status code string, see 1170 common_lib.log.is_valid_status() 1171 @param test_name: name of the test 1172 @param reason: (optional) string providing detailed reason for test 1173 outcome 1174 @param attributes: (optional) dict of string keyvals to associate with 1175 this result 1176 @param distinguishing_attributes: (optional) list of attribute names 1177 that should be used to distinguish identically-named test 1178 results. These attributes should be present in the attributes 1179 parameter. This is used to generate user-friendly subdirectory 1180 names. 1181 @param child_test_ids: (optional) list of test indices for test results 1182 used in generating this result. 1183 """ 1184 subdirectory_name_parts = [test_name] 1185 for attribute in distinguishing_attributes: 1186 assert attributes 1187 assert attribute in attributes, '%s not in %s' % (attribute, 1188 attributes) 1189 subdirectory_name_parts.append(attributes[attribute]) 1190 base_subdirectory_name = '.'.join(subdirectory_name_parts) 1191 1192 subdirectory = self._unique_subdirectory(base_subdirectory_name) 1193 subdirectory_path = os.path.join(self.resultdir, subdirectory) 1194 os.mkdir(subdirectory_path) 1195 1196 self.record(status_code, subdirectory, test_name, 1197 status=reason, optional_fields={'is_summary': True}) 1198 1199 if attributes: 1200 utils.write_keyval(subdirectory_path, attributes) 1201 1202 if child_test_ids: 1203 ids_string = ','.join(str(test_id) for test_id in child_test_ids) 1204 summary_data = {'child_test_ids': ids_string} 1205 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'), 1206 summary_data) 1207 1208 1209 def disable_warnings(self, warning_type): 1210 self.warning_manager.disable_warnings(warning_type) 1211 self.record("INFO", None, None, 1212 "disabling %s warnings" % warning_type, 1213 {"warnings.disable": warning_type}) 1214 1215 1216 def enable_warnings(self, warning_type): 1217 self.warning_manager.enable_warnings(warning_type) 1218 self.record("INFO", None, None, 1219 "enabling %s warnings" % warning_type, 1220 {"warnings.enable": warning_type}) 1221 1222 1223 def get_status_log_path(self, subdir=None): 1224 """Return the path to the job status log. 1225 1226 @param subdir - Optional paramter indicating that you want the path 1227 to a subdirectory status log. 1228 1229 @returns The path where the status log should be. 1230 """ 1231 if self.resultdir: 1232 if subdir: 1233 return os.path.join(self.resultdir, subdir, "status.log") 1234 else: 1235 return os.path.join(self.resultdir, "status.log") 1236 else: 1237 return None 1238 1239 1240 def _update_uncollected_logs_list(self, update_func): 1241 """Updates the uncollected logs list in a multi-process safe manner. 1242 1243 @param update_func - a function that updates the list of uncollected 1244 logs. Should take one parameter, the list to be updated. 1245 """ 1246 # Skip log collection if file _uncollected_log_file does not exist. 1247 if not (self._uncollected_log_file and 1248 os.path.exists(self._uncollected_log_file)): 1249 return 1250 if self._uncollected_log_file: 1251 log_file = open(self._uncollected_log_file, "r+") 1252 fcntl.flock(log_file, fcntl.LOCK_EX) 1253 try: 1254 uncollected_logs = pickle.load(log_file) 1255 update_func(uncollected_logs) 1256 log_file.seek(0) 1257 log_file.truncate() 1258 pickle.dump(uncollected_logs, log_file) 1259 log_file.flush() 1260 finally: 1261 fcntl.flock(log_file, fcntl.LOCK_UN) 1262 log_file.close() 1263 1264 1265 def add_client_log(self, hostname, remote_path, local_path): 1266 """Adds a new set of client logs to the list of uncollected logs, 1267 to allow for future log recovery. 1268 1269 @param host - the hostname of the machine holding the logs 1270 @param remote_path - the directory on the remote machine holding logs 1271 @param local_path - the local directory to copy the logs into 1272 """ 1273 def update_func(logs_list): 1274 logs_list.append((hostname, remote_path, local_path)) 1275 self._update_uncollected_logs_list(update_func) 1276 1277 1278 def remove_client_log(self, hostname, remote_path, local_path): 1279 """Removes a set of client logs from the list of uncollected logs, 1280 to allow for future log recovery. 1281 1282 @param host - the hostname of the machine holding the logs 1283 @param remote_path - the directory on the remote machine holding logs 1284 @param local_path - the local directory to copy the logs into 1285 """ 1286 def update_func(logs_list): 1287 logs_list.remove((hostname, remote_path, local_path)) 1288 self._update_uncollected_logs_list(update_func) 1289 1290 1291 def get_client_logs(self): 1292 """Retrieves the list of uncollected logs, if it exists. 1293 1294 @returns A list of (host, remote_path, local_path) tuples. Returns 1295 an empty list if no uncollected logs file exists. 1296 """ 1297 log_exists = (self._uncollected_log_file and 1298 os.path.exists(self._uncollected_log_file)) 1299 if log_exists: 1300 return pickle.load(open(self._uncollected_log_file)) 1301 else: 1302 return [] 1303 1304 1305 def _fill_server_control_namespace(self, namespace, protect=True): 1306 """ 1307 Prepare a namespace to be used when executing server control files. 1308 1309 This sets up the control file API by importing modules and making them 1310 available under the appropriate names within namespace. 1311 1312 For use by _execute_code(). 1313 1314 Args: 1315 namespace: The namespace dictionary to fill in. 1316 protect: Boolean. If True (the default) any operation that would 1317 clobber an existing entry in namespace will cause an error. 1318 Raises: 1319 error.AutoservError: When a name would be clobbered by import. 1320 """ 1321 def _import_names(module_name, names=()): 1322 """ 1323 Import a module and assign named attributes into namespace. 1324 1325 Args: 1326 module_name: The string module name. 1327 names: A limiting list of names to import from module_name. If 1328 empty (the default), all names are imported from the module 1329 similar to a "from foo.bar import *" statement. 1330 Raises: 1331 error.AutoservError: When a name being imported would clobber 1332 a name already in namespace. 1333 """ 1334 module = __import__(module_name, {}, {}, names) 1335 1336 # No names supplied? Import * from the lowest level module. 1337 # (Ugh, why do I have to implement this part myself?) 1338 if not names: 1339 for submodule_name in module_name.split('.')[1:]: 1340 module = getattr(module, submodule_name) 1341 if hasattr(module, '__all__'): 1342 names = getattr(module, '__all__') 1343 else: 1344 names = dir(module) 1345 1346 # Install each name into namespace, checking to make sure it 1347 # doesn't override anything that already exists. 1348 for name in names: 1349 # Check for conflicts to help prevent future problems. 1350 if name in namespace and protect: 1351 if namespace[name] is not getattr(module, name): 1352 raise error.AutoservError('importing name ' 1353 '%s from %s %r would override %r' % 1354 (name, module_name, getattr(module, name), 1355 namespace[name])) 1356 else: 1357 # Encourage cleanliness and the use of __all__ for a 1358 # more concrete API with less surprises on '*' imports. 1359 warnings.warn('%s (%r) being imported from %s for use ' 1360 'in server control files is not the ' 1361 'first occurrence of that import.' % 1362 (name, namespace[name], module_name)) 1363 1364 namespace[name] = getattr(module, name) 1365 1366 1367 # This is the equivalent of prepending a bunch of import statements to 1368 # the front of the control script. 1369 namespace.update(os=os, sys=sys, logging=logging) 1370 _import_names('autotest_lib.server', 1371 ('hosts', 'autotest', 'standalone_profiler')) 1372 _import_names('autotest_lib.server.subcommand', 1373 ('parallel', 'parallel_simple', 'subcommand')) 1374 _import_names('autotest_lib.server.utils', 1375 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 1376 _import_names('autotest_lib.client.common_lib.error') 1377 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 1378 1379 # Inject ourself as the job object into other classes within the API. 1380 # (Yuck, this injection is a gross thing be part of a public API. -gps) 1381 # 1382 # XXX Base & SiteAutotest do not appear to use .job. Who does? 1383 namespace['autotest'].Autotest.job = self 1384 # server.hosts.base_classes.Host uses .job. 1385 namespace['hosts'].Host.job = self 1386 namespace['hosts'].TestBed.job = self 1387 namespace['hosts'].factory.ssh_user = self._ssh_user 1388 namespace['hosts'].factory.ssh_port = self._ssh_port 1389 namespace['hosts'].factory.ssh_pass = self._ssh_pass 1390 namespace['hosts'].factory.ssh_verbosity_flag = ( 1391 self._ssh_verbosity_flag) 1392 namespace['hosts'].factory.ssh_options = self._ssh_options 1393 1394 1395 def _execute_code(self, code_file, namespace, protect=True): 1396 """ 1397 Execute code using a copy of namespace as a server control script. 1398 1399 Unless protect_namespace is explicitly set to False, the dict will not 1400 be modified. 1401 1402 Args: 1403 code_file: The filename of the control file to execute. 1404 namespace: A dict containing names to make available during execution. 1405 protect: Boolean. If True (the default) a copy of the namespace dict 1406 is used during execution to prevent the code from modifying its 1407 contents outside of this function. If False the raw dict is 1408 passed in and modifications will be allowed. 1409 """ 1410 if protect: 1411 namespace = namespace.copy() 1412 self._fill_server_control_namespace(namespace, protect=protect) 1413 # TODO: Simplify and get rid of the special cases for only 1 machine. 1414 if len(self.machines) > 1: 1415 machines_text = '\n'.join(self.machines) + '\n' 1416 # Only rewrite the file if it does not match our machine list. 1417 try: 1418 machines_f = open(MACHINES_FILENAME, 'r') 1419 existing_machines_text = machines_f.read() 1420 machines_f.close() 1421 except EnvironmentError: 1422 existing_machines_text = None 1423 if machines_text != existing_machines_text: 1424 utils.open_write_close(MACHINES_FILENAME, machines_text) 1425 execfile(code_file, namespace, namespace) 1426 1427 1428 def _parse_status(self, new_line): 1429 if not self._using_parser: 1430 return 1431 new_tests = self.parser.process_lines([new_line]) 1432 for test in new_tests: 1433 self.__insert_test(test) 1434 1435 1436 def __insert_test(self, test): 1437 """ 1438 An internal method to insert a new test result into the 1439 database. This method will not raise an exception, even if an 1440 error occurs during the insert, to avoid failing a test 1441 simply because of unexpected database issues.""" 1442 self.num_tests_run += 1 1443 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 1444 self.num_tests_failed += 1 1445 try: 1446 self.results_db.insert_test(self.job_model, test) 1447 except Exception: 1448 msg = ("WARNING: An unexpected error occured while " 1449 "inserting test results into the database. " 1450 "Ignoring error.\n" + traceback.format_exc()) 1451 print >> sys.stderr, msg 1452 1453 1454 def preprocess_client_state(self): 1455 """ 1456 Produce a state file for initializing the state of a client job. 1457 1458 Creates a new client state file with all the current server state, as 1459 well as some pre-set client state. 1460 1461 @returns The path of the file the state was written into. 1462 """ 1463 # initialize the sysinfo state 1464 self._state.set('client', 'sysinfo', self.sysinfo.serialize()) 1465 1466 # dump the state out to a tempfile 1467 fd, file_path = tempfile.mkstemp(dir=self.tmpdir) 1468 os.close(fd) 1469 1470 # write_to_file doesn't need locking, we exclusively own file_path 1471 self._state.write_to_file(file_path) 1472 return file_path 1473 1474 1475 def postprocess_client_state(self, state_path): 1476 """ 1477 Update the state of this job with the state from a client job. 1478 1479 Updates the state of the server side of a job with the final state 1480 of a client job that was run. Updates the non-client-specific state, 1481 pulls in some specific bits from the client-specific state, and then 1482 discards the rest. Removes the state file afterwards 1483 1484 @param state_file A path to the state file from the client. 1485 """ 1486 # update the on-disk state 1487 try: 1488 self._state.read_from_file(state_path) 1489 os.remove(state_path) 1490 except OSError, e: 1491 # ignore file-not-found errors 1492 if e.errno != errno.ENOENT: 1493 raise 1494 else: 1495 logging.debug('Client state file %s not found', state_path) 1496 1497 # update the sysinfo state 1498 if self._state.has('client', 'sysinfo'): 1499 self.sysinfo.deserialize(self._state.get('client', 'sysinfo')) 1500 1501 # drop all the client-specific state 1502 self._state.discard_namespace('client') 1503 1504 1505 def clear_all_known_hosts(self): 1506 """Clears known hosts files for all AbstractSSHHosts.""" 1507 for host in self.hosts: 1508 if isinstance(host, abstract_ssh.AbstractSSHHost): 1509 host.clear_known_hosts() 1510 1511 1512 def close(self): 1513 """Closes this job's operation.""" 1514 1515 # Use shallow copy, because host.close() internally discards itself. 1516 for host in list(self.hosts): 1517 host.close() 1518 assert not self.hosts 1519 self._connection_pool.shutdown() 1520 1521 1522 def _get_job_data(self): 1523 """Add custom data to the job keyval info. 1524 1525 When multiple machines are used in a job, change the hostname to 1526 the platform of the first machine instead of machine1,machine2,... This 1527 makes the job reports easier to read and keeps the tko_machines table from 1528 growing too large. 1529 1530 Returns: 1531 keyval dictionary with new hostname value, or empty dictionary. 1532 """ 1533 job_data = {} 1534 # Only modify hostname on multimachine jobs. Assume all host have the same 1535 # platform. 1536 if len(self.machines) > 1: 1537 # Search through machines for first machine with a platform. 1538 for host in self.machines: 1539 keyval_path = os.path.join(self.resultdir, 'host_keyvals', host) 1540 keyvals = utils.read_keyval(keyval_path) 1541 host_plat = keyvals.get('platform', None) 1542 if not host_plat: 1543 continue 1544 job_data['hostname'] = host_plat 1545 break 1546 return job_data 1547 1548 1549class warning_manager(object): 1550 """Class for controlling warning logs. Manages the enabling and disabling 1551 of warnings.""" 1552 def __init__(self): 1553 # a map of warning types to a list of disabled time intervals 1554 self.disabled_warnings = {} 1555 1556 1557 def is_valid(self, timestamp, warning_type): 1558 """Indicates if a warning (based on the time it occured and its type) 1559 is a valid warning. A warning is considered "invalid" if this type of 1560 warning was marked as "disabled" at the time the warning occured.""" 1561 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1562 for start, end in disabled_intervals: 1563 if timestamp >= start and (end is None or timestamp < end): 1564 return False 1565 return True 1566 1567 1568 def disable_warnings(self, warning_type, current_time_func=time.time): 1569 """As of now, disables all further warnings of this type.""" 1570 intervals = self.disabled_warnings.setdefault(warning_type, []) 1571 if not intervals or intervals[-1][1] is not None: 1572 intervals.append((int(current_time_func()), None)) 1573 1574 1575 def enable_warnings(self, warning_type, current_time_func=time.time): 1576 """As of now, enables all further warnings of this type.""" 1577 intervals = self.disabled_warnings.get(warning_type, []) 1578 if intervals and intervals[-1][1] is None: 1579 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1580 1581 1582def _is_current_server_job(test): 1583 """Return True if parsed test is the currently running job. 1584 1585 @param test: test instance from tko parser. 1586 """ 1587 return test.testname == 'SERVER_JOB' 1588 1589 1590def _create_afe_host(hostname): 1591 """Create an afe_host object backed by the AFE. 1592 1593 @param hostname: Name of the host for which we want the Host object. 1594 @returns: An object of type frontend.AFE 1595 """ 1596 afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10) 1597 hosts = afe.get_hosts(hostname=hostname) 1598 if not hosts: 1599 raise error.AutoservError('No hosts named %s found' % hostname) 1600 1601 return hosts[0] 1602 1603 1604def _create_host_info_store(hostname): 1605 """Create a real or stub afe_store.AfeStore object. 1606 1607 @param hostname: Name of the host for which we want the store. 1608 @returns: An object of type afe_store.AfeStore 1609 """ 1610 host_info_store = afe_store.AfeStore(hostname) 1611 try: 1612 host_info_store.get(force_refresh=True) 1613 except host_info.StoreError: 1614 raise error.AutoservError('Could not obtain HostInfo for hostname %s' % 1615 hostname) 1616 return host_info_store 1617