1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Event handlers.""" 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import datetime 12import logging 13 14from lucifer import autotest 15from lucifer import jobx 16 17logger = logging.getLogger(__name__) 18 19 20class EventHandler(object): 21 """Event handling dispatcher. 22 23 Event handlers are implemented as methods named _handle_<event value>. 24 25 Each handler method must handle its exceptions accordingly. If an 26 exception escapes, the job dies on the spot. 27 28 Instances have one public attribute completed. completed is set to 29 True once the final COMPLETED event is received and the handler 30 finishes. 31 """ 32 33 def __init__(self, metrics, job, autoserv_exit): 34 """Initialize instance. 35 36 @param metrics: Metrics instance 37 @param job: frontend.afe.models.Job instance to own 38 @param hqes: list of HostQueueEntry instances for the job 39 @param autoserv_exit: autoserv exit status 40 """ 41 self.completed = False 42 self._metrics = metrics 43 self._job = job 44 # TODO(crbug.com/748234): autoserv not implemented yet. 45 self._autoserv_exit = autoserv_exit 46 47 def __call__(self, event, msg): 48 logger.debug('Received event %r with message %r', event.name, msg) 49 method_name = '_handle_%s' % event.value 50 try: 51 handler = getattr(self, method_name) 52 except AttributeError: 53 raise NotImplementedError('%s is not implemented for handling %s', 54 method_name, event.name) 55 handler(msg) 56 57 def _handle_starting(self, msg): 58 # TODO(crbug.com/748234): No event update needed yet. 59 pass 60 61 def _handle_gathering(self, msg): 62 # TODO(crbug.com/794779): monitor_db leaves HQEs in GATHERING 63 pass 64 65 def _handle_x_tests_done(self, msg): 66 """Taken from GatherLogsTask.epilog.""" 67 autoserv_exit, failures = (int(x) for x in msg.split(',')) 68 logger.debug('Got autoserv_exit=%d, failures=%d', 69 autoserv_exit, failures) 70 success = (autoserv_exit == 0 and failures == 0) 71 reset_after_failure = not self._job.run_reset and not success 72 hqes = self._job.hostqueueentry_set.all().prefetch_related('host') 73 if self._should_reboot_duts(autoserv_exit, failures, 74 reset_after_failure): 75 logger.debug('Creating cleanup jobs for hosts') 76 for entry in hqes: 77 self._handle_host_needs_cleanup(entry.host.hostname) 78 else: 79 logger.debug('Not creating cleanup jobs for hosts') 80 for entry in hqes: 81 self._handle_host_ready(entry.host.hostname) 82 if not reset_after_failure: 83 logger.debug('Skipping reset because reset_after_failure is False') 84 return 85 logger.debug('Creating reset jobs for hosts') 86 self._metrics.send_reset_after_failure(autoserv_exit, failures) 87 for entry in hqes: 88 self._handle_host_needs_reset(entry.host.hostname) 89 90 def _handle_parsing(self, _msg): 91 models = autotest.load('frontend.afe.models') 92 self._job.hostqueueentry_set.all().update( 93 status=models.HostQueueEntry.Status.PARSING) 94 95 def _handle_completed(self, _msg): 96 models = autotest.load('frontend.afe.models') 97 final_status = self._final_status() 98 for hqe in self._job.hostqueueentry_set.all(): 99 self._set_completed_status(hqe, final_status) 100 if final_status is not models.HostQueueEntry.Status.ABORTED: 101 _stop_prejob_hqes(self._job) 102 if self._job.shard_id is not None: 103 # If shard_id is None, the job will be synced back to the master 104 self._job.shard_id = None 105 self._job.save() 106 self.completed = True 107 108 def _handle_host_ready(self, msg): 109 models = autotest.load('frontend.afe.models') 110 (models.Host.objects.filter(hostname=msg) 111 .update(status=models.Host.Status.READY)) 112 113 def _handle_host_needs_cleanup(self, msg): 114 models = autotest.load('frontend.afe.models') 115 host = models.Host.objects.get(hostname=msg) 116 models.SpecialTask.objects.create( 117 host_id=host.id, 118 task=models.SpecialTask.Task.CLEANUP, 119 requested_by=models.User.objects.get(login=self._job.owner)) 120 121 def _handle_host_needs_reset(self, msg): 122 models = autotest.load('frontend.afe.models') 123 host = models.Host.objects.get(hostname=msg) 124 models.SpecialTask.objects.create( 125 host_id=host.id, 126 task=models.SpecialTask.Task.RESET, 127 requested_by=models.User.objects.get(login=self._job.owner)) 128 129 def _should_reboot_duts(self, autoserv_exit, failures, reset_after_failure): 130 models = autotest.load('frontend.afe.models') 131 reboot_after = self._job.reboot_after 132 if self._final_status() == models.HostQueueEntry.Status.ABORTED: 133 logger.debug('Should reboot because reboot_after=ABORTED') 134 return True 135 elif reboot_after == models.Job.RebootAfter.ALWAYS: 136 logger.debug('Should reboot because reboot_after=ALWAYS') 137 return True 138 elif (reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED 139 and autoserv_exit == 0 and failures == 0): 140 logger.debug('Should reboot because' 141 ' reboot_after=IF_ALL_TESTS_PASSED') 142 return True 143 else: 144 return failures > 0 and not reset_after_failure 145 146 def _final_status(self): 147 models = autotest.load('frontend.afe.models') 148 Status = models.HostQueueEntry.Status 149 if jobx.is_aborted(self._job): 150 return Status.ABORTED 151 if self._autoserv_exit == 0: 152 return Status.COMPLETED 153 return Status.FAILED 154 155 def _set_completed_status(self, hqe, status): 156 """Set completed status of HQE. 157 158 This is a cleaned up version of the one in scheduler_models to work 159 with Django models. 160 """ 161 hqe.status = status 162 hqe.active = False 163 hqe.complete = True 164 if hqe.started_on: 165 hqe.finished_on = datetime.datetime.now() 166 hqe.save() 167 self._metrics.send_hqe_completion(hqe) 168 self._metrics.send_hqe_duration(hqe) 169 170 171class Metrics(object): 172 173 """Class for sending job metrics.""" 174 175 def __init__(self): 176 # Metrics 177 metrics = autotest.chromite_load('metrics') 178 self._hqe_completion_metric = metrics.Counter( 179 'chromeos/autotest/scheduler/hqe_completion_count') 180 self._reset_after_failure_metric = metrics.Counter( 181 'chromeos/autotest/scheduler/postjob_tasks/' 182 'reset_after_failure') 183 184 def send_hqe_completion(self, hqe): 185 """Send ts_mon metrics for HQE completion.""" 186 fields = { 187 'status': hqe.status.lower(), 188 'board': 'NO_HOST', 189 'pool': 'NO_HOST', 190 } 191 if hqe.host: 192 labellib = autotest.load('utils.labellib') 193 labels = labellib.LabelsMapping.from_host(hqe.host) 194 fields['board'] = labels.get('board', '') 195 fields['pool'] = labels.get('pool', '') 196 self._hqe_completion_metric.increment(fields=fields) 197 198 def send_hqe_duration(self, hqe): 199 """Send CloudTrace metrics for HQE duration.""" 200 if not (hqe.started_on and hqe.finished_on): 201 return 202 scheduler_models = autotest.load('scheduler.scheduler_models') 203 cloud_trace = autotest.chromite_load('cloud_trace') 204 types = autotest.deps_load('google.protobuf.internal.well_known_types') 205 hqe_trace_id = scheduler_models.hqe_trace_id 206 207 span = cloud_trace.Span( 208 'HQE', spanId='0', traceId=hqe_trace_id(hqe.id)) 209 span.startTime = types.Timestamp() 210 span.startTime.FromDatetime(hqe.started_on) 211 span.endTime = types.Timestamp() 212 span.endTime.FromDatetime(hqe.finished_on) 213 cloud_trace.LogSpan(span) 214 215 def send_reset_after_failure(self, autoserv_exit, failures): 216 """Send reset_after_failure metric.""" 217 self._reset_after_failure_metric.increment( 218 fields={'autoserv_process_success': autoserv_exit == 0, 219 # Yes, this is a boolean 220 'num_tests_failed': failures > 0}) 221 222 223def _stop_prejob_hqes(job): 224 """Stop pending HQEs for a job (for synch_count).""" 225 models = autotest.load('frontend.afe.models') 226 HQEStatus = models.HostQueueEntry.Status 227 HostStatus = models.Host.Status 228 not_yet_run = _get_prejob_hqes(job) 229 if not_yet_run.count() == job.synch_count: 230 return 231 entries_to_stop = _get_prejob_hqes(job, include_active=False) 232 for hqe in entries_to_stop: 233 if hqe.status == HQEStatus.PENDING: 234 hqe.host.status = HostStatus.READY 235 hqe.host.save() 236 hqe.status = HQEStatus.STOPPED 237 hqe.save() 238 239 240def _get_prejob_hqes(job, include_active=True): 241 """Return a queryset of not run HQEs for the job (for synch_count).""" 242 models = autotest.load('frontend.afe.models') 243 if include_active: 244 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES) 245 else: 246 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES) 247 return models.HostQueueEntry.objects.filter( 248 job=job, status__in=statuses) 249