• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pylint: disable=missing-docstring
2
3"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out.  Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
14        one of these statuses, an email will be sent to the job's email_list.
15        comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import base64
22import datetime
23import errno
24import itertools
25import logging
26import re
27import weakref
28
29import google.protobuf.internal.well_known_types as types
30
31from autotest_lib.client.common_lib import global_config, host_protections
32from autotest_lib.client.common_lib import time_utils
33from autotest_lib.client.common_lib import utils
34from autotest_lib.frontend.afe import models, model_attributes
35from autotest_lib.scheduler import drone_manager, email_manager
36from autotest_lib.scheduler import rdb_lib
37from autotest_lib.scheduler import scheduler_config
38from autotest_lib.scheduler import scheduler_lib
39from autotest_lib.server import afe_urls
40from autotest_lib.server.cros import provision
41
42try:
43    from chromite.lib import metrics
44    from chromite.lib import cloud_trace
45except ImportError:
46    metrics = utils.metrics_mock
47    import mock
48    cloud_trace = mock.Mock()
49
50
51_notify_email_statuses = []
52_base_url = None
53
54_db = None
55_drone_manager = None
56
57RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
58        'SKYLAB', 'respect_static_labels', type=bool, default=False)
59
60
61def initialize():
62    global _db
63    _db = scheduler_lib.ConnectionManager().get_connection()
64
65    notify_statuses_list = global_config.global_config.get_config_value(
66            scheduler_config.CONFIG_SECTION, "notify_email_statuses",
67            default='')
68    global _notify_email_statuses
69    _notify_email_statuses = [status for status in
70                              re.split(r'[\s,;:]', notify_statuses_list.lower())
71                              if status]
72
73    # AUTOTEST_WEB.base_url is still a supported config option as some people
74    # may wish to override the entire url.
75    global _base_url
76    config_base_url = global_config.global_config.get_config_value(
77            scheduler_config.CONFIG_SECTION, 'base_url', default='')
78    if config_base_url:
79        _base_url = config_base_url
80    else:
81        _base_url = afe_urls.ROOT_URL
82
83    initialize_globals()
84
85
86def initialize_globals():
87    global _drone_manager
88    _drone_manager = drone_manager.instance()
89
90
91def get_job_metadata(job):
92    """Get a dictionary of the job information.
93
94    The return value is a dictionary that includes job information like id,
95    name and parent job information. The value will be stored in metadata
96    database.
97
98    @param job: A Job object.
99    @return: A dictionary containing the job id, owner and name.
100    """
101    if not job:
102        logging.error('Job is None, no metadata returned.')
103        return {}
104    try:
105        return {'job_id': job.id,
106                'owner': job.owner,
107                'job_name': job.name,
108                'parent_job_id': job.parent_job_id}
109    except AttributeError as e:
110        logging.error('Job has missing attribute: %s', e)
111        return {}
112
113
114class DBError(Exception):
115    """Raised by the DBObject constructor when its select fails."""
116
117
118class DBObject(object):
119    """A miniature object relational model for the database."""
120
121    # Subclasses MUST override these:
122    _table_name = ''
123    _fields = ()
124
125    # A mapping from (type, id) to the instance of the object for that
126    # particular id.  This prevents us from creating new Job() and Host()
127    # instances for every HostQueueEntry object that we instantiate as
128    # multiple HQEs often share the same Job.
129    _instances_by_type_and_id = weakref.WeakValueDictionary()
130    _initialized = False
131
132
133    def __new__(cls, id=None, **kwargs):
134        """
135        Look to see if we already have an instance for this particular type
136        and id.  If so, use it instead of creating a duplicate instance.
137        """
138        if id is not None:
139            instance = cls._instances_by_type_and_id.get((cls, id))
140            if instance:
141                return instance
142        return super(DBObject, cls).__new__(cls, id=id, **kwargs)
143
144
145    def __init__(self, id=None, row=None, new_record=False, always_query=True):
146        assert bool(id) or bool(row)
147        if id is not None and row is not None:
148            assert id == row[0]
149        assert self._table_name, '_table_name must be defined in your class'
150        assert self._fields, '_fields must be defined in your class'
151        if not new_record:
152            if self._initialized and not always_query:
153                return  # We've already been initialized.
154            if id is None:
155                id = row[0]
156            # Tell future constructors to use us instead of re-querying while
157            # this instance is still around.
158            self._instances_by_type_and_id[(type(self), id)] = self
159
160        self.__table = self._table_name
161
162        self.__new_record = new_record
163
164        if row is None:
165            row = self._fetch_row_from_db(id)
166
167        if self._initialized:
168            differences = self._compare_fields_in_row(row)
169            if differences:
170                logging.warning(
171                    'initialized %s %s instance requery is updating: %s',
172                    type(self), self.id, differences)
173        self._update_fields_from_row(row)
174        self._initialized = True
175
176
177    @classmethod
178    def _clear_instance_cache(cls):
179        """Used for testing, clear the internal instance cache."""
180        cls._instances_by_type_and_id.clear()
181
182
183    def _fetch_row_from_db(self, row_id):
184        fields = ', '.join(self._fields)
185        sql = 'SELECT %s FROM %s WHERE ID=%%s' % (fields, self.__table)
186        rows = _db.execute(sql, (row_id,))
187        if not rows:
188            raise DBError("row not found (table=%s, row id=%s)"
189                          % (self.__table, row_id))
190        return rows[0]
191
192
193    def _assert_row_length(self, row):
194        assert len(row) == len(self._fields), (
195            "table = %s, row = %s/%d, fields = %s/%d" % (
196            self.__table, row, len(row), self._fields, len(self._fields)))
197
198
199    def _compare_fields_in_row(self, row):
200        """
201        Given a row as returned by a SELECT query, compare it to our existing in
202        memory fields.  Fractional seconds are stripped from datetime values
203        before comparison.
204
205        @param row - A sequence of values corresponding to fields named in
206                The class attribute _fields.
207
208        @returns A dictionary listing the differences keyed by field name
209                containing tuples of (current_value, row_value).
210        """
211        self._assert_row_length(row)
212        differences = {}
213        for field, row_value in itertools.izip(self._fields, row):
214            current_value = getattr(self, field)
215            if (isinstance(current_value, datetime.datetime)
216                and isinstance(row_value, datetime.datetime)):
217                current_value = current_value.strftime(time_utils.TIME_FMT)
218                row_value = row_value.strftime(time_utils.TIME_FMT)
219            if current_value != row_value:
220                differences[field] = (current_value, row_value)
221        return differences
222
223
224    def _update_fields_from_row(self, row):
225        """
226        Update our field attributes using a single row returned by SELECT.
227
228        @param row - A sequence of values corresponding to fields named in
229                the class fields list.
230        """
231        self._assert_row_length(row)
232
233        self._valid_fields = set()
234        for field, value in itertools.izip(self._fields, row):
235            setattr(self, field, value)
236            self._valid_fields.add(field)
237
238        self._valid_fields.remove('id')
239
240
241    def update_from_database(self):
242        assert self.id is not None
243        row = self._fetch_row_from_db(self.id)
244        self._update_fields_from_row(row)
245
246
247    def count(self, where, table = None):
248        if not table:
249            table = self.__table
250
251        rows = _db.execute("""
252                SELECT count(*) FROM %s
253                WHERE %s
254        """ % (table, where))
255
256        assert len(rows) == 1
257
258        return int(rows[0][0])
259
260
261    def update_field(self, field, value):
262        assert field in self._valid_fields
263
264        if getattr(self, field) == value:
265            return
266
267        query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
268        _db.execute(query, (value, self.id))
269
270        setattr(self, field, value)
271
272
273    def save(self):
274        if self.__new_record:
275            keys = self._fields[1:] # avoid id
276            columns = ','.join([str(key) for key in keys])
277            values = []
278            for key in keys:
279                value = getattr(self, key)
280                if value is None:
281                    values.append('NULL')
282                else:
283                    values.append('"%s"' % value)
284            values_str = ','.join(values)
285            query = ('INSERT INTO %s (%s) VALUES (%s)' %
286                     (self.__table, columns, values_str))
287            _db.execute(query)
288            # Update our id to the one the database just assigned to us.
289            self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
290
291
292    def delete(self):
293        self._instances_by_type_and_id.pop((type(self), id), None)
294        self._initialized = False
295        self._valid_fields.clear()
296        query = 'DELETE FROM %s WHERE id=%%s' % self.__table
297        _db.execute(query, (self.id,))
298
299
300    @staticmethod
301    def _prefix_with(string, prefix):
302        if string:
303            string = prefix + string
304        return string
305
306
307    @classmethod
308    def fetch_rows(cls, where='', params=(), joins='', order_by=''):
309        """
310        Fetch the rows based on the given database query.
311
312        @yields the rows fetched by the given query.
313        """
314        order_by = cls._prefix_with(order_by, 'ORDER BY ')
315        where = cls._prefix_with(where, 'WHERE ')
316        fields = []
317        for field in cls._fields:
318            fields.append('%s.%s' % (cls._table_name, field))
319
320        query = ('SELECT %(fields)s FROM %(table)s %(joins)s '
321                 '%(where)s %(order_by)s' % {'fields' : ', '.join(fields),
322                                             'table' : cls._table_name,
323                                             'joins' : joins,
324                                             'where' : where,
325                                             'order_by' : order_by})
326        rows = _db.execute(query, params)
327        return rows
328
329    @classmethod
330    def fetch(cls, where='', params=(), joins='', order_by=''):
331        """
332        Construct instances of our class based on the given database query.
333
334        @yields One class instance for each row fetched.
335        """
336        rows = cls.fetch_rows(where=where, params=params, joins=joins,
337                              order_by=order_by)
338        return [cls(id=row[0], row=row) for row in rows]
339
340
341class IneligibleHostQueue(DBObject):
342    _table_name = 'afe_ineligible_host_queues'
343    _fields = ('id', 'job_id', 'host_id')
344
345
346class AtomicGroup(DBObject):
347    _table_name = 'afe_atomic_groups'
348    _fields = ('id', 'name', 'description', 'max_number_of_machines',
349               'invalid')
350
351
352class Label(DBObject):
353    _table_name = 'afe_labels'
354    _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
355               'only_if_needed', 'atomic_group_id')
356
357
358    def __repr__(self):
359        return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
360                self.name, self.id, self.atomic_group_id)
361
362
363class Host(DBObject):
364    _table_name = 'afe_hosts'
365    # TODO(ayatane): synch_id is not used, remove after fixing DB.
366    _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
367               'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
368               'leased', 'shard_id', 'lock_reason')
369
370
371    def set_status(self,status):
372        logging.info('%s -> %s', self.hostname, status)
373        self.update_field('status',status)
374
375
376    def _get_labels_with_platform(self, non_static_rows, static_rows):
377        """Helper function to fetch labels & platform for a host."""
378        if not RESPECT_STATIC_LABELS:
379            return non_static_rows
380
381        combined_rows = []
382        replaced_labels = _db.execute(
383                'SELECT label_id FROM afe_replaced_labels')
384        replaced_label_ids = {l[0] for l in replaced_labels}
385
386        # We respect afe_labels more, which means:
387        #   * if non-static labels are replaced, we find its replaced static
388        #   labels from afe_static_labels by label name.
389        #   * if non-static labels are not replaced, we keep it.
390        #   * Drop static labels which don't have reference non-static labels.
391        static_label_names = []
392        for label_id, label_name, is_platform in non_static_rows:
393            if label_id not in replaced_label_ids:
394                combined_rows.append((label_id, label_name, is_platform))
395            else:
396                static_label_names.append(label_name)
397
398        # Only keep static labels who have replaced non-static labels.
399        for label_id, label_name, is_platform in static_rows:
400            if label_name in static_label_names:
401                combined_rows.append((label_id, label_name, is_platform))
402
403        return combined_rows
404
405
406    def platform_and_labels(self):
407        """
408        Returns a tuple (platform_name, list_of_all_label_names).
409        """
410        template = ('SELECT %(label_table)s.id, %(label_table)s.name, '
411                    '%(label_table)s.platform FROM %(label_table)s INNER '
412                    'JOIN %(host_label_table)s '
413                    'ON %(label_table)s.id = %(host_label_table)s.%(column)s '
414                    'WHERE %(host_label_table)s.host_id = %(host_id)s '
415                    'ORDER BY %(label_table)s.name')
416        static_query = template % {
417                'host_label_table': 'afe_static_hosts_labels',
418                'label_table': 'afe_static_labels',
419                'column': 'staticlabel_id',
420                'host_id': self.id
421        }
422        non_static_query = template % {
423                'host_label_table': 'afe_hosts_labels',
424                'label_table': 'afe_labels',
425                'column': 'label_id',
426                'host_id': self.id
427        }
428        non_static_rows = _db.execute(non_static_query)
429        static_rows = _db.execute(static_query)
430
431        rows = self._get_labels_with_platform(non_static_rows, static_rows)
432        platform = None
433        all_labels = []
434        for _, label_name, is_platform in rows:
435            if is_platform:
436                platform = label_name
437            all_labels.append(label_name)
438        return platform, all_labels
439
440
441    _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
442
443
444    @classmethod
445    def cmp_for_sort(cls, a, b):
446        """
447        A comparison function for sorting Host objects by hostname.
448
449        This strips any trailing numeric digits, ignores leading 0s and
450        compares hostnames by the leading name and the trailing digits as a
451        number.  If both hostnames do not match this pattern, they are simply
452        compared as lower case strings.
453
454        Example of how hostnames will be sorted:
455
456          alice, host1, host2, host09, host010, host10, host11, yolkfolk
457
458        This hopefully satisfy most people's hostname sorting needs regardless
459        of their exact naming schemes.  Nobody sane should have both a host10
460        and host010 (but the algorithm works regardless).
461        """
462        lower_a = a.hostname.lower()
463        lower_b = b.hostname.lower()
464        match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
465        match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
466        if match_a and match_b:
467            name_a, number_a_str = match_a.groups()
468            name_b, number_b_str = match_b.groups()
469            number_a = int(number_a_str.lstrip('0'))
470            number_b = int(number_b_str.lstrip('0'))
471            result = cmp((name_a, number_a), (name_b, number_b))
472            if result == 0 and lower_a != lower_b:
473                # If they compared equal above but the lower case names are
474                # indeed different, don't report equality.  abc012 != abc12.
475                return cmp(lower_a, lower_b)
476            return result
477        else:
478            return cmp(lower_a, lower_b)
479
480
481class HostQueueEntry(DBObject):
482    _table_name = 'afe_host_queue_entries'
483    _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
484               'active', 'complete', 'deleted', 'execution_subdir',
485               'atomic_group_id', 'aborted', 'started_on', 'finished_on')
486
487    _COMPLETION_COUNT_METRIC = metrics.Counter(
488        'chromeos/autotest/scheduler/hqe_completion_count')
489
490    def __init__(self, id=None, row=None, job_row=None, **kwargs):
491        """
492        @param id: ID field from afe_host_queue_entries table.
493                   Either id or row should be specified for initialization.
494        @param row: The DB row for a particular HostQueueEntry.
495                    Either id or row should be specified for initialization.
496        @param job_row: The DB row for the job of this HostQueueEntry.
497        """
498        assert id or row
499        super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
500        self.job = Job(self.job_id, row=job_row)
501
502        if self.host_id:
503            self.host = rdb_lib.get_hosts([self.host_id])[0]
504            self.host.dbg_str = self.get_dbg_str()
505            self.host.metadata = get_job_metadata(self.job)
506        else:
507            self.host = None
508
509
510    @classmethod
511    def clone(cls, template):
512        """
513        Creates a new row using the values from a template instance.
514
515        The new instance will not exist in the database or have a valid
516        id attribute until its save() method is called.
517        """
518        assert isinstance(template, cls)
519        new_row = [getattr(template, field) for field in cls._fields]
520        clone = cls(row=new_row, new_record=True)
521        clone.id = None
522        return clone
523
524
525    @classmethod
526    def fetch(cls, where='', params=(), joins='', order_by=''):
527        """
528        Construct instances of our class based on the given database query.
529
530        @yields One class instance for each row fetched.
531        """
532        # Override the original fetch method to pre-fetch the jobs from the DB
533        # in order to prevent each HQE making separate DB queries.
534        rows = cls.fetch_rows(where=where, params=params, joins=joins,
535                              order_by=order_by)
536        if len(rows) <= 1:
537            return [cls(id=row[0], row=row) for row in rows]
538
539        job_params = ', '.join([str(row[1]) for row in rows])
540        job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params))
541        # Create a Job_id to Job_row match dictionary to match the HQE
542        # to its corresponding job.
543        job_dict = {job_row[0]: job_row for job_row in job_rows}
544        return [cls(id=row[0], row=row, job_row=job_dict.get(row[1]))
545                for row in rows]
546
547
548    def _view_job_url(self):
549        return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
550
551
552    def get_labels(self):
553        """
554        Get all labels associated with this host queue entry (either via the
555        meta_host or as a job dependency label).  The labels yielded are not
556        guaranteed to be unique.
557
558        @yields Label instances associated with this host_queue_entry.
559        """
560        if self.meta_host:
561            yield Label(id=self.meta_host, always_query=False)
562        labels = Label.fetch(
563                joins="JOIN afe_jobs_dependency_labels AS deps "
564                      "ON (afe_labels.id = deps.label_id)",
565                where="deps.job_id = %d" % self.job.id)
566        for label in labels:
567            yield label
568
569
570    def set_host(self, host):
571        if host:
572            logging.info('Assigning host %s to entry %s', host.hostname, self)
573            self.update_field('host_id', host.id)
574            self.block_host(host.id)
575        else:
576            logging.info('Releasing host from %s', self)
577            self.unblock_host(self.host.id)
578            self.update_field('host_id', None)
579
580        self.host = host
581
582
583    def block_host(self, host_id):
584        logging.info("creating block %s/%s", self.job.id, host_id)
585        row = [0, self.job.id, host_id]
586        block = IneligibleHostQueue(row=row, new_record=True)
587        block.save()
588
589
590    def unblock_host(self, host_id):
591        logging.info("removing block %s/%s", self.job.id, host_id)
592        blocks = IneligibleHostQueue.fetch(
593            'job_id=%d and host_id=%d' % (self.job.id, host_id))
594        for block in blocks:
595            block.delete()
596
597
598    def set_execution_subdir(self, subdir=None):
599        if subdir is None:
600            assert self.host
601            subdir = self.host.hostname
602        self.update_field('execution_subdir', subdir)
603
604
605    def _get_hostname(self):
606        if self.host:
607            return self.host.hostname
608        return 'no host'
609
610
611    def get_dbg_str(self):
612        """Get a debug string to identify this host.
613
614        @return: A string containing the hqe and job id.
615        """
616        try:
617            return 'HQE: %s, for job: %s' % (self.id, self.job_id)
618        except AttributeError as e:
619            return 'HQE has not been initialized yet: %s' % e
620
621
622    def __str__(self):
623        flags = []
624        if self.active:
625            flags.append('active')
626        if self.complete:
627            flags.append('complete')
628        if self.deleted:
629            flags.append('deleted')
630        if self.aborted:
631            flags.append('aborted')
632        flags_str = ','.join(flags)
633        if flags_str:
634            flags_str = ' [%s]' % flags_str
635        return ("%s and host: %s has status:%s%s" %
636                (self.get_dbg_str(), self._get_hostname(), self.status,
637                 flags_str))
638
639
640    def set_status(self, status):
641        logging.info("%s -> %s", self, status)
642
643        self.update_field('status', status)
644
645        active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
646        complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
647
648        self.update_field('active', active)
649
650        # The ordering of these operations is important. Once we set the
651        # complete bit this job will become indistinguishable from all
652        # the other complete jobs, unless we first set shard_id to NULL
653        # to signal to the shard_client that we need to upload it. However,
654        # we can only set both these after we've updated finished_on etc
655        # within _on_complete or the job will get synced in an intermediate
656        # state. This means that if someone sigkills the scheduler between
657        # setting finished_on and complete, we will have inconsistent jobs.
658        # This should be fine, because nothing critical checks finished_on,
659        # and the scheduler should never be killed mid-tick.
660        if complete:
661            self._on_complete(status)
662            self._email_on_job_complete()
663
664        self.update_field('complete', complete)
665
666        should_email_status = (status.lower() in _notify_email_statuses or
667                               'all' in _notify_email_statuses)
668        if should_email_status:
669            self._email_on_status(status)
670        logging.debug('HQE Set Status Complete')
671
672
673    def _on_complete(self, status):
674        metric_fields = {'status': status.lower()}
675        if self.host:
676            metric_fields['board'] = self.host.board or ''
677            if len(self.host.pools) == 1:
678                metric_fields['pool'] = self.host.pools[0]
679            else:
680                metric_fields['pool'] = 'MULTIPLE'
681        else:
682            metric_fields['board'] = 'NO_HOST'
683            metric_fields['pool'] = 'NO_HOST'
684        self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
685        if status is not models.HostQueueEntry.Status.ABORTED:
686            self.job.stop_if_necessary()
687        if self.started_on:
688            self.set_finished_on_now()
689            self._log_trace()
690        if self.job.shard_id is not None:
691            # If shard_id is None, the job will be synced back to the master
692            self.job.update_field('shard_id', None)
693        if not self.execution_subdir:
694            return
695        # unregister any possible pidfiles associated with this queue entry
696        for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
697            pidfile_id = _drone_manager.get_pidfile_id_from(
698                    self.execution_path(), pidfile_name=pidfile_name)
699            _drone_manager.unregister_pidfile(pidfile_id)
700
701    def _log_trace(self):
702        """Emits a Cloud Trace span for the HQE's duration."""
703        if self.started_on and self.finished_on:
704            span = cloud_trace.Span('HQE', spanId='0',
705                                    traceId=hqe_trace_id(self.id))
706            # TODO(phobbs) make a .SetStart() and .SetEnd() helper method
707            span.startTime = types.Timestamp()
708            span.startTime.FromDatetime(self.started_on)
709            span.endTime = types.Timestamp()
710            span.endTime.FromDatetime(self.finished_on)
711            # TODO(phobbs) any LogSpan calls need to be wrapped in this for
712            # safety during tests, so this should be caught within LogSpan.
713            try:
714                cloud_trace.LogSpan(span)
715            except IOError as e:
716                if e.errno == errno.ENOENT:
717                    logging.warning('Error writing to cloud trace results '
718                                    'directory: %s', e)
719
720
721    def _get_status_email_contents(self, status, summary=None, hostname=None):
722        """
723        Gather info for the status notification e-mails.
724
725        If needed, we could start using the Django templating engine to create
726        the subject and the e-mail body, but that doesn't seem necessary right
727        now.
728
729        @param status: Job status text. Mandatory.
730        @param summary: Job summary text. Optional.
731        @param hostname: A hostname for the job. Optional.
732
733        @return: Tuple (subject, body) for the notification e-mail.
734        """
735        job_stats = Job(id=self.job.id).get_execution_details()
736
737        subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
738                   (self.job.id, self.job.name, status))
739
740        if hostname is not None:
741            subject += '| Hostname: %s ' % hostname
742
743        if status not in ["1 Failed", "Failed"]:
744            subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
745
746        body =  "Job ID: %s\n" % self.job.id
747        body += "Job name: %s\n" % self.job.name
748        if hostname is not None:
749            body += "Host: %s\n" % hostname
750        if summary is not None:
751            body += "Summary: %s\n" % summary
752        body += "Status: %s\n" % status
753        body += "Results interface URL: %s\n" % self._view_job_url()
754        body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
755        if int(job_stats['total_executed']) > 0:
756            body += "User tests executed: %s\n" % job_stats['total_executed']
757            body += "User tests passed: %s\n" % job_stats['total_passed']
758            body += "User tests failed: %s\n" % job_stats['total_failed']
759            body += ("User tests success rate: %.2f %%\n" %
760                     job_stats['success_rate'])
761
762        if job_stats['failed_rows']:
763            body += "Failures:\n"
764            body += job_stats['failed_rows']
765
766        return subject, body
767
768
769    def _email_on_status(self, status):
770        hostname = self._get_hostname()
771        subject, body = self._get_status_email_contents(status, None, hostname)
772        email_manager.manager.send_email(self.job.email_list, subject, body)
773
774
775    def _email_on_job_complete(self):
776        if not self.job.is_finished():
777            return
778
779        summary = []
780        hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
781        for queue_entry in hosts_queue:
782            summary.append("Host: %s Status: %s" %
783                                (queue_entry._get_hostname(),
784                                 queue_entry.status))
785
786        summary = "\n".join(summary)
787        status_counts = models.Job.objects.get_status_counts(
788                [self.job.id])[self.job.id]
789        status = ', '.join('%d %s' % (count, status) for status, count
790                    in status_counts.iteritems())
791
792        subject, body = self._get_status_email_contents(status, summary, None)
793        email_manager.manager.send_email(self.job.email_list, subject, body)
794
795
796    def schedule_pre_job_tasks(self):
797        logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
798                     self.job.name, self.meta_host, self.atomic_group_id,
799                     self.job.id, self.id, self.host.hostname, self.status)
800
801        self._do_schedule_pre_job_tasks()
802
803
804    def _do_schedule_pre_job_tasks(self):
805        self.job.schedule_pre_job_tasks(queue_entry=self)
806
807
808    def requeue(self):
809        assert self.host
810        self.set_status(models.HostQueueEntry.Status.QUEUED)
811        self.update_field('started_on', None)
812        self.update_field('finished_on', None)
813        # verify/cleanup failure sets the execution subdir, so reset it here
814        self.set_execution_subdir('')
815        if self.meta_host:
816            self.set_host(None)
817
818
819    @property
820    def aborted_by(self):
821        self._load_abort_info()
822        return self._aborted_by
823
824
825    @property
826    def aborted_on(self):
827        self._load_abort_info()
828        return self._aborted_on
829
830
831    def _load_abort_info(self):
832        """ Fetch info about who aborted the job. """
833        if hasattr(self, "_aborted_by"):
834            return
835        rows = _db.execute("""
836                SELECT afe_users.login,
837                        afe_aborted_host_queue_entries.aborted_on
838                FROM afe_aborted_host_queue_entries
839                INNER JOIN afe_users
840                ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
841                WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
842                """, (self.id,))
843        if rows:
844            self._aborted_by, self._aborted_on = rows[0]
845        else:
846            self._aborted_by = self._aborted_on = None
847
848
849    def on_pending(self):
850        """
851        Called when an entry in a synchronous job has passed verify.  If the
852        job is ready to run, sets the entries to STARTING. Otherwise, it leaves
853        them in PENDING.
854        """
855        self.set_status(models.HostQueueEntry.Status.PENDING)
856        if not self.host:
857            raise scheduler_lib.NoHostIdError(
858                    'Failed to recover a job whose host_queue_entry_id=%r due'
859                    ' to no host_id.'
860                    % self.id)
861        self.host.set_status(models.Host.Status.PENDING)
862
863        # Some debug code here: sends an email if an asynchronous job does not
864        # immediately enter Starting.
865        # TODO: Remove this once we figure out why asynchronous jobs are getting
866        # stuck in Pending.
867        self.job.run_if_ready(queue_entry=self)
868        if (self.job.synch_count == 1 and
869                self.status == models.HostQueueEntry.Status.PENDING):
870            subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
871            message = 'Asynchronous job stuck in Pending'
872            email_manager.manager.enqueue_notify_email(subject, message)
873
874
875    def abort(self, dispatcher):
876        assert self.aborted and not self.complete
877
878        Status = models.HostQueueEntry.Status
879        if self.status in {Status.GATHERING, Status.PARSING}:
880            # do nothing; post-job tasks will finish and then mark this entry
881            # with status "Aborted" and take care of the host
882            return
883
884        if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
885            # If hqe is in any of these status, it should not have any
886            # unfinished agent before it can be aborted.
887            agents = dispatcher.get_agents_for_entry(self)
888            # Agent with finished task can be left behind. This is added to
889            # handle the special case of aborting hostless job in STARTING
890            # status, in which the agent has only a HostlessQueueTask
891            # associated. The finished HostlessQueueTask will be cleaned up in
892            # the next tick, so it's safe to leave the agent there. Without
893            # filtering out finished agent, HQE abort won't be able to proceed.
894            assert all([agent.is_done() for agent in agents])
895            # If hqe is still in STARTING status, it may not have assigned a
896            # host yet.
897            if self.host:
898                self.host.set_status(models.Host.Status.READY)
899        elif (self.status == Status.VERIFYING or
900              self.status == Status.RESETTING):
901            models.SpecialTask.objects.create(
902                    task=models.SpecialTask.Task.CLEANUP,
903                    host=models.Host.objects.get(id=self.host.id),
904                    requested_by=self.job.owner_model())
905        elif self.status == Status.PROVISIONING:
906            models.SpecialTask.objects.create(
907                    task=models.SpecialTask.Task.REPAIR,
908                    host=models.Host.objects.get(id=self.host.id),
909                    requested_by=self.job.owner_model())
910
911        self.set_status(Status.ABORTED)
912
913
914    def execution_tag(self):
915        SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
916                               'complete!=1 AND execution_subdir="" AND '
917                               'status!="Queued";')
918        SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
919                                 'status="Aborted" WHERE id=%s;')
920        try:
921            assert self.execution_subdir
922        except AssertionError:
923            # TODO(scottz): Remove temporary fix/info gathering pathway for
924            # crosbug.com/31595 once issue is root caused.
925            logging.error('No execution_subdir for host queue id:%s.', self.id)
926            logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
927            for row in _db.execute(SQL_SUSPECT_ENTRIES):
928                logging.error(row)
929            logging.error('====DB DEBUG====\n')
930            fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
931            logging.error('EXECUTING: %s', fix_query)
932            _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
933            raise AssertionError(('self.execution_subdir not found. '
934                                  'See log for details.'))
935
936        return "%s/%s" % (self.job.tag(), self.execution_subdir)
937
938
939    def execution_path(self):
940        return self.execution_tag()
941
942
943    def set_started_on_now(self):
944        self.update_field('started_on', datetime.datetime.now())
945
946
947    def set_finished_on_now(self):
948        self.update_field('finished_on', datetime.datetime.now())
949
950
951    def is_hostless(self):
952        return (self.host_id is None
953                and self.meta_host is None)
954
955
956def hqe_trace_id(hqe_id):
957    """Constructs the canonical trace id based on the HQE's id.
958
959    Encodes 'HQE' in base16 and concatenates with the hex representation
960    of the HQE's id.
961
962    @param hqe_id: The HostQueueEntry's id.
963
964    Returns:
965        A trace id (in hex format)
966    """
967    return base64.b16encode('HQE') + hex(hqe_id)[2:]
968
969
970class Job(DBObject):
971    _table_name = 'afe_jobs'
972    _fields = ('id', 'owner', 'name', 'priority', 'control_file',
973               'control_type', 'created_on', 'synch_count', 'timeout',
974               'run_verify', 'email_list', 'reboot_before', 'reboot_after',
975               'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
976               'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
977               'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
978               'require_ssp')
979
980    # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
981    # all status='Pending' atomic group HQEs incase a delay was running when the
982    # scheduler was restarted and no more hosts ever successfully exit Verify.
983
984    def __init__(self, id=None, row=None, **kwargs):
985        assert id or row
986        super(Job, self).__init__(id=id, row=row, **kwargs)
987        self._owner_model = None # caches model instance of owner
988        self.update_image_path = None # path of OS image to install
989
990
991    def model(self):
992        return models.Job.objects.get(id=self.id)
993
994
995    def owner_model(self):
996        # work around the fact that the Job owner field is a string, not a
997        # foreign key
998        if not self._owner_model:
999            self._owner_model = models.User.objects.get(login=self.owner)
1000        return self._owner_model
1001
1002
1003    def tag(self):
1004        return "%s-%s" % (self.id, self.owner)
1005
1006
1007    def get_execution_details(self):
1008        """
1009        Get test execution details for this job.
1010
1011        @return: Dictionary with test execution details
1012        """
1013        def _find_test_jobs(rows):
1014            """
1015            Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1016            Those are autotest 'internal job' tests, so they should not be
1017            counted when evaluating the test stats.
1018
1019            @param rows: List of rows (matrix) with database results.
1020            """
1021            job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1022            n_test_jobs = 0
1023            for r in rows:
1024                test_name = r[0]
1025                if job_test_pattern.match(test_name):
1026                    n_test_jobs += 1
1027
1028            return n_test_jobs
1029
1030        stats = {}
1031
1032        rows = _db.execute("""
1033                SELECT t.test, s.word, t.reason
1034                FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1035                WHERE t.job_idx = j.job_idx
1036                AND s.status_idx = t.status
1037                AND j.afe_job_id = %s
1038                ORDER BY t.reason
1039                """ % self.id)
1040
1041        failed_rows = [r for r in rows if not r[1] == 'GOOD']
1042
1043        n_test_jobs = _find_test_jobs(rows)
1044        n_test_jobs_failed = _find_test_jobs(failed_rows)
1045
1046        total_executed = len(rows) - n_test_jobs
1047        total_failed = len(failed_rows) - n_test_jobs_failed
1048
1049        if total_executed > 0:
1050            success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1051        else:
1052            success_rate = 0
1053
1054        stats['total_executed'] = total_executed
1055        stats['total_failed'] = total_failed
1056        stats['total_passed'] = total_executed - total_failed
1057        stats['success_rate'] = success_rate
1058
1059        status_header = ("Test Name", "Status", "Reason")
1060        if failed_rows:
1061            stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1062                                                          status_header)
1063        else:
1064            stats['failed_rows'] = ''
1065
1066        time_row = _db.execute("""
1067                   SELECT started_time, finished_time
1068                   FROM tko_jobs
1069                   WHERE afe_job_id = %s
1070                   """ % self.id)
1071
1072        if time_row:
1073            t_begin, t_end = time_row[0]
1074            try:
1075                delta = t_end - t_begin
1076                minutes, seconds = divmod(delta.seconds, 60)
1077                hours, minutes = divmod(minutes, 60)
1078                stats['execution_time'] = ("%02d:%02d:%02d" %
1079                                           (hours, minutes, seconds))
1080            # One of t_end or t_begin are None
1081            except TypeError:
1082                stats['execution_time'] = '(could not determine)'
1083        else:
1084            stats['execution_time'] = '(none)'
1085
1086        return stats
1087
1088
1089    def keyval_dict(self):
1090        return self.model().keyval_dict()
1091
1092
1093    def _pending_count(self):
1094        """The number of HostQueueEntries for this job in the Pending state."""
1095        pending_entries = models.HostQueueEntry.objects.filter(
1096                job=self.id, status=models.HostQueueEntry.Status.PENDING)
1097        return pending_entries.count()
1098
1099
1100    def is_ready(self):
1101        pending_count = self._pending_count()
1102        ready = (pending_count >= self.synch_count)
1103
1104        if not ready:
1105            logging.info(
1106                    'Job %s not ready: %s pending, %s required ',
1107                    self, pending_count, self.synch_count)
1108
1109        return ready
1110
1111
1112    def num_machines(self, clause = None):
1113        sql = "job_id=%s" % self.id
1114        if clause:
1115            sql += " AND (%s)" % clause
1116        return self.count(sql, table='afe_host_queue_entries')
1117
1118
1119    def num_queued(self):
1120        return self.num_machines('not complete')
1121
1122
1123    def num_active(self):
1124        return self.num_machines('active')
1125
1126
1127    def num_complete(self):
1128        return self.num_machines('complete')
1129
1130
1131    def is_finished(self):
1132        return self.num_complete() == self.num_machines()
1133
1134
1135    def _not_yet_run_entries(self, include_active=True):
1136        if include_active:
1137          statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
1138        else:
1139          statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
1140        return models.HostQueueEntry.objects.filter(job=self.id,
1141                                                    status__in=statuses)
1142
1143
1144    def _stop_all_entries(self):
1145        """Stops the job's inactive pre-job HQEs."""
1146        entries_to_stop = self._not_yet_run_entries(
1147            include_active=False)
1148        for child_entry in entries_to_stop:
1149            assert not child_entry.complete, (
1150                '%s status=%s, active=%s, complete=%s' %
1151                (child_entry.id, child_entry.status, child_entry.active,
1152                 child_entry.complete))
1153            if child_entry.status == models.HostQueueEntry.Status.PENDING:
1154                child_entry.host.status = models.Host.Status.READY
1155                child_entry.host.save()
1156            child_entry.status = models.HostQueueEntry.Status.STOPPED
1157            child_entry.save()
1158
1159
1160    def stop_if_necessary(self):
1161        not_yet_run = self._not_yet_run_entries()
1162        if not_yet_run.count() < self.synch_count:
1163            self._stop_all_entries()
1164
1165
1166    def _next_group_name(self):
1167        """@returns a directory name to use for the next host group results."""
1168        group_name = ''
1169        group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1170        query = models.HostQueueEntry.objects.filter(
1171            job=self.id).values('execution_subdir').distinct()
1172        subdirs = (entry['execution_subdir'] for entry in query)
1173        group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1174        ids = [int(match.group(1)) for match in group_matches if match]
1175        if ids:
1176            next_id = max(ids) + 1
1177        else:
1178            next_id = 0
1179        return '%sgroup%d' % (group_name, next_id)
1180
1181
1182    def get_group_entries(self, queue_entry_from_group):
1183        """
1184        @param queue_entry_from_group: A HostQueueEntry instance to find other
1185                group entries on this job for.
1186
1187        @returns A list of HostQueueEntry objects all executing this job as
1188                part of the same group as the one supplied (having the same
1189                execution_subdir).
1190        """
1191        execution_subdir = queue_entry_from_group.execution_subdir
1192        return list(HostQueueEntry.fetch(
1193            where='job_id=%s AND execution_subdir=%s',
1194            params=(self.id, execution_subdir)))
1195
1196
1197    def _should_run_cleanup(self, queue_entry):
1198        if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1199            return True
1200        elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1201            return queue_entry.host.dirty
1202        return False
1203
1204
1205    def _should_run_verify(self, queue_entry):
1206        do_not_verify = (queue_entry.host.protection ==
1207                         host_protections.Protection.DO_NOT_VERIFY)
1208        if do_not_verify:
1209            return False
1210        # If RebootBefore is set to NEVER, then we won't run reset because
1211        # we can't cleanup, so we need to weaken a Reset into a Verify.
1212        weaker_reset = (self.run_reset and
1213                self.reboot_before == model_attributes.RebootBefore.NEVER)
1214        return self.run_verify or weaker_reset
1215
1216
1217    def _should_run_reset(self, queue_entry):
1218        can_verify = (queue_entry.host.protection !=
1219                         host_protections.Protection.DO_NOT_VERIFY)
1220        can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1221        return (can_reboot and can_verify and (self.run_reset or
1222                (self._should_run_cleanup(queue_entry) and
1223                 self._should_run_verify(queue_entry))))
1224
1225
1226    def _should_run_provision(self, queue_entry):
1227        """
1228        Determine if the queue_entry needs to have a provision task run before
1229        it to provision queue_entry.host.
1230
1231        @param queue_entry: The host queue entry in question.
1232        @returns: True if we should schedule a provision task, False otherwise.
1233
1234        """
1235        # If we get to this point, it means that the scheduler has already
1236        # vetted that all the unprovisionable labels match, so we can just
1237        # find all labels on the job that aren't on the host to get the list
1238        # of what we need to provision.  (See the scheduling logic in
1239        # host_scheduler.py:is_host_eligable_for_job() where we discard all
1240        # actionable labels when assigning jobs to hosts.)
1241        job_labels = {x.name for x in queue_entry.get_labels()}
1242        # Skip provision if `skip_provision` is listed in the job labels.
1243        if provision.SKIP_PROVISION in job_labels:
1244            return False
1245        _, host_labels = queue_entry.host.platform_and_labels()
1246        # If there are any labels on the job that are not on the host and they
1247        # are labels that provisioning knows how to change, then that means
1248        # there is provisioning work to do.  If there's no provisioning work to
1249        # do, then obviously we have no reason to schedule a provision task!
1250        diff = job_labels - set(host_labels)
1251        if any([provision.Provision.acts_on(x) for x in diff]):
1252            return True
1253        return False
1254
1255
1256    def _queue_special_task(self, queue_entry, task):
1257        """
1258        Create a special task and associate it with a host queue entry.
1259
1260        @param queue_entry: The queue entry this special task should be
1261                            associated with.
1262        @param task: One of the members of the enum models.SpecialTask.Task.
1263        @returns: None
1264
1265        """
1266        models.SpecialTask.objects.create(
1267                host=models.Host.objects.get(id=queue_entry.host_id),
1268                queue_entry=queue_entry, task=task)
1269
1270
1271    def schedule_pre_job_tasks(self, queue_entry):
1272        """
1273        Queue all of the special tasks that need to be run before a host
1274        queue entry may run.
1275
1276        If no special taskes need to be scheduled, then |on_pending| will be
1277        called directly.
1278
1279        @returns None
1280
1281        """
1282        task_queued = False
1283        hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1284
1285        if self._should_run_provision(queue_entry):
1286            self._queue_special_task(hqe_model,
1287                                     models.SpecialTask.Task.PROVISION)
1288            task_queued = True
1289        elif self._should_run_reset(queue_entry):
1290            self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
1291            task_queued = True
1292        else:
1293            if self._should_run_cleanup(queue_entry):
1294                self._queue_special_task(hqe_model,
1295                                         models.SpecialTask.Task.CLEANUP)
1296                task_queued = True
1297            if self._should_run_verify(queue_entry):
1298                self._queue_special_task(hqe_model,
1299                                         models.SpecialTask.Task.VERIFY)
1300                task_queued = True
1301
1302        if not task_queued:
1303            queue_entry.on_pending()
1304
1305
1306    def _assign_new_group(self, queue_entries):
1307        if len(queue_entries) == 1:
1308            group_subdir_name = queue_entries[0].host.hostname
1309        else:
1310            group_subdir_name = self._next_group_name()
1311            logging.info('Running synchronous job %d hosts %s as %s',
1312                self.id, [entry.host.hostname for entry in queue_entries],
1313                group_subdir_name)
1314
1315        for queue_entry in queue_entries:
1316            queue_entry.set_execution_subdir(group_subdir_name)
1317
1318
1319    def _choose_group_to_run(self, include_queue_entry):
1320        """
1321        @returns A tuple containing a list of HostQueueEntry instances to be
1322                used to run this Job, a string group name to suggest giving
1323                to this job in the results database.
1324        """
1325        chosen_entries = [include_queue_entry]
1326        num_entries_wanted = self.synch_count
1327        num_entries_wanted -= len(chosen_entries)
1328
1329        if num_entries_wanted > 0:
1330            where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1331            pending_entries = list(HostQueueEntry.fetch(
1332                     where=where_clause,
1333                     params=(self.id, include_queue_entry.id)))
1334
1335            # Sort the chosen hosts by hostname before slicing.
1336            def cmp_queue_entries_by_hostname(entry_a, entry_b):
1337                return Host.cmp_for_sort(entry_a.host, entry_b.host)
1338            pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1339            chosen_entries += pending_entries[:num_entries_wanted]
1340
1341        # Sanity check.  We'll only ever be called if this can be met.
1342        if len(chosen_entries) < self.synch_count:
1343            message = ('job %s got less than %s chosen entries: %s' % (
1344                    self.id, self.synch_count, chosen_entries))
1345            logging.error(message)
1346            email_manager.manager.enqueue_notify_email(
1347                    'Job not started, too few chosen entries', message)
1348            return []
1349
1350        self._assign_new_group(chosen_entries)
1351        return chosen_entries
1352
1353
1354    def run_if_ready(self, queue_entry):
1355        """
1356        Run this job by kicking its HQEs into status='Starting' if enough
1357        hosts are ready for it to run.
1358
1359        Cleans up by kicking HQEs into status='Stopped' if this Job is not
1360        ready to run.
1361        """
1362        if not self.is_ready():
1363            self.stop_if_necessary()
1364        else:
1365            self.run(queue_entry)
1366
1367
1368    def request_abort(self):
1369        """Request that this Job be aborted on the next scheduler cycle."""
1370        self.model().abort()
1371
1372
1373    def run(self, queue_entry):
1374        """
1375        @param queue_entry: The HostQueueEntry instance calling this method.
1376        """
1377        queue_entries = self._choose_group_to_run(queue_entry)
1378        if queue_entries:
1379            self._finish_run(queue_entries)
1380
1381
1382    def _finish_run(self, queue_entries):
1383        for queue_entry in queue_entries:
1384            queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1385
1386
1387    def __str__(self):
1388        return '%s-%s' % (self.id, self.owner)
1389