from django.db import models as dbmodels, connection from autotest_lib.frontend.afe import model_logic, readonly_connection _quote_name = connection.ops.quote_name class TempManager(model_logic.ExtendedManager): """A Temp Manager.""" _GROUP_COUNT_NAME = 'group_count' def _get_key_unless_is_function(self, field): if '(' in field: return field return self.get_key_on_this_table(field) def _get_field_names(self, fields, extra_select_fields={}): field_names = [] for field in fields: if field in extra_select_fields: field_names.append(extra_select_fields[field][0]) else: field_names.append(self._get_key_unless_is_function(field)) return field_names def _get_group_query_sql(self, query, group_by): compiler = query.query.get_compiler(using=query.db) sql, params = compiler.as_sql() # insert GROUP BY clause into query group_fields = self._get_field_names(group_by, query.query.extra_select) group_by_clause = ' GROUP BY ' + ', '.join(group_fields) group_by_position = sql.rfind('ORDER BY') if group_by_position == -1: group_by_position = len(sql) sql = (sql[:group_by_position] + group_by_clause + ' ' + sql[group_by_position:]) return sql, params def _get_column_names(self, cursor): """Gets the column names from the cursor description. This method exists so that it can be mocked in the unit test for sqlite3 compatibility. """ return [column_info[0] for column_info in cursor.description] def execute_group_query(self, query, group_by): """Performs the given query grouped by the specified fields. The given query's extra select fields are added. @param query: The query to perform. @param group_by: The fields by which to group. @return A list of dicts, where each dict corresponds to single row and contains a key for each grouped field as well as all of the extra select fields. """ sql, params = self._get_group_query_sql(query, group_by) cursor = readonly_connection.cursor() cursor.execute(sql, params) field_names = self._get_column_names(cursor) row_dicts = [dict(zip(field_names, row)) for row in cursor.fetchall()] return row_dicts def get_count_sql(self, query): """Get SQL to select a per-group count of unique matches for a query. @param query: The query to use. @return A tuple (field alias, field SQL). """ if query.query.distinct: pk_field = self.get_key_on_this_table() count_sql = 'COUNT(DISTINCT %s)' % pk_field else: count_sql = 'COUNT(1)' return self._GROUP_COUNT_NAME, count_sql def _get_num_groups_sql(self, query, group_by): group_fields = self._get_field_names(group_by, query.query.extra_select) query = query.order_by() # this can mess up the query and isn't needed compiler = query.query.get_compiler(using=query.db) sql, params = compiler.as_sql() from_ = sql[sql.find(' FROM'):] return ('SELECT DISTINCT %s %s' % (','.join(group_fields), from_), params) def _cursor_rowcount(self, cursor): """To be stubbed by tests""" return cursor.rowcount def get_num_groups(self, query, group_by): """Gets the number of distinct groups for a query. @param query: The query to use. @param group_by: The fields by which to group. @return The number of distinct groups for the given query grouped by the fields in group_by. """ sql, params = self._get_num_groups_sql(query, group_by) cursor = readonly_connection.cursor() cursor.execute(sql, params) return self._cursor_rowcount(cursor) class Machine(dbmodels.Model): """Models a machine.""" machine_idx = dbmodels.AutoField(primary_key=True) hostname = dbmodels.CharField(unique=True, max_length=255) machine_group = dbmodels.CharField(blank=True, max_length=240) owner = dbmodels.CharField(blank=True, max_length=240) class Meta: """Metadata for class Machine.""" db_table = 'tko_machines' class Kernel(dbmodels.Model): """Models a kernel.""" kernel_idx = dbmodels.AutoField(primary_key=True) kernel_hash = dbmodels.CharField(max_length=105, editable=False) base = dbmodels.CharField(max_length=90) printable = dbmodels.CharField(max_length=300) class Meta: """Metadata for class Kernel.""" db_table = 'tko_kernels' class Patch(dbmodels.Model): """Models a patch.""" kernel = dbmodels.ForeignKey(Kernel, db_column='kernel_idx') name = dbmodels.CharField(blank=True, max_length=240) url = dbmodels.CharField(blank=True, max_length=900) the_hash = dbmodels.CharField(blank=True, max_length=105, db_column='hash') class Meta: """Metadata for class Patch.""" db_table = 'tko_patches' class Status(dbmodels.Model): """Models a status.""" status_idx = dbmodels.AutoField(primary_key=True) word = dbmodels.CharField(max_length=30) class Meta: """Metadata for class Status.""" db_table = 'tko_status' class Job(dbmodels.Model, model_logic.ModelExtensions): """Models a job.""" job_idx = dbmodels.AutoField(primary_key=True) tag = dbmodels.CharField(unique=True, max_length=100) label = dbmodels.CharField(max_length=300) username = dbmodels.CharField(max_length=240) machine = dbmodels.ForeignKey(Machine, db_column='machine_idx') queued_time = dbmodels.DateTimeField(null=True, blank=True) started_time = dbmodels.DateTimeField(null=True, blank=True) finished_time = dbmodels.DateTimeField(null=True, blank=True) afe_job_id = dbmodels.IntegerField(null=True, default=None) objects = model_logic.ExtendedManager() class Meta: """Metadata for class Job.""" db_table = 'tko_jobs' class JobKeyval(dbmodels.Model): """Models a job keyval.""" job = dbmodels.ForeignKey(Job) key = dbmodels.CharField(max_length=90) value = dbmodels.CharField(blank=True, max_length=300) class Meta: """Metadata for class JobKeyval.""" db_table = 'tko_job_keyvals' class Test(dbmodels.Model, model_logic.ModelExtensions, model_logic.ModelWithAttributes): """Models a test.""" test_idx = dbmodels.AutoField(primary_key=True) job = dbmodels.ForeignKey(Job, db_column='job_idx') test = dbmodels.CharField(max_length=300) subdir = dbmodels.CharField(blank=True, max_length=300) kernel = dbmodels.ForeignKey(Kernel, db_column='kernel_idx') status = dbmodels.ForeignKey(Status, db_column='status') reason = dbmodels.CharField(blank=True, max_length=3072) machine = dbmodels.ForeignKey(Machine, db_column='machine_idx') finished_time = dbmodels.DateTimeField(null=True, blank=True) started_time = dbmodels.DateTimeField(null=True, blank=True) invalid = dbmodels.BooleanField(default=False) invalidates_test = dbmodels.ForeignKey( 'self', null=True, db_column='invalidates_test_idx', related_name='invalidates_test_set') objects = model_logic.ExtendedManager() def _get_attribute_model_and_args(self, attribute): return TestAttribute, dict(test=self, attribute=attribute, user_created=True) def set_attribute(self, attribute, value): # ensure non-user-created attributes remain immutable try: TestAttribute.objects.get(test=self, attribute=attribute, user_created=False) raise ValueError('Attribute %s already exists for test %s and is ' 'immutable' % (attribute, self.test_idx)) except TestAttribute.DoesNotExist: super(Test, self).set_attribute(attribute, value) class Meta: """Metadata for class Test.""" db_table = 'tko_tests' class TestAttribute(dbmodels.Model, model_logic.ModelExtensions): """Models a test attribute.""" test = dbmodels.ForeignKey(Test, db_column='test_idx') attribute = dbmodels.CharField(max_length=90) value = dbmodels.CharField(blank=True, max_length=300) user_created = dbmodels.BooleanField(default=False) objects = model_logic.ExtendedManager() class Meta: """Metadata for class TestAttribute.""" db_table = 'tko_test_attributes' class IterationAttribute(dbmodels.Model, model_logic.ModelExtensions): """Models an iteration attribute.""" # This isn't really a primary key, but it's necessary to appease Django # and is harmless as long as we're careful. test = dbmodels.ForeignKey(Test, db_column='test_idx', primary_key=True) iteration = dbmodels.IntegerField() attribute = dbmodels.CharField(max_length=90) value = dbmodels.CharField(blank=True, max_length=300) objects = model_logic.ExtendedManager() class Meta: """Metadata for class IterationAttribute.""" db_table = 'tko_iteration_attributes' class IterationResult(dbmodels.Model, model_logic.ModelExtensions): """Models an iteration result.""" # See comment on IterationAttribute regarding primary_key=True. test = dbmodels.ForeignKey(Test, db_column='test_idx', primary_key=True) iteration = dbmodels.IntegerField() attribute = dbmodels.CharField(max_length=256) value = dbmodels.FloatField(null=True, blank=True) objects = model_logic.ExtendedManager() class Meta: """Metadata for class IterationResult.""" db_table = 'tko_iteration_result' class TestLabel(dbmodels.Model, model_logic.ModelExtensions): """Models a test label.""" name = dbmodels.CharField(max_length=80, unique=True) description = dbmodels.TextField(blank=True) tests = dbmodels.ManyToManyField(Test, blank=True, db_table='tko_test_labels_tests') name_field = 'name' objects = model_logic.ExtendedManager() class Meta: """Metadata for class TestLabel.""" db_table = 'tko_test_labels' class SavedQuery(dbmodels.Model, model_logic.ModelExtensions): """Models a saved query.""" # TODO: change this to foreign key once DBs are merged. owner = dbmodels.CharField(max_length=80) name = dbmodels.CharField(max_length=100) url_token = dbmodels.TextField() class Meta: """Metadata for class SavedQuery.""" db_table = 'tko_saved_queries' # Views. class TestViewManager(TempManager): """A Test View Manager.""" def get_query_set(self): query = super(TestViewManager, self).get_query_set() # add extra fields to selects, using the SQL itself as the "alias" extra_select = dict((sql, sql) for sql in self.model.extra_fields.iterkeys()) return query.extra(select=extra_select) def _get_include_exclude_suffix(self, exclude): if exclude: return '_exclude' return '_include' def _add_attribute_join(self, query_set, join_condition, suffix=None, exclude=False): if suffix is None: suffix = self._get_include_exclude_suffix(exclude) return self.add_join(query_set, 'tko_test_attributes', join_key='test_idx', join_condition=join_condition, suffix=suffix, exclude=exclude) def _add_label_pivot_table_join(self, query_set, suffix, join_condition='', exclude=False, force_left_join=False): return self.add_join(query_set, 'tko_test_labels_tests', join_key='test_id', join_condition=join_condition, suffix=suffix, exclude=exclude, force_left_join=force_left_join) def _add_label_joins(self, query_set, suffix=''): query_set = self._add_label_pivot_table_join( query_set, suffix=suffix, force_left_join=True) # since we're not joining from the original table, we can't use # self.add_join() again second_join_alias = 'tko_test_labels' + suffix second_join_condition = ('%s.id = %s.testlabel_id' % (second_join_alias, 'tko_test_labels_tests' + suffix)) query_set.query.add_custom_join('tko_test_labels', second_join_condition, query_set.query.LOUTER, alias=second_join_alias) return query_set def _get_label_ids_from_names(self, label_names): label_ids = list( # listifying avoids a double query below TestLabel.objects.filter(name__in=label_names) .values_list('name', 'id')) if len(label_ids) < len(set(label_names)): raise ValueError('Not all labels found: %s' % ', '.join(label_names)) return dict(name_and_id for name_and_id in label_ids) def _include_or_exclude_labels(self, query_set, label_names, exclude=False): label_ids = self._get_label_ids_from_names(label_names).itervalues() suffix = self._get_include_exclude_suffix(exclude) condition = ('tko_test_labels_tests%s.testlabel_id IN (%s)' % (suffix, ','.join(str(label_id) for label_id in label_ids))) return self._add_label_pivot_table_join(query_set, join_condition=condition, suffix=suffix, exclude=exclude) def _add_custom_select(self, query_set, select_name, select_sql): return query_set.extra(select={select_name: select_sql}) def _add_select_value(self, query_set, alias): return self._add_custom_select(query_set, alias, _quote_name(alias) + '.value') def _add_select_ifnull(self, query_set, alias, non_null_value): select_sql = "IF(%s.id IS NOT NULL, '%s', NULL)" % (_quote_name(alias), non_null_value) return self._add_custom_select(query_set, alias, select_sql) def _join_test_label_column(self, query_set, label_name, label_id): alias = 'test_label_' + label_name label_query = TestLabel.objects.filter(name=label_name) query_set = Test.objects.join_custom_field(query_set, label_query, alias) query_set = self._add_select_ifnull(query_set, alias, label_name) return query_set def _join_test_label_columns(self, query_set, label_names): label_id_map = self._get_label_ids_from_names(label_names) for label_name in label_names: query_set = self._join_test_label_column(query_set, label_name, label_id_map[label_name]) return query_set def _join_test_attribute(self, query_set, attribute, alias=None, extra_join_condition=None): """ Join the given TestView QuerySet to TestAttribute. The resulting query has an additional column for the given attribute named "attribute_". """ if not alias: alias = 'test_attribute_' + attribute attribute_query = TestAttribute.objects.filter(attribute=attribute) if extra_join_condition: attribute_query = attribute_query.extra( where=[extra_join_condition]) query_set = Test.objects.join_custom_field(query_set, attribute_query, alias) query_set = self._add_select_value(query_set, alias) return query_set def _join_machine_label_columns(self, query_set, machine_label_names): for label_name in machine_label_names: alias = 'machine_label_' + label_name condition = "FIND_IN_SET('%s', %s)" % ( label_name, _quote_name(alias) + '.value') query_set = self._join_test_attribute( query_set, 'host-labels', alias=alias, extra_join_condition=condition) query_set = self._add_select_ifnull(query_set, alias, label_name) return query_set def _join_one_iteration_key(self, query_set, result_key, first_alias=None): alias = 'iteration_result_' + result_key iteration_query = IterationResult.objects.filter(attribute=result_key) if first_alias: # after the first join, we need to match up iteration indices, # otherwise each join will expand the query by the number of # iterations and we'll have extraneous rows iteration_query = iteration_query.extra( where=['%s.iteration = %s.iteration' % (_quote_name(alias), _quote_name(first_alias))]) query_set = Test.objects.join_custom_field(query_set, iteration_query, alias, left_join=False) # select the iteration value and index for this join query_set = self._add_select_value(query_set, alias) if not first_alias: # for first join, add iteration index select too query_set = self._add_custom_select( query_set, 'iteration_index', _quote_name(alias) + '.iteration') return query_set, alias def _join_iteration_results(self, test_view_query_set, result_keys): """Join the given TestView QuerySet to IterationResult for one result. The resulting query looks like a TestView query but has one row per iteration. Each row includes all the attributes of TestView, an attribute for each key in result_keys and an iteration_index attribute. We accomplish this by joining the TestView query to IterationResult once per result key. Each join is restricted on the result key (and on the test index, like all one-to-many joins). For the first join, this is the only restriction, so each TestView row expands to a row per iteration (per iteration that includes the key, of course). For each subsequent join, we also restrict the iteration index to match that of the initial join. This makes each subsequent join produce exactly one result row for each input row. (This assumes each iteration contains the same set of keys. Results are undefined if that's not true.) """ if not result_keys: return test_view_query_set query_set, first_alias = self._join_one_iteration_key( test_view_query_set, result_keys[0]) for result_key in result_keys[1:]: query_set, _ = self._join_one_iteration_key(query_set, result_key, first_alias=first_alias) return query_set def _join_job_keyvals(self, query_set, job_keyvals): for job_keyval in job_keyvals: alias = 'job_keyval_' + job_keyval keyval_query = JobKeyval.objects.filter(key=job_keyval) query_set = Job.objects.join_custom_field(query_set, keyval_query, alias) query_set = self._add_select_value(query_set, alias) return query_set def _join_iteration_attributes(self, query_set, iteration_attributes): for attribute in iteration_attributes: alias = 'iteration_attribute_' + attribute attribute_query = IterationAttribute.objects.filter( attribute=attribute) query_set = Test.objects.join_custom_field(query_set, attribute_query, alias) query_set = self._add_select_value(query_set, alias) return query_set def get_query_set_with_joins(self, filter_data): """Add joins for querying over test-related items. These parameters are supported going forward: * test_attribute_fields: list of attribute names. Each attribute will be available as a column attribute_.value. * test_label_fields: list of label names. Each label will be available as a column label_.id, non-null iff the label is present. * iteration_result_fields: list of iteration result names. Each result will be available as a column iteration_.value. Note that this changes the semantics to return iterations instead of tests -- if a test has multiple iterations, a row will be returned for each one. The iteration index is also available as iteration_.iteration. * machine_label_fields: list of machine label names. Each will be available as a column machine_label_.id, non-null iff the label is present on the machine used in the test. * job_keyval_fields: list of job keyval names. Each value will be available as a column job_keyval_.id, non-null iff the keyval is present in the AFE job. * iteration_attribute_fields: list of iteration attribute names. Each attribute will be available as a column iteration_attribute.id, non-null iff the attribute is present. These parameters are deprecated: * include_labels * exclude_labels * include_attributes_where * exclude_attributes_where Additionally, this method adds joins if the following strings are present in extra_where (this is also deprecated): * test_labels * test_attributes_host_labels @param filter_data: Data by which to filter. @return A QuerySet. """ query_set = self.get_query_set() test_attributes = filter_data.pop('test_attribute_fields', []) for attribute in test_attributes: query_set = self._join_test_attribute(query_set, attribute) test_labels = filter_data.pop('test_label_fields', []) query_set = self._join_test_label_columns(query_set, test_labels) machine_labels = filter_data.pop('machine_label_fields', []) query_set = self._join_machine_label_columns(query_set, machine_labels) iteration_keys = filter_data.pop('iteration_result_fields', []) query_set = self._join_iteration_results(query_set, iteration_keys) job_keyvals = filter_data.pop('job_keyval_fields', []) query_set = self._join_job_keyvals(query_set, job_keyvals) iteration_attributes = filter_data.pop('iteration_attribute_fields', []) query_set = self._join_iteration_attributes(query_set, iteration_attributes) # everything that follows is deprecated behavior joined = False extra_where = filter_data.get('extra_where', '') if 'tko_test_labels' in extra_where: query_set = self._add_label_joins(query_set) joined = True include_labels = filter_data.pop('include_labels', []) exclude_labels = filter_data.pop('exclude_labels', []) if include_labels: query_set = self._include_or_exclude_labels(query_set, include_labels) joined = True if exclude_labels: query_set = self._include_or_exclude_labels(query_set, exclude_labels, exclude=True) joined = True include_attributes_where = filter_data.pop('include_attributes_where', '') exclude_attributes_where = filter_data.pop('exclude_attributes_where', '') if include_attributes_where: query_set = self._add_attribute_join( query_set, join_condition=self.escape_user_sql(include_attributes_where)) joined = True if exclude_attributes_where: query_set = self._add_attribute_join( query_set, join_condition=self.escape_user_sql(exclude_attributes_where), exclude=True) joined = True if not joined: filter_data['no_distinct'] = True if 'tko_test_attributes_host_labels' in extra_where: query_set = self._add_attribute_join( query_set, suffix='_host_labels', join_condition='tko_test_attributes_host_labels.attribute = ' '"host-labels"') return query_set def query_test_ids(self, filter_data, apply_presentation=True): """Queries for test IDs. @param filter_data: Data by which to filter. @param apply_presentation: Whether or not to apply presentation parameters. @return A list of test IDs. """ query = self.model.query_objects(filter_data, apply_presentation=apply_presentation) dicts = query.values('test_idx') return [item['test_idx'] for item in dicts] def query_test_label_ids(self, filter_data): """Queries for test label IDs. @param filter_data: Data by which to filter. @return A list of test label IDs. """ query_set = self.model.query_objects(filter_data) query_set = self._add_label_joins(query_set, suffix='_list') rows = self._custom_select_query(query_set, ['tko_test_labels_list.id']) return [row[0] for row in rows if row[0] is not None] def escape_user_sql(self, sql): sql = super(TestViewManager, self).escape_user_sql(sql) return sql.replace('test_idx', self.get_key_on_this_table('test_idx')) class TestView(dbmodels.Model, model_logic.ModelExtensions): """Models a test view.""" extra_fields = { 'DATE(job_queued_time)': 'job queued day', 'DATE(test_finished_time)': 'test finished day', } group_fields = [ 'test_name', 'status', 'kernel', 'hostname', 'job_tag', 'job_name', 'platform', 'reason', 'job_owner', 'job_queued_time', 'DATE(job_queued_time)', 'test_started_time', 'test_finished_time', 'DATE(test_finished_time)', ] test_idx = dbmodels.IntegerField('test index', primary_key=True) job_idx = dbmodels.IntegerField('job index', null=True, blank=True) test_name = dbmodels.CharField(blank=True, max_length=90) subdir = dbmodels.CharField('subdirectory', blank=True, max_length=180) kernel_idx = dbmodels.IntegerField('kernel index') status_idx = dbmodels.IntegerField('status index') reason = dbmodels.CharField(blank=True, max_length=3072) machine_idx = dbmodels.IntegerField('host index') test_started_time = dbmodels.DateTimeField(null=True, blank=True) test_finished_time = dbmodels.DateTimeField(null=True, blank=True) job_tag = dbmodels.CharField(blank=True, max_length=300) job_name = dbmodels.CharField(blank=True, max_length=300) job_owner = dbmodels.CharField('owner', blank=True, max_length=240) job_queued_time = dbmodels.DateTimeField(null=True, blank=True) job_started_time = dbmodels.DateTimeField(null=True, blank=True) job_finished_time = dbmodels.DateTimeField(null=True, blank=True) afe_job_id = dbmodels.IntegerField(null=True) hostname = dbmodels.CharField(blank=True, max_length=300) platform = dbmodels.CharField(blank=True, max_length=240) machine_owner = dbmodels.CharField(blank=True, max_length=240) kernel_hash = dbmodels.CharField(blank=True, max_length=105) kernel_base = dbmodels.CharField(blank=True, max_length=90) kernel = dbmodels.CharField(blank=True, max_length=300) status = dbmodels.CharField(blank=True, max_length=30) invalid = dbmodels.BooleanField(blank=True) invalidates_test_idx = dbmodels.IntegerField(null=True, blank=True) objects = TestViewManager() def save(self): raise NotImplementedError('TestView is read-only') def delete(self): raise NotImplementedError('TestView is read-only') @classmethod def query_objects(cls, filter_data, initial_query=None, apply_presentation=True): if initial_query is None: initial_query = cls.objects.get_query_set_with_joins(filter_data) return super(TestView, cls).query_objects( filter_data, initial_query=initial_query, apply_presentation=apply_presentation) class Meta: """Metadata for class TestView.""" db_table = 'tko_test_view_2'