• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Event handlers."""
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import datetime
12import logging
13
14from lucifer import autotest
15from lucifer import jobx
16
17logger = logging.getLogger(__name__)
18
19
20class EventHandler(object):
21    """Event handling dispatcher.
22
23    Event handlers are implemented as methods named _handle_<event value>.
24
25    Each handler method must handle its exceptions accordingly.  If an
26    exception escapes, the job dies on the spot.
27
28    Instances have one public attribute completed.  completed is set to
29    True once the final COMPLETED event is received and the handler
30    finishes.
31    """
32
33    def __init__(self, metrics, job, autoserv_exit):
34        """Initialize instance.
35
36        @param metrics: Metrics instance
37        @param job: frontend.afe.models.Job instance to own
38        @param hqes: list of HostQueueEntry instances for the job
39        @param autoserv_exit: autoserv exit status
40        """
41        self.completed = False
42        self._metrics = metrics
43        self._job = job
44        # TODO(crbug.com/748234): autoserv not implemented yet.
45        self._autoserv_exit = autoserv_exit
46
47    def __call__(self, event, msg):
48        logger.debug('Received event %r with message %r', event.name, msg)
49        method_name = '_handle_%s' % event.value
50        try:
51            handler = getattr(self, method_name)
52        except AttributeError:
53            raise NotImplementedError('%s is not implemented for handling %s',
54                                      method_name, event.name)
55        handler(msg)
56
57    def _handle_starting(self, msg):
58        # TODO(crbug.com/748234): No event update needed yet.
59        pass
60
61    def _handle_gathering(self, msg):
62        # TODO(crbug.com/794779): monitor_db leaves HQEs in GATHERING
63        pass
64
65    def _handle_x_tests_done(self, msg):
66        """Taken from GatherLogsTask.epilog."""
67        autoserv_exit, failures = (int(x) for x in msg.split(','))
68        logger.debug('Got autoserv_exit=%d, failures=%d',
69                     autoserv_exit, failures)
70        success = (autoserv_exit == 0 and failures == 0)
71        reset_after_failure = not self._job.run_reset and not success
72        hqes = self._job.hostqueueentry_set.all().prefetch_related('host')
73        if self._should_reboot_duts(autoserv_exit, failures,
74                                    reset_after_failure):
75            logger.debug('Creating cleanup jobs for hosts')
76            for entry in hqes:
77                self._handle_host_needs_cleanup(entry.host.hostname)
78        else:
79            logger.debug('Not creating cleanup jobs for hosts')
80            for entry in hqes:
81                self._handle_host_ready(entry.host.hostname)
82        if not reset_after_failure:
83            logger.debug('Skipping reset because reset_after_failure is False')
84            return
85        logger.debug('Creating reset jobs for hosts')
86        self._metrics.send_reset_after_failure(autoserv_exit, failures)
87        for entry in hqes:
88            self._handle_host_needs_reset(entry.host.hostname)
89
90    def _handle_parsing(self, _msg):
91        models = autotest.load('frontend.afe.models')
92        self._job.hostqueueentry_set.all().update(
93                status=models.HostQueueEntry.Status.PARSING)
94
95    def _handle_completed(self, _msg):
96        models = autotest.load('frontend.afe.models')
97        final_status = self._final_status()
98        for hqe in self._job.hostqueueentry_set.all():
99            self._set_completed_status(hqe, final_status)
100        if final_status is not models.HostQueueEntry.Status.ABORTED:
101            _stop_prejob_hqes(self._job)
102        if self._job.shard_id is not None:
103            # If shard_id is None, the job will be synced back to the master
104            self._job.shard_id = None
105            self._job.save()
106        self.completed = True
107
108    def _handle_host_ready(self, msg):
109        models = autotest.load('frontend.afe.models')
110        (models.Host.objects.filter(hostname=msg)
111         .update(status=models.Host.Status.READY))
112
113    def _handle_host_needs_cleanup(self, msg):
114        models = autotest.load('frontend.afe.models')
115        host = models.Host.objects.get(hostname=msg)
116        models.SpecialTask.objects.create(
117                host_id=host.id,
118                task=models.SpecialTask.Task.CLEANUP,
119                requested_by=models.User.objects.get(login=self._job.owner))
120
121    def _handle_host_needs_reset(self, msg):
122        models = autotest.load('frontend.afe.models')
123        host = models.Host.objects.get(hostname=msg)
124        models.SpecialTask.objects.create(
125                host_id=host.id,
126                task=models.SpecialTask.Task.RESET,
127                requested_by=models.User.objects.get(login=self._job.owner))
128
129    def _should_reboot_duts(self, autoserv_exit, failures, reset_after_failure):
130        models = autotest.load('frontend.afe.models')
131        reboot_after = self._job.reboot_after
132        if self._final_status() == models.HostQueueEntry.Status.ABORTED:
133            logger.debug('Should reboot because reboot_after=ABORTED')
134            return True
135        elif reboot_after == models.Job.RebootAfter.ALWAYS:
136            logger.debug('Should reboot because reboot_after=ALWAYS')
137            return True
138        elif (reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED
139              and autoserv_exit == 0 and failures == 0):
140            logger.debug('Should reboot because'
141                         ' reboot_after=IF_ALL_TESTS_PASSED')
142            return True
143        else:
144            return failures > 0 and not reset_after_failure
145
146    def _final_status(self):
147        models = autotest.load('frontend.afe.models')
148        Status = models.HostQueueEntry.Status
149        if jobx.is_aborted(self._job):
150            return Status.ABORTED
151        if self._autoserv_exit == 0:
152            return Status.COMPLETED
153        return Status.FAILED
154
155    def _set_completed_status(self, hqe, status):
156        """Set completed status of HQE.
157
158        This is a cleaned up version of the one in scheduler_models to work
159        with Django models.
160        """
161        hqe.status = status
162        hqe.active = False
163        hqe.complete = True
164        if hqe.started_on:
165            hqe.finished_on = datetime.datetime.now()
166        hqe.save()
167        self._metrics.send_hqe_completion(hqe)
168        self._metrics.send_hqe_duration(hqe)
169
170
171class Metrics(object):
172
173    """Class for sending job metrics."""
174
175    def __init__(self):
176        # Metrics
177        metrics = autotest.chromite_load('metrics')
178        self._hqe_completion_metric = metrics.Counter(
179                'chromeos/autotest/scheduler/hqe_completion_count')
180        self._reset_after_failure_metric = metrics.Counter(
181                'chromeos/autotest/scheduler/postjob_tasks/'
182                'reset_after_failure')
183
184    def send_hqe_completion(self, hqe):
185        """Send ts_mon metrics for HQE completion."""
186        fields = {
187                'status': hqe.status.lower(),
188                'board': 'NO_HOST',
189                'pool': 'NO_HOST',
190        }
191        if hqe.host:
192            labellib = autotest.load('utils.labellib')
193            labels = labellib.LabelsMapping.from_host(hqe.host)
194            fields['board'] = labels.get('board', '')
195            fields['pool'] = labels.get('pool', '')
196        self._hqe_completion_metric.increment(fields=fields)
197
198    def send_hqe_duration(self, hqe):
199        """Send CloudTrace metrics for HQE duration."""
200        if not (hqe.started_on and hqe.finished_on):
201            return
202        scheduler_models = autotest.load('scheduler.scheduler_models')
203        cloud_trace = autotest.chromite_load('cloud_trace')
204        types = autotest.deps_load('google.protobuf.internal.well_known_types')
205        hqe_trace_id = scheduler_models.hqe_trace_id
206
207        span = cloud_trace.Span(
208                'HQE', spanId='0', traceId=hqe_trace_id(hqe.id))
209        span.startTime = types.Timestamp()
210        span.startTime.FromDatetime(hqe.started_on)
211        span.endTime = types.Timestamp()
212        span.endTime.FromDatetime(hqe.finished_on)
213        cloud_trace.LogSpan(span)
214
215    def send_reset_after_failure(self, autoserv_exit, failures):
216        """Send reset_after_failure metric."""
217        self._reset_after_failure_metric.increment(
218                fields={'autoserv_process_success': autoserv_exit == 0,
219                        # Yes, this is a boolean
220                        'num_tests_failed': failures > 0})
221
222
223def _stop_prejob_hqes(job):
224    """Stop pending HQEs for a job (for synch_count)."""
225    models = autotest.load('frontend.afe.models')
226    HQEStatus = models.HostQueueEntry.Status
227    HostStatus = models.Host.Status
228    not_yet_run = _get_prejob_hqes(job)
229    if not_yet_run.count() == job.synch_count:
230        return
231    entries_to_stop = _get_prejob_hqes(job, include_active=False)
232    for hqe in entries_to_stop:
233        if hqe.status == HQEStatus.PENDING:
234            hqe.host.status = HostStatus.READY
235            hqe.host.save()
236        hqe.status = HQEStatus.STOPPED
237        hqe.save()
238
239
240def _get_prejob_hqes(job, include_active=True):
241    """Return a queryset of not run HQEs for the job (for synch_count)."""
242    models = autotest.load('frontend.afe.models')
243    if include_active:
244        statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
245    else:
246        statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
247    return models.HostQueueEntry.objects.filter(
248            job=job, status__in=statuses)
249