1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8import argparse 9import httplib 10import logging 11import os 12import random 13import signal 14import time 15import urllib2 16 17import common 18 19from autotest_lib.frontend import setup_django_environment 20from autotest_lib.frontend.afe.json_rpc import proxy 21from autotest_lib.client.common_lib import error 22from autotest_lib.client.common_lib import global_config 23from autotest_lib.frontend.afe import models 24from autotest_lib.scheduler import email_manager 25from autotest_lib.scheduler import scheduler_lib 26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 27from autotest_lib.server import utils as server_utils 28from chromite.lib import timeout_util 29from django.db import transaction 30 31try: 32 from chromite.lib import metrics 33 from chromite.lib import ts_mon_config 34except ImportError: 35 metrics = server_utils.metrics_mock 36 ts_mon_config = server_utils.metrics_mock 37 38 39""" 40Autotest shard client 41 42The shard client can be run as standalone service. It periodically polls the 43master in a heartbeat, retrieves new jobs and hosts and inserts them into the 44local database. 45 46A shard is set up (by a human) and pointed to the global AFE (cautotest). 47On the shard, this script periodically makes so called heartbeat requests to the 48global AFE, which will then complete the following actions: 49 501. Find the previously created (with atest) record for the shard. Shards are 51 identified by their hostnames, specified in the shadow_config. 522. Take the records that were sent in the heartbeat and insert them into the 53 global database. 54 - This is to set the status of jobs to completed in the master database after 55 they were run by a slave. This is necessary so one can just look at the 56 master's afe to see the statuses of all jobs. Otherwise one would have to 57 check the tko tables or the individual slave AFEs. 583. Find labels that have been assigned to this shard. 594. Assign hosts that: 60 - have the specified label 61 - aren't leased 62 - have an id which is not in the known_host_ids which were sent in the 63 heartbeat request. 645. Assign jobs that: 65 - depend on the specified label 66 - haven't been assigned before 67 - aren't started yet 68 - aren't completed yet 69 - have an id which is not in the jobs_known_ids which were sent in the 70 heartbeat request. 716. Serialize the chosen jobs and hosts. 72 - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users, 73 and many more. Details about this can be found around 74 model_logic.serialize() 757. Send these objects to the slave. 76 77 78On the client side, this will happen: 791. Deserialize the objects sent from the master and persist them to the local 80 database. 812. monitor_db on the shard will pick up these jobs and schedule them on the 82 available hosts (which were retrieved from a heartbeat). 833. Once a job is finished, it's shard_id is set to NULL 844. The shard_client will pick up all jobs where shard_id=NULL and will 85 send them to the master in the request of the next heartbeat. 86 - The master will persist them as described earlier. 87 - the shard_id will be set back to the shard's id, so the record won't be 88 uploaded again. 89 The heartbeat request will also contain the ids of incomplete jobs and the 90 ids of all hosts. This is used to not send objects repeatedly. For more 91 information on this and alternatives considered 92 see rpc_interface.shard_heartbeat. 93""" 94 95 96HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat' 97_METRICS_PREFIX = 'chromeos/autotest/shard_client/heartbeat/' 98 99RPC_TIMEOUT_MIN = 5 100RPC_DELAY_SEC = 5 101 102_heartbeat_client = None 103 104 105class ShardClient(object): 106 """Performs client side tasks of sharding, i.e. the heartbeat. 107 108 This class contains the logic to do periodic heartbeats to a global AFE, 109 to retrieve new jobs from it and to report completed jobs back. 110 """ 111 112 def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec): 113 self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname, 114 timeout_min=RPC_TIMEOUT_MIN, 115 delay_sec=RPC_DELAY_SEC) 116 self.hostname = shard_hostname 117 self.tick_pause_sec = tick_pause_sec 118 self._shutdown_requested = False 119 self._shard = None 120 121 122 def _deserialize_many(self, serialized_list, djmodel, message): 123 """Deserialize data in JSON format to database. 124 125 Deserialize a list of JSON-formatted data to database using Django. 126 127 @param serialized_list: A list of JSON-formatted data. 128 @param djmodel: Django model type. 129 @param message: A string to be used in a logging message. 130 """ 131 for serialized in serialized_list: 132 with transaction.commit_on_success(): 133 try: 134 djmodel.deserialize(serialized) 135 except Exception as e: 136 logging.error('Deserializing a %s fails: %s, Error: %s', 137 message, serialized, e) 138 metrics.Counter( 139 'chromeos/autotest/shard_client/deserialization_failed' 140 ).increment() 141 142 143 @metrics.SecondsTimerDecorator( 144 'chromeos/autotest/shard_client/heartbeat_response_duration') 145 def process_heartbeat_response(self, heartbeat_response): 146 """Save objects returned by a heartbeat to the local database. 147 148 This deseralizes hosts and jobs including their dependencies and saves 149 them to the local database. 150 151 @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs', 152 as returned by the `shard_heartbeat` rpc 153 call. 154 """ 155 hosts_serialized = heartbeat_response['hosts'] 156 jobs_serialized = heartbeat_response['jobs'] 157 suite_keyvals_serialized = heartbeat_response['suite_keyvals'] 158 incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', []) 159 160 metrics.Gauge('chromeos/autotest/shard_client/hosts_received' 161 ).set(len(hosts_serialized)) 162 metrics.Gauge('chromeos/autotest/shard_client/jobs_received' 163 ).set(len(jobs_serialized)) 164 metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received' 165 ).set(len(suite_keyvals_serialized)) 166 167 self._deserialize_many(hosts_serialized, models.Host, 'host') 168 self._deserialize_many(jobs_serialized, models.Job, 'job') 169 self._deserialize_many(suite_keyvals_serialized, models.JobKeyval, 170 'jobkeyval') 171 172 host_ids = [h['id'] for h in hosts_serialized] 173 logging.info('Heartbeat response contains hosts %s', host_ids) 174 job_ids = [j['id'] for j in jobs_serialized] 175 logging.info('Heartbeat response contains jobs %s', job_ids) 176 parent_jobs_with_keyval = set([kv['job_id'] 177 for kv in suite_keyvals_serialized]) 178 logging.info('Heartbeat response contains suite_keyvals_for jobs %s', 179 list(parent_jobs_with_keyval)) 180 if incorrect_host_ids: 181 logging.info('Heartbeat response contains incorrect_host_ids %s ' 182 'which will be deleted.', incorrect_host_ids) 183 self._remove_incorrect_hosts(incorrect_host_ids) 184 185 # If the master has just sent any jobs that we think have completed, 186 # re-sync them with the master. This is especially useful when a 187 # heartbeat or job is silently dropped, as the next heartbeat will 188 # have a disagreement. Updating the shard_id to NULL will mark these 189 # jobs for upload on the next heartbeat. 190 job_models = models.Job.objects.filter( 191 id__in=job_ids, hostqueueentry__complete=True) 192 if job_models: 193 job_models.update(shard=None) 194 job_ids_repr = ', '.join([str(job.id) for job in job_models]) 195 logging.warn('Following completed jobs are reset shard_id to NULL ' 196 'to be uploaded to master again: %s', job_ids_repr) 197 198 199 def _remove_incorrect_hosts(self, incorrect_host_ids=None): 200 """Remove from local database any hosts that should not exist. 201 202 Entries of |incorrect_host_ids| that are absent from database will be 203 silently ignored. 204 205 @param incorrect_host_ids: a list of ids (as integers) to remove. 206 """ 207 if not incorrect_host_ids: 208 return 209 210 models.Host.objects.filter(id__in=incorrect_host_ids).delete() 211 212 213 @property 214 def shard(self): 215 """Return this shard's own shard object, fetched from the database. 216 217 A shard's object is fetched from the master with the first jobs. It will 218 not exist before that time. 219 220 @returns: The shard object if it already exists, otherwise None 221 """ 222 if self._shard is None: 223 try: 224 self._shard = models.Shard.smart_get(self.hostname) 225 except models.Shard.DoesNotExist: 226 # This might happen before any jobs are assigned to this shard. 227 # This is okay because then there is nothing to offload anyway. 228 pass 229 return self._shard 230 231 232 def _get_jobs_to_upload(self): 233 jobs = [] 234 # The scheduler sets shard to None upon completion of the job. 235 # For more information on the shard field's semantic see 236 # models.Job.shard. We need to be careful to wait for both the 237 # shard_id and the complete bit here, or we will end up syncing 238 # the job without ever setting the complete bit. 239 job_ids = list(models.Job.objects.filter( 240 shard=None, 241 hostqueueentry__complete=True).values_list('pk', flat=True)) 242 243 for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all(): 244 jobs.append(job_to_upload) 245 return jobs 246 247 248 def _mark_jobs_as_uploaded(self, job_ids): 249 # self.shard might be None if no jobs were downloaded yet. 250 # But then job_ids is empty, so this is harmless. 251 # Even if there were jobs we'd in the worst case upload them twice. 252 models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard) 253 254 255 def _get_hqes_for_jobs(self, jobs): 256 hqes = [] 257 for job in jobs: 258 hqes.extend(job.hostqueueentry_set.all()) 259 return hqes 260 261 262 def _get_known_jobs_and_hosts(self): 263 """Returns lists of host and job info to send in a heartbeat. 264 265 The host and job ids are ids of objects that are already present on the 266 shard and therefore don't need to be sent again. 267 268 For jobs, only incomplete jobs are sent, as the master won't send 269 already completed jobs anyway. This helps keeping the list of id's 270 considerably small. 271 272 For hosts, host status in addition to host id are sent to master 273 to sync the host status. 274 275 @returns: Tuple of three lists. The first one contains job ids, the 276 second one host ids, and the third one host statuses. 277 """ 278 job_ids = list(models.Job.objects.filter( 279 hostqueueentry__complete=False).values_list('id', flat=True)) 280 host_models = models.Host.objects.filter(invalid=0) 281 host_ids = [] 282 host_statuses = [] 283 for h in host_models: 284 host_ids.append(h.id) 285 host_statuses.append(h.status) 286 return job_ids, host_ids, host_statuses 287 288 289 def _heartbeat_packet(self): 290 """Construct the heartbeat packet. 291 292 See rpc_interface for a more detailed description of the heartbeat. 293 294 @return: A heartbeat packet. 295 """ 296 known_job_ids, known_host_ids, known_host_statuses = ( 297 self._get_known_jobs_and_hosts()) 298 logging.info('Known jobs: %s', known_job_ids) 299 300 job_objs = self._get_jobs_to_upload() 301 hqes = [hqe.serialize(include_dependencies=False) 302 for hqe in self._get_hqes_for_jobs(job_objs)] 303 jobs = [job.serialize(include_dependencies=False) for job in job_objs] 304 logging.info('Uploading jobs %s', [j['id'] for j in jobs]) 305 306 return {'shard_hostname': self.hostname, 307 'known_job_ids': known_job_ids, 308 'known_host_ids': known_host_ids, 309 'known_host_statuses': known_host_statuses, 310 'jobs': jobs, 'hqes': hqes} 311 312 313 def _report_packet_metrics(self, packet): 314 """Report stats about outgoing packet to monarch.""" 315 metrics.Gauge(_METRICS_PREFIX + 'known_job_ids_count').set( 316 len(packet['known_job_ids'])) 317 metrics.Gauge(_METRICS_PREFIX + 'jobs_upload_count').set( 318 len(packet['jobs'])) 319 metrics.Gauge(_METRICS_PREFIX + 'known_host_ids_count').set( 320 len(packet['known_host_ids'])) 321 322 323 def _heartbeat_failure(self, log_message, failure_type_str=''): 324 logging.error("Heartbeat failed. %s", log_message) 325 metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure' 326 ).increment(fields={'failure_type': failure_type_str}) 327 328 329 @metrics.SecondsTimerDecorator( 330 'chromeos/autotest/shard_client/do_heatbeat_duration') 331 def do_heartbeat(self): 332 """Perform a heartbeat: Retreive new jobs. 333 334 This function executes a `shard_heartbeat` RPC. It retrieves the 335 response of this call and processes the response by storing the returned 336 objects in the local database. 337 338 Returns: True if the heartbeat ran successfully, False otherwise. 339 """ 340 341 logging.info("Performing heartbeat.") 342 packet = self._heartbeat_packet() 343 self._report_packet_metrics(packet) 344 metrics.Gauge(_METRICS_PREFIX + 'request_size').set( 345 len(str(packet))) 346 347 try: 348 response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet) 349 except urllib2.HTTPError as e: 350 self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason), 351 'HTTPError') 352 return False 353 except urllib2.URLError as e: 354 self._heartbeat_failure('URLError: %s' % e.reason, 355 'URLError') 356 return False 357 except httplib.HTTPException as e: 358 self._heartbeat_failure('HTTPException: %s' % e, 359 'HTTPException') 360 return False 361 except timeout_util.TimeoutError as e: 362 self._heartbeat_failure('TimeoutError: %s' % e, 363 'TimeoutError') 364 return False 365 except proxy.JSONRPCException as e: 366 self._heartbeat_failure('JSONRPCException: %s' % e, 367 'JSONRPCException') 368 return False 369 370 metrics.Gauge(_METRICS_PREFIX + 'response_size').set( 371 len(str(response))) 372 self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']]) 373 self.process_heartbeat_response(response) 374 logging.info("Heartbeat completed.") 375 return True 376 377 378 def tick(self): 379 """Performs all tasks the shard clients needs to do periodically.""" 380 success = self.do_heartbeat() 381 if success: 382 metrics.Counter('chromeos/autotest/shard_client/tick').increment() 383 384 385 def loop(self, lifetime_hours): 386 """Calls tick() until shutdown() is called or lifetime expires. 387 388 @param lifetime_hours: (int) hours to loop for. 389 """ 390 loop_start_time = time.time() 391 while self._continue_looping(lifetime_hours, loop_start_time): 392 self.tick() 393 # Sleep with +/- 10% fuzzing to avoid phaselock of shards. 394 tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5) 395 time.sleep(self.tick_pause_sec + tick_fuzz) 396 397 398 def shutdown(self): 399 """Stops the shard client after the current tick.""" 400 logging.info("Shutdown request received.") 401 self._shutdown_requested = True 402 403 404 def _continue_looping(self, lifetime_hours, loop_start_time): 405 """Determines if we should continue with the next mainloop iteration. 406 407 @param lifetime_hours: (float) number of hours to loop for. None 408 implies no deadline. 409 @param process_start_time: Time when we started looping. 410 @returns True if we should continue looping, False otherwise. 411 """ 412 if self._shutdown_requested: 413 return False 414 415 if (lifetime_hours is None 416 or time.time() - loop_start_time < lifetime_hours * 3600): 417 return True 418 logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', 419 lifetime_hours) 420 return False 421 422 423def handle_signal(signum, frame): 424 """Sigint handler so we don't crash mid-tick.""" 425 _heartbeat_client.shutdown() 426 427 428def _get_shard_hostname_and_ensure_running_on_shard(): 429 """Read the hostname the local shard from the global configuration. 430 431 Raise an exception if run from elsewhere than a shard. 432 433 @raises error.HeartbeatOnlyAllowedInShardModeException if run from 434 elsewhere than from a shard. 435 """ 436 hostname = global_config.global_config.get_config_value( 437 'SHARD', 'shard_hostname', default=None) 438 if not hostname: 439 raise error.HeartbeatOnlyAllowedInShardModeException( 440 'To run the shard client, shard_hostname must neither be None nor ' 441 'empty.') 442 return hostname 443 444 445def _get_tick_pause_sec(): 446 """Read pause to make between two ticks from the global configuration.""" 447 return global_config.global_config.get_config_value( 448 'SHARD', 'heartbeat_pause_sec', type=float) 449 450 451def get_shard_client(): 452 """Instantiate a shard client instance. 453 454 Configuration values will be read from the global configuration. 455 456 @returns A shard client instance. 457 """ 458 global_afe_hostname = server_utils.get_global_afe_hostname() 459 shard_hostname = _get_shard_hostname_and_ensure_running_on_shard() 460 tick_pause_sec = _get_tick_pause_sec() 461 return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec) 462 463 464def main(): 465 parser = argparse.ArgumentParser(description='Shard client.') 466 parser.add_argument( 467 '--lifetime-hours', 468 type=float, 469 default=None, 470 help='If provided, number of hours we should run for. ' 471 'At the expiry of this time, the process will exit ' 472 'gracefully.', 473 ) 474 parser.add_argument( 475 '--metrics-file', 476 help='If provided, drop metrics to this local file instead of ' 477 'reporting to ts_mon', 478 type=str, 479 default=None, 480 ) 481 options = parser.parse_args() 482 483 with ts_mon_config.SetupTsMonGlobalState( 484 'shard_client', 485 indirect=True, 486 debug_file=options.metrics_file, 487 ): 488 try: 489 metrics.Counter('chromeos/autotest/shard_client/start').increment() 490 main_without_exception_handling(options) 491 except Exception as e: 492 metrics.Counter('chromeos/autotest/shard_client/uncaught_exception' 493 ).increment() 494 message = 'Uncaught exception. Terminating shard_client.' 495 email_manager.manager.log_stacktrace(message) 496 logging.exception(message) 497 raise 498 finally: 499 email_manager.manager.send_queued_emails() 500 501 502def main_without_exception_handling(options): 503 scheduler_lib.setup_logging( 504 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 505 None, timestamped_logfile_prefix='shard_client') 506 507 logging.info("Setting signal handler.") 508 signal.signal(signal.SIGINT, handle_signal) 509 signal.signal(signal.SIGTERM, handle_signal) 510 511 logging.info("Starting shard client.") 512 global _heartbeat_client 513 _heartbeat_client = get_shard_client() 514 _heartbeat_client.loop(options.lifetime_hours) 515 516 517if __name__ == '__main__': 518 main() 519