1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8import argparse 9import httplib 10import logging 11import os 12import random 13import signal 14import time 15import urllib2 16 17import common 18 19from autotest_lib.frontend import setup_django_environment 20from autotest_lib.frontend.afe.json_rpc import proxy 21from autotest_lib.client.common_lib import error 22from autotest_lib.client.common_lib import global_config 23from autotest_lib.frontend.afe import models 24from autotest_lib.scheduler import email_manager 25from autotest_lib.scheduler import scheduler_lib 26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 27from autotest_lib.server import utils as server_utils 28from chromite.lib import timeout_util 29from django.db import transaction 30 31try: 32 from chromite.lib import metrics 33 from chromite.lib import ts_mon_config 34except ImportError: 35 metrics = server_utils.metrics_mock 36 ts_mon_config = server_utils.metrics_mock 37 38 39""" 40Autotest shard client 41 42The shard client can be run as standalone service. It periodically polls the 43master in a heartbeat, retrieves new jobs and hosts and inserts them into the 44local database. 45 46A shard is set up (by a human) and pointed to the global AFE (cautotest). 47On the shard, this script periodically makes so called heartbeat requests to the 48global AFE, which will then complete the following actions: 49 501. Find the previously created (with atest) record for the shard. Shards are 51 identified by their hostnames, specified in the shadow_config. 522. Take the records that were sent in the heartbeat and insert them into the 53 global database. 54 - This is to set the status of jobs to completed in the master database after 55 they were run by a slave. This is necessary so one can just look at the 56 master's afe to see the statuses of all jobs. Otherwise one would have to 57 check the tko tables or the individual slave AFEs. 583. Find labels that have been assigned to this shard. 594. Assign hosts that: 60 - have the specified label 61 - aren't leased 62 - have an id which is not in the known_host_ids which were sent in the 63 heartbeat request. 645. Assign jobs that: 65 - depend on the specified label 66 - haven't been assigned before 67 - aren't started yet 68 - aren't completed yet 69 - have an id which is not in the jobs_known_ids which were sent in the 70 heartbeat request. 716. Serialize the chosen jobs and hosts. 72 - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users, 73 and many more. Details about this can be found around 74 model_logic.serialize() 757. Send these objects to the slave. 76 77 78On the client side, this will happen: 791. Deserialize the objects sent from the master and persist them to the local 80 database. 812. monitor_db on the shard will pick up these jobs and schedule them on the 82 available hosts (which were retrieved from a heartbeat). 833. Once a job is finished, it's shard_id is set to NULL 844. The shard_client will pick up all jobs where shard_id=NULL and will 85 send them to the master in the request of the next heartbeat. 86 - The master will persist them as described earlier. 87 - the shard_id will be set back to the shard's id, so the record won't be 88 uploaded again. 89 The heartbeat request will also contain the ids of incomplete jobs and the 90 ids of all hosts. This is used to not send objects repeatedly. For more 91 information on this and alternatives considered 92 see rpc_interface.shard_heartbeat. 93""" 94 95 96HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat' 97 98RPC_TIMEOUT_MIN = 5 99RPC_DELAY_SEC = 5 100 101_heartbeat_client = None 102 103 104class ShardClient(object): 105 """Performs client side tasks of sharding, i.e. the heartbeat. 106 107 This class contains the logic to do periodic heartbeats to a global AFE, 108 to retrieve new jobs from it and to report completed jobs back. 109 """ 110 111 def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec): 112 self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname, 113 timeout_min=RPC_TIMEOUT_MIN, 114 delay_sec=RPC_DELAY_SEC) 115 self.hostname = shard_hostname 116 self.tick_pause_sec = tick_pause_sec 117 self._shutdown = False 118 self._shard = None 119 120 121 def _deserialize_many(self, serialized_list, djmodel, message): 122 """Deserialize data in JSON format to database. 123 124 Deserialize a list of JSON-formatted data to database using Django. 125 126 @param serialized_list: A list of JSON-formatted data. 127 @param djmodel: Django model type. 128 @param message: A string to be used in a logging message. 129 """ 130 for serialized in serialized_list: 131 with transaction.commit_on_success(): 132 try: 133 djmodel.deserialize(serialized) 134 except Exception as e: 135 logging.error('Deserializing a %s fails: %s, Error: %s', 136 message, serialized, e) 137 metrics.Counter( 138 'chromeos/autotest/shard_client/deserialization_failed' 139 ).increment() 140 141 142 @metrics.SecondsTimerDecorator( 143 'chromeos/autotest/shard_client/heartbeat_response_duration') 144 def process_heartbeat_response(self, heartbeat_response): 145 """Save objects returned by a heartbeat to the local database. 146 147 This deseralizes hosts and jobs including their dependencies and saves 148 them to the local database. 149 150 @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs', 151 as returned by the `shard_heartbeat` rpc 152 call. 153 """ 154 hosts_serialized = heartbeat_response['hosts'] 155 jobs_serialized = heartbeat_response['jobs'] 156 suite_keyvals_serialized = heartbeat_response['suite_keyvals'] 157 incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', []) 158 159 metrics.Gauge('chromeos/autotest/shard_client/hosts_received' 160 ).set(len(hosts_serialized)) 161 metrics.Gauge('chromeos/autotest/shard_client/jobs_received' 162 ).set(len(jobs_serialized)) 163 metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received' 164 ).set(len(suite_keyvals_serialized)) 165 166 self._deserialize_many(hosts_serialized, models.Host, 'host') 167 self._deserialize_many(jobs_serialized, models.Job, 'job') 168 self._deserialize_many(suite_keyvals_serialized, models.JobKeyval, 169 'jobkeyval') 170 171 host_ids = [h['id'] for h in hosts_serialized] 172 logging.info('Heartbeat response contains hosts %s', host_ids) 173 job_ids = [j['id'] for j in jobs_serialized] 174 logging.info('Heartbeat response contains jobs %s', job_ids) 175 parent_jobs_with_keyval = set([kv['job_id'] 176 for kv in suite_keyvals_serialized]) 177 logging.info('Heartbeat response contains suite_keyvals_for jobs %s', 178 list(parent_jobs_with_keyval)) 179 if incorrect_host_ids: 180 logging.info('Heartbeat response contains incorrect_host_ids %s ' 181 'which will be deleted.', incorrect_host_ids) 182 self._remove_incorrect_hosts(incorrect_host_ids) 183 184 # If the master has just sent any jobs that we think have completed, 185 # re-sync them with the master. This is especially useful when a 186 # heartbeat or job is silently dropped, as the next heartbeat will 187 # have a disagreement. Updating the shard_id to NULL will mark these 188 # jobs for upload on the next heartbeat. 189 job_models = models.Job.objects.filter( 190 id__in=job_ids, hostqueueentry__complete=True) 191 if job_models: 192 job_models.update(shard=None) 193 job_ids_repr = ', '.join([str(job.id) for job in job_models]) 194 logging.warn('Following completed jobs are reset shard_id to NULL ' 195 'to be uploaded to master again: %s', job_ids_repr) 196 197 198 def _remove_incorrect_hosts(self, incorrect_host_ids=None): 199 """Remove from local database any hosts that should not exist. 200 201 Entries of |incorrect_host_ids| that are absent from database will be 202 silently ignored. 203 204 @param incorrect_host_ids: a list of ids (as integers) to remove. 205 """ 206 if not incorrect_host_ids: 207 return 208 209 models.Host.objects.filter(id__in=incorrect_host_ids).delete() 210 211 212 @property 213 def shard(self): 214 """Return this shard's own shard object, fetched from the database. 215 216 A shard's object is fetched from the master with the first jobs. It will 217 not exist before that time. 218 219 @returns: The shard object if it already exists, otherwise None 220 """ 221 if self._shard is None: 222 try: 223 self._shard = models.Shard.smart_get(self.hostname) 224 except models.Shard.DoesNotExist: 225 # This might happen before any jobs are assigned to this shard. 226 # This is okay because then there is nothing to offload anyway. 227 pass 228 return self._shard 229 230 231 def _get_jobs_to_upload(self): 232 jobs = [] 233 # The scheduler sets shard to None upon completion of the job. 234 # For more information on the shard field's semantic see 235 # models.Job.shard. We need to be careful to wait for both the 236 # shard_id and the complete bit here, or we will end up syncing 237 # the job without ever setting the complete bit. 238 job_ids = list(models.Job.objects.filter( 239 shard=None, 240 hostqueueentry__complete=True).values_list('pk', flat=True)) 241 242 for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all(): 243 jobs.append(job_to_upload) 244 return jobs 245 246 247 def _mark_jobs_as_uploaded(self, job_ids): 248 # self.shard might be None if no jobs were downloaded yet. 249 # But then job_ids is empty, so this is harmless. 250 # Even if there were jobs we'd in the worst case upload them twice. 251 models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard) 252 253 254 def _get_hqes_for_jobs(self, jobs): 255 hqes = [] 256 for job in jobs: 257 hqes.extend(job.hostqueueentry_set.all()) 258 return hqes 259 260 261 def _get_known_jobs_and_hosts(self): 262 """Returns lists of host and job info to send in a heartbeat. 263 264 The host and job ids are ids of objects that are already present on the 265 shard and therefore don't need to be sent again. 266 267 For jobs, only incomplete jobs are sent, as the master won't send 268 already completed jobs anyway. This helps keeping the list of id's 269 considerably small. 270 271 For hosts, host status in addition to host id are sent to master 272 to sync the host status. 273 274 @returns: Tuple of three lists. The first one contains job ids, the 275 second one host ids, and the third one host statuses. 276 """ 277 job_ids = list(models.Job.objects.filter( 278 hostqueueentry__complete=False).values_list('id', flat=True)) 279 host_models = models.Host.objects.filter(invalid=0) 280 host_ids = [] 281 host_statuses = [] 282 for h in host_models: 283 host_ids.append(h.id) 284 host_statuses.append(h.status) 285 return job_ids, host_ids, host_statuses 286 287 288 def _heartbeat_packet(self): 289 """Construct the heartbeat packet. 290 291 See rpc_interface for a more detailed description of the heartbeat. 292 293 @return: A heartbeat packet. 294 """ 295 known_job_ids, known_host_ids, known_host_statuses = ( 296 self._get_known_jobs_and_hosts()) 297 logging.info('Known jobs: %s', known_job_ids) 298 299 job_objs = self._get_jobs_to_upload() 300 hqes = [hqe.serialize(include_dependencies=False) 301 for hqe in self._get_hqes_for_jobs(job_objs)] 302 jobs = [job.serialize(include_dependencies=False) for job in job_objs] 303 logging.info('Uploading jobs %s', [j['id'] for j in jobs]) 304 305 return {'shard_hostname': self.hostname, 306 'known_job_ids': known_job_ids, 307 'known_host_ids': known_host_ids, 308 'known_host_statuses': known_host_statuses, 309 'jobs': jobs, 'hqes': hqes} 310 311 312 def _heartbeat_failure(self, log_message, failure_type_str=''): 313 logging.error("Heartbeat failed. %s", log_message) 314 metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure' 315 ).increment(fields={'failure_type': failure_type_str}) 316 317 318 @metrics.SecondsTimerDecorator( 319 'chromeos/autotest/shard_client/do_heatbeat_duration') 320 def do_heartbeat(self): 321 """Perform a heartbeat: Retreive new jobs. 322 323 This function executes a `shard_heartbeat` RPC. It retrieves the 324 response of this call and processes the response by storing the returned 325 objects in the local database. 326 """ 327 heartbeat_metrics_prefix = 'chromeos/autotest/shard_client/heartbeat/' 328 329 logging.info("Performing heartbeat.") 330 packet = self._heartbeat_packet() 331 metrics.Gauge(heartbeat_metrics_prefix + 'request_size').set( 332 len(str(packet))) 333 334 try: 335 response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet) 336 except urllib2.HTTPError as e: 337 self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason), 338 'HTTPError') 339 return 340 except urllib2.URLError as e: 341 self._heartbeat_failure('URLError: %s' % e.reason, 342 'URLError') 343 return 344 except httplib.HTTPException as e: 345 self._heartbeat_failure('HTTPException: %s' % e, 346 'HTTPException') 347 return 348 except timeout_util.TimeoutError as e: 349 self._heartbeat_failure('TimeoutError: %s' % e, 350 'TimeoutError') 351 return 352 except proxy.JSONRPCException as e: 353 self._heartbeat_failure('JSONRPCException: %s' % e, 354 'JSONRPCException') 355 return 356 357 metrics.Gauge(heartbeat_metrics_prefix + 'response_size').set( 358 len(str(response))) 359 self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']]) 360 self.process_heartbeat_response(response) 361 logging.info("Heartbeat completed.") 362 363 364 def tick(self): 365 """Performs all tasks the shard clients needs to do periodically.""" 366 self.do_heartbeat() 367 metrics.Counter('chromeos/autotest/shard_client/tick').increment() 368 369 370 def loop(self): 371 """Calls tick() until shutdown() is called.""" 372 while not self._shutdown: 373 self.tick() 374 # Sleep with +/- 10% fuzzing to avoid phaselock of shards. 375 tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5) 376 time.sleep(self.tick_pause_sec + tick_fuzz) 377 378 379 def shutdown(self): 380 """Stops the shard client after the current tick.""" 381 logging.info("Shutdown request received.") 382 self._shutdown = True 383 384 385def handle_signal(signum, frame): 386 """Sigint handler so we don't crash mid-tick.""" 387 _heartbeat_client.shutdown() 388 389 390def _get_shard_hostname_and_ensure_running_on_shard(): 391 """Read the hostname the local shard from the global configuration. 392 393 Raise an exception if run from elsewhere than a shard. 394 395 @raises error.HeartbeatOnlyAllowedInShardModeException if run from 396 elsewhere than from a shard. 397 """ 398 hostname = global_config.global_config.get_config_value( 399 'SHARD', 'shard_hostname', default=None) 400 if not hostname: 401 raise error.HeartbeatOnlyAllowedInShardModeException( 402 'To run the shard client, shard_hostname must neither be None nor ' 403 'empty.') 404 return hostname 405 406 407def _get_tick_pause_sec(): 408 """Read pause to make between two ticks from the global configuration.""" 409 return global_config.global_config.get_config_value( 410 'SHARD', 'heartbeat_pause_sec', type=float) 411 412 413def get_shard_client(): 414 """Instantiate a shard client instance. 415 416 Configuration values will be read from the global configuration. 417 418 @returns A shard client instance. 419 """ 420 global_afe_hostname = server_utils.get_global_afe_hostname() 421 shard_hostname = _get_shard_hostname_and_ensure_running_on_shard() 422 tick_pause_sec = _get_tick_pause_sec() 423 return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec) 424 425 426def main(): 427 ts_mon_config.SetupTsMonGlobalState('shard_client') 428 429 try: 430 metrics.Counter('chromeos/autotest/shard_client/start').increment() 431 main_without_exception_handling() 432 except Exception as e: 433 metrics.Counter('chromeos/autotest/shard_client/uncaught_exception' 434 ).increment() 435 message = 'Uncaught exception. Terminating shard_client.' 436 email_manager.manager.log_stacktrace(message) 437 logging.exception(message) 438 raise 439 finally: 440 email_manager.manager.send_queued_emails() 441 442 443def main_without_exception_handling(): 444 parser = argparse.ArgumentParser(description='Shard client.') 445 options = parser.parse_args() 446 447 scheduler_lib.setup_logging( 448 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 449 None, timestamped_logfile_prefix='shard_client') 450 451 logging.info("Setting signal handler.") 452 signal.signal(signal.SIGINT, handle_signal) 453 signal.signal(signal.SIGTERM, handle_signal) 454 455 logging.info("Starting shard client.") 456 global _heartbeat_client 457 _heartbeat_client = get_shard_client() 458 _heartbeat_client.loop() 459 460 461if __name__ == '__main__': 462 main() 463