1#!/usr/bin/python 2 3#pylint: disable=C0111 4 5""" 6Autotest scheduler 7""" 8 9import datetime 10import functools 11import logging 12import optparse 13import os 14import signal 15import sys 16import time 17 18import common 19from autotest_lib.frontend import setup_django_environment 20 21import django.db 22 23from autotest_lib.client.common_lib import control_data 24from autotest_lib.client.common_lib import global_config 25from autotest_lib.client.common_lib import utils 26from autotest_lib.frontend.afe import models 27from autotest_lib.scheduler import agent_task, drone_manager 28from autotest_lib.scheduler import email_manager, host_scheduler 29from autotest_lib.scheduler import luciferlib 30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task 31from autotest_lib.scheduler import postjob_task 32from autotest_lib.scheduler import query_managers 33from autotest_lib.scheduler import scheduler_lib 34from autotest_lib.scheduler import scheduler_models 35from autotest_lib.scheduler import scheduler_config 36from autotest_lib.server import autoserv_utils 37from autotest_lib.server import system_utils 38from autotest_lib.server import utils as server_utils 39from autotest_lib.site_utils import server_manager_utils 40 41try: 42 from chromite.lib import metrics 43 from chromite.lib import ts_mon_config 44except ImportError: 45 metrics = utils.metrics_mock 46 ts_mon_config = utils.metrics_mock 47 48 49PID_FILE_PREFIX = 'monitor_db' 50 51RESULTS_DIR = '.' 52AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') 53 54if os.environ.has_key('AUTOTEST_DIR'): 55 AUTOTEST_PATH = os.environ['AUTOTEST_DIR'] 56AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server') 57AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko') 58 59if AUTOTEST_SERVER_DIR not in sys.path: 60 sys.path.insert(0, AUTOTEST_SERVER_DIR) 61 62# error message to leave in results dir when an autoserv process disappears 63# mysteriously 64_LOST_PROCESS_ERROR = """\ 65Autoserv failed abnormally during execution for this job, probably due to a 66system error on the Autotest server. Full results may not be available. Sorry. 67""" 68 69_db_manager = None 70_db = None 71_shutdown = False 72 73# These 2 globals are replaced for testing 74_autoserv_directory = autoserv_utils.autoserv_directory 75_autoserv_path = autoserv_utils.autoserv_path 76_testing_mode = False 77_drone_manager = None 78 79 80def _verify_default_drone_set_exists(): 81 if (models.DroneSet.drone_sets_enabled() and 82 not models.DroneSet.default_drone_set_name()): 83 raise scheduler_lib.SchedulerError( 84 'Drone sets are enabled, but no default is set') 85 86 87def _sanity_check(): 88 """Make sure the configs are consistent before starting the scheduler""" 89 _verify_default_drone_set_exists() 90 91 92def main(): 93 try: 94 try: 95 main_without_exception_handling() 96 except SystemExit: 97 raise 98 except: 99 logging.exception('Exception escaping in monitor_db') 100 raise 101 finally: 102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX) 103 104 105def main_without_exception_handling(): 106 scheduler_lib.setup_logging( 107 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 108 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)) 109 usage = 'usage: %prog [options] results_dir' 110 parser = optparse.OptionParser(usage) 111 parser.add_option('--recover-hosts', help='Try to recover dead hosts', 112 action='store_true') 113 parser.add_option('--test', help='Indicate that scheduler is under ' + 114 'test and should use dummy autoserv and no parsing', 115 action='store_true') 116 parser.add_option( 117 '--metrics-file', 118 help='If provided, drop metrics to this local file instead of ' 119 'reporting to ts_mon', 120 type=str, 121 default=None, 122 ) 123 parser.add_option( 124 '--lifetime-hours', 125 type=float, 126 default=None, 127 help='If provided, number of hours the scheduler should run for. ' 128 'At the expiry of this time, the process will exit ' 129 'gracefully.', 130 ) 131 parser.add_option('--production', 132 help=('Indicate that scheduler is running in production ' 133 'environment and it can use database that is not ' 134 'hosted in localhost. If it is set to False, ' 135 'scheduler will fail if database is not in ' 136 'localhost.'), 137 action='store_true', default=False) 138 (options, args) = parser.parse_args() 139 if len(args) != 1: 140 parser.print_usage() 141 return 142 143 scheduler_lib.check_production_settings(options) 144 145 scheduler_enabled = global_config.global_config.get_config_value( 146 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool) 147 148 if not scheduler_enabled: 149 logging.error("Scheduler not enabled, set enable_scheduler to true in " 150 "the global_config's SCHEDULER section to enable it. " 151 "Exiting.") 152 sys.exit(1) 153 154 global RESULTS_DIR 155 RESULTS_DIR = args[0] 156 157 # Change the cwd while running to avoid issues incase we were launched from 158 # somewhere odd (such as a random NFS home directory of the person running 159 # sudo to launch us as the appropriate user). 160 os.chdir(RESULTS_DIR) 161 162 # This is helpful for debugging why stuff a scheduler launches is 163 # misbehaving. 164 logging.info('os.environ: %s', os.environ) 165 166 if options.test: 167 global _autoserv_path 168 _autoserv_path = 'autoserv_dummy' 169 global _testing_mode 170 _testing_mode = True 171 172 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler', 173 indirect=True, 174 debug_file=options.metrics_file): 175 try: 176 metrics.Counter('chromeos/autotest/scheduler/start').increment() 177 process_start_time = time.time() 178 initialize() 179 dispatcher = Dispatcher() 180 dispatcher.initialize(recover_hosts=options.recover_hosts) 181 minimum_tick_sec = global_config.global_config.get_config_value( 182 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float) 183 184 # TODO(crbug.com/837680): Force creating the current user. 185 # This is a dirty hack to work around a race; see bug. 186 models.User.current_user() 187 188 while not _shutdown: 189 if _lifetime_expired(options.lifetime_hours, process_start_time): 190 break 191 192 start = time.time() 193 dispatcher.tick() 194 curr_tick_sec = time.time() - start 195 if minimum_tick_sec > curr_tick_sec: 196 time.sleep(minimum_tick_sec - curr_tick_sec) 197 else: 198 time.sleep(0.0001) 199 except server_manager_utils.ServerActionError as e: 200 # This error is expected when the server is not in primary status 201 # for scheduler role. Thus do not send email for it. 202 logging.exception(e) 203 except Exception: 204 logging.exception('Uncaught exception, terminating monitor_db.') 205 metrics.Counter('chromeos/autotest/scheduler/uncaught_exception' 206 ).increment() 207 208 email_manager.manager.send_queued_emails() 209 _drone_manager.shutdown() 210 _db_manager.disconnect() 211 212 213def handle_signal(signum, frame): 214 global _shutdown 215 _shutdown = True 216 logging.info("Shutdown request received.") 217 218 219def _lifetime_expired(lifetime_hours, process_start_time): 220 """Returns True if we've expired the process lifetime, False otherwise. 221 222 Also sets the global _shutdown so that any background processes also take 223 the cue to exit. 224 """ 225 if lifetime_hours is None: 226 return False 227 if time.time() - process_start_time > lifetime_hours * 3600: 228 logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', 229 lifetime_hours) 230 global _shutdown 231 _shutdown = True 232 return True 233 return False 234 235 236def initialize(): 237 logging.info("%s> dispatcher starting", time.strftime("%X %x")) 238 logging.info("My PID is %d", os.getpid()) 239 240 if utils.program_is_alive(PID_FILE_PREFIX): 241 logging.critical("monitor_db already running, aborting!") 242 sys.exit(1) 243 utils.write_pid(PID_FILE_PREFIX) 244 245 if _testing_mode: 246 global_config.global_config.override_config_value( 247 scheduler_lib.DB_CONFIG_SECTION, 'database', 248 'stresstest_autotest_web') 249 250 # If server database is enabled, check if the server has role `scheduler`. 251 # If the server does not have scheduler role, exception will be raised and 252 # scheduler will not continue to run. 253 if server_manager_utils.use_server_db(): 254 server_manager_utils.confirm_server_has_role(hostname='localhost', 255 role='scheduler') 256 257 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH'] 258 global _db_manager 259 _db_manager = scheduler_lib.ConnectionManager() 260 global _db 261 _db = _db_manager.get_connection() 262 logging.info("Setting signal handler") 263 signal.signal(signal.SIGINT, handle_signal) 264 signal.signal(signal.SIGTERM, handle_signal) 265 266 initialize_globals() 267 scheduler_models.initialize() 268 269 drone_list = system_utils.get_drones() 270 results_host = global_config.global_config.get_config_value( 271 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost') 272 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host) 273 274 logging.info("Connected! Running...") 275 276 277def initialize_globals(): 278 global _drone_manager 279 _drone_manager = drone_manager.instance() 280 281 282def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None, 283 verbose=True): 284 """ 285 @returns The autoserv command line as a list of executable + parameters. 286 287 @param machines - string - A machine or comma separated list of machines 288 for the (-m) flag. 289 @param extra_args - list - Additional arguments to pass to autoserv. 290 @param job - Job object - If supplied, -u owner, -l name and client -c or 291 server -s parameters will be added. 292 @param queue_entry - A HostQueueEntry object - If supplied and no Job 293 object was supplied, this will be used to lookup the Job object. 294 """ 295 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory, 296 machines, results_directory=drone_manager.WORKING_DIRECTORY, 297 extra_args=extra_args, job=job, queue_entry=queue_entry, 298 verbose=verbose, in_lab=True) 299 return command 300 301def _calls_log_tick_msg(func): 302 """Used to trace functions called by Dispatcher.tick.""" 303 @functools.wraps(func) 304 def wrapper(self, *args, **kwargs): 305 self._log_tick_msg('Starting %s' % func.__name__) 306 return func(self, *args, **kwargs) 307 308 return wrapper 309 310 311class Dispatcher(object): 312 313 314 def __init__(self): 315 self._agents = [] 316 self._last_clean_time = time.time() 317 user_cleanup_time = scheduler_config.config.clean_interval_minutes 318 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( 319 _db, user_cleanup_time) 320 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep( 321 _db, _drone_manager) 322 self._host_agents = {} 323 self._queue_entry_agents = {} 324 self._tick_count = 0 325 self._tick_debug = global_config.global_config.get_config_value( 326 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool, 327 default=False) 328 self._extra_debugging = global_config.global_config.get_config_value( 329 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool, 330 default=False) 331 self._inline_host_acquisition = ( 332 global_config.global_config.get_config_value( 333 scheduler_config.CONFIG_SECTION, 334 'inline_host_acquisition', type=bool, default=True)) 335 336 # If _inline_host_acquisition is set the scheduler will acquire and 337 # release hosts against jobs inline, with the tick. Otherwise the 338 # scheduler will only focus on jobs that already have hosts, and 339 # will not explicitly unlease a host when a job finishes using it. 340 self._job_query_manager = query_managers.AFEJobQueryManager() 341 self._host_scheduler = (host_scheduler.BaseHostScheduler() 342 if self._inline_host_acquisition else 343 host_scheduler.DummyHostScheduler()) 344 345 346 def initialize(self, recover_hosts=True): 347 self._periodic_cleanup.initialize() 348 self._24hr_upkeep.initialize() 349 # Execute all actions queued in the cleanup tasks. Scheduler tick will 350 # run a refresh task first. If there is any action in the queue, refresh 351 # will raise an exception. 352 _drone_manager.execute_actions() 353 354 # always recover processes 355 self._recover_processes() 356 357 if recover_hosts: 358 self._recover_hosts() 359 360 361 # TODO(pprabhu) Drop this metric once tick_times has been verified. 362 @metrics.SecondsTimerDecorator( 363 'chromeos/autotest/scheduler/tick_durations/tick') 364 def tick(self): 365 """ 366 This is an altered version of tick() where we keep track of when each 367 major step begins so we can try to figure out where we are using most 368 of the tick time. 369 """ 370 with metrics.RuntimeBreakdownTimer( 371 'chromeos/autotest/scheduler/tick_times') as breakdown_timer: 372 self._log_tick_msg('New tick') 373 system_utils.DroneCache.refresh() 374 375 with breakdown_timer.Step('trigger_refresh'): 376 self._log_tick_msg('Starting _drone_manager.trigger_refresh') 377 _drone_manager.trigger_refresh() 378 with breakdown_timer.Step('schedule_running_host_queue_entries'): 379 self._schedule_running_host_queue_entries() 380 with breakdown_timer.Step('schedule_special_tasks'): 381 self._schedule_special_tasks() 382 with breakdown_timer.Step('schedule_new_jobs'): 383 self._schedule_new_jobs() 384 with breakdown_timer.Step('gather_tick_metrics'): 385 self._gather_tick_metrics() 386 with breakdown_timer.Step('sync_refresh'): 387 self._log_tick_msg('Starting _drone_manager.sync_refresh') 388 _drone_manager.sync_refresh() 389 if luciferlib.is_lucifer_enabled(): 390 with breakdown_timer.Step('send_to_lucifer'): 391 self._send_to_lucifer() 392 # _run_cleanup must be called between drone_manager.sync_refresh, 393 # and drone_manager.execute_actions, as sync_refresh will clear the 394 # calls queued in drones. Therefore, any action that calls 395 # drone.queue_call to add calls to the drone._calls, should be after 396 # drone refresh is completed and before 397 # drone_manager.execute_actions at the end of the tick. 398 with breakdown_timer.Step('run_cleanup'): 399 self._run_cleanup() 400 with breakdown_timer.Step('find_aborting'): 401 self._find_aborting() 402 with breakdown_timer.Step('find_aborted_special_tasks'): 403 self._find_aborted_special_tasks() 404 with breakdown_timer.Step('handle_agents'): 405 self._handle_agents() 406 with breakdown_timer.Step('host_scheduler_tick'): 407 self._log_tick_msg('Starting _host_scheduler.tick') 408 self._host_scheduler.tick() 409 with breakdown_timer.Step('drones_execute_actions'): 410 self._log_tick_msg('Starting _drone_manager.execute_actions') 411 _drone_manager.execute_actions() 412 with breakdown_timer.Step('send_queued_emails'): 413 self._log_tick_msg( 414 'Starting email_manager.manager.send_queued_emails') 415 email_manager.manager.send_queued_emails() 416 with breakdown_timer.Step('db_reset_queries'): 417 self._log_tick_msg('Starting django.db.reset_queries') 418 django.db.reset_queries() 419 420 self._tick_count += 1 421 metrics.Counter('chromeos/autotest/scheduler/tick').increment() 422 423 424 @_calls_log_tick_msg 425 def _run_cleanup(self): 426 self._periodic_cleanup.run_cleanup_maybe() 427 self._24hr_upkeep.run_cleanup_maybe() 428 429 430 def _gather_tick_metrics(self): 431 """Gather metrics during tick, after all tasks have been scheduled.""" 432 metrics.Gauge( 433 'chromeos/autotest/scheduler/agent_count' 434 ).set(len(self._agents)) 435 436 437 def _register_agent_for_ids(self, agent_dict, object_ids, agent): 438 for object_id in object_ids: 439 agent_dict.setdefault(object_id, set()).add(agent) 440 441 442 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent): 443 for object_id in object_ids: 444 assert object_id in agent_dict 445 agent_dict[object_id].remove(agent) 446 # If an ID has no more active agent associated, there is no need to 447 # keep it in the dictionary. Otherwise, scheduler will keep an 448 # unnecessarily big dictionary until being restarted. 449 if not agent_dict[object_id]: 450 agent_dict.pop(object_id) 451 452 453 def add_agent_task(self, agent_task): 454 """ 455 Creates and adds an agent to the dispatchers list. 456 457 In creating the agent we also pass on all the queue_entry_ids and 458 host_ids from the special agent task. For every agent we create, we 459 add it to 1. a dict against the queue_entry_ids given to it 2. A dict 460 against the host_ids given to it. So theoritically, a host can have any 461 number of agents associated with it, and each of them can have any 462 special agent task, though in practice we never see > 1 agent/task per 463 host at any time. 464 465 @param agent_task: A SpecialTask for the agent to manage. 466 """ 467 if luciferlib.is_enabled_for('STARTING'): 468 # TODO(crbug.com/810141): Transition code. After running at 469 # STARTING for a while, these tasks should no longer exist. 470 if (isinstance(agent_task, postjob_task.GatherLogsTask) 471 # TODO(crbug.com/811877): Don't skip split HQE parsing. 472 or (isinstance(agent_task, postjob_task.FinalReparseTask) 473 and not luciferlib.is_split_job( 474 agent_task.queue_entries[0].id))): 475 return 476 if isinstance(agent_task, AbstractQueueTask): 477 # If Lucifer already owns the job, ignore the agent. 478 if luciferlib.is_lucifer_owned_by_id(agent_task.job.id): 479 return 480 # If the job isn't started yet, let Lucifer own it. 481 if not agent_task.started: 482 return 483 # Otherwise, this is a STARTING job that Autotest owned 484 # before Lucifer was enabled for STARTING. Allow the 485 # scheduler to recover the agent task normally. 486 487 agent = Agent(agent_task) 488 self._agents.append(agent) 489 agent.dispatcher = self 490 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent) 491 self._register_agent_for_ids(self._queue_entry_agents, 492 agent.queue_entry_ids, agent) 493 494 495 def get_agents_for_entry(self, queue_entry): 496 """ 497 Find agents corresponding to the specified queue_entry. 498 """ 499 return list(self._queue_entry_agents.get(queue_entry.id, set())) 500 501 502 def host_has_agent(self, host): 503 """ 504 Determine if there is currently an Agent present using this host. 505 """ 506 return bool(self._host_agents.get(host.id, None)) 507 508 509 def remove_agent(self, agent): 510 self._agents.remove(agent) 511 self._unregister_agent_for_ids(self._host_agents, agent.host_ids, 512 agent) 513 self._unregister_agent_for_ids(self._queue_entry_agents, 514 agent.queue_entry_ids, agent) 515 516 517 def _host_has_scheduled_special_task(self, host): 518 return bool(models.SpecialTask.objects.filter(host__id=host.id, 519 is_active=False, 520 is_complete=False)) 521 522 523 def _recover_processes(self): 524 agent_tasks = self._create_recovery_agent_tasks() 525 self._register_pidfiles(agent_tasks) 526 _drone_manager.refresh() 527 self._recover_tasks(agent_tasks) 528 self._recover_pending_entries() 529 self._check_for_unrecovered_verifying_entries() 530 self._reverify_remaining_hosts() 531 # reinitialize drones after killing orphaned processes, since they can 532 # leave around files when they die 533 _drone_manager.execute_actions() 534 _drone_manager.reinitialize_drones() 535 536 537 def _create_recovery_agent_tasks(self): 538 return (self._get_queue_entry_agent_tasks() 539 + self._get_special_task_agent_tasks(is_active=True)) 540 541 542 def _get_queue_entry_agent_tasks(self): 543 """ 544 Get agent tasks for all hqe in the specified states. 545 546 Loosely this translates to taking a hqe in one of the specified states, 547 say parsing, and getting an AgentTask for it, like the FinalReparseTask, 548 through _get_agent_task_for_queue_entry. Each queue entry can only have 549 one agent task at a time, but there might be multiple queue entries in 550 the group. 551 552 @return: A list of AgentTasks. 553 """ 554 # host queue entry statuses handled directly by AgentTasks 555 # (Verifying is handled through SpecialTasks, so is not 556 # listed here) 557 statuses = (models.HostQueueEntry.Status.STARTING, 558 models.HostQueueEntry.Status.RUNNING, 559 models.HostQueueEntry.Status.GATHERING, 560 models.HostQueueEntry.Status.PARSING) 561 status_list = ','.join("'%s'" % status for status in statuses) 562 queue_entries = scheduler_models.HostQueueEntry.fetch( 563 where='status IN (%s)' % status_list) 564 565 agent_tasks = [] 566 used_queue_entries = set() 567 hqe_count_by_status = {} 568 for entry in queue_entries: 569 try: 570 hqe_count_by_status[entry.status] = ( 571 hqe_count_by_status.get(entry.status, 0) + 1) 572 if self.get_agents_for_entry(entry): 573 # already being handled 574 continue 575 if entry in used_queue_entries: 576 # already picked up by a synchronous job 577 continue 578 try: 579 agent_task = self._get_agent_task_for_queue_entry(entry) 580 except scheduler_lib.SchedulerError: 581 # Probably being handled by lucifer crbug.com/809773 582 continue 583 agent_tasks.append(agent_task) 584 used_queue_entries.update(agent_task.queue_entries) 585 except scheduler_lib.MalformedRecordError as e: 586 logging.exception('Skipping agent task for a malformed hqe.') 587 # TODO(akeshet): figure out a way to safely permanently discard 588 # this errant HQE. It appears that calling entry.abort() is not 589 # sufficient, as that already makes some assumptions about 590 # record sanity that may be violated. See crbug.com/739530 for 591 # context. 592 m = 'chromeos/autotest/scheduler/skipped_malformed_hqe' 593 metrics.Counter(m).increment() 594 595 for status, count in hqe_count_by_status.iteritems(): 596 metrics.Gauge( 597 'chromeos/autotest/scheduler/active_host_queue_entries' 598 ).set(count, fields={'status': status}) 599 600 return agent_tasks 601 602 603 def _get_special_task_agent_tasks(self, is_active=False): 604 special_tasks = models.SpecialTask.objects.filter( 605 is_active=is_active, is_complete=False) 606 agent_tasks = [] 607 for task in special_tasks: 608 try: 609 agent_tasks.append(self._get_agent_task_for_special_task(task)) 610 except scheduler_lib.MalformedRecordError as e: 611 logging.exception('Skipping agent task for malformed special ' 612 'task.') 613 m = 'chromeos/autotest/scheduler/skipped_malformed_special_task' 614 metrics.Counter(m).increment() 615 return agent_tasks 616 617 618 def _get_agent_task_for_queue_entry(self, queue_entry): 619 """ 620 Construct an AgentTask instance for the given active HostQueueEntry. 621 622 @param queue_entry: a HostQueueEntry 623 @return: an AgentTask to run the queue entry 624 """ 625 task_entries = queue_entry.job.get_group_entries(queue_entry) 626 self._check_for_duplicate_host_entries(task_entries) 627 628 if queue_entry.status in (models.HostQueueEntry.Status.STARTING, 629 models.HostQueueEntry.Status.RUNNING): 630 if queue_entry.is_hostless(): 631 return HostlessQueueTask(queue_entry=queue_entry) 632 return QueueTask(queue_entries=task_entries) 633 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: 634 return postjob_task.GatherLogsTask(queue_entries=task_entries) 635 if queue_entry.status == models.HostQueueEntry.Status.PARSING: 636 return postjob_task.FinalReparseTask(queue_entries=task_entries) 637 638 raise scheduler_lib.MalformedRecordError( 639 '_get_agent_task_for_queue_entry got entry with ' 640 'invalid status %s: %s' % (queue_entry.status, queue_entry)) 641 642 643 def _check_for_duplicate_host_entries(self, task_entries): 644 non_host_statuses = {models.HostQueueEntry.Status.PARSING} 645 for task_entry in task_entries: 646 using_host = (task_entry.host is not None 647 and task_entry.status not in non_host_statuses) 648 if using_host: 649 self._assert_host_has_no_agent(task_entry) 650 651 652 def _assert_host_has_no_agent(self, entry): 653 """ 654 @param entry: a HostQueueEntry or a SpecialTask 655 """ 656 if self.host_has_agent(entry.host): 657 agent = tuple(self._host_agents.get(entry.host.id))[0] 658 raise scheduler_lib.MalformedRecordError( 659 'While scheduling %s, host %s already has a host agent %s' 660 % (entry, entry.host, agent.task)) 661 662 663 def _get_agent_task_for_special_task(self, special_task): 664 """ 665 Construct an AgentTask class to run the given SpecialTask and add it 666 to this dispatcher. 667 668 A special task is created through schedule_special_tasks, but only if 669 the host doesn't already have an agent. This happens through 670 add_agent_task. All special agent tasks are given a host on creation, 671 and a Null hqe. To create a SpecialAgentTask object, you need a 672 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask 673 object contains a hqe it's passed on to the special agent task, which 674 creates a HostQueueEntry and saves it as it's queue_entry. 675 676 @param special_task: a models.SpecialTask instance 677 @returns an AgentTask to run this SpecialTask 678 """ 679 self._assert_host_has_no_agent(special_task) 680 681 special_agent_task_classes = (prejob_task.CleanupTask, 682 prejob_task.VerifyTask, 683 prejob_task.RepairTask, 684 prejob_task.ResetTask, 685 prejob_task.ProvisionTask) 686 687 for agent_task_class in special_agent_task_classes: 688 if agent_task_class.TASK_TYPE == special_task.task: 689 return agent_task_class(task=special_task) 690 691 raise scheduler_lib.MalformedRecordError( 692 'No AgentTask class for task', str(special_task)) 693 694 695 def _register_pidfiles(self, agent_tasks): 696 for agent_task in agent_tasks: 697 agent_task.register_necessary_pidfiles() 698 699 700 def _recover_tasks(self, agent_tasks): 701 orphans = _drone_manager.get_orphaned_autoserv_processes() 702 703 for agent_task in agent_tasks: 704 agent_task.recover() 705 if agent_task.monitor and agent_task.monitor.has_process(): 706 orphans.discard(agent_task.monitor.get_process()) 707 self.add_agent_task(agent_task) 708 709 self._check_for_remaining_orphan_processes(orphans) 710 711 712 def _get_unassigned_entries(self, status): 713 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'" 714 % status): 715 if entry.status == status and not self.get_agents_for_entry(entry): 716 # The status can change during iteration, e.g., if job.run() 717 # sets a group of queue entries to Starting 718 yield entry 719 720 721 def _check_for_remaining_orphan_processes(self, orphans): 722 m = 'chromeos/autotest/errors/unrecovered_orphan_processes' 723 metrics.Gauge(m).set(len(orphans)) 724 725 if not orphans: 726 return 727 subject = 'Unrecovered orphan autoserv processes remain' 728 message = '\n'.join(str(process) for process in orphans) 729 die_on_orphans = global_config.global_config.get_config_value( 730 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool) 731 732 if die_on_orphans: 733 raise RuntimeError(subject + '\n' + message) 734 735 736 def _recover_pending_entries(self): 737 for entry in self._get_unassigned_entries( 738 models.HostQueueEntry.Status.PENDING): 739 logging.info('Recovering Pending entry %s', entry) 740 try: 741 entry.on_pending() 742 except scheduler_lib.MalformedRecordError as e: 743 logging.exception( 744 'Skipping agent task for malformed special task.') 745 m = 'chromeos/autotest/scheduler/skipped_malformed_special_task' 746 metrics.Counter(m).increment() 747 748 749 def _check_for_unrecovered_verifying_entries(self): 750 # Verify is replaced by Reset. 751 queue_entries = scheduler_models.HostQueueEntry.fetch( 752 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING) 753 for queue_entry in queue_entries: 754 special_tasks = models.SpecialTask.objects.filter( 755 task__in=(models.SpecialTask.Task.CLEANUP, 756 models.SpecialTask.Task.VERIFY, 757 models.SpecialTask.Task.RESET), 758 queue_entry__id=queue_entry.id, 759 is_complete=False) 760 if special_tasks.count() == 0: 761 logging.error('Unrecovered Resetting host queue entry: %s. ', 762 str(queue_entry)) 763 # Essentially this host queue entry was set to be Verifying 764 # however no special task exists for entry. This occurs if the 765 # scheduler dies between changing the status and creating the 766 # special task. By setting it to queued, the job can restart 767 # from the beginning and proceed correctly. This is much more 768 # preferable than having monitor_db not launching. 769 logging.info('Setting host status for %s to Ready', 770 str(queue_entry.host)) 771 # Let's at least run a cleanup/reset before reusing this DUT. 772 queue_entry.host.update_field('dirty', 1) 773 queue_entry.host.set_status(models.Host.Status.READY) 774 logging.info('Setting status for HQE %s to Queued.', 775 str(queue_entry)) 776 queue_entry.set_status('Queued') 777 778 779 @_calls_log_tick_msg 780 def _schedule_special_tasks(self): 781 """ 782 Execute queued SpecialTasks that are ready to run on idle hosts. 783 784 Special tasks include PreJobTasks like verify, reset and cleanup. 785 They are created through _schedule_new_jobs and associated with a hqe 786 This method translates SpecialTasks to the appropriate AgentTask and 787 adds them to the dispatchers agents list, so _handle_agents can execute 788 them. 789 """ 790 # When the host scheduler is responsible for acquisition we only want 791 # to run tasks with leased hosts. All hqe tasks will already have 792 # leased hosts, and we don't want to run frontend tasks till the host 793 # scheduler has vetted the assignment. Note that this doesn't include 794 # frontend tasks with hosts leased by other active hqes. 795 for task in self._job_query_manager.get_prioritized_special_tasks( 796 only_tasks_with_leased_hosts=not self._inline_host_acquisition): 797 if self.host_has_agent(task.host): 798 continue 799 try: 800 self.add_agent_task(self._get_agent_task_for_special_task(task)) 801 except scheduler_lib.MalformedRecordError: 802 logging.exception('Skipping schedule for malformed ' 803 'special task.') 804 m = 'chromeos/autotest/scheduler/skipped_schedule_special_task' 805 metrics.Counter(m).increment() 806 807 808 def _reverify_remaining_hosts(self): 809 # recover active hosts that have not yet been recovered, although this 810 # should never happen 811 message = ('Recovering active host %s - this probably indicates a ' 812 'scheduler bug') 813 self._reverify_hosts_where( 814 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')", 815 print_message=message) 816 817 818 DEFAULT_REQUESTED_BY_USER_ID = 1 819 820 821 def _reverify_hosts_where(self, where, 822 print_message='Reverifying host %s'): 823 full_where = 'locked = 0 AND invalid = 0 AND %s' % where 824 for host in scheduler_models.Host.fetch(where=full_where): 825 if self.host_has_agent(host): 826 # host has already been recovered in some way 827 continue 828 if self._host_has_scheduled_special_task(host): 829 # host will have a special task scheduled on the next cycle 830 continue 831 if host.shard_id is not None and not server_utils.is_shard(): 832 # I am master and host is owned by a shard, ignore it. 833 continue 834 if print_message: 835 logging.error(print_message, host.hostname) 836 try: 837 user = models.User.objects.get(login='autotest_system') 838 except models.User.DoesNotExist: 839 user = models.User.objects.get( 840 id=self.DEFAULT_REQUESTED_BY_USER_ID) 841 models.SpecialTask.objects.create( 842 task=models.SpecialTask.Task.RESET, 843 host=models.Host.objects.get(id=host.id), 844 requested_by=user) 845 846 847 def _recover_hosts(self): 848 # recover "Repair Failed" hosts 849 message = 'Reverifying dead host %s' 850 self._reverify_hosts_where("status = 'Repair Failed'", 851 print_message=message) 852 853 854 def _refresh_pending_queue_entries(self): 855 """ 856 Lookup the pending HostQueueEntries and call our HostScheduler 857 refresh() method given that list. Return the list. 858 859 @returns A list of pending HostQueueEntries sorted in priority order. 860 """ 861 queue_entries = self._job_query_manager.get_pending_queue_entries( 862 only_hostless=not self._inline_host_acquisition) 863 if not queue_entries: 864 return [] 865 return queue_entries 866 867 868 def _schedule_hostless_job(self, queue_entry): 869 """Schedule a hostless (suite) job. 870 871 @param queue_entry: The queue_entry representing the hostless job. 872 """ 873 if not luciferlib.is_enabled_for('STARTING'): 874 self.add_agent_task(HostlessQueueTask(queue_entry)) 875 876 # Need to set execution_subdir before setting the status: 877 # After a restart of the scheduler, agents will be restored for HQEs in 878 # Starting, Running, Gathering, Parsing or Archiving. To do this, the 879 # execution_subdir is needed. Therefore it must be set before entering 880 # one of these states. 881 # Otherwise, if the scheduler was interrupted between setting the status 882 # and the execution_subdir, upon it's restart restoring agents would 883 # fail. 884 # Is there a way to get a status in one of these states without going 885 # through this code? Following cases are possible: 886 # - If it's aborted before being started: 887 # active bit will be 0, so there's nothing to parse, it will just be 888 # set to completed by _find_aborting. Critical statuses are skipped. 889 # - If it's aborted or it fails after being started: 890 # It was started, so this code was executed. 891 queue_entry.update_field('execution_subdir', 'hostless') 892 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 893 894 895 def _schedule_host_job(self, host, queue_entry): 896 """Schedules a job on the given host. 897 898 1. Assign the host to the hqe, if it isn't already assigned. 899 2. Create a SpecialAgentTask for the hqe. 900 3. Activate the hqe. 901 902 @param queue_entry: The job to schedule. 903 @param host: The host to schedule the job on. 904 """ 905 if self.host_has_agent(host): 906 host_agent_task = list(self._host_agents.get(host.id))[0].task 907 else: 908 self._host_scheduler.schedule_host_job(host, queue_entry) 909 910 911 @_calls_log_tick_msg 912 def _schedule_new_jobs(self): 913 """ 914 Find any new HQEs and call schedule_pre_job_tasks for it. 915 916 This involves setting the status of the HQE and creating a row in the 917 db corresponding the the special task, through 918 scheduler_models._queue_special_task. The new db row is then added as 919 an agent to the dispatcher through _schedule_special_tasks and 920 scheduled for execution on the drone through _handle_agents. 921 """ 922 queue_entries = self._refresh_pending_queue_entries() 923 924 key = 'scheduler.jobs_per_tick' 925 new_hostless_jobs = 0 926 new_jobs_with_hosts = 0 927 new_jobs_need_hosts = 0 928 host_jobs = [] 929 logging.debug('Processing %d queue_entries', len(queue_entries)) 930 931 for queue_entry in queue_entries: 932 if queue_entry.is_hostless(): 933 self._schedule_hostless_job(queue_entry) 934 new_hostless_jobs = new_hostless_jobs + 1 935 else: 936 host_jobs.append(queue_entry) 937 new_jobs_need_hosts = new_jobs_need_hosts + 1 938 939 metrics.Counter( 940 'chromeos/autotest/scheduler/scheduled_jobs_hostless' 941 ).increment_by(new_hostless_jobs) 942 943 if not host_jobs: 944 return 945 946 if not self._inline_host_acquisition: 947 # In this case, host_scheduler is responsible for scheduling 948 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption 949 # since host_scheduler assumes it is the single process scheduling 950 # host jobs. 951 metrics.Gauge( 952 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set( 953 len(host_jobs)) 954 return 955 956 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs) 957 for host_assignment in jobs_with_hosts: 958 self._schedule_host_job(host_assignment.host, host_assignment.job) 959 new_jobs_with_hosts = new_jobs_with_hosts + 1 960 961 metrics.Counter( 962 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts' 963 ).increment_by(new_jobs_with_hosts) 964 965 966 @_calls_log_tick_msg 967 def _send_to_lucifer(self): 968 """ 969 Hand off ownership of a job to lucifer component. 970 """ 971 self._send_starting_to_lucifer() 972 self._send_parsing_to_lucifer() 973 974 975 # TODO(crbug.com/748234): This is temporary to enable toggling 976 # lucifer rollouts with an option. 977 def _send_starting_to_lucifer(self): 978 Status = models.HostQueueEntry.Status 979 queue_entries_qs = (models.HostQueueEntry.objects 980 .filter(status=Status.STARTING)) 981 for queue_entry in queue_entries_qs: 982 if self.get_agents_for_entry(queue_entry): 983 continue 984 job = queue_entry.job 985 if luciferlib.is_lucifer_owned(job): 986 continue 987 try: 988 drone = luciferlib.spawn_starting_job_handler( 989 manager=_drone_manager, 990 job=job) 991 except Exception: 992 logging.exception('Error when sending job to Lucifer') 993 models.HostQueueEntry.abort_host_queue_entries( 994 job.hostqueueentry_set.all()) 995 else: 996 models.JobHandoff.objects.create( 997 job=job, drone=drone.hostname()) 998 999 1000 # TODO(crbug.com/748234): This is temporary to enable toggling 1001 # lucifer rollouts with an option. 1002 def _send_parsing_to_lucifer(self): 1003 Status = models.HostQueueEntry.Status 1004 queue_entries_qs = (models.HostQueueEntry.objects 1005 .filter(status=Status.PARSING)) 1006 for queue_entry in queue_entries_qs: 1007 # If this HQE already has an agent, let monitor_db continue 1008 # owning it. 1009 if self.get_agents_for_entry(queue_entry): 1010 continue 1011 job = queue_entry.job 1012 if luciferlib.is_lucifer_owned(job): 1013 continue 1014 # TODO(crbug.com/811877): Ignore split HQEs. 1015 if luciferlib.is_split_job(queue_entry.id): 1016 continue 1017 task = postjob_task.PostJobTask( 1018 [queue_entry], log_file_name='/dev/null') 1019 pidfile_id = task._autoserv_monitor.pidfile_id 1020 autoserv_exit = task._autoserv_monitor.exit_code() 1021 try: 1022 drone = luciferlib.spawn_parsing_job_handler( 1023 manager=_drone_manager, 1024 job=job, 1025 autoserv_exit=autoserv_exit, 1026 pidfile_id=pidfile_id) 1027 models.JobHandoff.objects.create(job=job, 1028 drone=drone.hostname()) 1029 except drone_manager.DroneManagerError as e: 1030 logging.warning( 1031 'Fail to get drone for job %s, skipping lucifer. Error: %s', 1032 job.id, e) 1033 1034 1035 1036 @_calls_log_tick_msg 1037 def _schedule_running_host_queue_entries(self): 1038 """ 1039 Adds agents to the dispatcher. 1040 1041 Any AgentTask, like the QueueTask, is wrapped in an Agent. The 1042 QueueTask for example, will have a job with a control file, and 1043 the agent will have methods that poll, abort and check if the queue 1044 task is finished. The dispatcher runs the agent_task, as well as 1045 other agents in it's _agents member, through _handle_agents, by 1046 calling the Agents tick(). 1047 1048 This method creates an agent for each HQE in one of (starting, running, 1049 gathering, parsing) states, and adds it to the dispatcher so 1050 it is handled by _handle_agents. 1051 """ 1052 for agent_task in self._get_queue_entry_agent_tasks(): 1053 self.add_agent_task(agent_task) 1054 1055 1056 @_calls_log_tick_msg 1057 def _find_aborting(self): 1058 """ 1059 Looks through the afe_host_queue_entries for an aborted entry. 1060 1061 The aborted bit is set on an HQE in many ways, the most common 1062 being when a user requests an abort through the frontend, which 1063 results in an rpc from the afe to abort_host_queue_entries. 1064 """ 1065 jobs_to_stop = set() 1066 for entry in scheduler_models.HostQueueEntry.fetch( 1067 where='aborted=1 and complete=0'): 1068 if (luciferlib.is_enabled_for('STARTING') 1069 and luciferlib.is_lucifer_owned_by_id(entry.job.id)): 1070 continue 1071 1072 # If the job is running on a shard, let the shard handle aborting 1073 # it and sync back the right status. 1074 if entry.job.shard_id is not None and not server_utils.is_shard(): 1075 # Due to crbug.com/894162, we abort jobs that 1hr beyond 1076 # timeout on master. 1077 create_on = time.mktime(entry.job.created_on.timetuple()) 1078 wait_threshold = entry.job.timeout_mins * 60 + 3600 1079 abort_anyway = wait_threshold < time.time() - create_on 1080 if abort_anyway: 1081 logging.info('Aborting %s on master due to ' 1082 'the job 1 hour beyond timeout.', entry) 1083 else: 1084 logging.info('Waiting for shard %s to abort hqe %s', 1085 entry.job.shard_id, entry) 1086 continue 1087 1088 logging.info('Aborting %s', entry) 1089 1090 # The task would have started off with both is_complete and 1091 # is_active = False. Aborted tasks are neither active nor complete. 1092 # For all currently active tasks this will happen through the agent, 1093 # but we need to manually update the special tasks that haven't 1094 # started yet, because they don't have agents. 1095 models.SpecialTask.objects.filter(is_active=False, 1096 queue_entry_id=entry.id).update(is_complete=True) 1097 1098 for agent in self.get_agents_for_entry(entry): 1099 agent.abort() 1100 entry.abort(self) 1101 jobs_to_stop.add(entry.job) 1102 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop)) 1103 for job in jobs_to_stop: 1104 job.stop_if_necessary() 1105 1106 1107 @_calls_log_tick_msg 1108 def _find_aborted_special_tasks(self): 1109 """ 1110 Find SpecialTasks that have been marked for abortion. 1111 1112 Poll the database looking for SpecialTasks that are active 1113 and have been marked for abortion, then abort them. 1114 """ 1115 1116 # The completed and active bits are very important when it comes 1117 # to scheduler correctness. The active bit is set through the prolog 1118 # of a special task, and reset through the cleanup method of the 1119 # SpecialAgentTask. The cleanup is called both through the abort and 1120 # epilog. The complete bit is set in several places, and in general 1121 # a hanging job will have is_active=1 is_complete=0, while a special 1122 # task which completed will have is_active=0 is_complete=1. To check 1123 # aborts we directly check active because the complete bit is set in 1124 # several places, including the epilog of agent tasks. 1125 aborted_tasks = models.SpecialTask.objects.filter(is_active=True, 1126 is_aborted=True) 1127 for task in aborted_tasks: 1128 # There are 2 ways to get the agent associated with a task, 1129 # through the host and through the hqe. A special task 1130 # always needs a host, but doesn't always need a hqe. 1131 for agent in self._host_agents.get(task.host.id, []): 1132 if isinstance(agent.task, agent_task.SpecialAgentTask): 1133 1134 # The epilog preforms critical actions such as 1135 # queueing the next SpecialTask, requeuing the 1136 # hqe etc, however it doesn't actually kill the 1137 # monitor process and set the 'done' bit. Epilogs 1138 # assume that the job failed, and that the monitor 1139 # process has already written an exit code. The 1140 # done bit is a necessary condition for 1141 # _handle_agents to schedule any more special 1142 # tasks against the host, and it must be set 1143 # in addition to is_active, is_complete and success. 1144 agent.task.epilog() 1145 agent.task.abort() 1146 1147 1148 def _can_start_agent(self, agent, have_reached_limit): 1149 # always allow zero-process agents to run 1150 if agent.task.num_processes == 0: 1151 return True 1152 # don't allow any nonzero-process agents to run after we've reached a 1153 # limit (this avoids starvation of many-process agents) 1154 if have_reached_limit: 1155 return False 1156 # total process throttling 1157 max_runnable_processes = _drone_manager.max_runnable_processes( 1158 agent.task.owner_username, 1159 agent.task.get_drone_hostnames_allowed()) 1160 if agent.task.num_processes > max_runnable_processes: 1161 return False 1162 return True 1163 1164 1165 @_calls_log_tick_msg 1166 def _handle_agents(self): 1167 """ 1168 Handles agents of the dispatcher. 1169 1170 Appropriate Agents are added to the dispatcher through 1171 _schedule_running_host_queue_entries. These agents each 1172 have a task. This method runs the agents task through 1173 agent.tick() leading to: 1174 agent.start 1175 prolog -> AgentTasks prolog 1176 For each queue entry: 1177 sets host status/status to Running 1178 set started_on in afe_host_queue_entries 1179 run -> AgentTasks run 1180 Creates PidfileRunMonitor 1181 Queues the autoserv command line for this AgentTask 1182 via the drone manager. These commands are executed 1183 through the drone managers execute actions. 1184 poll -> AgentTasks/BaseAgentTask poll 1185 checks the monitors exit_code. 1186 Executes epilog if task is finished. 1187 Executes AgentTasks _finish_task 1188 finish_task is usually responsible for setting the status 1189 of the HQE/host, and updating it's active and complete fileds. 1190 1191 agent.is_done 1192 Removed the agent from the dispatchers _agents queue. 1193 Is_done checks the finished bit on the agent, that is 1194 set based on the Agents task. During the agents poll 1195 we check to see if the monitor process has exited in 1196 it's finish method, and set the success member of the 1197 task based on this exit code. 1198 """ 1199 num_started_this_tick = 0 1200 num_finished_this_tick = 0 1201 have_reached_limit = False 1202 # iterate over copy, so we can remove agents during iteration 1203 logging.debug('Handling %d Agents', len(self._agents)) 1204 for agent in list(self._agents): 1205 self._log_extra_msg('Processing Agent with Host Ids: %s and ' 1206 'queue_entry ids:%s' % (agent.host_ids, 1207 agent.queue_entry_ids)) 1208 if not agent.started: 1209 if not self._can_start_agent(agent, have_reached_limit): 1210 have_reached_limit = True 1211 logging.debug('Reached Limit of allowed running Agents.') 1212 continue 1213 num_started_this_tick += agent.task.num_processes 1214 self._log_extra_msg('Starting Agent') 1215 agent.tick() 1216 self._log_extra_msg('Agent tick completed.') 1217 if agent.is_done(): 1218 num_finished_this_tick += agent.task.num_processes 1219 self._log_extra_msg("Agent finished") 1220 self.remove_agent(agent) 1221 1222 metrics.Counter( 1223 'chromeos/autotest/scheduler/agent_processes_started' 1224 ).increment_by(num_started_this_tick) 1225 metrics.Counter( 1226 'chromeos/autotest/scheduler/agent_processes_finished' 1227 ).increment_by(num_finished_this_tick) 1228 num_agent_processes = _drone_manager.total_running_processes() 1229 metrics.Gauge( 1230 'chromeos/autotest/scheduler/agent_processes' 1231 ).set(num_agent_processes) 1232 logging.info('%d running processes. %d added this tick.', 1233 num_agent_processes, num_started_this_tick) 1234 1235 1236 def _log_tick_msg(self, msg): 1237 if self._tick_debug: 1238 logging.debug(msg) 1239 1240 1241 def _log_extra_msg(self, msg): 1242 if self._extra_debugging: 1243 logging.debug(msg) 1244 1245 1246class Agent(object): 1247 """ 1248 An agent for use by the Dispatcher class to perform a task. An agent wraps 1249 around an AgentTask mainly to associate the AgentTask with the queue_entry 1250 and host ids. 1251 1252 The following methods are required on all task objects: 1253 poll() - Called periodically to let the task check its status and 1254 update its internal state. If the task succeeded. 1255 is_done() - Returns True if the task is finished. 1256 abort() - Called when an abort has been requested. The task must 1257 set its aborted attribute to True if it actually aborted. 1258 1259 The following attributes are required on all task objects: 1260 aborted - bool, True if this task was aborted. 1261 success - bool, True if this task succeeded. 1262 queue_entry_ids - A sequence of HostQueueEntry ids this task handles. 1263 host_ids - A sequence of Host ids this task represents. 1264 """ 1265 1266 1267 def __init__(self, task): 1268 """ 1269 @param task: An instance of an AgentTask. 1270 """ 1271 self.task = task 1272 1273 # This is filled in by Dispatcher.add_agent() 1274 self.dispatcher = None 1275 1276 self.queue_entry_ids = task.queue_entry_ids 1277 self.host_ids = task.host_ids 1278 1279 self.started = False 1280 self.finished = False 1281 1282 1283 def tick(self): 1284 self.started = True 1285 if not self.finished: 1286 self.task.poll() 1287 if self.task.is_done(): 1288 self.finished = True 1289 1290 1291 def is_done(self): 1292 return self.finished 1293 1294 1295 def abort(self): 1296 if self.task: 1297 self.task.abort() 1298 if self.task.aborted: 1299 # tasks can choose to ignore aborts 1300 self.finished = True 1301 1302 1303class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals): 1304 """ 1305 Common functionality for QueueTask and HostlessQueueTask 1306 """ 1307 def __init__(self, queue_entries): 1308 super(AbstractQueueTask, self).__init__() 1309 self.job = queue_entries[0].job 1310 self.queue_entries = queue_entries 1311 1312 1313 def _keyval_path(self): 1314 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 1315 1316 1317 def _write_control_file(self, execution_path): 1318 control_path = _drone_manager.attach_file_to_execution( 1319 execution_path, self.job.control_file) 1320 return control_path 1321 1322 1323 # TODO: Refactor into autoserv_utils. crbug.com/243090 1324 def _command_line(self): 1325 execution_path = self.queue_entries[0].execution_path() 1326 control_path = self._write_control_file(execution_path) 1327 hostnames = ','.join(entry.host.hostname 1328 for entry in self.queue_entries 1329 if not entry.is_hostless()) 1330 1331 execution_tag = self.queue_entries[0].execution_tag() 1332 params = _autoserv_command_line( 1333 hostnames, 1334 ['-P', execution_tag, '-n', 1335 _drone_manager.absolute_path(control_path)], 1336 job=self.job, verbose=False) 1337 1338 return params 1339 1340 1341 @property 1342 def num_processes(self): 1343 return len(self.queue_entries) 1344 1345 1346 @property 1347 def owner_username(self): 1348 return self.job.owner 1349 1350 1351 def _working_directory(self): 1352 return self._get_consistent_execution_path(self.queue_entries) 1353 1354 1355 def prolog(self): 1356 queued_key, queued_time = self._job_queued_keyval(self.job) 1357 keyval_dict = self.job.keyval_dict() 1358 keyval_dict[queued_key] = queued_time 1359 self._write_keyvals_before_job(keyval_dict) 1360 for queue_entry in self.queue_entries: 1361 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING) 1362 queue_entry.set_started_on_now() 1363 1364 1365 def _write_lost_process_error_file(self): 1366 error_file_path = os.path.join(self._working_directory(), 'job_failure') 1367 _drone_manager.write_lines_to_file(error_file_path, 1368 [_LOST_PROCESS_ERROR]) 1369 1370 1371 def _finish_task(self): 1372 if not self.monitor: 1373 return 1374 1375 self._write_job_finished() 1376 1377 if self.monitor.lost_process: 1378 self._write_lost_process_error_file() 1379 1380 1381 def _write_status_comment(self, comment): 1382 _drone_manager.write_lines_to_file( 1383 os.path.join(self._working_directory(), 'status.log'), 1384 ['INFO\t----\t----\t' + comment], 1385 paired_with_process=self.monitor.get_process()) 1386 1387 1388 def _log_abort(self): 1389 if not self.monitor or not self.monitor.has_process(): 1390 return 1391 1392 # build up sets of all the aborted_by and aborted_on values 1393 aborted_by, aborted_on = set(), set() 1394 for queue_entry in self.queue_entries: 1395 if queue_entry.aborted_by: 1396 aborted_by.add(queue_entry.aborted_by) 1397 t = int(time.mktime(queue_entry.aborted_on.timetuple())) 1398 aborted_on.add(t) 1399 1400 # extract some actual, unique aborted by value and write it out 1401 # TODO(showard): this conditional is now obsolete, we just need to leave 1402 # it in temporarily for backwards compatibility over upgrades. delete 1403 # soon. 1404 assert len(aborted_by) <= 1 1405 if len(aborted_by) == 1: 1406 aborted_by_value = aborted_by.pop() 1407 aborted_on_value = max(aborted_on) 1408 else: 1409 aborted_by_value = 'autotest_system' 1410 aborted_on_value = int(time.time()) 1411 1412 self._write_keyval_after_job("aborted_by", aborted_by_value) 1413 self._write_keyval_after_job("aborted_on", aborted_on_value) 1414 1415 aborted_on_string = str(datetime.datetime.fromtimestamp( 1416 aborted_on_value)) 1417 self._write_status_comment('Job aborted by %s on %s' % 1418 (aborted_by_value, aborted_on_string)) 1419 1420 1421 def abort(self): 1422 super(AbstractQueueTask, self).abort() 1423 self._log_abort() 1424 self._finish_task() 1425 1426 1427 def epilog(self): 1428 super(AbstractQueueTask, self).epilog() 1429 self._finish_task() 1430 1431 1432class QueueTask(AbstractQueueTask): 1433 def __init__(self, queue_entries): 1434 super(QueueTask, self).__init__(queue_entries) 1435 self._set_ids(queue_entries=queue_entries) 1436 self._enable_ssp_container = ( 1437 global_config.global_config.get_config_value( 1438 'AUTOSERV', 'enable_ssp_container', type=bool, 1439 default=True)) 1440 1441 1442 def prolog(self): 1443 self._check_queue_entry_statuses( 1444 self.queue_entries, 1445 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING, 1446 models.HostQueueEntry.Status.RUNNING), 1447 allowed_host_statuses=(models.Host.Status.PENDING, 1448 models.Host.Status.RUNNING)) 1449 1450 super(QueueTask, self).prolog() 1451 1452 for queue_entry in self.queue_entries: 1453 self._write_host_keyvals(queue_entry.host) 1454 queue_entry.host.set_status(models.Host.Status.RUNNING) 1455 queue_entry.host.update_field('dirty', 1) 1456 1457 1458 def _finish_task(self): 1459 super(QueueTask, self)._finish_task() 1460 1461 for queue_entry in self.queue_entries: 1462 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING) 1463 queue_entry.host.set_status(models.Host.Status.RUNNING) 1464 1465 1466 def _command_line(self): 1467 invocation = super(QueueTask, self)._command_line() 1468 # Check if server-side packaging is needed. 1469 if (self._enable_ssp_container and 1470 self.job.control_type == control_data.CONTROL_TYPE.SERVER and 1471 self.job.require_ssp != False): 1472 invocation += ['--require-ssp'] 1473 keyval_dict = self.job.keyval_dict() 1474 test_source_build = keyval_dict.get('test_source_build', None) 1475 if test_source_build: 1476 invocation += ['--test_source_build', test_source_build] 1477 if self.job.parent_job_id: 1478 invocation += ['--parent_job_id', str(self.job.parent_job_id)] 1479 return invocation + ['--verify_job_repo_url'] 1480 1481 1482class HostlessQueueTask(AbstractQueueTask): 1483 def __init__(self, queue_entry): 1484 super(HostlessQueueTask, self).__init__([queue_entry]) 1485 self.queue_entry_ids = [queue_entry.id] 1486 1487 1488 def prolog(self): 1489 super(HostlessQueueTask, self).prolog() 1490 1491 1492 def _finish_task(self): 1493 super(HostlessQueueTask, self)._finish_task() 1494 1495 # When a job is added to database, its initial status is always 1496 # Starting. In a scheduler tick, scheduler finds all jobs in Starting 1497 # status, check if any of them can be started. If scheduler hits some 1498 # limit, e.g., max_hostless_jobs_per_drone, scheduler will 1499 # leave these jobs in Starting status. Otherwise, the jobs' 1500 # status will be changed to Running, and an autoserv process 1501 # will be started in drone for each of these jobs. 1502 # If the entry is still in status Starting, the process has not started 1503 # yet. Therefore, there is no need to parse and collect log. Without 1504 # this check, exception will be raised by scheduler as execution_subdir 1505 # for this queue entry does not have a value yet. 1506 hqe = self.queue_entries[0] 1507 if hqe.status != models.HostQueueEntry.Status.STARTING: 1508 hqe.set_status(models.HostQueueEntry.Status.PARSING) 1509 1510 1511if __name__ == '__main__': 1512 main() 1513