# pylint: disable=missing-docstring import contextlib from datetime import datetime from datetime import timedelta import logging import os import django.core import six try: from django.db import models as dbmodels, connection except django.core.exceptions.ImproperlyConfigured: raise ImportError('Django database not yet configured. Import either ' 'setup_django_environment or ' 'setup_django_lite_environment from ' 'autotest_lib.frontend before any imports that ' 'depend on django models.') from xml.sax import saxutils import common from autotest_lib.frontend.afe import model_logic, model_attributes from autotest_lib.frontend.afe import rdb_model_extensions from autotest_lib.frontend import settings, thread_local from autotest_lib.client.common_lib import autotest_enum, error, host_protections from autotest_lib.client.common_lib import global_config from autotest_lib.client.common_lib import host_queue_entry_states from autotest_lib.client.common_lib import control_data, priorities, decorators from autotest_lib.server import utils as server_utils # job options and user preferences DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 'SKYLAB', 'respect_static_labels', type=bool, default=False) RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 'SKYLAB', 'respect_static_attributes', type=bool, default=False) class AclAccessViolation(Exception): """\ Raised when an operation is attempted with proper permissions as dictated by ACLs. """ class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): """\ An atomic group defines a collection of hosts which must only be scheduled all at once. Any host with a label having an atomic group will only be scheduled for a job at the same time as other hosts sharing that label. Required: name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. max_number_of_machines: The maximum number of machines that will be scheduled at once when scheduling jobs to this atomic group. The job.synch_count is considered the minimum. Optional: description: Arbitrary text description of this group's purpose. """ name = dbmodels.CharField(max_length=255, unique=True) description = dbmodels.TextField(blank=True) # This magic value is the default to simplify the scheduler logic. # It must be "large". The common use of atomic groups is to want all # machines in the group to be used, limits on which subset used are # often chosen via dependency labels. # TODO(dennisjeffrey): Revisit this so we don't have to assume that # "infinity" is around 3.3 million. INFINITE_MACHINES = 333333333 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) invalid = dbmodels.BooleanField(default=False, editable=settings.FULL_ADMIN) name_field = 'name' objects = model_logic.ModelWithInvalidManager() valid_objects = model_logic.ValidObjectsManager() def enqueue_job(self, job, is_template=False): """Enqueue a job on an associated atomic group of hosts. @param job: A job to enqueue. @param is_template: Whether the status should be "Template". """ queue_entry = HostQueueEntry.create(atomic_group=self, job=job, is_template=is_template) queue_entry.save() def clean_object(self): self.label_set.clear() class Meta: """Metadata for class AtomicGroup.""" db_table = 'afe_atomic_groups' def __unicode__(self): return unicode(self.name) class Label(model_logic.ModelWithInvalid, dbmodels.Model): """\ Required: name: label name Optional: kernel_config: URL/path to kernel config for jobs run on this label. platform: If True, this is a platform label (defaults to False). only_if_needed: If True, a Host with this label can only be used if that label is requested by the job/test (either as the meta_host or in the job_dependencies). atomic_group: The atomic group associated with this label. """ name = dbmodels.CharField(max_length=255, unique=True) kernel_config = dbmodels.CharField(max_length=255, blank=True) platform = dbmodels.BooleanField(default=False) invalid = dbmodels.BooleanField(default=False, editable=settings.FULL_ADMIN) only_if_needed = dbmodels.BooleanField(default=False) name_field = 'name' objects = model_logic.ModelWithInvalidManager() valid_objects = model_logic.ValidObjectsManager() atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) def clean_object(self): self.host_set.clear() self.test_set.clear() def enqueue_job(self, job, is_template=False): """Enqueue a job on any host of this label. @param job: A job to enqueue. @param is_template: Whether the status should be "Template". """ queue_entry = HostQueueEntry.create(meta_host=self, job=job, is_template=is_template) queue_entry.save() class Meta: """Metadata for class Label.""" db_table = 'afe_labels' def __unicode__(self): return unicode(self.name) def is_replaced_by_static(self): """Detect whether a label is replaced by a static label. 'Static' means it can only be modified by skylab inventory tools. """ if RESPECT_STATIC_LABELS: replaced = ReplacedLabel.objects.filter(label__id=self.id) if len(replaced) > 0: return True return False class StaticLabel(model_logic.ModelWithInvalid, dbmodels.Model): """\ Required: name: label name Optional: kernel_config: URL/path to kernel config for jobs run on this label. platform: If True, this is a platform label (defaults to False). only_if_needed: Deprecated. This is always False. atomic_group: Deprecated. This is always NULL. """ name = dbmodels.CharField(max_length=255, unique=True) kernel_config = dbmodels.CharField(max_length=255, blank=True) platform = dbmodels.BooleanField(default=False) invalid = dbmodels.BooleanField(default=False, editable=settings.FULL_ADMIN) only_if_needed = dbmodels.BooleanField(default=False) name_field = 'name' objects = model_logic.ModelWithInvalidManager() valid_objects = model_logic.ValidObjectsManager() atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) def clean_object(self): self.host_set.clear() self.test_set.clear() class Meta: """Metadata for class StaticLabel.""" db_table = 'afe_static_labels' def __unicode__(self): return unicode(self.name) class ReplacedLabel(dbmodels.Model, model_logic.ModelExtensions): """The tag to indicate Whether to replace labels with static labels.""" label = dbmodels.ForeignKey(Label) objects = model_logic.ExtendedManager() class Meta: """Metadata for class ReplacedLabel.""" db_table = 'afe_replaced_labels' def __unicode__(self): return unicode(self.label) class Shard(dbmodels.Model, model_logic.ModelExtensions): hostname = dbmodels.CharField(max_length=255, unique=True) name_field = 'hostname' labels = dbmodels.ManyToManyField(Label, blank=True, db_table='afe_shards_labels') class Meta: """Metadata for class ParameterizedJob.""" db_table = 'afe_shards' class Drone(dbmodels.Model, model_logic.ModelExtensions): """ A scheduler drone hostname: the drone's hostname """ hostname = dbmodels.CharField(max_length=255, unique=True) name_field = 'hostname' objects = model_logic.ExtendedManager() def save(self, *args, **kwargs): if not User.current_user().is_superuser(): raise Exception('Only superusers may edit drones') super(Drone, self).save(*args, **kwargs) def delete(self): if not User.current_user().is_superuser(): raise Exception('Only superusers may delete drones') super(Drone, self).delete() class Meta: """Metadata for class Drone.""" db_table = 'afe_drones' def __unicode__(self): return unicode(self.hostname) class DroneSet(dbmodels.Model, model_logic.ModelExtensions): """ A set of scheduler drones These will be used by the scheduler to decide what drones a job is allowed to run on. name: the drone set's name drones: the drones that are part of the set """ DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 'SCHEDULER', 'default_drone_set_name', default=None) name = dbmodels.CharField(max_length=255, unique=True) drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') name_field = 'name' objects = model_logic.ExtendedManager() def save(self, *args, **kwargs): if not User.current_user().is_superuser(): raise Exception('Only superusers may edit drone sets') super(DroneSet, self).save(*args, **kwargs) def delete(self): if not User.current_user().is_superuser(): raise Exception('Only superusers may delete drone sets') super(DroneSet, self).delete() @classmethod def drone_sets_enabled(cls): """Returns whether drone sets are enabled. @param cls: Implicit class object. """ return cls.DRONE_SETS_ENABLED @classmethod def default_drone_set_name(cls): """Returns the default drone set name. @param cls: Implicit class object. """ return cls.DEFAULT_DRONE_SET_NAME @classmethod def get_default(cls): """Gets the default drone set name, compatible with Job.add_object. @param cls: Implicit class object. """ return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) @classmethod def resolve_name(cls, drone_set_name): """ Returns the name of one of these, if not None, in order of preference: 1) the drone set given, 2) the current user's default drone set, or 3) the global default drone set or returns None if drone sets are disabled @param cls: Implicit class object. @param drone_set_name: A drone set name. """ if not cls.drone_sets_enabled(): return None user = User.current_user() user_drone_set_name = user.drone_set and user.drone_set.name return drone_set_name or user_drone_set_name or cls.get_default().name def get_drone_hostnames(self): """ Gets the hostnames of all drones in this drone set """ return set(self.drones.all().values_list('hostname', flat=True)) class Meta: """Metadata for class DroneSet.""" db_table = 'afe_drone_sets' def __unicode__(self): return unicode(self.name) class User(dbmodels.Model, model_logic.ModelExtensions): """\ Required: login :user login name Optional: access_level: 0=User (default), 1=Admin, 100=Root """ ACCESS_ROOT = 100 ACCESS_ADMIN = 1 ACCESS_USER = 0 AUTOTEST_SYSTEM = 'autotest_system' login = dbmodels.CharField(max_length=255, unique=True) access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) # user preferences reboot_before = dbmodels.SmallIntegerField( choices=model_attributes.RebootBefore.choices(), blank=True, default=DEFAULT_REBOOT_BEFORE) reboot_after = dbmodels.SmallIntegerField( choices=model_attributes.RebootAfter.choices(), blank=True, default=DEFAULT_REBOOT_AFTER) drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) show_experimental = dbmodels.BooleanField(default=False) name_field = 'login' objects = model_logic.ExtendedManager() def save(self, *args, **kwargs): # is this a new object being saved for the first time? first_time = (self.id is None) user = thread_local.get_user() if user and not user.is_superuser() and user.login != self.login: raise AclAccessViolation("You cannot modify user " + self.login) super(User, self).save(*args, **kwargs) if first_time: everyone = AclGroup.objects.get(name='Everyone') everyone.users.add(self) def is_superuser(self): """Returns whether the user has superuser access.""" return self.access_level >= self.ACCESS_ROOT @classmethod def current_user(cls): """Returns the current user. @param cls: Implicit class object. """ user = thread_local.get_user() if user is None: user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) user.access_level = cls.ACCESS_ROOT user.save() return user @classmethod def get_record(cls, data): """Check the database for an identical record. Check for a record with matching id and login. If one exists, return it. If one does not exist there is a possibility that the following cases have happened: 1. Same id, different login We received: "1 chromeos-test" And we have: "1 debug-user" In this case we need to delete "1 debug_user" and insert "1 chromeos-test". 2. Same login, different id: We received: "1 chromeos-test" And we have: "2 chromeos-test" In this case we need to delete "2 chromeos-test" and insert "1 chromeos-test". As long as this method deletes bad records and raises the DoesNotExist exception the caller will handle creating the new record. @raises: DoesNotExist, if a record with the matching login and id does not exist. """ # Both the id and login should be uniqe but there are cases when # we might already have a user with the same login/id because # current_user will proactively create a user record if it doesn't # exist. Since we want to avoid conflict between the main and # shard, just delete any existing user records that don't match # what we're about to deserialize from the main. try: return cls.objects.get(login=data['login'], id=data['id']) except cls.DoesNotExist: cls.delete_matching_record(login=data['login']) cls.delete_matching_record(id=data['id']) raise class Meta: """Metadata for class User.""" db_table = 'afe_users' def __unicode__(self): return unicode(self.login) class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, model_logic.ModelWithAttributes): """\ Required: hostname optional: locked: if true, host is locked and will not be queued Internal: From AbstractHostModel: status: string describing status of host invalid: true if the host has been deleted protection: indicates what can be done to this host during repair lock_time: DateTime at which the host was locked dirty: true if the host has been used without being rebooted Local: locked_by: user that locked the host, or null if the host is unlocked """ SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 'hostattribute_set', 'labels', 'shard']) SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) def custom_deserialize_relation(self, link, data): assert link == 'shard', 'Link %s should not be deserialized' % link self.shard = Shard.deserialize(data) # Note: Only specify foreign keys here, specify host columns in # rdb_model_extensions instead. Protection = host_protections.Protection labels = dbmodels.ManyToManyField(Label, blank=True, db_table='afe_hosts_labels') static_labels = dbmodels.ManyToManyField( StaticLabel, blank=True, db_table='afe_static_hosts_labels') locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) name_field = 'hostname' objects = model_logic.ModelWithInvalidManager() valid_objects = model_logic.ValidObjectsManager() leased_objects = model_logic.LeasedHostManager() shard = dbmodels.ForeignKey(Shard, blank=True, null=True) def __init__(self, *args, **kwargs): super(Host, self).__init__(*args, **kwargs) self._record_attributes(['status']) @classmethod def classify_labels(cls, label_names): """Split labels to static & non-static. @label_names: a list of labels (string). @returns: a list of StaticLabel objects & a list of (non-static) Label objects. """ if not label_names: return [], [] labels = Label.objects.filter(name__in=label_names) if not RESPECT_STATIC_LABELS: return [], labels return cls.classify_label_objects(labels) @classmethod def classify_label_objects(cls, label_objects): if not RESPECT_STATIC_LABELS: return [], label_objects replaced_labels = ReplacedLabel.objects.filter(label__in=label_objects) replaced_ids = [l.label.id for l in replaced_labels] non_static_labels = [ l for l in label_objects if not l.id in replaced_ids] static_label_names = [ l.name for l in label_objects if l.id in replaced_ids] static_labels = StaticLabel.objects.filter(name__in=static_label_names) return static_labels, non_static_labels @classmethod def get_hosts_with_labels(cls, label_names, initial_query): """Get hosts by label filters. @param label_names: label (string) lists for fetching hosts. @param initial_query: a model_logic.QuerySet of Host object, e.g. Host.objects.all(), Host.valid_objects.all(). This initial_query cannot be a sliced QuerySet, e.g. Host.objects.all().filter(query_limit=10) """ if not label_names: return initial_query static_labels, non_static_labels = cls.classify_labels(label_names) if len(static_labels) + len(non_static_labels) != len(label_names): # Some labels don't exist in afe db, which means no hosts # should be matched. return set() for l in static_labels: initial_query = initial_query.filter(static_labels=l) for l in non_static_labels: initial_query = initial_query.filter(labels=l) return initial_query @classmethod def get_hosts_with_label_ids(cls, label_ids, initial_query): """Get hosts by label_id filters. @param label_ids: label id (int) lists for fetching hosts. @param initial_query: a list of Host object, e.g. [, , ...] """ labels = Label.objects.filter(id__in=label_ids) label_names = [l.name for l in labels] return cls.get_hosts_with_labels(label_names, initial_query) @staticmethod def create_one_time_host(hostname): """Creates a one-time host. @param hostname: The name for the host. """ query = Host.objects.filter(hostname=hostname) if query.count() == 0: host = Host(hostname=hostname, invalid=True) host.do_validate() else: host = query[0] if not host.invalid: raise model_logic.ValidationError({ 'hostname' : '%s already exists in the autotest DB. ' 'Select it rather than entering it as a one time ' 'host.' % hostname }) host.protection = host_protections.Protection.DO_NOT_REPAIR host.locked = False host.save() host.clean_object() return host @classmethod def _assign_to_shard_nothing_helper(cls): """Does nothing. This method is called in the middle of assign_to_shard, and does nothing. It exists to allow integration tests to simulate a race condition.""" @classmethod def assign_to_shard(cls, shard, known_ids): """Assigns hosts to a shard. For all labels that have been assigned to a shard, all hosts that have at least one of the shard's labels are assigned to the shard. Hosts that are assigned to the shard but aren't already present on the shard are returned. Any boards that are in |known_ids| but that do not belong to the shard are incorrect ids, which are also returned so that the shard can remove them locally. Board to shard mapping is many-to-one. Many different boards can be hosted in a shard. However, DUTs of a single board cannot be distributed into more than one shard. @param shard: The shard object to assign labels/hosts for. @param known_ids: List of all host-ids the shard already knows. This is used to figure out which hosts should be sent to the shard. If shard_ids were used instead, hosts would only be transferred once, even if the client failed persisting them. The number of hosts usually lies in O(100), so the overhead is acceptable. @returns a tuple of (hosts objects that should be sent to the shard, incorrect host ids that should not belong to] shard) """ # Disclaimer: concurrent heartbeats should theoretically not occur in # the current setup. As they may be introduced in the near future, # this comment will be left here. # Sending stuff twice is acceptable, but forgetting something isn't. # Detecting duplicates on the client is easy, but here it's harder. The # following options were considered: # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more # than select returned, as concurrently more hosts might have been # inserted # - UPDATE and then SELECT WHERE shard=shard: select always returns all # hosts for the shard, this is overhead # - SELECT and then UPDATE only selected without requerying afterwards: # returns the old state of the records. new_hosts = [] possible_new_host_ids = set(Host.objects.filter( labels__in=shard.labels.all(), leased=False ).exclude( id__in=known_ids, ).values_list('pk', flat=True)) # No-op in production, used to simulate race condition in tests. cls._assign_to_shard_nothing_helper() if possible_new_host_ids: Host.objects.filter( pk__in=possible_new_host_ids, labels__in=shard.labels.all(), leased=False ).update(shard=shard) new_hosts = list(Host.objects.filter( pk__in=possible_new_host_ids, shard=shard ).all()) invalid_host_ids = list(Host.objects.filter( id__in=known_ids ).exclude( shard=shard ).values_list('pk', flat=True)) return new_hosts, invalid_host_ids def resurrect_object(self, old_object): super(Host, self).resurrect_object(old_object) # invalid hosts can be in use by the scheduler (as one-time hosts), so # don't change the status self.status = old_object.status def clean_object(self): self.aclgroup_set.clear() self.labels.clear() self.static_labels.clear() def save(self, *args, **kwargs): # extra spaces in the hostname can be a sneaky source of errors self.hostname = self.hostname.strip() # is this a new object being saved for the first time? first_time = (self.id is None) if not first_time: AclGroup.check_for_acl_violation_hosts([self]) # If locked is changed, send its status and user made the change to # metaDB. Locks are important in host history because if a device is # locked then we don't really care what state it is in. if self.locked and not self.locked_by: self.locked_by = User.current_user() if not self.lock_time: self.lock_time = datetime.now() self.dirty = True elif not self.locked and self.locked_by: self.locked_by = None self.lock_time = None super(Host, self).save(*args, **kwargs) if first_time: everyone = AclGroup.objects.get(name='Everyone') everyone.hosts.add(self) # remove attributes that may have lingered from an old host and # should not be associated with a new host for host_attribute in self.hostattribute_set.all(): self.delete_attribute(host_attribute.attribute) self._check_for_updated_attributes() def delete(self): AclGroup.check_for_acl_violation_hosts([self]) logging.info('Preconditions for deleting host %s...', self.hostname) for queue_entry in self.hostqueueentry_set.all(): logging.info(' Deleting and aborting hqe %s...', queue_entry) queue_entry.deleted = True queue_entry.abort() logging.info(' ... done with hqe %s.', queue_entry) for host_attribute in self.hostattribute_set.all(): logging.info(' Deleting attribute %s...', host_attribute) self.delete_attribute(host_attribute.attribute) logging.info(' ... done with attribute %s.', host_attribute) logging.info('... preconditions done for host %s.', self.hostname) logging.info('Deleting host %s...', self.hostname) super(Host, self).delete() logging.info('... done.') def on_attribute_changed(self, attribute, old_value): assert attribute == 'status' logging.info('%s -> %s', self.hostname, self.status) def enqueue_job(self, job, is_template=False): """Enqueue a job on this host. @param job: A job to enqueue. @param is_template: Whther the status should be "Template". """ queue_entry = HostQueueEntry.create(host=self, job=job, is_template=is_template) # allow recovery of dead hosts from the frontend if not self.active_queue_entry() and self.is_dead(): self.status = Host.Status.READY self.save() queue_entry.save() block = IneligibleHostQueue(job=job, host=self) block.save() def platform(self): """The platform of the host.""" # TODO(showard): slighly hacky? platforms = self.labels.filter(platform=True) if len(platforms) == 0: return None return platforms[0] platform.short_description = 'Platform' @classmethod def check_no_platform(cls, hosts): """Verify the specified hosts have no associated platforms. @param cls: Implicit class object. @param hosts: The hosts to verify. @raises model_logic.ValidationError if any hosts already have a platform. """ Host.objects.populate_relationships(hosts, Label, 'label_list') Host.objects.populate_relationships(hosts, StaticLabel, 'staticlabel_list') errors = [] for host in hosts: platforms = [label.name for label in host.label_list if label.platform] if RESPECT_STATIC_LABELS: platforms += [label.name for label in host.staticlabel_list if label.platform] if platforms: # do a join, just in case this host has multiple platforms, # we'll be able to see it errors.append('Host %s already has a platform: %s' % ( host.hostname, ', '.join(platforms))) if errors: raise model_logic.ValidationError({'labels': '; '.join(errors)}) @classmethod def check_board_labels_allowed(cls, hosts, new_labels=[]): """Verify the specified hosts have valid board labels and the given new board labels can be added. @param cls: Implicit class object. @param hosts: The hosts to verify. @param new_labels: A list of labels to be added to the hosts. @raises model_logic.ValidationError if any host has invalid board labels or the given board labels cannot be added to the hsots. """ Host.objects.populate_relationships(hosts, Label, 'label_list') Host.objects.populate_relationships(hosts, StaticLabel, 'staticlabel_list') errors = [] for host in hosts: boards = [label.name for label in host.label_list if label.name.startswith('board:')] if RESPECT_STATIC_LABELS: boards += [label.name for label in host.staticlabel_list if label.name.startswith('board:')] new_boards = [name for name in new_labels if name.startswith('board:')] if len(boards) + len(new_boards) > 1: # do a join, just in case this host has multiple boards, # we'll be able to see it errors.append('Host %s already has board labels: %s' % ( host.hostname, ', '.join(boards))) if errors: raise model_logic.ValidationError({'labels': '; '.join(errors)}) def is_dead(self): """Returns whether the host is dead (has status repair failed).""" return self.status == Host.Status.REPAIR_FAILED def active_queue_entry(self): """Returns the active queue entry for this host, or None if none.""" active = list(self.hostqueueentry_set.filter(active=True)) if not active: return None assert len(active) == 1, ('More than one active entry for ' 'host ' + self.hostname) return active[0] def _get_attribute_model_and_args(self, attribute): return HostAttribute, dict(host=self, attribute=attribute) def _get_static_attribute_model_and_args(self, attribute): return StaticHostAttribute, dict(host=self, attribute=attribute) def _is_replaced_by_static_attribute(self, attribute): if RESPECT_STATIC_ATTRIBUTES: model, args = self._get_static_attribute_model_and_args(attribute) try: static_attr = model.objects.get(**args) return True except StaticHostAttribute.DoesNotExist: return False return False @classmethod def get_attribute_model(cls): """Return the attribute model. Override method in parent class. See ModelExtensions for details. @returns: The attribute model of Host. """ return HostAttribute class Meta: """Metadata for the Host class.""" db_table = 'afe_hosts' def __unicode__(self): return unicode(self.hostname) class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): """Arbitrary keyvals associated with hosts.""" SERIALIZATION_LINKS_TO_KEEP = set(['host']) SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) host = dbmodels.ForeignKey(Host) attribute = dbmodels.CharField(max_length=90) value = dbmodels.CharField(max_length=300) objects = model_logic.ExtendedManager() class Meta: """Metadata for the HostAttribute class.""" db_table = 'afe_host_attributes' @classmethod def get_record(cls, data): """Check the database for an identical record. Use host_id and attribute to search for a existing record. @raises: DoesNotExist, if no record found @raises: MultipleObjectsReturned if multiple records found. """ # TODO(fdeng): We should use host_id and attribute together as # a primary key in the db. return cls.objects.get(host_id=data['host_id'], attribute=data['attribute']) @classmethod def deserialize(cls, data): """Override deserialize in parent class. Do not deserialize id as id is not kept consistent on main and shards. @param data: A dictionary of data to deserialize. @returns: A HostAttribute object. """ if data: data.pop('id') return super(HostAttribute, cls).deserialize(data) class StaticHostAttribute(dbmodels.Model, model_logic.ModelExtensions): """Static arbitrary keyvals associated with hosts.""" SERIALIZATION_LINKS_TO_KEEP = set(['host']) SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) host = dbmodels.ForeignKey(Host) attribute = dbmodels.CharField(max_length=90) value = dbmodels.CharField(max_length=300) objects = model_logic.ExtendedManager() class Meta: """Metadata for the StaticHostAttribute class.""" db_table = 'afe_static_host_attributes' @classmethod def get_record(cls, data): """Check the database for an identical record. Use host_id and attribute to search for a existing record. @raises: DoesNotExist, if no record found @raises: MultipleObjectsReturned if multiple records found. """ return cls.objects.get(host_id=data['host_id'], attribute=data['attribute']) @classmethod def deserialize(cls, data): """Override deserialize in parent class. Do not deserialize id as id is not kept consistent on main and shards. @param data: A dictionary of data to deserialize. @returns: A StaticHostAttribute object. """ if data: data.pop('id') return super(StaticHostAttribute, cls).deserialize(data) class Test(dbmodels.Model, model_logic.ModelExtensions): """\ Required: author: author name description: description of the test name: test name time: short, medium, long test_class: This describes the class for your the test belongs in. test_category: This describes the category for your tests test_type: Client or Server path: path to pass to run_test() sync_count: is a number >=1 (1 being the default). If it's 1, then it's an async job. If it's >1 it's sync job for that number of machines i.e. if sync_count = 2 it is a sync job that requires two machines. Optional: dependencies: What the test requires to run. Comma deliminated list dependency_labels: many-to-many relationship with labels corresponding to test dependencies. experimental: If this is set to True production servers will ignore the test run_verify: Whether or not the scheduler should run the verify stage run_reset: Whether or not the scheduler should run the reset stage test_retry: Number of times to retry test if the test did not complete successfully. (optional, default: 0) """ TestTime = autotest_enum.AutotestEnum('SHORT', 'MEDIUM', 'LONG', start_value=1) name = dbmodels.CharField(max_length=255, unique=True) author = dbmodels.CharField(max_length=255) test_class = dbmodels.CharField(max_length=255) test_category = dbmodels.CharField(max_length=255) dependencies = dbmodels.CharField(max_length=255, blank=True) description = dbmodels.TextField(blank=True) experimental = dbmodels.BooleanField(default=True) run_verify = dbmodels.BooleanField(default=False) test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), default=TestTime.MEDIUM) test_type = dbmodels.SmallIntegerField( choices=control_data.CONTROL_TYPE.choices()) sync_count = dbmodels.IntegerField(default=1) path = dbmodels.CharField(max_length=255, unique=True) test_retry = dbmodels.IntegerField(blank=True, default=0) run_reset = dbmodels.BooleanField(default=True) dependency_labels = ( dbmodels.ManyToManyField(Label, blank=True, db_table='afe_autotests_dependency_labels')) name_field = 'name' objects = model_logic.ExtendedManager() def admin_description(self): """Returns a string representing the admin description.""" escaped_description = saxutils.escape(self.description) return '%s' % escaped_description admin_description.allow_tags = True admin_description.short_description = 'Description' class Meta: """Metadata for class Test.""" db_table = 'afe_autotests' def __unicode__(self): return unicode(self.name) class TestParameter(dbmodels.Model): """ A declared parameter of a test """ test = dbmodels.ForeignKey(Test) name = dbmodels.CharField(max_length=255) class Meta: """Metadata for class TestParameter.""" db_table = 'afe_test_parameters' unique_together = ('test', 'name') def __unicode__(self): return u'%s (%s)' % (self.name, self.test.name) class Profiler(dbmodels.Model, model_logic.ModelExtensions): """\ Required: name: profiler name test_type: Client or Server Optional: description: arbirary text description """ name = dbmodels.CharField(max_length=255, unique=True) description = dbmodels.TextField(blank=True) name_field = 'name' objects = model_logic.ExtendedManager() class Meta: """Metadata for class Profiler.""" db_table = 'afe_profilers' def __unicode__(self): return unicode(self.name) class AclGroup(dbmodels.Model, model_logic.ModelExtensions): """\ Required: name: name of ACL group Optional: description: arbitrary description of group """ SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) name = dbmodels.CharField(max_length=255, unique=True) description = dbmodels.CharField(max_length=255, blank=True) users = dbmodels.ManyToManyField(User, blank=False, db_table='afe_acl_groups_users') hosts = dbmodels.ManyToManyField(Host, blank=True, db_table='afe_acl_groups_hosts') name_field = 'name' objects = model_logic.ExtendedManager() @staticmethod def check_for_acl_violation_hosts(hosts): """Verify the current user has access to the specified hosts. @param hosts: The hosts to verify against. @raises AclAccessViolation if the current user doesn't have access to a host. """ user = User.current_user() if user.is_superuser(): return accessible_host_ids = set( host.id for host in Host.objects.filter(aclgroup__users=user)) for host in hosts: # Check if the user has access to this host, # but only if it is not a metahost or a one-time-host. no_access = (isinstance(host, Host) and not host.invalid and int(host.id) not in accessible_host_ids) if no_access: raise AclAccessViolation("%s does not have access to %s" % (str(user), str(host))) @staticmethod def check_abort_permissions(queue_entries): """Look for queue entries that aren't abortable by the current user. An entry is not abortable if: * the job isn't owned by this user, and * the machine isn't ACL-accessible, or * the machine is in the "Everyone" ACL @param queue_entries: The queue entries to check. @raises AclAccessViolation if a queue entry is not abortable by the current user. """ user = User.current_user() if user.is_superuser(): return not_owned = queue_entries.exclude(job__owner=user.login) # I do this using ID sets instead of just Django filters because # filtering on M2M dbmodels is broken in Django 0.96. It's better in # 1.0. # TODO: Use Django filters, now that we're using 1.0. accessible_ids = set( entry.id for entry in not_owned.filter(host__aclgroup__users__login=user.login)) public_ids = set(entry.id for entry in not_owned.filter(host__aclgroup__name='Everyone')) cannot_abort = [entry for entry in not_owned.select_related() if entry.id not in accessible_ids or entry.id in public_ids] if len(cannot_abort) == 0: return entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, entry.host_or_metahost_name()) for entry in cannot_abort) raise AclAccessViolation('You cannot abort the following job entries: ' + entry_names) def check_for_acl_violation_acl_group(self): """Verifies the current user has acces to this ACL group. @raises AclAccessViolation if the current user doesn't have access to this ACL group. """ user = User.current_user() if user.is_superuser(): return if self.name == 'Everyone': raise AclAccessViolation("You cannot modify 'Everyone'!") if not user in self.users.all(): raise AclAccessViolation("You do not have access to %s" % self.name) @staticmethod def on_host_membership_change(): """Invoked when host membership changes.""" everyone = AclGroup.objects.get(name='Everyone') # find hosts that aren't in any ACL group and add them to Everyone # TODO(showard): this is a bit of a hack, since the fact that this query # works is kind of a coincidence of Django internals. This trick # doesn't work in general (on all foreign key relationships). I'll # replace it with a better technique when the need arises. orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) everyone.hosts.add(*orphaned_hosts.distinct()) # find hosts in both Everyone and another ACL group, and remove them # from Everyone hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') acled_hosts = set() for host in hosts_in_everyone: # Has an ACL group other than Everyone if host.aclgroup_set.count() > 1: acled_hosts.add(host) everyone.hosts.remove(*acled_hosts) def delete(self): if (self.name == 'Everyone'): raise AclAccessViolation("You cannot delete 'Everyone'!") self.check_for_acl_violation_acl_group() super(AclGroup, self).delete() self.on_host_membership_change() def add_current_user_if_empty(self): """Adds the current user if the set of users is empty.""" if not self.users.count(): self.users.add(User.current_user()) def perform_after_save(self, change): """Called after a save. @param change: Whether there was a change. """ if not change: self.users.add(User.current_user()) self.add_current_user_if_empty() self.on_host_membership_change() def save(self, *args, **kwargs): change = bool(self.id) if change: # Check the original object for an ACL violation AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() super(AclGroup, self).save(*args, **kwargs) self.perform_after_save(change) class Meta: """Metadata for class AclGroup.""" db_table = 'afe_acl_groups' def __unicode__(self): return unicode(self.name) class ParameterizedJob(dbmodels.Model): """ Auxiliary configuration for a parameterized job. This class is obsolete, and ought to be dead. Due to a series of unfortunate events, it can't be deleted: * In `class Job` we're required to keep a reference to this class for the sake of the scheduler unit tests. * The existence of the reference in `Job` means that certain methods here will get called from the `get_jobs` RPC. So, the definitions below seem to be the minimum stub we can support unless/until we change the database schema. """ @classmethod def smart_get(cls, id_or_name, *args, **kwargs): """For compatibility with Job.add_object. @param cls: Implicit class object. @param id_or_name: The ID or name to get. @param args: Non-keyword arguments. @param kwargs: Keyword arguments. """ return cls.objects.get(pk=id_or_name) def job(self): """Returns the job if it exists, or else None.""" jobs = self.job_set.all() assert jobs.count() <= 1 return jobs and jobs[0] or None class Meta: """Metadata for class ParameterizedJob.""" db_table = 'afe_parameterized_jobs' def __unicode__(self): return u'%s (parameterized) - %s' % (self.test.name, self.job()) class JobManager(model_logic.ExtendedManager): 'Custom manager to provide efficient status counts querying.' def get_status_counts(self, job_ids): """Returns a dict mapping the given job IDs to their status count dicts. @param job_ids: A list of job IDs. """ if not job_ids: return {} id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) cursor = connection.cursor() cursor.execute(""" SELECT job_id, status, aborted, complete, COUNT(*) FROM afe_host_queue_entries WHERE job_id IN %s GROUP BY job_id, status, aborted, complete """ % id_list) all_job_counts = dict((job_id, {}) for job_id in job_ids) for job_id, status, aborted, complete, count in cursor.fetchall(): job_dict = all_job_counts[job_id] full_status = HostQueueEntry.compute_full_status(status, aborted, complete) job_dict.setdefault(full_status, 0) job_dict[full_status] += count return all_job_counts class Job(dbmodels.Model, model_logic.ModelExtensions): """\ owner: username of job owner name: job name (does not have to be unique) priority: Integer priority value. Higher is more important. control_file: contents of control file control_type: Client or Server created_on: date of job creation submitted_on: date of job submission synch_count: how many hosts should be used per autoserv execution run_verify: Whether or not to run the verify phase run_reset: Whether or not to run the reset phase timeout: DEPRECATED - hours from queuing time until job times out timeout_mins: minutes from job queuing time until the job times out max_runtime_hrs: DEPRECATED - hours from job starting time until job times out max_runtime_mins: minutes from job starting time until job times out email_list: list of people to email on completion delimited by any of: white space, ',', ':', ';' dependency_labels: many-to-many relationship with labels corresponding to job dependencies reboot_before: Never, If dirty, or Always reboot_after: Never, If all tests passed, or Always parse_failed_repair: if True, a failed repair launched by this job will have its results parsed as part of the job. drone_set: The set of drones to run this job on parent_job: Parent job (optional) test_retry: Number of times to retry test if the test did not complete successfully. (optional, default: 0) require_ssp: Require server-side packaging unless require_ssp is set to False. (optional, default: None) """ # TODO: Investigate, if jobkeyval_set is really needed. # dynamic_suite will write them into an attached file for the drone, but # it doesn't seem like they are actually used. If they aren't used, remove # jobkeyval_set here. SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 'hostqueueentry_set', 'jobkeyval_set', 'shard']) EXCLUDE_KNOWN_JOBS_CLAUSE = ''' AND NOT (afe_host_queue_entries.aborted = 0 AND afe_jobs.id IN (%(known_ids)s)) ''' EXCLUDE_OLD_JOBS_CLAUSE = 'AND (afe_jobs.created_on > "%(cutoff)s")' SQL_SHARD_JOBS = ''' SELECT DISTINCT(afe_jobs.id) FROM afe_jobs INNER JOIN afe_host_queue_entries ON (afe_jobs.id = afe_host_queue_entries.job_id) LEFT OUTER JOIN afe_jobs_dependency_labels ON (afe_jobs.id = afe_jobs_dependency_labels.job_id) JOIN afe_shards_labels ON (afe_shards_labels.label_id = afe_jobs_dependency_labels.label_id OR afe_shards_labels.label_id = afe_host_queue_entries.meta_host) WHERE (afe_shards_labels.shard_id = %(shard_id)s AND afe_host_queue_entries.complete != 1 AND afe_host_queue_entries.active != 1 %(exclude_known_jobs)s %(exclude_old_jobs)s) ''' # Jobs can be created with assigned hosts and have no dependency # labels nor meta_host. # We are looking for: # - a job whose hqe's meta_host is null # - a job whose hqe has a host # - one of the host's labels matches the shard's label. # Non-aborted known jobs, completed jobs, active jobs, jobs # without hqe are exluded as we do with SQL_SHARD_JOBS. SQL_SHARD_JOBS_WITH_HOSTS = ''' SELECT DISTINCT(afe_jobs.id) FROM afe_jobs INNER JOIN afe_host_queue_entries ON (afe_jobs.id = afe_host_queue_entries.job_id) LEFT OUTER JOIN %(host_label_table)s ON (afe_host_queue_entries.host_id = %(host_label_table)s.host_id) WHERE (%(host_label_table)s.%(host_label_column)s IN %(label_ids)s AND afe_host_queue_entries.complete != 1 AND afe_host_queue_entries.active != 1 AND afe_host_queue_entries.meta_host IS NULL AND afe_host_queue_entries.host_id IS NOT NULL %(exclude_known_jobs)s %(exclude_old_jobs)s) ''' # Even if we had filters about complete, active and aborted # bits in the above two SQLs, there is a chance that # the result may still contain a job with an hqe with 'complete=1' # or 'active=1'.' # This happens when a job has two (or more) hqes and at least # one hqe has different bits than others. # We use a second sql to ensure we exclude all un-desired jobs. SQL_JOBS_TO_EXCLUDE = ''' SELECT afe_jobs.id FROM afe_jobs INNER JOIN afe_host_queue_entries ON (afe_jobs.id = afe_host_queue_entries.job_id) WHERE (afe_jobs.id in (%(candidates)s) AND (afe_host_queue_entries.complete=1 OR afe_host_queue_entries.active=1)) ''' def _deserialize_relation(self, link, data): if link in ['hostqueueentry_set', 'jobkeyval_set']: for obj in data: obj['job_id'] = self.id super(Job, self)._deserialize_relation(link, data) def custom_deserialize_relation(self, link, data): assert link == 'shard', 'Link %s should not be deserialized' % link self.shard = Shard.deserialize(data) def _check_update_from_shard(self, shard, updated_serialized): # If the job got aborted on the main after the client fetched it # no shard_id will be set. The shard might still push updates though, # as the job might complete before the abort bit syncs to the shard. # Alternative considered: The main scheduler could be changed to not # set aborted jobs to completed that are sharded out. But that would # require database queries and seemed more complicated to implement. # This seems safe to do, as there won't be updates pushed from the wrong # shards should be powered off and wiped hen they are removed from the # main. if self.shard_id and self.shard_id != shard.id: raise error.IgnorableUnallowedRecordsSentToMain( 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 'from shard %s.' % (self.id, self.shard_id, updated_serialized, shard.id)) RebootBefore = model_attributes.RebootBefore RebootAfter = model_attributes.RebootAfter # TIMEOUT is deprecated. DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 'AUTOTEST_WEB', 'job_timeout_default', default=24) DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is # completed. DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, default=False) FETCH_READONLY_JOBS = global_config.global_config.get_config_value( 'AUTOTEST_WEB','readonly_heartbeat', type=bool, default=False) # TODO(ayatane): Deprecated, not removed due to difficulty untangling imports SKIP_JOBS_CREATED_BEFORE = 0 owner = dbmodels.CharField(max_length=255) name = dbmodels.CharField(max_length=255) priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) control_file = dbmodels.TextField(null=True, blank=True) control_type = dbmodels.SmallIntegerField( choices=control_data.CONTROL_TYPE.choices(), blank=True, # to allow 0 default=control_data.CONTROL_TYPE.CLIENT) created_on = dbmodels.DateTimeField() synch_count = dbmodels.IntegerField(blank=True, default=0) timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) run_verify = dbmodels.BooleanField(default=False) email_list = dbmodels.CharField(max_length=250, blank=True) dependency_labels = ( dbmodels.ManyToManyField(Label, blank=True, db_table='afe_jobs_dependency_labels')) reboot_before = dbmodels.SmallIntegerField( choices=model_attributes.RebootBefore.choices(), blank=True, default=DEFAULT_REBOOT_BEFORE) reboot_after = dbmodels.SmallIntegerField( choices=model_attributes.RebootAfter.choices(), blank=True, default=DEFAULT_REBOOT_AFTER) parse_failed_repair = dbmodels.BooleanField( default=DEFAULT_PARSE_FAILED_REPAIR) # max_runtime_hrs is deprecated. Will be removed after switch to mins is # completed. max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) # TODO(jrbarnette) We have to keep `parameterized_job` around or it # breaks the scheduler_models unit tests (and fixing the unit tests # will break the scheduler, so don't do that). # # The ultimate fix is to delete the column from the database table # at which point, you _must_ delete this. Until you're ready to do # that, DON'T MUCK WITH IT. parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, blank=True) parent_job = dbmodels.ForeignKey('self', blank=True, null=True) test_retry = dbmodels.IntegerField(blank=True, default=0) run_reset = dbmodels.BooleanField(default=True) timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) # If this is None on the main, a shard should be found. # If this is None on a shard, it should be synced back to the main shard = dbmodels.ForeignKey(Shard, blank=True, null=True) # If this is None, server-side packaging will be used for server side test. require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) # custom manager objects = JobManager() @decorators.cached_property def labels(self): """All the labels of this job""" # We need to convert dependency_labels to a list, because all() gives us # back an iterator, and storing/caching an iterator means we'd only be # able to read from it once. return list(self.dependency_labels.all()) def is_server_job(self): """Returns whether this job is of type server.""" return self.control_type == control_data.CONTROL_TYPE.SERVER @classmethod def create(cls, owner, options, hosts): """Creates a job. The job is created by taking some information (the listed args) and filling in the rest of the necessary information. @param cls: Implicit class object. @param owner: The owner for the job. @param options: An options object. @param hosts: The hosts to use. """ AclGroup.check_for_acl_violation_hosts(hosts) control_file = options.get('control_file') user = User.current_user() if options.get('reboot_before') is None: options['reboot_before'] = user.get_reboot_before_display() if options.get('reboot_after') is None: options['reboot_after'] = user.get_reboot_after_display() drone_set = DroneSet.resolve_name(options.get('drone_set')) if options.get('timeout_mins') is None and options.get('timeout'): options['timeout_mins'] = options['timeout'] * 60 job = cls.add_object( owner=owner, name=options['name'], priority=options['priority'], control_file=control_file, control_type=options['control_type'], synch_count=options.get('synch_count'), # timeout needs to be deleted in the future. timeout=options.get('timeout'), timeout_mins=options.get('timeout_mins'), max_runtime_mins=options.get('max_runtime_mins'), run_verify=options.get('run_verify'), email_list=options.get('email_list'), reboot_before=options.get('reboot_before'), reboot_after=options.get('reboot_after'), parse_failed_repair=options.get('parse_failed_repair'), created_on=datetime.now(), drone_set=drone_set, parent_job=options.get('parent_job_id'), test_retry=options.get('test_retry'), run_reset=options.get('run_reset'), require_ssp=options.get('require_ssp')) job.dependency_labels = options['dependencies'] if options.get('keyvals'): for key, value in six.iteritems(options['keyvals']): # None (or NULL) is not acceptable by DB, so change it to an # empty string in case. JobKeyval.objects.create(job=job, key=key, value='' if value is None else value) return job @classmethod def assign_to_shard(cls, shard, known_ids): """Assigns unassigned jobs to a shard. For all labels that have been assigned to this shard, all jobs that have this label are assigned to this shard. @param shard: The shard to assign jobs to. @param known_ids: List of all ids of incomplete jobs the shard already knows about. @returns The job objects that should be sent to the shard. """ with cls._readonly_job_query_context(): job_ids = cls._get_new_jobs_for_shard(shard, known_ids) if not job_ids: return [] cls._assign_jobs_to_shard(job_ids, shard) return cls._jobs_with_ids(job_ids) @classmethod @contextlib.contextmanager def _readonly_job_query_context(cls): #TODO: Get rid of this kludge if/when we update Django to >=1.7 #correct usage would be .raw(..., using='readonly') old_db = Job.objects._db try: if cls.FETCH_READONLY_JOBS: Job.objects._db = 'readonly' yield finally: Job.objects._db = old_db @classmethod def _assign_jobs_to_shard(cls, job_ids, shard): Job.objects.filter(pk__in=job_ids).update(shard=shard) @classmethod def _jobs_with_ids(cls, job_ids): return list(Job.objects.filter(pk__in=job_ids).all()) @classmethod def _get_new_jobs_for_shard(cls, shard, known_ids): job_ids = cls._get_jobs_without_hosts(shard, known_ids) job_ids |= cls._get_jobs_with_hosts(shard, known_ids) if job_ids: job_ids -= cls._filter_finished_jobs(job_ids) return job_ids @classmethod def _filter_finished_jobs(cls, job_ids): query = Job.objects.raw( cls.SQL_JOBS_TO_EXCLUDE % {'candidates': ','.join([str(i) for i in job_ids])}) return set([j.id for j in query]) @classmethod def _get_jobs_without_hosts(cls, shard, known_ids): raw_sql = cls.SQL_SHARD_JOBS % { 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 'shard_id': shard.id } return set([j.id for j in Job.objects.raw(raw_sql)]) @classmethod def _get_jobs_with_hosts(cls, shard, known_ids): job_ids = set([]) static_labels, non_static_labels = Host.classify_label_objects( shard.labels.all()) if static_labels: label_ids = [str(l.id) for l in static_labels] query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 'host_label_table': 'afe_static_hosts_labels', 'host_label_column': 'staticlabel_id', 'label_ids': '(%s)' % ','.join(label_ids)}) job_ids |= set([j.id for j in query]) if non_static_labels: label_ids = [str(l.id) for l in non_static_labels] query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 'host_label_table': 'afe_hosts_labels', 'host_label_column': 'label_id', 'label_ids': '(%s)' % ','.join(label_ids)}) job_ids |= set([j.id for j in query]) return job_ids @classmethod def _exclude_known_jobs_clause(cls, known_ids): if not known_ids: return '' return (cls.EXCLUDE_KNOWN_JOBS_CLAUSE % {'known_ids': ','.join([str(i) for i in known_ids])}) @classmethod def _exclude_old_jobs_clause(cls): """Filter queried jobs to be created within a few hours in the past. With this clause, any jobs older than a configurable number of hours are skipped in the jobs query. The job creation window affects the overall query performance. Longer creation windows require a range query over more Job table rows using the created_on column index. c.f. http://crbug.com/966872#c35 """ if cls.SKIP_JOBS_CREATED_BEFORE <= 0: return '' cutoff = datetime.now()- timedelta(hours=cls.SKIP_JOBS_CREATED_BEFORE) return (cls.EXCLUDE_OLD_JOBS_CLAUSE % {'cutoff': cutoff.strftime('%Y-%m-%d %H:%M:%S')}) def queue(self, hosts, is_template=False): """Enqueue a job on the given hosts. @param hosts: The hosts to use. @param is_template: Whether the status should be "Template". """ if not hosts: # hostless job entry = HostQueueEntry.create(job=self, is_template=is_template) entry.save() return for host in hosts: host.enqueue_job(self, is_template=is_template) def user(self): """Gets the user of this job, or None if it doesn't exist.""" try: return User.objects.get(login=self.owner) except self.DoesNotExist: return None def abort(self): """Aborts this job.""" for queue_entry in self.hostqueueentry_set.all(): queue_entry.abort() def tag(self): """Returns a string tag for this job.""" return server_utils.get_job_tag(self.id, self.owner) def keyval_dict(self): """Returns all keyvals for this job as a dictionary.""" return dict((keyval.key, keyval.value) for keyval in self.jobkeyval_set.all()) @classmethod def get_attribute_model(cls): """Return the attribute model. Override method in parent class. This class is called when deserializing the one-to-many relationship betwen Job and JobKeyval. On deserialization, we will try to clear any existing job keyvals associated with a job to avoid any inconsistency. Though Job doesn't implement ModelWithAttribute, we still treat it as an attribute model for this purpose. @returns: The attribute model of Job. """ return JobKeyval class Meta: """Metadata for class Job.""" db_table = 'afe_jobs' def __unicode__(self): return u'%s (%s-%s)' % (self.name, self.id, self.owner) class JobHandoff(dbmodels.Model, model_logic.ModelExtensions): """Jobs that have been handed off to lucifer.""" job = dbmodels.OneToOneField(Job, on_delete=dbmodels.CASCADE, primary_key=True) created = dbmodels.DateTimeField(auto_now_add=True) completed = dbmodels.BooleanField(default=False) drone = dbmodels.CharField( max_length=128, null=True, help_text=''' The hostname of the drone the job is running on and whose job_aborter should be responsible for aborting the job if the job process dies. NULL means any drone's job_aborter has free reign to abort the job. ''') class Meta: """Metadata for class Job.""" db_table = 'afe_job_handoffs' class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): """Keyvals associated with jobs""" SERIALIZATION_LINKS_TO_KEEP = set(['job']) SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) job = dbmodels.ForeignKey(Job) key = dbmodels.CharField(max_length=90) value = dbmodels.CharField(max_length=300) objects = model_logic.ExtendedManager() @classmethod def get_record(cls, data): """Check the database for an identical record. Use job_id and key to search for a existing record. @raises: DoesNotExist, if no record found @raises: MultipleObjectsReturned if multiple records found. """ # TODO(fdeng): We should use job_id and key together as # a primary key in the db. return cls.objects.get(job_id=data['job_id'], key=data['key']) @classmethod def deserialize(cls, data): """Override deserialize in parent class. Do not deserialize id as id is not kept consistent on main and shards. @param data: A dictionary of data to deserialize. @returns: A JobKeyval object. """ if data: data.pop('id') return super(JobKeyval, cls).deserialize(data) class Meta: """Metadata for class JobKeyval.""" db_table = 'afe_job_keyvals' class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): """Represents an ineligible host queue.""" job = dbmodels.ForeignKey(Job) host = dbmodels.ForeignKey(Host) objects = model_logic.ExtendedManager() class Meta: """Metadata for class IneligibleHostQueue.""" db_table = 'afe_ineligible_host_queues' class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): """Represents a host queue entry.""" SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) SERIALIZATION_LINKS_TO_KEEP = set(['host']) SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) def custom_deserialize_relation(self, link, data): assert link == 'meta_host' self.meta_host = Label.deserialize(data) def _check_update_from_shard(self, shard, updated_serialized, job_ids_sent): if self.job_id not in job_ids_sent: raise error.IgnorableUnallowedRecordsSentToMain( 'Sent HostQueueEntry without corresponding ' 'job entry: %s' % updated_serialized) Status = host_queue_entry_states.Status ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES job = dbmodels.ForeignKey(Job) host = dbmodels.ForeignKey(Host, blank=True, null=True) status = dbmodels.CharField(max_length=255) meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, db_column='meta_host') active = dbmodels.BooleanField(default=False) complete = dbmodels.BooleanField(default=False) deleted = dbmodels.BooleanField(default=False) execution_subdir = dbmodels.CharField(max_length=255, blank=True, default='') # If atomic_group is set, this is a virtual HostQueueEntry that will # be expanded into many actual hosts within the group at schedule time. atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) aborted = dbmodels.BooleanField(default=False) started_on = dbmodels.DateTimeField(null=True, blank=True) finished_on = dbmodels.DateTimeField(null=True, blank=True) objects = model_logic.ExtendedManager() def __init__(self, *args, **kwargs): super(HostQueueEntry, self).__init__(*args, **kwargs) self._record_attributes(['status']) @classmethod def create(cls, job, host=None, meta_host=None, is_template=False): """Creates a new host queue entry. @param cls: Implicit class object. @param job: The associated job. @param host: The associated host. @param meta_host: The associated meta host. @param is_template: Whether the status should be "Template". """ if is_template: status = cls.Status.TEMPLATE else: status = cls.Status.QUEUED return cls(job=job, host=host, meta_host=meta_host, status=status) def save(self, *args, **kwargs): self._set_active_and_complete() super(HostQueueEntry, self).save(*args, **kwargs) self._check_for_updated_attributes() def execution_path(self): """ Path to this entry's results (relative to the base results directory). """ return server_utils.get_hqe_exec_path(self.job.tag(), self.execution_subdir) def host_or_metahost_name(self): """Returns the first non-None name found in priority order. The priority order checked is: (1) host name; (2) meta host name """ if self.host: return self.host.hostname else: assert self.meta_host return self.meta_host.name def _set_active_and_complete(self): if self.status in self.ACTIVE_STATUSES: self.active, self.complete = True, False elif self.status in self.COMPLETE_STATUSES: self.active, self.complete = False, True else: self.active, self.complete = False, False def on_attribute_changed(self, attribute, old_value): assert attribute == 'status' logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, self.status) def is_meta_host_entry(self): 'True if this is a entry has a meta_host instead of a host.' return self.host is None and self.meta_host is not None # This code is shared between rpc_interface and models.HostQueueEntry. # Sadly due to circular imports between the 2 (crbug.com/230100) making it # a class method was the best way to refactor it. Attempting to put it in # rpc_utils or a new utils module failed as that would require us to import # models.py but to call it from here we would have to import the utils.py # thus creating a cycle. @classmethod def abort_host_queue_entries(cls, host_queue_entries): """Aborts a collection of host_queue_entries. Abort these host queue entry and all host queue entries of jobs created by them. @param host_queue_entries: List of host queue entries we want to abort. """ # This isn't completely immune to race conditions since it's not atomic, # but it should be safe given the scheduler's behavior. # TODO(milleral): crbug.com/230100 # The |abort_host_queue_entries| rpc does nearly exactly this, # however, trying to re-use the code generates some horrible # circular import error. I'd be nice to refactor things around # sometime so the code could be reused. # Fixpoint algorithm to find the whole tree of HQEs to abort to # minimize the total number of database queries: children = set() new_children = set(host_queue_entries) while new_children: children.update(new_children) new_child_ids = [hqe.job_id for hqe in new_children] new_children = HostQueueEntry.objects.filter( job__parent_job__in=new_child_ids, complete=False, aborted=False).all() # To handle circular parental relationships new_children = set(new_children) - children # Associate a user with the host queue entries that we're about # to abort so that we can look up who to blame for the aborts. child_ids = [hqe.id for hqe in children] # Get a list of hqe ids that already exists, so we can exclude them when # we do bulk_create later to avoid IntegrityError. existing_hqe_ids = set(AbortedHostQueueEntry.objects. filter(queue_entry_id__in=child_ids). values_list('queue_entry_id', flat=True)) now = datetime.now() user = User.current_user() aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, aborted_by=user, aborted_on=now) for hqe in children if hqe.id not in existing_hqe_ids] AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) # Bulk update all of the HQEs to set the abort bit. HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) def abort(self): """ Aborts this host queue entry. Abort this host queue entry and all host queue entries of jobs created by this one. """ if not self.complete and not self.aborted: HostQueueEntry.abort_host_queue_entries([self]) @classmethod def compute_full_status(cls, status, aborted, complete): """Returns a modified status msg if the host queue entry was aborted. @param cls: Implicit class object. @param status: The original status message. @param aborted: Whether the host queue entry was aborted. @param complete: Whether the host queue entry was completed. """ if aborted and not complete: return 'Aborted (%s)' % status return status def full_status(self): """Returns the full status of this host queue entry, as a string.""" return self.compute_full_status(self.status, self.aborted, self.complete) def _postprocess_object_dict(self, object_dict): object_dict['full_status'] = self.full_status() class Meta: """Metadata for class HostQueueEntry.""" db_table = 'afe_host_queue_entries' def __unicode__(self): hostname = None if self.host: hostname = self.host.hostname return u"%s/%d (%d)" % (hostname, self.job.id, self.id) class HostQueueEntryStartTimes(dbmodels.Model): """An auxilary table to HostQueueEntry to index by start time.""" insert_time = dbmodels.DateTimeField() highest_hqe_id = dbmodels.IntegerField() class Meta: """Metadata for class HostQueueEntryStartTimes.""" db_table = 'afe_host_queue_entry_start_times' class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): """Represents an aborted host queue entry.""" queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) aborted_by = dbmodels.ForeignKey(User) aborted_on = dbmodels.DateTimeField() objects = model_logic.ExtendedManager() def save(self, *args, **kwargs): self.aborted_on = datetime.now() super(AbortedHostQueueEntry, self).save(*args, **kwargs) class Meta: """Metadata for class AbortedHostQueueEntry.""" db_table = 'afe_aborted_host_queue_entries' class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): """\ Tasks to run on hosts at the next time they are in the Ready state. Use this for high-priority tasks, such as forced repair or forced reinstall. host: host to run this task on task: special task to run time_requested: date and time the request for this task was made is_active: task is currently running is_complete: task has finished running is_aborted: task was aborted time_started: date and time the task started time_finished: date and time the task finished queue_entry: Host queue entry waiting on this task (or None, if task was not started in preparation of a job) """ Task = autotest_enum.AutotestEnum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', string_values=True) host = dbmodels.ForeignKey(Host, blank=False, null=False) task = dbmodels.CharField(max_length=64, choices=Task.choices(), blank=False, null=False) requested_by = dbmodels.ForeignKey(User) time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, null=False) is_active = dbmodels.BooleanField(default=False, blank=False, null=False) is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) time_started = dbmodels.DateTimeField(null=True, blank=True) queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) success = dbmodels.BooleanField(default=False, blank=False, null=False) time_finished = dbmodels.DateTimeField(null=True, blank=True) objects = model_logic.ExtendedManager() def save(self, **kwargs): if self.queue_entry: self.requested_by = User.objects.get( login=self.queue_entry.job.owner) super(SpecialTask, self).save(**kwargs) def execution_path(self): """Returns the execution path for a special task.""" return server_utils.get_special_task_exec_path( self.host.hostname, self.id, self.task, self.time_requested) # property to emulate HostQueueEntry.status @property def status(self): """Returns a host queue entry status appropriate for a speical task.""" return server_utils.get_special_task_status( self.is_complete, self.success, self.is_active) # property to emulate HostQueueEntry.started_on @property def started_on(self): """Returns the time at which this special task started.""" return self.time_started @classmethod def schedule_special_task(cls, host, task): """Schedules a special task on a host if not already scheduled. @param cls: Implicit class object. @param host: The host to use. @param task: The task to schedule. """ existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, is_active=False, is_complete=False) if existing_tasks: return existing_tasks[0] special_task = SpecialTask(host=host, task=task, requested_by=User.current_user()) special_task.save() return special_task def abort(self): """ Abort this special task.""" self.is_aborted = True self.save() def activate(self): """ Sets a task as active and sets the time started to the current time. """ logging.info('Starting: %s', self) self.is_active = True self.time_started = datetime.now() self.save() def finish(self, success): """Sets a task as completed. @param success: Whether or not the task was successful. """ logging.info('Finished: %s', self) self.is_active = False self.is_complete = True self.success = success if self.time_started: self.time_finished = datetime.now() self.save() class Meta: """Metadata for class SpecialTask.""" db_table = 'afe_special_tasks' def __unicode__(self): result = u'Special Task %s (host %s, task %s, time %s)' % ( self.id, self.host, self.task, self.time_requested) if self.is_complete: result += u' (completed)' elif self.is_active: result += u' (active)' return result class StableVersion(dbmodels.Model, model_logic.ModelExtensions): board = dbmodels.CharField(max_length=255, unique=True) version = dbmodels.CharField(max_length=255) class Meta: """Metadata for class StableVersion.""" db_table = 'afe_stable_versions' def save(self, *args, **kwargs): if os.getenv("OVERRIDE_STABLE_VERSION_BAN"): super(StableVersion, self).save(*args, **kwargs) else: raise RuntimeError("the ability to save StableVersions has been intentionally removed") # pylint:disable=undefined-variable def delete(self): if os.getenv("OVERRIDE_STABLE_VERSION_BAN"): super(StableVersion, self).delete(*args, **kwargs) else: raise RuntimeError("the ability to delete StableVersions has been intentionally removed")