1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8import argparse 9import datetime 10import httplib 11import logging 12import os 13import random 14import signal 15import time 16import urllib2 17 18import common 19 20from autotest_lib.frontend import setup_django_environment 21from autotest_lib.frontend.afe.json_rpc import proxy 22from autotest_lib.client.common_lib import error 23from autotest_lib.client.common_lib import global_config 24from autotest_lib.frontend.afe import models 25from autotest_lib.scheduler import email_manager 26from autotest_lib.scheduler import scheduler_lib 27from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 28from autotest_lib.server import utils as server_utils 29from chromite.lib import timeout_util 30from django.core.exceptions import MultipleObjectsReturned 31from django.db import transaction 32 33try: 34 from chromite.lib import metrics 35 from chromite.lib import ts_mon_config 36 from infra_libs import ts_mon 37except ImportError: 38 metrics = server_utils.metrics_mock 39 ts_mon_config = server_utils.metrics_mock 40 41 42""" 43Autotest shard client 44 45The shard client can be run as standalone service. It periodically polls the 46master in a heartbeat, retrieves new jobs and hosts and inserts them into the 47local database. 48 49A shard is set up (by a human) and pointed to the global AFE (cautotest). 50On the shard, this script periodically makes so called heartbeat requests to the 51global AFE, which will then complete the following actions: 52 531. Find the previously created (with atest) record for the shard. Shards are 54 identified by their hostnames, specified in the shadow_config. 552. Take the records that were sent in the heartbeat and insert them into the 56 global database. 57 - This is to set the status of jobs to completed in the master database after 58 they were run by a slave. This is necessary so one can just look at the 59 master's afe to see the statuses of all jobs. Otherwise one would have to 60 check the tko tables or the individual slave AFEs. 613. Find labels that have been assigned to this shard. 624. Assign hosts that: 63 - have the specified label 64 - aren't leased 65 - have an id which is not in the known_host_ids which were sent in the 66 heartbeat request. 675. Assign jobs that: 68 - depend on the specified label 69 - haven't been assigned before 70 - aren't started yet 71 - aren't completed yet 72 - have an id which is not in the jobs_known_ids which were sent in the 73 heartbeat request. 746. Serialize the chosen jobs and hosts. 75 - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users, 76 and many more. Details about this can be found around 77 model_logic.serialize() 787. Send these objects to the slave. 79 80 81On the client side, this will happen: 821. Deserialize the objects sent from the master and persist them to the local 83 database. 842. monitor_db on the shard will pick up these jobs and schedule them on the 85 available hosts (which were retrieved from a heartbeat). 863. Once a job is finished, it's shard_id is set to NULL 874. The shard_client will pick up all jobs where shard_id=NULL and will 88 send them to the master in the request of the next heartbeat. 89 - The master will persist them as described earlier. 90 - the shard_id will be set back to the shard's id, so the record won't be 91 uploaded again. 92 The heartbeat request will also contain the ids of incomplete jobs and the 93 ids of all hosts. This is used to not send objects repeatedly. For more 94 information on this and alternatives considered 95 see rpc_interface.shard_heartbeat. 96""" 97 98 99HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat' 100_METRICS_PREFIX = 'chromeos/autotest/shard_client/heartbeat/' 101 102RPC_TIMEOUT_MIN = 30 103RPC_DELAY_SEC = 5 104 105# The maximum number of jobs to attempt to upload in a single heartbeat. 106MAX_UPLOAD_JOBS = 1000 107 108_heartbeat_client = None 109 110 111class ShardClient(object): 112 """Performs client side tasks of sharding, i.e. the heartbeat. 113 114 This class contains the logic to do periodic heartbeats to a global AFE, 115 to retrieve new jobs from it and to report completed jobs back. 116 """ 117 118 def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec): 119 self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname, 120 timeout_min=RPC_TIMEOUT_MIN, 121 delay_sec=RPC_DELAY_SEC) 122 self.hostname = shard_hostname 123 self.tick_pause_sec = tick_pause_sec 124 self._shutdown_requested = False 125 self._shard = None 126 127 128 def _deserialize_many(self, serialized_list, djmodel, message): 129 """Deserialize data in JSON format to database. 130 131 Deserialize a list of JSON-formatted data to database using Django. 132 133 @param serialized_list: A list of JSON-formatted data or python dict 134 literals. 135 @param djmodel: Django model type. 136 @param message: A string to be used in a logging message. 137 """ 138 logging.info('Deserializing %s %ss', len(serialized_list), message) 139 i = 0 140 for serialized in serialized_list: 141 i += 1 142 if i % 100 == 1: 143 logging.info('Progress: at entry %s', i) 144 with transaction.commit_on_success(): 145 try: 146 djmodel.deserialize(serialized) 147 except Exception as e: 148 logging.error('Deserializing a %s fails: %s, Error: %s', 149 message, serialized, e) 150 metrics.Counter( 151 'chromeos/autotest/shard_client/deserialization_failed' 152 ).increment() 153 logging.info('Done deserializing %ss', message) 154 155 156 @metrics.SecondsTimerDecorator( 157 'chromeos/autotest/shard_client/heartbeat_response_duration') 158 def process_heartbeat_response(self, heartbeat_response): 159 """Save objects returned by a heartbeat to the local database. 160 161 This deseralizes hosts and jobs including their dependencies and saves 162 them to the local database. 163 164 @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs', 165 as returned by the `shard_heartbeat` rpc 166 call. 167 """ 168 hosts_serialized = heartbeat_response['hosts'] 169 jobs_serialized = heartbeat_response['jobs'] 170 suite_keyvals_serialized = heartbeat_response['suite_keyvals'] 171 incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', []) 172 173 metrics.Gauge('chromeos/autotest/shard_client/hosts_received' 174 ).set(len(hosts_serialized)) 175 metrics.Gauge('chromeos/autotest/shard_client/jobs_received' 176 ).set(len(jobs_serialized)) 177 metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received' 178 ).set(len(suite_keyvals_serialized)) 179 180 self._deserialize_many(hosts_serialized, models.Host, 'host') 181 self._deserialize_many(jobs_serialized, models.Job, 'job') 182 self._deserialize_many(suite_keyvals_serialized, models.JobKeyval, 183 'jobkeyval') 184 185 host_ids = [h['id'] for h in hosts_serialized] 186 logging.info('Heartbeat response contains hosts %s', host_ids) 187 job_ids = [j['id'] for j in jobs_serialized] 188 logging.info('Heartbeat response contains jobs %s', job_ids) 189 parent_jobs_with_keyval = set([kv['job_id'] 190 for kv in suite_keyvals_serialized]) 191 logging.info('Heartbeat response contains suite_keyvals_for jobs %s', 192 list(parent_jobs_with_keyval)) 193 if incorrect_host_ids: 194 logging.info('Heartbeat response contains incorrect_host_ids %s ' 195 'which will be deleted.', incorrect_host_ids) 196 self._remove_incorrect_hosts(incorrect_host_ids) 197 198 # If the master has just sent any jobs that we think have completed, 199 # re-sync them with the master. This is especially useful when a 200 # heartbeat or job is silently dropped, as the next heartbeat will 201 # have a disagreement. Updating the shard_id to NULL will mark these 202 # jobs for upload on the next heartbeat. 203 job_models = models.Job.objects.filter( 204 id__in=job_ids, hostqueueentry__complete=True) 205 if job_models: 206 job_models.update(shard=None) 207 job_ids_repr = ', '.join([str(job.id) for job in job_models]) 208 logging.warn('Following completed jobs are reset shard_id to NULL ' 209 'to be uploaded to master again: %s', job_ids_repr) 210 211 212 def _remove_incorrect_hosts(self, incorrect_host_ids=None): 213 """Remove from local database any hosts that should not exist. 214 215 Entries of |incorrect_host_ids| that are absent from database will be 216 silently ignored. 217 218 @param incorrect_host_ids: a list of ids (as integers) to remove. 219 """ 220 if not incorrect_host_ids: 221 return 222 223 try: 224 models.Host.objects.filter(id__in=incorrect_host_ids).delete() 225 except MultipleObjectsReturned as e: 226 logging.exception('Failed to remove incorrect hosts %s', 227 incorrect_host_ids) 228 229 230 @property 231 def shard(self): 232 """Return this shard's own shard object, fetched from the database. 233 234 A shard's object is fetched from the master with the first jobs. It will 235 not exist before that time. 236 237 @returns: The shard object if it already exists, otherwise None 238 """ 239 if self._shard is None: 240 try: 241 self._shard = models.Shard.smart_get(self.hostname) 242 except models.Shard.DoesNotExist: 243 # This might happen before any jobs are assigned to this shard. 244 # This is okay because then there is nothing to offload anyway. 245 pass 246 return self._shard 247 248 249 def _get_jobs_to_upload(self): 250 jobs = [] 251 # The scheduler sets shard to None upon completion of the job. 252 # For more information on the shard field's semantic see 253 # models.Job.shard. We need to be careful to wait for both the 254 # shard_id and the complete bit here, or we will end up syncing 255 # the job without ever setting the complete bit. 256 job_ids = list(models.Job.objects.filter( 257 shard=None, 258 hostqueueentry__complete=True).values_list('pk', flat=True)) 259 260 for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all(): 261 jobs.append(job_to_upload) 262 return jobs 263 264 265 def _mark_jobs_as_uploaded(self, job_ids): 266 # self.shard might be None if no jobs were downloaded yet. 267 # But then job_ids is empty, so this is harmless. 268 # Even if there were jobs we'd in the worst case upload them twice. 269 models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard) 270 271 272 def _get_hqes_for_jobs(self, jobs): 273 hqes = [] 274 for job in jobs: 275 hqes.extend(job.hostqueueentry_set.all()) 276 return hqes 277 278 279 def _get_known_jobs_and_hosts(self): 280 """Returns lists of host and job info to send in a heartbeat. 281 282 The host and job ids are ids of objects that are already present on the 283 shard and therefore don't need to be sent again. 284 285 For jobs, only incomplete jobs are sent, as the master won't send 286 already completed jobs anyway. This helps keeping the list of id's 287 considerably small. 288 289 For hosts, host status in addition to host id are sent to master 290 to sync the host status. 291 292 @returns: Tuple of three lists. The first one contains job ids, the 293 second one host ids, and the third one host statuses. 294 """ 295 jobs = models.Job.objects.filter(hostqueueentry__complete=False) 296 job_ids = list(jobs.values_list('id', flat=True)) 297 self._report_job_time_distribution(jobs) 298 299 host_models = models.Host.objects.filter(invalid=0) 300 host_ids = [] 301 host_statuses = [] 302 for h in host_models: 303 host_ids.append(h.id) 304 host_statuses.append(h.status) 305 return job_ids, host_ids, host_statuses 306 307 308 def _heartbeat_packet(self): 309 """Construct the heartbeat packet. 310 311 See rpc_interface for a more detailed description of the heartbeat. 312 313 @return: A heartbeat packet. 314 """ 315 known_job_ids, known_host_ids, known_host_statuses = ( 316 self._get_known_jobs_and_hosts()) 317 max_print = 100 318 logging.info('Known jobs (first %s): %s', max_print, 319 known_job_ids[:max_print]) 320 logging.info('Total known jobs: %s', len(known_job_ids)) 321 322 job_objs = self._get_jobs_to_upload() 323 hqes = [hqe.serialize(include_dependencies=False) 324 for hqe in self._get_hqes_for_jobs(job_objs)] 325 326 jobs = [job.serialize(include_dependencies=False) for job in job_objs] 327 if len(jobs) > MAX_UPLOAD_JOBS: 328 logging.info('Throttling number of jobs to upload from %s to %s.', 329 len(jobs), MAX_UPLOAD_JOBS) 330 jobs = jobs[:MAX_UPLOAD_JOBS] 331 logging.info('Uploading jobs %s', [j['id'] for j in jobs]) 332 333 return {'shard_hostname': self.hostname, 334 'known_job_ids': known_job_ids, 335 'known_host_ids': known_host_ids, 336 'known_host_statuses': known_host_statuses, 337 'jobs': jobs, 338 'hqes': hqes} 339 340 341 def _report_job_time_distribution(self, jobs): 342 """Report distribution of job durations to monarch.""" 343 jobs_time_distribution = metrics.Distribution( 344 _METRICS_PREFIX + 'known_jobs_durations') 345 now = datetime.datetime.now() 346 347 # The type expected by the .set(...) of a distribution is a 348 # distribution. 349 dist = ts_mon.Distribution(ts_mon.GeometricBucketer()) 350 for job in jobs: 351 duration = int( 352 max(0, (now - job.created_on).total_seconds())) 353 dist.add(duration) 354 jobs_time_distribution.set(dist) 355 356 def _report_packet_metrics(self, packet): 357 """Report stats about outgoing packet to monarch.""" 358 metrics.Gauge(_METRICS_PREFIX + 'known_job_ids_count').set( 359 len(packet['known_job_ids'])) 360 metrics.Gauge(_METRICS_PREFIX + 'jobs_upload_count').set( 361 len(packet['jobs'])) 362 metrics.Gauge(_METRICS_PREFIX + 'known_host_ids_count').set( 363 len(packet['known_host_ids'])) 364 365 366 def _heartbeat_failure(self, log_message, failure_type_str=''): 367 logging.error("Heartbeat failed. %s", log_message) 368 metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure' 369 ).increment(fields={'failure_type': failure_type_str}) 370 371 372 @metrics.SecondsTimerDecorator( 373 'chromeos/autotest/shard_client/do_heatbeat_duration') 374 def do_heartbeat(self): 375 """Perform a heartbeat: Retreive new jobs. 376 377 This function executes a `shard_heartbeat` RPC. It retrieves the 378 response of this call and processes the response by storing the returned 379 objects in the local database. 380 381 Returns: True if the heartbeat ran successfully, False otherwise. 382 """ 383 384 logging.info("Performing heartbeat.") 385 packet = self._heartbeat_packet() 386 self._report_packet_metrics(packet) 387 metrics.Gauge(_METRICS_PREFIX + 'request_size').set( 388 len(str(packet))) 389 390 try: 391 response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet) 392 logging.info('Finished heartbeat upload.') 393 except urllib2.HTTPError as e: 394 self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason), 395 'HTTPError') 396 return False 397 except urllib2.URLError as e: 398 self._heartbeat_failure('URLError: %s' % e.reason, 399 'URLError') 400 return False 401 except httplib.HTTPException as e: 402 self._heartbeat_failure('HTTPException: %s' % e, 403 'HTTPException') 404 return False 405 except timeout_util.TimeoutError as e: 406 self._heartbeat_failure('TimeoutError: %s' % e, 407 'TimeoutError') 408 return False 409 except proxy.JSONRPCException as e: 410 self._heartbeat_failure('JSONRPCException: %s' % e, 411 'JSONRPCException') 412 return False 413 414 metrics.Gauge(_METRICS_PREFIX + 'response_size').set( 415 len(str(response))) 416 logging.info('Marking jobs as uploaded.') 417 self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']]) 418 logging.info('Processing heartbeat response.') 419 self.process_heartbeat_response(response) 420 logging.info("Heartbeat completed.") 421 return True 422 423 424 def tick(self): 425 """Performs all tasks the shard clients needs to do periodically.""" 426 success = self.do_heartbeat() 427 if success: 428 metrics.Counter('chromeos/autotest/shard_client/tick').increment() 429 430 431 def loop(self, lifetime_hours): 432 """Calls tick() until shutdown() is called or lifetime expires. 433 434 @param lifetime_hours: (int) hours to loop for. 435 """ 436 loop_start_time = time.time() 437 while self._continue_looping(lifetime_hours, loop_start_time): 438 self.tick() 439 # Sleep with +/- 10% fuzzing to avoid phaselock of shards. 440 tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5) 441 time.sleep(self.tick_pause_sec + tick_fuzz) 442 443 444 def shutdown(self): 445 """Stops the shard client after the current tick.""" 446 logging.info("Shutdown request received.") 447 self._shutdown_requested = True 448 449 450 def _continue_looping(self, lifetime_hours, loop_start_time): 451 """Determines if we should continue with the next mainloop iteration. 452 453 @param lifetime_hours: (float) number of hours to loop for. None 454 implies no deadline. 455 @param process_start_time: Time when we started looping. 456 @returns True if we should continue looping, False otherwise. 457 """ 458 if self._shutdown_requested: 459 return False 460 461 if (lifetime_hours is None 462 or time.time() - loop_start_time < lifetime_hours * 3600): 463 return True 464 logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', 465 lifetime_hours) 466 return False 467 468 469def handle_signal(signum, frame): 470 """Sigint handler so we don't crash mid-tick.""" 471 _heartbeat_client.shutdown() 472 473 474def _get_shard_hostname_and_ensure_running_on_shard(): 475 """Read the hostname the local shard from the global configuration. 476 477 Raise an exception if run from elsewhere than a shard. 478 479 @raises error.HeartbeatOnlyAllowedInShardModeException if run from 480 elsewhere than from a shard. 481 """ 482 hostname = global_config.global_config.get_config_value( 483 'SHARD', 'shard_hostname', default=None) 484 if not hostname: 485 raise error.HeartbeatOnlyAllowedInShardModeException( 486 'To run the shard client, shard_hostname must neither be None nor ' 487 'empty.') 488 return hostname 489 490 491def _get_tick_pause_sec(): 492 """Read pause to make between two ticks from the global configuration.""" 493 return global_config.global_config.get_config_value( 494 'SHARD', 'heartbeat_pause_sec', type=float) 495 496 497def get_shard_client(): 498 """Instantiate a shard client instance. 499 500 Configuration values will be read from the global configuration. 501 502 @returns A shard client instance. 503 """ 504 global_afe_hostname = server_utils.get_global_afe_hostname() 505 shard_hostname = _get_shard_hostname_and_ensure_running_on_shard() 506 tick_pause_sec = _get_tick_pause_sec() 507 return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec) 508 509 510def main(): 511 parser = argparse.ArgumentParser(description='Shard client.') 512 parser.add_argument( 513 '--lifetime-hours', 514 type=float, 515 default=None, 516 help='If provided, number of hours we should run for. ' 517 'At the expiry of this time, the process will exit ' 518 'gracefully.', 519 ) 520 parser.add_argument( 521 '--metrics-file', 522 help='If provided, drop metrics to this local file instead of ' 523 'reporting to ts_mon', 524 type=str, 525 default=None, 526 ) 527 options = parser.parse_args() 528 529 with ts_mon_config.SetupTsMonGlobalState( 530 'shard_client', 531 indirect=True, 532 debug_file=options.metrics_file, 533 ): 534 try: 535 metrics.Counter('chromeos/autotest/shard_client/start').increment() 536 main_without_exception_handling(options) 537 except Exception as e: 538 metrics.Counter('chromeos/autotest/shard_client/uncaught_exception' 539 ).increment() 540 message = 'Uncaught exception. Terminating shard_client.' 541 email_manager.manager.log_stacktrace(message) 542 logging.exception(message) 543 raise 544 finally: 545 email_manager.manager.send_queued_emails() 546 547 548def main_without_exception_handling(options): 549 scheduler_lib.setup_logging( 550 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 551 None, timestamped_logfile_prefix='shard_client') 552 553 logging.info("Setting signal handler.") 554 signal.signal(signal.SIGINT, handle_signal) 555 signal.signal(signal.SIGTERM, handle_signal) 556 557 logging.info("Starting shard client.") 558 global _heartbeat_client 559 _heartbeat_client = get_shard_client() 560 _heartbeat_client.loop(options.lifetime_hours) 561 562 563if __name__ == '__main__': 564 main() 565