1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8import argparse 9import httplib 10import logging 11import os 12import signal 13import socket 14import time 15import urllib2 16 17import common 18from autotest_lib.frontend import setup_django_environment 19from autotest_lib.client.common_lib import error 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib.cros.graphite import autotest_stats 22from autotest_lib.frontend.afe import models 23from autotest_lib.scheduler import email_manager 24from autotest_lib.scheduler import scheduler_lib 25from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 26from autotest_lib.server import utils as server_utils 27from chromite.lib import timeout_util 28from django.db import transaction 29 30""" 31Autotest shard client 32 33The shard client can be run as standalone service. It periodically polls the 34master in a heartbeat, retrieves new jobs and hosts and inserts them into the 35local database. 36 37A shard is set up (by a human) and pointed to the global AFE (cautotest). 38On the shard, this script periodically makes so called heartbeat requests to the 39global AFE, which will then complete the following actions: 40 411. Find the previously created (with atest) record for the shard. Shards are 42 identified by their hostnames, specified in the shadow_config. 432. Take the records that were sent in the heartbeat and insert them into the 44 global database. 45 - This is to set the status of jobs to completed in the master database after 46 they were run by a slave. This is necessary so one can just look at the 47 master's afe to see the statuses of all jobs. Otherwise one would have to 48 check the tko tables or the individual slave AFEs. 493. Find labels that have been assigned to this shard. 504. Assign hosts that: 51 - have the specified label 52 - aren't leased 53 - have an id which is not in the known_host_ids which were sent in the 54 heartbeat request. 555. Assign jobs that: 56 - depend on the specified label 57 - haven't been assigned before 58 - aren't started yet 59 - aren't completed yet 60 - have an id which is not in the jobs_known_ids which were sent in the 61 heartbeat request. 626. Serialize the chosen jobs and hosts. 63 - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users, 64 and many more. Details about this can be found around 65 model_logic.serialize() 667. Send these objects to the slave. 67 68 69On the client side, this will happen: 701. Deserialize the objects sent from the master and persist them to the local 71 database. 722. monitor_db on the shard will pick up these jobs and schedule them on the 73 available hosts (which were retrieved from a heartbeat). 743. Once a job is finished, it's shard_id is set to NULL 754. The shard_client will pick up all jobs where shard_id=NULL and will 76 send them to the master in the request of the next heartbeat. 77 - The master will persist them as described earlier. 78 - the shard_id will be set back to the shard's id, so the record won't be 79 uploaded again. 80 The heartbeat request will also contain the ids of incomplete jobs and the 81 ids of all hosts. This is used to not send objects repeatedly. For more 82 information on this and alternatives considered 83 see site_rpc_interface.shard_heartbeat. 84""" 85 86 87HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat' 88 89RPC_TIMEOUT_MIN = 5 90RPC_DELAY_SEC = 5 91 92STATS_KEY = 'shard_client.%s' % socket.gethostname() 93timer = autotest_stats.Timer(STATS_KEY) 94_heartbeat_client = None 95 96 97class ShardClient(object): 98 """Performs client side tasks of sharding, i.e. the heartbeat. 99 100 This class contains the logic to do periodic heartbeats to a global AFE, 101 to retrieve new jobs from it and to report completed jobs back. 102 """ 103 104 def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec): 105 self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname, 106 timeout_min=RPC_TIMEOUT_MIN, 107 delay_sec=RPC_DELAY_SEC) 108 self.hostname = shard_hostname 109 self.tick_pause_sec = tick_pause_sec 110 self._shutdown = False 111 self._shard = None 112 113 114 def _deserialize_many(self, serialized_list, djmodel, message): 115 """Deserialize data in JSON format to database. 116 117 Deserialize a list of JSON-formatted data to database using Django. 118 119 @param serialized_list: A list of JSON-formatted data. 120 @param djmodel: Django model type. 121 @param message: A string to be used in a logging message. 122 """ 123 for serialized in serialized_list: 124 with transaction.commit_on_success(): 125 try: 126 djmodel.deserialize(serialized) 127 except Exception as e: 128 logging.error('Deserializing a %s fails: %s, Error: %s', 129 message, serialized, e) 130 autotest_stats.Counter(STATS_KEY).increment( 131 'deserialization_failures') 132 133 134 @timer.decorate 135 def process_heartbeat_response(self, heartbeat_response): 136 """Save objects returned by a heartbeat to the local database. 137 138 This deseralizes hosts and jobs including their dependencies and saves 139 them to the local database. 140 141 @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs', 142 as returned by the `shard_heartbeat` rpc 143 call. 144 """ 145 hosts_serialized = heartbeat_response['hosts'] 146 jobs_serialized = heartbeat_response['jobs'] 147 suite_keyvals_serialized = heartbeat_response['suite_keyvals'] 148 149 autotest_stats.Gauge(STATS_KEY).send( 150 'hosts_received', len(hosts_serialized)) 151 autotest_stats.Gauge(STATS_KEY).send( 152 'jobs_received', len(jobs_serialized)) 153 autotest_stats.Gauge(STATS_KEY).send( 154 'suite_keyvals_received', len(suite_keyvals_serialized)) 155 156 self._deserialize_many(hosts_serialized, models.Host, 'host') 157 self._deserialize_many(jobs_serialized, models.Job, 'job') 158 self._deserialize_many(suite_keyvals_serialized, models.JobKeyval, 159 'jobkeyval') 160 161 host_ids = [h['id'] for h in hosts_serialized] 162 logging.info('Heartbeat response contains hosts %s', host_ids) 163 job_ids = [j['id'] for j in jobs_serialized] 164 logging.info('Heartbeat response contains jobs %s', job_ids) 165 parent_jobs_with_keyval = set([kv['job_id'] 166 for kv in suite_keyvals_serialized]) 167 logging.info('Heartbeat response contains suite_keyvals_for jobs %s', 168 list(parent_jobs_with_keyval)) 169 170 # If the master has just sent any jobs that we think have completed, 171 # re-sync them with the master. This is especially useful when a 172 # heartbeat or job is silently dropped, as the next heartbeat will 173 # have a disagreement. Updating the shard_id to NULL will mark these 174 # jobs for upload on the next heartbeat. 175 job_models = models.Job.objects.filter( 176 id__in=job_ids, hostqueueentry__complete=True) 177 if job_models: 178 job_models.update(shard=None) 179 job_ids_repr = ', '.join([str(job.id) for job in job_models]) 180 logging.warn('Following completed jobs are reset shard_id to NULL ' 181 'to be uploaded to master again: %s', job_ids_repr) 182 183 184 @property 185 def shard(self): 186 """Return this shard's own shard object, fetched from the database. 187 188 A shard's object is fetched from the master with the first jobs. It will 189 not exist before that time. 190 191 @returns: The shard object if it already exists, otherwise None 192 """ 193 if self._shard is None: 194 try: 195 self._shard = models.Shard.smart_get(self.hostname) 196 except models.Shard.DoesNotExist: 197 # This might happen before any jobs are assigned to this shard. 198 # This is okay because then there is nothing to offload anyway. 199 pass 200 return self._shard 201 202 203 def _get_jobs_to_upload(self): 204 jobs = [] 205 # The scheduler sets shard to None upon completion of the job. 206 # For more information on the shard field's semantic see 207 # models.Job.shard. We need to be careful to wait for both the 208 # shard_id and the complete bit here, or we will end up syncing 209 # the job without ever setting the complete bit. 210 job_ids = list(models.Job.objects.filter( 211 shard=None, 212 hostqueueentry__complete=True).values_list('pk', flat=True)) 213 214 for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all(): 215 jobs.append(job_to_upload) 216 return jobs 217 218 219 def _mark_jobs_as_uploaded(self, job_ids): 220 # self.shard might be None if no jobs were downloaded yet. 221 # But then job_ids is empty, so this is harmless. 222 # Even if there were jobs we'd in the worst case upload them twice. 223 models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard) 224 225 226 def _get_hqes_for_jobs(self, jobs): 227 hqes = [] 228 for job in jobs: 229 hqes.extend(job.hostqueueentry_set.all()) 230 return hqes 231 232 233 def _get_known_jobs_and_hosts(self): 234 """Returns lists of host and job info to send in a heartbeat. 235 236 The host and job ids are ids of objects that are already present on the 237 shard and therefore don't need to be sent again. 238 239 For jobs, only incomplete jobs are sent, as the master won't send 240 already completed jobs anyway. This helps keeping the list of id's 241 considerably small. 242 243 For hosts, host status in addition to host id are sent to master 244 to sync the host status. 245 246 @returns: Tuple of three lists. The first one contains job ids, the 247 second one host ids, and the third one host statuses. 248 """ 249 job_ids = list(models.Job.objects.filter( 250 hostqueueentry__complete=False).values_list('id', flat=True)) 251 host_models = models.Host.objects.filter(invalid=0) 252 host_ids = [] 253 host_statuses = [] 254 for h in host_models: 255 host_ids.append(h.id) 256 host_statuses.append(h.status) 257 return job_ids, host_ids, host_statuses 258 259 260 def _heartbeat_packet(self): 261 """Construct the heartbeat packet. 262 263 See site_rpc_interface for a more detailed description of the heartbeat. 264 265 @return: A heartbeat packet. 266 """ 267 known_job_ids, known_host_ids, known_host_statuses = ( 268 self._get_known_jobs_and_hosts()) 269 logging.info('Known jobs: %s', known_job_ids) 270 271 job_objs = self._get_jobs_to_upload() 272 hqes = [hqe.serialize(include_dependencies=False) 273 for hqe in self._get_hqes_for_jobs(job_objs)] 274 jobs = [job.serialize(include_dependencies=False) for job in job_objs] 275 logging.info('Uploading jobs %s', [j['id'] for j in jobs]) 276 277 return {'shard_hostname': self.hostname, 278 'known_job_ids': known_job_ids, 279 'known_host_ids': known_host_ids, 280 'known_host_statuses': known_host_statuses, 281 'jobs': jobs, 'hqes': hqes} 282 283 284 def _heartbeat_failure(self, log_message): 285 logging.error("Heartbeat failed. %s", log_message) 286 autotest_stats.Counter(STATS_KEY).increment('heartbeat_failures') 287 288 289 @timer.decorate 290 def do_heartbeat(self): 291 """Perform a heartbeat: Retreive new jobs. 292 293 This function executes a `shard_heartbeat` RPC. It retrieves the 294 response of this call and processes the response by storing the returned 295 objects in the local database. 296 """ 297 logging.info("Performing heartbeat.") 298 packet = self._heartbeat_packet() 299 autotest_stats.Gauge(STATS_KEY).send( 300 'heartbeat.request_size', len(str(packet))) 301 302 try: 303 response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet) 304 except urllib2.HTTPError as e: 305 self._heartbeat_failure("HTTPError %d: %s" % (e.code, e.reason)) 306 return 307 except urllib2.URLError as e: 308 self._heartbeat_failure("URLError: %s" % e.reason) 309 return 310 except httplib.HTTPException as e: 311 self._heartbeat_failure("HTTPException: %s" % e) 312 return 313 except timeout_util.TimeoutError as e: 314 self._heartbeat_failure("TimeoutError: %s" % e) 315 return 316 317 autotest_stats.Gauge(STATS_KEY).send( 318 'heartbeat.response_size', len(str(response))) 319 self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']]) 320 self.process_heartbeat_response(response) 321 logging.info("Heartbeat completed.") 322 323 324 def tick(self): 325 """Performs all tasks the shard clients needs to do periodically.""" 326 self.do_heartbeat() 327 328 329 def loop(self): 330 """Calls tick() until shutdown() is called.""" 331 while not self._shutdown: 332 self.tick() 333 time.sleep(self.tick_pause_sec) 334 335 336 def shutdown(self): 337 """Stops the shard client after the current tick.""" 338 logging.info("Shutdown request received.") 339 self._shutdown = True 340 341 342def handle_signal(signum, frame): 343 """Sigint handler so we don't crash mid-tick.""" 344 _heartbeat_client.shutdown() 345 346 347def _get_shard_hostname_and_ensure_running_on_shard(): 348 """Read the hostname the local shard from the global configuration. 349 350 Raise an exception if run from elsewhere than a shard. 351 352 @raises error.HeartbeatOnlyAllowedInShardModeException if run from 353 elsewhere than from a shard. 354 """ 355 hostname = global_config.global_config.get_config_value( 356 'SHARD', 'shard_hostname', default=None) 357 if not hostname: 358 raise error.HeartbeatOnlyAllowedInShardModeException( 359 'To run the shard client, shard_hostname must neither be None nor ' 360 'empty.') 361 return hostname 362 363 364def _get_tick_pause_sec(): 365 """Read pause to make between two ticks from the global configuration.""" 366 return global_config.global_config.get_config_value( 367 'SHARD', 'heartbeat_pause_sec', type=float) 368 369 370def get_shard_client(): 371 """Instantiate a shard client instance. 372 373 Configuration values will be read from the global configuration. 374 375 @returns A shard client instance. 376 """ 377 global_afe_hostname = server_utils.get_global_afe_hostname() 378 shard_hostname = _get_shard_hostname_and_ensure_running_on_shard() 379 tick_pause_sec = _get_tick_pause_sec() 380 return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec) 381 382 383def main(): 384 try: 385 autotest_stats.Counter(STATS_KEY).increment('starts') 386 main_without_exception_handling() 387 except Exception as e: 388 message = 'Uncaught exception. Terminating shard_client.' 389 email_manager.manager.log_stacktrace(message) 390 logging.exception(message) 391 autotest_stats.Counter(STATS_KEY).increment('uncaught_exceptions') 392 raise 393 finally: 394 email_manager.manager.send_queued_emails() 395 396 397def main_without_exception_handling(): 398 parser = argparse.ArgumentParser(description='Shard client.') 399 options = parser.parse_args() 400 401 scheduler_lib.setup_logging( 402 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 403 None, timestamped_logfile_prefix='shard_client') 404 405 logging.info("Setting signal handler.") 406 signal.signal(signal.SIGINT, handle_signal) 407 signal.signal(signal.SIGTERM, handle_signal) 408 409 logging.info("Starting shard client.") 410 global _heartbeat_client 411 _heartbeat_client = get_shard_client() 412 _heartbeat_client.loop() 413 414 415if __name__ == '__main__': 416 main() 417