1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8"""Host scheduler. 9 10If run as a standalone service, the host scheduler ensures the following: 11 1. Hosts will not be assigned to multiple hqes simultaneously. The process 12 of assignment in this case refers to the modification of the host_id 13 column of a row in the afe_host_queue_entries table, to reflect the host 14 id of a leased host matching the dependencies of the job. 15 2. Hosts that are not being used by active hqes or incomplete special tasks 16 will be released back to the available hosts pool, for acquisition by 17 subsequent hqes. 18In addition to these guarantees, the host scheduler also confirms that no 2 19active hqes/special tasks are assigned the same host, and sets the leased bit 20for hosts needed by frontend special tasks. The need for the latter is only 21apparent when viewed in the context of the job-scheduler (monitor_db), which 22runs special tasks only after their hosts have been leased. 23 24** Suport minimum duts requirement for suites (non-inline mode) ** 25 26Each suite can specify the minimum number of duts it requires by 27dropping a 'suite_min_duts' job keyval which defaults to 0. 28 29When suites are competing for duts, if any suite has not got minimum duts 30it requires, the host scheduler will try to meet the requirement first, 31even if other suite may have higher priority or earlier timestamp. Once 32all suites' minimum duts requirement have been fullfilled, the host 33scheduler will allocate the rest of duts based on job priority and suite job id. 34This is to prevent low priority suites from starving when sharing pool with 35high-priority suites. 36 37Note: 38 1. Prevent potential starvation: 39 We need to carefully choose |suite_min_duts| for both low and high 40 priority suites. If a high priority suite didn't specify it but a low 41 priority one does, the high priority suite can be starved! 42 2. Restart requirement: 43 Restart host scheduler if you manually released a host by setting 44 leased=0 in db. This is needed because host scheduler maintains internal 45 state of host assignment for suites. 46 3. Exchanging duts triggers provisioning: 47 TODO(fdeng): There is a chance two suites can exchange duts, 48 if the two suites are for different builds, the exchange 49 will trigger provisioning. This can be optimized by preferring getting 50 hosts with the same build. 51""" 52 53import argparse 54import collections 55import datetime 56import logging 57import os 58import signal 59import sys 60import time 61 62import common 63from autotest_lib.client.common_lib import utils 64from autotest_lib.frontend import setup_django_environment 65 66# This import needs to come earlier to avoid using autotest's version of 67# httplib2, which is out of date. 68try: 69 from chromite.lib import metrics 70 from chromite.lib import ts_mon_config 71except ImportError: 72 metrics = utils.metrics_mock 73 ts_mon_config = utils.metrics_mock 74 75from autotest_lib.client.common_lib import global_config 76from autotest_lib.scheduler import email_manager 77from autotest_lib.scheduler import query_managers 78from autotest_lib.scheduler import rdb_lib 79from autotest_lib.scheduler import rdb_utils 80from autotest_lib.scheduler import scheduler_lib 81from autotest_lib.scheduler import scheduler_models 82from autotest_lib.site_utils import server_manager_utils 83 84 85_db_manager = None 86_shutdown = False 87_tick_pause_sec = global_config.global_config.get_config_value( 88 'SCHEDULER', 'tick_pause_sec', type=int, default=5) 89_monitor_db_host_acquisition = global_config.global_config.get_config_value( 90 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True) 91_METRICS_PREFIX = 'chromeos/autotest/host_scheduler' 92 93class SuiteRecorder(object): 94 """Recording the host assignment for suites. 95 96 The recorder holds two things: 97 * suite_host_num, records how many duts a suite is holding, 98 which is a map <suite_job_id -> num_of_hosts> 99 * hosts_to_suites, records which host is assigned to which 100 suite, it is a map <host_id -> suite_job_id> 101 The two datastructure got updated when a host is assigned to or released 102 by a job. 103 104 The reason to maintain hosts_to_suites is that, when a host is released, 105 we need to know which suite it was leased to. Querying the db for the 106 latest completed job that has run on a host is slow. Therefore, we go with 107 an alternative: keeping a <host id, suite job id> map 108 in memory (for 10K hosts, the map should take less than 1M memory on 109 64-bit machine with python 2.7) 110 111 """ 112 113 114 def __init__(self, job_query_manager): 115 """Initialize. 116 117 @param job_queue_manager: A JobQueueryManager object. 118 """ 119 self.job_query_manager = job_query_manager 120 self.suite_host_num, self.hosts_to_suites = ( 121 self.job_query_manager.get_suite_host_assignment()) 122 123 124 def record_assignment(self, queue_entry): 125 """Record that the hqe has got a host. 126 127 @param queue_entry: A scheduler_models.HostQueueEntry object which has 128 got a host. 129 """ 130 parent_id = queue_entry.job.parent_job_id 131 if not parent_id: 132 return 133 if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id: 134 logging.error('HQE (id: %d, parent_job_id: %d, host: %s) ' 135 'seems already recorded', queue_entry.id, 136 parent_id, queue_entry.host.hostname) 137 return 138 num_hosts = self.suite_host_num.get(parent_id, 0) 139 self.suite_host_num[parent_id] = num_hosts + 1 140 self.hosts_to_suites[queue_entry.host_id] = parent_id 141 logging.debug('Suite %d got host %s, currently holding %d hosts', 142 parent_id, queue_entry.host.hostname, 143 self.suite_host_num[parent_id]) 144 145 146 def record_release(self, hosts): 147 """Update the record with host releasing event. 148 149 @param hosts: A list of scheduler_models.Host objects. 150 """ 151 for host in hosts: 152 if host.id in self.hosts_to_suites: 153 parent_job_id = self.hosts_to_suites.pop(host.id) 154 count = self.suite_host_num[parent_job_id] - 1 155 if count == 0: 156 del self.suite_host_num[parent_job_id] 157 else: 158 self.suite_host_num[parent_job_id] = count 159 logging.debug( 160 'Suite %d releases host %s, currently holding %d hosts', 161 parent_job_id, host.hostname, count) 162 163 164 def get_min_duts(self, suite_job_ids): 165 """Figure out min duts to request. 166 167 Given a set ids of suite jobs, figure out minimum duts to request for 168 each suite. It is determined by two factors: min_duts specified 169 for each suite in its job keyvals, and how many duts a suite is 170 currently holding. 171 172 @param suite_job_ids: A set of suite job ids. 173 174 @returns: A dictionary, the key is suite_job_id, the value 175 is the minimum number of duts to request. 176 """ 177 suite_min_duts = self.job_query_manager.get_min_duts_of_suites( 178 suite_job_ids) 179 for parent_id in suite_job_ids: 180 min_duts = suite_min_duts.get(parent_id, 0) 181 cur_duts = self.suite_host_num.get(parent_id, 0) 182 suite_min_duts[parent_id] = max(0, min_duts - cur_duts) 183 logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s', 184 suite_min_duts) 185 return suite_min_duts 186 187 188class BaseHostScheduler(object): 189 """Base class containing host acquisition logic. 190 191 This class contains all the core host acquisition logic needed by the 192 scheduler to run jobs on hosts. It is only capable of releasing hosts 193 back to the rdb through its tick, any other action must be instigated by 194 the job scheduler. 195 """ 196 197 198 host_assignment = collections.namedtuple('host_assignment', ['host', 'job']) 199 200 201 def __init__(self): 202 self.host_query_manager = query_managers.AFEHostQueryManager() 203 204 205 def _release_hosts(self): 206 """Release hosts to the RDB. 207 208 Release all hosts that are ready and are currently not being used by an 209 active hqe, and don't have a new special task scheduled against them. 210 211 @return a list of hosts that are released. 212 """ 213 release_hosts = self.host_query_manager.find_unused_healty_hosts() 214 release_hostnames = [host.hostname for host in release_hosts] 215 if release_hostnames: 216 self.host_query_manager.set_leased( 217 False, hostname__in=release_hostnames) 218 return release_hosts 219 220 221 @classmethod 222 def schedule_host_job(cls, host, queue_entry): 223 """Schedule a job on a host. 224 225 Scheduling a job involves: 226 1. Setting the active bit on the queue_entry. 227 2. Scheduling a special task on behalf of the queue_entry. 228 Performing these actions will lead the job scheduler through a chain of 229 events, culminating in running the test and collecting results from 230 the host. 231 232 @param host: The host against which to schedule the job. 233 @param queue_entry: The queue_entry to schedule. 234 """ 235 if queue_entry.host_id is None: 236 queue_entry.set_host(host) 237 elif host.id != queue_entry.host_id: 238 raise rdb_utils.RDBException('The rdb returned host: %s ' 239 'but the job:%s was already assigned a host: %s. ' % 240 (host.hostname, queue_entry.job_id, 241 queue_entry.host.hostname)) 242 queue_entry.update_field('active', True) 243 244 # TODO: crbug.com/373936. The host scheduler should only be assigning 245 # jobs to hosts, but the criterion we use to release hosts depends 246 # on it not being used by an active hqe. Since we're activating the 247 # hqe here, we also need to schedule its first prejob task. OTOH, 248 # we could converge to having the host scheduler manager all special 249 # tasks, since their only use today is to verify/cleanup/reset a host. 250 logging.info('Scheduling pre job tasks for entry: %s', queue_entry) 251 queue_entry.schedule_pre_job_tasks() 252 253 254 def acquire_hosts(self, host_jobs): 255 """Accquire hosts for given jobs. 256 257 This method sends jobs that need hosts to rdb. 258 Child class can override this method to pipe more args 259 to rdb. 260 261 @param host_jobs: A list of queue entries that either require hosts, 262 or require host assignment validation through the rdb. 263 264 @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper 265 for each host acquired on behalf of a queue_entry, 266 or None if a host wasn't found. 267 """ 268 return rdb_lib.acquire_hosts(host_jobs) 269 270 271 def find_hosts_for_jobs(self, host_jobs): 272 """Find and verify hosts for a list of jobs. 273 274 @param host_jobs: A list of queue entries that either require hosts, 275 or require host assignment validation through the rdb. 276 @return: A generator of tuples of the form (host, queue_entry) for each 277 valid host-queue_entry assignment. 278 """ 279 hosts = self.acquire_hosts(host_jobs) 280 for host, job in zip(hosts, host_jobs): 281 if host: 282 yield self.host_assignment(host, job) 283 284 285 def tick(self): 286 """Schedule core host management activities.""" 287 self._release_hosts() 288 289 290class HostScheduler(BaseHostScheduler): 291 """A scheduler capable managing host acquisition for new jobs.""" 292 293 294 def __init__(self): 295 super(HostScheduler, self).__init__() 296 self.job_query_manager = query_managers.AFEJobQueryManager() 297 # Keeping track on how many hosts each suite is holding 298 # {suite_job_id: num_hosts} 299 self._suite_recorder = SuiteRecorder(self.job_query_manager) 300 301 302 def _record_host_assignment(self, host, queue_entry): 303 """Record that |host| is assigned to |queue_entry|. 304 305 Record: 306 1. How long it takes to assign a host to a job in metadata db. 307 2. Record host assignment of a suite. 308 309 @param host: A Host object. 310 @param queue_entry: A HostQueueEntry object. 311 """ 312 secs_in_queued = (datetime.datetime.now() - 313 queue_entry.job.created_on).total_seconds() 314 self._suite_recorder.record_assignment(queue_entry) 315 316 317 @metrics.SecondsTimerDecorator( 318 '%s/schedule_jobs_duration' % _METRICS_PREFIX) 319 def _schedule_jobs(self): 320 """Schedule new jobs against hosts.""" 321 322 new_jobs_with_hosts = 0 323 queue_entries = self.job_query_manager.get_pending_queue_entries( 324 only_hostless=False) 325 unverified_host_jobs = [job for job in queue_entries 326 if not job.is_hostless()] 327 if unverified_host_jobs: 328 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs): 329 self.schedule_host_job(acquisition.host, acquisition.job) 330 self._record_host_assignment(acquisition.host, acquisition.job) 331 new_jobs_with_hosts += 1 332 metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX 333 ).increment_by(new_jobs_with_hosts) 334 335 num_jobs_without_hosts = (len(unverified_host_jobs) - 336 new_jobs_with_hosts) 337 metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX 338 ).set(num_jobs_without_hosts) 339 340 metrics.Counter('%s/tick' % _METRICS_PREFIX).increment() 341 342 @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX) 343 def _lease_hosts_of_frontend_tasks(self): 344 """Lease hosts of tasks scheduled through the frontend.""" 345 # We really don't need to get all the special tasks here, just the ones 346 # without hqes, but reusing the method used by the scheduler ensures 347 # we prioritize the same way. 348 lease_hostnames = [ 349 task.host.hostname for task in 350 self.job_query_manager.get_prioritized_special_tasks( 351 only_tasks_with_leased_hosts=False) 352 if task.queue_entry_id is None and not task.host.leased] 353 # Leasing a leased hosts here shouldn't be a problem: 354 # 1. The only way a host can be leased is if it's been assigned to 355 # an active hqe or another similar frontend task, but doing so will 356 # have already precluded it from the list of tasks returned by the 357 # job_query_manager. 358 # 2. The unleasing is done based on global conditions. Eg: Even if a 359 # task has already leased a host and we lease it again, the 360 # host scheduler won't release the host till both tasks are complete. 361 if lease_hostnames: 362 self.host_query_manager.set_leased( 363 True, hostname__in=lease_hostnames) 364 365 366 def acquire_hosts(self, host_jobs): 367 """Override acquire_hosts. 368 369 This method overrides the method in parent class. 370 It figures out a set of suites that |host_jobs| belong to; 371 and get min_duts requirement for each suite. 372 It pipes min_duts for each suite to rdb. 373 374 """ 375 parent_job_ids = set([q.job.parent_job_id 376 for q in host_jobs if q.job.parent_job_id]) 377 suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids) 378 return rdb_lib.acquire_hosts(host_jobs, suite_min_duts) 379 380 381 @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX) 382 def tick(self): 383 logging.info('Calling new tick.') 384 logging.info('Leasing hosts for frontend tasks.') 385 self._lease_hosts_of_frontend_tasks() 386 logging.info('Finding hosts for new jobs.') 387 self._schedule_jobs() 388 logging.info('Releasing unused hosts.') 389 released_hosts = self._release_hosts() 390 logging.info('Updating suite assignment with released hosts') 391 self._suite_recorder.record_release(released_hosts) 392 logging.info('Calling email_manager.') 393 email_manager.manager.send_queued_emails() 394 395 396class DummyHostScheduler(BaseHostScheduler): 397 """A dummy host scheduler that doesn't acquire or release hosts.""" 398 399 def __init__(self): 400 pass 401 402 403 def tick(self): 404 pass 405 406 407def handle_signal(signum, frame): 408 """Sigint handler so we don't crash mid-tick.""" 409 global _shutdown 410 _shutdown = True 411 logging.info("Shutdown request received.") 412 413 414def initialize(testing=False): 415 """Initialize the host scheduler.""" 416 if testing: 417 # Don't import testing utilities unless we're in testing mode, 418 # as the database imports have side effects. 419 from autotest_lib.scheduler import rdb_testing_utils 420 rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing( 421 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE) 422 global _db_manager 423 _db_manager = scheduler_lib.ConnectionManager() 424 scheduler_lib.setup_logging( 425 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 426 None, timestamped_logfile_prefix='host_scheduler') 427 logging.info("Setting signal handler") 428 signal.signal(signal.SIGINT, handle_signal) 429 signal.signal(signal.SIGTERM, handle_signal) 430 scheduler_models.initialize() 431 432 433def parse_arguments(argv): 434 """ 435 Parse command line arguments 436 437 @param argv: argument list to parse 438 @returns: parsed arguments. 439 """ 440 parser = argparse.ArgumentParser(description='Host scheduler.') 441 parser.add_argument('--testing', action='store_true', default=False, 442 help='Start the host scheduler in testing mode.') 443 parser.add_argument('--production', 444 help=('Indicate that scheduler is running in production' 445 ' environment and it can use database that is not' 446 ' hosted in localhost. If it is set to False, ' 447 'scheduler will fail if database is not in ' 448 'localhost.'), 449 action='store_true', default=False) 450 parser.add_argument( 451 '--lifetime-hours', 452 type=float, 453 default=None, 454 help='If provided, number of hours the scheduler should run for. ' 455 'At the expiry of this time, the process will exit ' 456 'gracefully.', 457 ) 458 parser.add_argument( 459 '--metrics-file', 460 help='If provided, drop metrics to this local file instead of ' 461 'reporting to ts_mon', 462 type=str, 463 default=None, 464 ) 465 options = parser.parse_args(argv) 466 467 return options 468 469 470def main(): 471 if _monitor_db_host_acquisition: 472 logging.info('Please set inline_host_acquisition=False in the shadow ' 473 'config before starting the host scheduler.') 474 sys.exit(0) 475 try: 476 options = parse_arguments(sys.argv[1:]) 477 scheduler_lib.check_production_settings(options) 478 479 # If server database is enabled, check if the server has role 480 # `host_scheduler`. If the server does not have host_scheduler role, 481 # exception will be raised and host scheduler will not continue to run. 482 if server_manager_utils.use_server_db(): 483 server_manager_utils.confirm_server_has_role(hostname='localhost', 484 role='host_scheduler') 485 486 initialize(options.testing) 487 488 with ts_mon_config.SetupTsMonGlobalState( 489 'autotest_host_scheduler', 490 indirect=True, 491 debug_file=options.metrics_file, 492 ): 493 metrics.Counter('%s/start' % _METRICS_PREFIX).increment() 494 process_start_time = time.time() 495 host_scheduler = HostScheduler() 496 minimum_tick_sec = global_config.global_config.get_config_value( 497 'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float) 498 while not _shutdown: 499 if _lifetime_expired(options.lifetime_hours, 500 process_start_time): 501 break 502 start = time.time() 503 host_scheduler.tick() 504 curr_tick_sec = time.time() - start 505 if (minimum_tick_sec > curr_tick_sec): 506 time.sleep(minimum_tick_sec - curr_tick_sec) 507 else: 508 time.sleep(0.0001) 509 logging.info('Shutdown request recieved. Bye! Bye!') 510 except server_manager_utils.ServerActionError: 511 # This error is expected when the server is not in primary status 512 # for host-scheduler role. Thus do not send email for it. 513 raise 514 except Exception: 515 metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment() 516 raise 517 finally: 518 email_manager.manager.send_queued_emails() 519 if _db_manager: 520 _db_manager.disconnect() 521 522 523def _lifetime_expired(lifetime_hours, process_start_time): 524 """Returns True if we've expired the process lifetime, False otherwise. 525 526 Also sets the global _shutdown so that any background processes also take 527 the cue to exit. 528 """ 529 if lifetime_hours is None: 530 return False 531 if time.time() - process_start_time > lifetime_hours * 3600: 532 logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', 533 lifetime_hours) 534 global _shutdown 535 _shutdown = True 536 return True 537 return False 538 539 540if __name__ == '__main__': 541 main() 542