# pylint: disable=missing-docstring """\ Functions to expose over the RPC interface. For all modify* and delete* functions that ask for an 'id' parameter to identify the object to operate on, the id may be either * the database row ID * the name of the object (label name, hostname, user login, etc.) * a dictionary containing uniquely identifying field (this option should seldom be used) When specifying foreign key fields (i.e. adding hosts to a label, or adding users to an ACL group), the given value may be either the database row ID or the name of the object. All get* functions return lists of dictionaries. Each dictionary represents one object and maps field names to values. Some examples: modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' modify_test('sleeptest', test_type='Client', params=', seconds=60') delete_acl_group(1) # delete by ID delete_acl_group('Everyone') # delete by name acl_group_add_users('Everyone', ['mbligh', 'showard']) get_jobs(owner='showard', status='Queued') See doctests/001_rpc_test.txt for (lots) more examples. """ __author__ = 'showard@google.com (Steve Howard)' import ast import collections import contextlib import datetime import logging import os import sys import warnings import six from autotest_lib.client.common_lib import (control_data, error, global_config, priorities) from autotest_lib.client.common_lib.cros import dev_server from autotest_lib.frontend.afe import control_file as control_file_lib from autotest_lib.frontend.afe import (model_attributes, model_logic, models, rpc_utils) from autotest_lib.frontend.tko import models as tko_models from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface from autotest_lib.server import frontend, utils from autotest_lib.server.cros import provision from autotest_lib.server.cros.dynamic_suite import (constants, control_file_getter, suite_common, tools) from autotest_lib.server.cros.dynamic_suite.suite import Suite from autotest_lib.server.lib import status_history from autotest_lib.site_utils import job_history, stable_version_utils from django.db import connection as db_connection from django.db import transaction from django.db.models import Count from django.db.utils import DatabaseError import common _CONFIG = global_config.global_config # Definition of LabHealthIndicator LabHealthIndicator = collections.namedtuple( 'LabHealthIndicator', [ 'if_lab_close', 'available_duts', 'devserver_health', 'upcoming_builds', ] ) RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 'SKYLAB', 'respect_static_labels', type=bool, default=False) RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 'SKYLAB', 'respect_static_attributes', type=bool, default=False) # Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py. # labels def modify_label(id, **data): """Modify a label. @param id: id or name of a label. More often a label name. @param data: New data for a label. """ label_model = models.Label.smart_get(id) if label_model.is_replaced_by_static(): raise error.UnmodifiableLabelException( 'Failed to delete label "%s" because it is a static label. ' 'Use go/chromeos-skylab-inventory-tools to modify this ' 'label.' % label_model.name) label_model.update_object(data) # Main forwards the RPC to shards if not utils.is_shard(): rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, id=id, **data) def delete_label(id): """Delete a label. @param id: id or name of a label. More often a label name. """ label_model = models.Label.smart_get(id) if label_model.is_replaced_by_static(): raise error.UnmodifiableLabelException( 'Failed to delete label "%s" because it is a static label. ' 'Use go/chromeos-skylab-inventory-tools to modify this ' 'label.' % label_model.name) # Hosts that have the label to be deleted. Save this info before # the label is deleted to use it later. hosts = [] for h in label_model.host_set.all(): hosts.append(models.Host.smart_get(h.id)) label_model.delete() # Main forwards the RPC to shards if not utils.is_shard(): rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) def add_label(name, ignore_exception_if_exists=False, **kwargs): """Adds a new label of a given name. @param name: label name. @param ignore_exception_if_exists: If True and the exception was thrown due to the duplicated label name when adding a label, then suppress the exception. Default is False. @param kwargs: keyword args that store more info about a label other than the name. @return: int/long id of a new label. """ # models.Label.add_object() throws model_logic.ValidationError # when it is given a label name that already exists. # However, ValidationError can be thrown with different errors, # and those errors should be thrown up to the call chain. try: label = models.Label.add_object(name=name, **kwargs) except: exc_info = sys.exc_info() if ignore_exception_if_exists: label = rpc_utils.get_label(name) # If the exception is raised not because of duplicated # "name", then raise the original exception. if label is None: six.reraise(exc_info[0], exc_info[1], exc_info[2]) else: six.reraise(exc_info[0], exc_info[1], exc_info[2]) return label.id def add_label_to_hosts(id, hosts): """Adds a label of the given id to the given hosts only in local DB. @param id: id or name of a label. More often a label name. @param hosts: The hostnames of hosts that need the label. @raises models.Label.DoesNotExist: If the label with id doesn't exist. """ label = models.Label.smart_get(id) if label.is_replaced_by_static(): label = models.StaticLabel.smart_get(label.name) host_objs = models.Host.smart_get_bulk(hosts) if label.platform: models.Host.check_no_platform(host_objs) # Ensure a host has no more than one board label with it. if label.name.startswith('board:'): models.Host.check_board_labels_allowed(host_objs, [label.name]) label.host_set.add(*host_objs) def _create_label_everywhere(id, hosts): """ Yet another method to create labels. ALERT! This method should be run only on main not shards! DO NOT RUN THIS ON A SHARD!!! Deputies will hate you if you do!!! This method exists primarily to serve label_add_hosts() and host_add_labels(). Basically it pulls out the label check/add logic from label_add_hosts() into this nice method that not only creates the label but also tells the shards that service the hosts to also create the label. @param id: id or name of a label. More often a label name. @param hosts: A list of hostnames or ids. More often hostnames. """ try: label = models.Label.smart_get(id) except models.Label.DoesNotExist: # This matches the type checks in smart_get, which is a hack # in and off itself. The aim here is to create any non-existent # label, which we cannot do if the 'id' specified isn't a label name. if isinstance(id, six.string_types): label = models.Label.smart_get(add_label(id)) else: raise ValueError('Label id (%s) does not exist. Please specify ' 'the argument, id, as a string (label name).' % id) # Make sure the label exists on the shard with the same id # as it is on the main. # It is possible that the label is already in a shard because # we are adding a new label only to shards of hosts that the label # is going to be attached. # For example, we add a label L1 to a host in shard S1. # Main and S1 will have L1 but other shards won't. # Later, when we add the same label L1 to hosts in shards S1 and S2, # S1 already has the label but S2 doesn't. # S2 should have the new label without any problem. # We ignore exception in such a case. host_objs = models.Host.smart_get_bulk(hosts) rpc_utils.fanout_rpc( host_objs, 'add_label', include_hostnames=False, name=label.name, ignore_exception_if_exists=True, id=label.id, platform=label.platform) @rpc_utils.route_rpc_to_main def label_add_hosts(id, hosts): """Adds a label with the given id to the given hosts. This method should be run only on main not shards. The given label will be created if it doesn't exist, provided the `id` supplied is a label name not an int/long id. @param id: id or name of a label. More often a label name. @param hosts: A list of hostnames or ids. More often hostnames. @raises ValueError: If the id specified is an int/long (label id) while the label does not exist. """ # Create the label. _create_label_everywhere(id, hosts) # Add it to the main. add_label_to_hosts(id, hosts) # Add it to the shards. host_objs = models.Host.smart_get_bulk(hosts) rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) def remove_label_from_hosts(id, hosts): """Removes a label of the given id from the given hosts only in local DB. @param id: id or name of a label. @param hosts: The hostnames of hosts that need to remove the label from. """ host_objs = models.Host.smart_get_bulk(hosts) label = models.Label.smart_get(id) if label.is_replaced_by_static(): raise error.UnmodifiableLabelException( 'Failed to remove label "%s" for hosts "%r" because it is a ' 'static label. Use go/chromeos-skylab-inventory-tools to ' 'modify this label.' % (label.name, hosts)) label.host_set.remove(*host_objs) @rpc_utils.route_rpc_to_main def label_remove_hosts(id, hosts): """Removes a label of the given id from the given hosts. This method should be run only on main not shards. @param id: id or name of a label. @param hosts: A list of hostnames or ids. More often hostnames. """ host_objs = models.Host.smart_get_bulk(hosts) remove_label_from_hosts(id, hosts) rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) def get_labels(exclude_filters=(), **filter_data): """\ @param exclude_filters: A sequence of dictionaries of filters. @returns A sequence of nested dictionaries of label information. """ labels = models.Label.query_objects(filter_data) for exclude_filter in exclude_filters: labels = labels.exclude(**exclude_filter) if not RESPECT_STATIC_LABELS: return rpc_utils.prepare_rows_as_nested_dicts(labels, ()) static_labels = models.StaticLabel.query_objects(filter_data) for exclude_filter in exclude_filters: static_labels = static_labels.exclude(**exclude_filter) non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ()) static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ()) label_ids = [label.id for label in labels] replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids) replaced_ids = {r.label_id for r in replaced} replaced_label_names = {l.name for l in labels if l.id in replaced_ids} return_lists = [] for non_static_label in non_static_lists: if non_static_label.get('id') not in replaced_ids: return_lists.append(non_static_label) for static_label in static_lists: if static_label.get('name') in replaced_label_names: return_lists.append(static_label) return return_lists # hosts def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): if locked and not lock_reason: raise model_logic.ValidationError( {'locked': 'Please provide a reason for locking when adding host.'}) return models.Host.add_object(hostname=hostname, status=status, locked=locked, lock_reason=lock_reason, protection=protection).id @rpc_utils.route_rpc_to_main def modify_host(id, **kwargs): """Modify local attributes of a host. If this is called on the main, but the host is assigned to a shard, this will call `modify_host_local` RPC to the responsible shard. This means if a host is being locked using this function, this change will also propagate to shards. When this is called on a shard, the shard just routes the RPC to the main and does nothing. @param id: id of the host to modify. @param kwargs: key=value pairs of values to set on the host. """ rpc_utils.check_modify_host(kwargs) host = models.Host.smart_get(id) try: rpc_utils.check_modify_host_locking(host, kwargs) except model_logic.ValidationError as e: if not kwargs.get('force_modify_locking', False): raise logging.exception('The following exception will be ignored and lock ' 'modification will be enforced. %s', e) # This is required to make `lock_time` for a host be exactly same # between the main and a shard. if kwargs.get('locked', None) and 'lock_time' not in kwargs: kwargs['lock_time'] = datetime.datetime.now() # force_modifying_locking is not an internal field in database, remove. shard_kwargs = dict(kwargs) shard_kwargs.pop('force_modify_locking', None) rpc_utils.fanout_rpc([host], 'modify_host_local', include_hostnames=False, id=id, **shard_kwargs) # Update the local DB **after** RPC fanout is complete. # This guarantees that the main state is only updated if the shards were # correctly updated. # In case the shard update fails mid-flight and the main-shard desync, we # always consider the main state to be the source-of-truth, and any # (automated) corrective actions will revert the (partial) shard updates. host.update_object(kwargs) def modify_host_local(id, **kwargs): """Modify host attributes in local DB. @param id: Host id. @param kwargs: key=value pairs of values to set on the host. """ models.Host.smart_get(id).update_object(kwargs) @rpc_utils.route_rpc_to_main def modify_hosts(host_filter_data, update_data): """Modify local attributes of multiple hosts. If this is called on the main, but one of the hosts in that match the filters is assigned to a shard, this will call `modify_hosts_local` RPC to the responsible shard. When this is called on a shard, the shard just routes the RPC to the main and does nothing. The filters are always applied on the main, not on the shards. This means if the states of a host differ on the main and a shard, the state on the main will be used. I.e. this means: A host was synced to Shard 1. On Shard 1 the status of the host was set to 'Repair Failed'. - A call to modify_hosts with host_filter_data={'status': 'Ready'} will update the host (both on the shard and on the main), because the state of the host as the main knows it is still 'Ready'. - A call to modify_hosts with host_filter_data={'status': 'Repair failed' will not update the host, because the filter doesn't apply on the main. @param host_filter_data: Filters out which hosts to modify. @param update_data: A dictionary with the changes to make to the hosts. """ update_data = update_data.copy() rpc_utils.check_modify_host(update_data) hosts = models.Host.query_objects(host_filter_data) affected_shard_hostnames = set() affected_host_ids = [] # Check all hosts before changing data for exception safety. for host in hosts: try: rpc_utils.check_modify_host_locking(host, update_data) except model_logic.ValidationError as e: if not update_data.get('force_modify_locking', False): raise logging.exception('The following exception will be ignored and ' 'lock modification will be enforced. %s', e) if host.shard: affected_shard_hostnames.add(host.shard.hostname) affected_host_ids.append(host.id) # This is required to make `lock_time` for a host be exactly same # between the main and a shard. if update_data.get('locked', None) and 'lock_time' not in update_data: update_data['lock_time'] = datetime.datetime.now() for host in hosts: host.update_object(update_data) update_data.pop('force_modify_locking', None) # Caution: Changing the filter from the original here. See docstring. rpc_utils.run_rpc_on_multiple_hostnames( 'modify_hosts_local', affected_shard_hostnames, host_filter_data={'id__in': affected_host_ids}, update_data=update_data) def modify_hosts_local(host_filter_data, update_data): """Modify attributes of hosts in local DB. @param host_filter_data: Filters out which hosts to modify. @param update_data: A dictionary with the changes to make to the hosts. """ for host in models.Host.query_objects(host_filter_data): host.update_object(update_data) def add_labels_to_host(id, labels): """Adds labels to a given host only in local DB. @param id: id or hostname for a host. @param labels: ids or names for labels. """ label_objs = models.Label.smart_get_bulk(labels) if not RESPECT_STATIC_LABELS: models.Host.smart_get(id).labels.add(*label_objs) else: static_labels, non_static_labels = models.Host.classify_label_objects( label_objs) host = models.Host.smart_get(id) host.static_labels.add(*static_labels) host.labels.add(*non_static_labels) @rpc_utils.route_rpc_to_main def host_add_labels(id, labels): """Adds labels to a given host. @param id: id or hostname for a host. @param labels: ids or names for labels. @raises ValidationError: If adding more than one platform/board label. """ # Create the labels on the main/shards. for label in labels: _create_label_everywhere(label, [id]) label_objs = models.Label.smart_get_bulk(labels) platforms = [label.name for label in label_objs if label.platform] if len(platforms) > 1: raise model_logic.ValidationError( {'labels': ('Adding more than one platform: %s' % ', '.join(platforms))}) host_obj = models.Host.smart_get(id) if platforms: models.Host.check_no_platform([host_obj]) if any(label_name.startswith('board:') for label_name in labels): models.Host.check_board_labels_allowed([host_obj], labels) add_labels_to_host(id, labels) rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, id=id, labels=labels) def remove_labels_from_host(id, labels): """Removes labels from a given host only in local DB. @param id: id or hostname for a host. @param labels: ids or names for labels. """ label_objs = models.Label.smart_get_bulk(labels) if not RESPECT_STATIC_LABELS: models.Host.smart_get(id).labels.remove(*label_objs) else: static_labels, non_static_labels = models.Host.classify_label_objects( label_objs) host = models.Host.smart_get(id) host.labels.remove(*non_static_labels) if static_labels: logging.info('Cannot remove labels "%r" for host "%r" due to they ' 'are static labels. Use ' 'go/chromeos-skylab-inventory-tools to modify these ' 'labels.', static_labels, id) @rpc_utils.route_rpc_to_main def host_remove_labels(id, labels): """Removes labels from a given host. @param id: id or hostname for a host. @param labels: ids or names for labels. """ remove_labels_from_host(id, labels) host_obj = models.Host.smart_get(id) rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, id=id, labels=labels) def get_host_attribute(attribute, **host_filter_data): """ @param attribute: string name of attribute @param host_filter_data: filter data to apply to Hosts to choose hosts to act upon """ hosts = rpc_utils.get_host_query((), False, True, host_filter_data) hosts = list(hosts) models.Host.objects.populate_relationships(hosts, models.HostAttribute, 'attribute_list') host_attr_dicts = [] host_objs = [] for host_obj in hosts: for attr_obj in host_obj.attribute_list: if attr_obj.attribute == attribute: host_attr_dicts.append(attr_obj.get_object_dict()) host_objs.append(host_obj) if RESPECT_STATIC_ATTRIBUTES: for host_attr, host_obj in zip(host_attr_dicts, host_objs): static_attrs = models.StaticHostAttribute.query_objects( {'host_id': host_obj.id, 'attribute': attribute}) if len(static_attrs) > 0: host_attr['value'] = static_attrs[0].value return rpc_utils.prepare_for_serialization(host_attr_dicts) @rpc_utils.route_rpc_to_main def set_host_attribute(attribute, value, **host_filter_data): """Set an attribute on hosts. This RPC is a shim that forwards calls to main to be handled there. @param attribute: string name of attribute @param value: string, or None to delete an attribute @param host_filter_data: filter data to apply to Hosts to choose hosts to act upon """ assert not utils.is_shard() set_host_attribute_impl(attribute, value, **host_filter_data) def set_host_attribute_impl(attribute, value, **host_filter_data): """Set an attribute on hosts. *** DO NOT CALL THIS RPC from client code *** This RPC exists for main-shard communication only. Call set_host_attribute instead. @param attribute: string name of attribute @param value: string, or None to delete an attribute @param host_filter_data: filter data to apply to Hosts to choose hosts to act upon """ assert host_filter_data # disallow accidental actions on all hosts hosts = models.Host.query_objects(host_filter_data) models.AclGroup.check_for_acl_violation_hosts(hosts) for host in hosts: host.set_or_delete_attribute(attribute, value) # Main forwards this RPC to shards. if not utils.is_shard(): rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False, attribute=attribute, value=value, **host_filter_data) @rpc_utils.forward_single_host_rpc_to_shard def delete_host(id): models.Host.smart_get(id).delete() def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, valid_only=True, include_current_job=False, **filter_data): """Get a list of dictionaries which contains the information of hosts. @param multiple_labels: match hosts in all of the labels given. Should be a list of label names. @param exclude_only_if_needed_labels: Deprecated. Raise error if it's True. @param include_current_job: Set to True to include ids of currently running job and special task. """ if exclude_only_if_needed_labels: raise error.RPCException('exclude_only_if_needed_labels is deprecated') hosts = rpc_utils.get_host_query(multiple_labels, exclude_only_if_needed_labels, valid_only, filter_data) hosts = list(hosts) models.Host.objects.populate_relationships(hosts, models.Label, 'label_list') models.Host.objects.populate_relationships(hosts, models.AclGroup, 'acl_list') models.Host.objects.populate_relationships(hosts, models.HostAttribute, 'attribute_list') models.Host.objects.populate_relationships(hosts, models.StaticHostAttribute, 'staticattribute_list') host_dicts = [] for host_obj in hosts: host_dict = host_obj.get_object_dict() host_dict['acls'] = [acl.name for acl in host_obj.acl_list] host_dict['attributes'] = dict((attribute.attribute, attribute.value) for attribute in host_obj.attribute_list) if RESPECT_STATIC_LABELS: label_list = [] # Only keep static labels which has a corresponding entries in # afe_labels. for label in host_obj.label_list: if label.is_replaced_by_static(): static_label = models.StaticLabel.smart_get(label.name) label_list.append(static_label) else: label_list.append(label) host_dict['labels'] = [label.name for label in label_list] host_dict['platform'] = rpc_utils.find_platform( host_obj.hostname, label_list) else: host_dict['labels'] = [label.name for label in host_obj.label_list] host_dict['platform'] = rpc_utils.find_platform( host_obj.hostname, host_obj.label_list) if RESPECT_STATIC_ATTRIBUTES: # Overwrite attribute with values in afe_static_host_attributes. for attr in host_obj.staticattribute_list: if attr.attribute in host_dict['attributes']: host_dict['attributes'][attr.attribute] = attr.value if include_current_job: host_dict['current_job'] = None host_dict['current_special_task'] = None entries = models.HostQueueEntry.objects.filter( host_id=host_dict['id'], active=True, complete=False) if entries: host_dict['current_job'] = ( entries[0].get_object_dict()['job']) tasks = models.SpecialTask.objects.filter( host_id=host_dict['id'], is_active=True, is_complete=False) if tasks: host_dict['current_special_task'] = ( '%d-%s' % (tasks[0].get_object_dict()['id'], tasks[0].get_object_dict()['task'].lower())) host_dicts.append(host_dict) return rpc_utils.prepare_for_serialization(host_dicts) def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, valid_only=True, **filter_data): """ Same parameters as get_hosts(). @returns The number of matching hosts. """ if exclude_only_if_needed_labels: raise error.RPCException('exclude_only_if_needed_labels is deprecated') hosts = rpc_utils.get_host_query(multiple_labels, exclude_only_if_needed_labels, valid_only, filter_data) return len(hosts) # tests def get_tests(**filter_data): return rpc_utils.prepare_for_serialization( models.Test.list_objects(filter_data)) def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): """Gets the counts of all passed and failed tests from the matching jobs. @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix matching is case insensitive. @param label_name: Label that must be set in the jobs, e.g., 'cros-version:butterfly-release/R40-6457.21.0'. @returns A summary of the counts of all the passed and failed tests. """ job_ids = list(models.Job.objects.filter( name__istartswith=job_name_prefix, dependency_labels__name=label_name).values_list( 'pk', flat=True)) summary = {'passed': 0, 'failed': 0} if not job_ids: return summary counts = (tko_models.TestView.objects.filter( afe_job_id__in=job_ids).exclude( test_name='SERVER_JOB').exclude( test_name__startswith='CLIENT_JOB').values( 'status').annotate( count=Count('status'))) for status in counts: if status['status'] == 'GOOD': summary['passed'] += status['count'] else: summary['failed'] += status['count'] return summary # profilers def add_profiler(name, description=None): return models.Profiler.add_object(name=name, description=description).id def modify_profiler(id, **data): models.Profiler.smart_get(id).update_object(data) def delete_profiler(id): models.Profiler.smart_get(id).delete() def get_profilers(**filter_data): return rpc_utils.prepare_for_serialization( models.Profiler.list_objects(filter_data)) # users def get_users(**filter_data): return rpc_utils.prepare_for_serialization( models.User.list_objects(filter_data)) # acl groups def add_acl_group(name, description=None): group = models.AclGroup.add_object(name=name, description=description) group.users.add(models.User.current_user()) return group.id def modify_acl_group(id, **data): group = models.AclGroup.smart_get(id) group.check_for_acl_violation_acl_group() group.update_object(data) group.add_current_user_if_empty() def acl_group_add_users(id, users): group = models.AclGroup.smart_get(id) group.check_for_acl_violation_acl_group() users = models.User.smart_get_bulk(users) group.users.add(*users) def acl_group_remove_users(id, users): group = models.AclGroup.smart_get(id) group.check_for_acl_violation_acl_group() users = models.User.smart_get_bulk(users) group.users.remove(*users) group.add_current_user_if_empty() def acl_group_add_hosts(id, hosts): group = models.AclGroup.smart_get(id) group.check_for_acl_violation_acl_group() hosts = models.Host.smart_get_bulk(hosts) group.hosts.add(*hosts) group.on_host_membership_change() def acl_group_remove_hosts(id, hosts): group = models.AclGroup.smart_get(id) group.check_for_acl_violation_acl_group() hosts = models.Host.smart_get_bulk(hosts) group.hosts.remove(*hosts) group.on_host_membership_change() def delete_acl_group(id): models.AclGroup.smart_get(id).delete() def get_acl_groups(**filter_data): acl_groups = models.AclGroup.list_objects(filter_data) for acl_group in acl_groups: acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) acl_group['users'] = [user.login for user in acl_group_obj.users.all()] acl_group['hosts'] = [host.hostname for host in acl_group_obj.hosts.all()] return rpc_utils.prepare_for_serialization(acl_groups) # jobs def generate_control_file(tests=(), profilers=(), client_control_file='', use_container=False, profile_only=None, db_tests=True, test_source_build=None): """ Generates a client-side control file to run tests. @param tests List of tests to run. See db_tests for more information. @param profilers List of profilers to activate during the job. @param client_control_file The contents of a client-side control file to run at the end of all tests. If this is supplied, all tests must be client side. TODO: in the future we should support server control files directly to wrap with a kernel. That'll require changing the parameter name and adding a boolean to indicate if it is a client or server control file. @param use_container unused argument today. TODO: Enable containers on the host during a client side test. @param profile_only A boolean that indicates what default profile_only mode to use in the control file. Passing None will generate a control file that does not explcitly set the default mode at all. @param db_tests: if True, the test object can be found in the database backing the test model. In this case, tests is a tuple of test IDs which are used to retrieve the test objects from the database. If False, tests is a tuple of test dictionaries stored client-side in the AFE. @param test_source_build: Build to be used to retrieve test code. Default to None. @returns a dict with the following keys: control_file: str, The control file text. is_server: bool, is the control file a server-side control file? synch_count: How many machines the job uses per autoserv execution. synch_count == 1 means the job is asynchronous. dependencies: A list of the names of labels on which the job depends. """ if not tests and not client_control_file: return dict(control_file='', is_server=False, synch_count=1, dependencies=[]) cf_info, test_objects, profiler_objects = ( rpc_utils.prepare_generate_control_file(tests, profilers, db_tests)) cf_info['control_file'] = control_file_lib.generate_control( tests=test_objects, profilers=profiler_objects, is_server=cf_info['is_server'], client_control_file=client_control_file, profile_only=profile_only, test_source_build=test_source_build) return cf_info def create_job_page_handler(name, priority, control_file, control_type, image=None, hostless=False, firmware_rw_build=None, firmware_ro_build=None, test_source_build=None, is_cloning=False, cheets_build=None, **kwargs): """\ Create and enqueue a job. @param name name of this job @param priority Integer priority of this job. Higher is more important. @param control_file String contents of the control file. @param control_type Type of control file, Client or Server. @param image: ChromeOS build to be installed in the dut. Default to None. @param firmware_rw_build: Firmware build to update RW firmware. Default to None, i.e., RW firmware will not be updated. @param firmware_ro_build: Firmware build to update RO firmware. Default to None, i.e., RO firmware will not be updated. @param test_source_build: Build to be used to retrieve test code. Default to None. @param is_cloning: True if creating a cloning job. @param cheets_build: ChromeOS Android build to be installed in the dut. Default to None. Cheets build will not be updated. @param kwargs extra args that will be required by create_suite_job or create_job. @returns The created Job id number. """ test_args = {} if kwargs.get('args'): # args' format is: ['disable_sysinfo=False', 'fast=True', ...] args = kwargs.get('args') for arg in args: k, v = arg.split('=')[0], arg.split('=')[1] test_args[k] = v if is_cloning: logging.info('Start to clone a new job') # When cloning a job, hosts and meta_hosts should not exist together, # which would cause host-scheduler to schedule two hqe jobs to one host # at the same time, and crash itself. Clear meta_hosts for this case. if kwargs.get('hosts') and kwargs.get('meta_hosts'): kwargs['meta_hosts'] = [] else: logging.info('Start to create a new job') control_file = rpc_utils.encode_ascii(control_file) if not control_file: raise model_logic.ValidationError({ 'control_file' : "Control file cannot be empty"}) if image and hostless: builds = {} builds[provision.CROS_VERSION_PREFIX] = image if cheets_build: builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build if firmware_rw_build: builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build if firmware_ro_build: builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build return create_suite_job( name=name, control_file=control_file, priority=priority, builds=builds, test_source_build=test_source_build, is_cloning=is_cloning, test_args=test_args, **kwargs) return create_job(name, priority, control_file, control_type, image=image, hostless=hostless, test_args=test_args, **kwargs) @rpc_utils.route_rpc_to_main def create_job( name, priority, control_file, control_type, hosts=(), meta_hosts=(), one_time_hosts=(), synch_count=None, is_template=False, timeout=None, timeout_mins=None, max_runtime_mins=None, run_verify=False, email_list='', dependencies=(), reboot_before=None, reboot_after=None, parse_failed_repair=None, hostless=False, keyvals=None, drone_set=None, image=None, parent_job_id=None, test_retry=0, run_reset=True, require_ssp=None, test_args=None, **kwargs): """\ Create and enqueue a job. @param name name of this job @param priority Integer priority of this job. Higher is more important. @param control_file String contents of the control file. @param control_type Type of control file, Client or Server. @param synch_count How many machines the job uses per autoserv execution. synch_count == 1 means the job is asynchronous. If an atomic group is given this value is treated as a minimum. @param is_template If true then create a template job. @param timeout Hours after this call returns until the job times out. @param timeout_mins Minutes after this call returns until the job times out. @param max_runtime_mins Minutes from job starting time until job times out @param run_verify Should the host be verified before running the test? @param email_list String containing emails to mail when the job is done @param dependencies List of label names on which this job depends @param reboot_before Never, If dirty, or Always @param reboot_after Never, If all tests passed, or Always @param parse_failed_repair if true, results of failed repairs launched by this job will be parsed as part of the job. @param hostless if true, create a hostless job @param keyvals dict of keyvals to associate with the job @param hosts List of hosts to run job on. @param meta_hosts List where each entry is a label name, and for each entry one host will be chosen from that label to run the job on. @param one_time_hosts List of hosts not in the database to run the job on. @param drone_set The name of the drone set to run this test on. @param image OS image to install before running job. @param parent_job_id id of a job considered to be parent of created job. @param test_retry DEPRECATED @param run_reset Should the host be reset before running the test? @param require_ssp Set to True to require server-side packaging to run the test. If it's set to None, drone will still try to run the server side with server-side packaging. If the autotest-server package doesn't exist for the build or image is not set, drone will run the test without server- side packaging. Default is None. @param test_args A dict of args passed to be injected into control file. @param kwargs extra keyword args. NOT USED. @returns The created Job id number. """ if test_args: control_file = tools.inject_vars(test_args, control_file) if image: dependencies += (provision.image_version_to_label(image),) return rpc_utils.create_job_common( name=name, priority=priority, control_type=control_type, control_file=control_file, hosts=hosts, meta_hosts=meta_hosts, one_time_hosts=one_time_hosts, synch_count=synch_count, is_template=is_template, timeout=timeout, timeout_mins=timeout_mins, max_runtime_mins=max_runtime_mins, run_verify=run_verify, email_list=email_list, dependencies=dependencies, reboot_before=reboot_before, reboot_after=reboot_after, parse_failed_repair=parse_failed_repair, hostless=hostless, keyvals=keyvals, drone_set=drone_set, parent_job_id=parent_job_id, run_reset=run_reset, require_ssp=require_ssp) def abort_host_queue_entries(**filter_data): """\ Abort a set of host queue entries. @return: A list of dictionaries, each contains information about an aborted HQE. """ query = models.HostQueueEntry.query_objects(filter_data) # Dont allow aborts on: # 1. Jobs that have already completed (whether or not they were aborted) # 2. Jobs that we have already been aborted (but may not have completed) query = query.filter(complete=False).filter(aborted=False) models.AclGroup.check_abort_permissions(query) host_queue_entries = list(query.select_related()) rpc_utils.check_abort_synchronous_jobs(host_queue_entries) models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 'Job name': hqe.job.name} for hqe in host_queue_entries] return hqe_info def abort_special_tasks(**filter_data): """\ Abort the special task, or tasks, specified in the filter. """ query = models.SpecialTask.query_objects(filter_data) special_tasks = query.filter(is_active=True) for task in special_tasks: task.abort() def _call_special_tasks_on_hosts(task, hosts): """\ Schedules a set of hosts for a special task. @returns A list of hostnames that a special task was created for. """ models.AclGroup.check_for_acl_violation_hosts(hosts) shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) if shard_host_map and not utils.is_shard(): raise ValueError('The following hosts are on shards, please ' 'follow the link to the shards and create jobs ' 'there instead. %s.' % shard_host_map) for host in hosts: models.SpecialTask.schedule_special_task(host, task) return list(sorted(host.hostname for host in hosts)) def _forward_special_tasks_on_hosts(task, rpc, **filter_data): """Forward special tasks to corresponding shards. For main, when special tasks are fired on hosts that are sharded, forward the RPC to corresponding shards. For shard, create special task records in local DB. @param task: Enum value of frontend.afe.models.SpecialTask.Task @param rpc: RPC name to forward. @param filter_data: Filter keywords to be used for DB query. @return: A list of hostnames that a special task was created for. """ hosts = models.Host.query_objects(filter_data) shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) # Filter out hosts on a shard from those on the main, forward # rpcs to the shard with an additional hostname__in filter, and # create a local SpecialTask for each remaining host. if shard_host_map and not utils.is_shard(): hosts = [h for h in hosts if h.shard is None] for shard, hostnames in shard_host_map.iteritems(): # The main client of this module is the frontend website, and # it invokes it with an 'id' or an 'id__in' filter. Regardless, # the 'hostname' filter should narrow down the list of hosts on # each shard even though we supply all the ids in filter_data. # This method uses hostname instead of id because it fits better # with the overall architecture of redirection functions in # rpc_utils. shard_filter = filter_data.copy() shard_filter['hostname__in'] = hostnames rpc_utils.run_rpc_on_multiple_hostnames( rpc, [shard], **shard_filter) # There is a race condition here if someone assigns a shard to one of these # hosts before we create the task. The host will stay on the main if: # 1. The host is not Ready # 2. The host is Ready but has a task # But if the host is Ready and doesn't have a task yet, it will get sent # to the shard as we're creating a task here. # Given that we only rarely verify Ready hosts it isn't worth putting this # entire method in a transaction. The worst case scenario is that we have # a verify running on a Ready host while the shard is using it, if the # verify fails no subsequent tasks will be created against the host on the # main, and verifies are safe enough that this is OK. return _call_special_tasks_on_hosts(task, hosts) def reverify_hosts(**filter_data): """\ Schedules a set of hosts for verify. @returns A list of hostnames that a verify task was created for. """ return _forward_special_tasks_on_hosts( models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) def repair_hosts(**filter_data): """\ Schedules a set of hosts for repair. @returns A list of hostnames that a repair task was created for. """ return _forward_special_tasks_on_hosts( models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) def get_jobs(not_yet_run=False, running=False, finished=False, suite=False, sub=False, standalone=False, **filter_data): """\ Extra status filter args for get_jobs: -not_yet_run: Include only jobs that have not yet started running. -running: Include only jobs that have start running but for which not all hosts have completed. -finished: Include only jobs for which all hosts have completed (or aborted). Extra type filter args for get_jobs: -suite: Include only jobs with child jobs. -sub: Include only jobs with a parent job. -standalone: Inlcude only jobs with no child or parent jobs. At most one of these three fields should be specified. """ extra_args = rpc_utils.extra_job_status_filters(not_yet_run, running, finished) filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, suite, sub, standalone) job_dicts = [] jobs = list(models.Job.query_objects(filter_data)) models.Job.objects.populate_relationships(jobs, models.Label, 'dependencies') models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') for job in jobs: job_dict = job.get_object_dict() job_dict['dependencies'] = ','.join(label.name for label in job.dependencies) job_dict['keyvals'] = dict((keyval.key, keyval.value) for keyval in job.keyvals) job_dicts.append(job_dict) return rpc_utils.prepare_for_serialization(job_dicts) def get_num_jobs(not_yet_run=False, running=False, finished=False, suite=False, sub=False, standalone=False, **filter_data): """\ See get_jobs() for documentation of extra filter parameters. """ extra_args = rpc_utils.extra_job_status_filters(not_yet_run, running, finished) filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, suite, sub, standalone) return models.Job.query_count(filter_data) def get_jobs_summary(**filter_data): """\ Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 'status_counts' filed is a dictionary mapping status strings to the number of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 'result_counts' field is piped to tko's rpc_interface and has the return format specified under get_group_counts. """ jobs = get_jobs(**filter_data) ids = [job['id'] for job in jobs] all_status_counts = models.Job.objects.get_status_counts(ids) for job in jobs: job['status_counts'] = all_status_counts[job['id']] job['result_counts'] = tko_rpc_interface.get_status_counts( ['afe_job_id', 'afe_job_id'], header_groups=[['afe_job_id'], ['afe_job_id']], **{'afe_job_id': job['id']}) return rpc_utils.prepare_for_serialization(jobs) def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): """\ Retrieves all the information needed to clone a job. """ job = models.Job.objects.get(id=id) job_info = rpc_utils.get_job_info(job, preserve_metahosts, queue_entry_filter_data) host_dicts = [] for host in job_info['hosts']: host_dict = get_hosts(id=host.id)[0] other_labels = host_dict['labels'] if host_dict['platform']: other_labels.remove(host_dict['platform']) host_dict['other_labels'] = ', '.join(other_labels) host_dicts.append(host_dict) for host in job_info['one_time_hosts']: host_dict = dict(hostname=host.hostname, id=host.id, platform='(one-time host)', locked_text='') host_dicts.append(host_dict) # convert keys from Label objects to strings (names of labels) meta_host_counts = dict((meta_host.name, count) for meta_host, count in job_info['meta_host_counts'].iteritems()) info = dict(job=job.get_object_dict(), meta_host_counts=meta_host_counts, hosts=host_dicts) info['job']['dependencies'] = job_info['dependencies'] info['hostless'] = job_info['hostless'] info['drone_set'] = job.drone_set and job.drone_set.name image = _get_image_for_job(job, job_info['hostless']) if image: info['job']['image'] = image return rpc_utils.prepare_for_serialization(info) def _get_image_for_job(job, hostless): """Gets the image used for a job. Gets the image used for an AFE job from the job's keyvals 'build' or 'builds'. If that fails, and the job is a hostless job, tries to get the image from its control file attributes 'build' or 'builds'. TODO(ntang): Needs to handle FAFT with two builds for ro/rw. @param job An AFE job object. @param hostless Boolean indicating whether the job is hostless. @returns The image build used for the job. """ keyvals = job.keyval_dict() image = keyvals.get('build') if not image: value = keyvals.get('builds') builds = None if isinstance(value, dict): builds = value elif isinstance(value, six.string_types): builds = ast.literal_eval(value) if builds: image = builds.get('cros-version') if not image and hostless and job.control_file: try: control_obj = control_data.parse_control_string( job.control_file) if hasattr(control_obj, 'build'): image = getattr(control_obj, 'build') if not image and hasattr(control_obj, 'builds'): builds = getattr(control_obj, 'builds') image = builds.get('cros-version') except: logging.warning('Failed to parse control file for job: %s', job.name) return image def get_host_queue_entries_by_insert_time( insert_time_after=None, insert_time_before=None, **filter_data): """Like get_host_queue_entries, but using the insert index table. @param insert_time_after: A lower bound on insert_time @param insert_time_before: An upper bound on insert_time @returns A sequence of nested dictionaries of host and job information. """ assert insert_time_after is not None or insert_time_before is not None, \ ('Caller to get_host_queue_entries_by_insert_time must provide either' ' insert_time_after or insert_time_before.') # Get insert bounds on the index of the host queue entries. if insert_time_after: query = models.HostQueueEntryStartTimes.objects.filter( # Note: '-insert_time' means descending. We want the largest # insert time smaller than the insert time. insert_time__lte=insert_time_after).order_by('-insert_time') try: constraint = query[0].highest_hqe_id if 'id__gte' in filter_data: constraint = max(constraint, filter_data['id__gte']) filter_data['id__gte'] = constraint except IndexError: pass # Get end bounds. if insert_time_before: query = models.HostQueueEntryStartTimes.objects.filter( insert_time__gte=insert_time_before).order_by('insert_time') try: constraint = query[0].highest_hqe_id if 'id__lte' in filter_data: constraint = min(constraint, filter_data['id__lte']) filter_data['id__lte'] = constraint except IndexError: pass return rpc_utils.prepare_rows_as_nested_dicts( models.HostQueueEntry.query_objects(filter_data), ('host', 'job')) def get_host_queue_entries(start_time=None, end_time=None, **filter_data): """\ @returns A sequence of nested dictionaries of host and job information. """ filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 'started_on__lte', start_time, end_time, **filter_data) return rpc_utils.prepare_rows_as_nested_dicts( models.HostQueueEntry.query_objects(filter_data), ('host', 'job')) def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): """\ Get the number of host queue entries associated with this job. """ filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 'started_on__lte', start_time, end_time, **filter_data) return models.HostQueueEntry.query_count(filter_data) def get_hqe_percentage_complete(**filter_data): """ Computes the fraction of host queue entries matching the given filter data that are complete. """ query = models.HostQueueEntry.query_objects(filter_data) complete_count = query.filter(complete=True).count() total_count = query.count() if total_count == 0: return 1 return float(complete_count) / total_count # special tasks def get_special_tasks(**filter_data): """Get special task entries from the local database. Query the special tasks table for tasks matching the given `filter_data`, and return a list of the results. No attempt is made to forward the call to shards; the buck will stop here. The caller is expected to know the target shard for such reasons as: * The caller is a service (such as gs_offloader) configured to operate on behalf of one specific shard, and no other. * The caller has a host as a parameter, and knows that this is the shard assigned to that host. @param filter_data Filter keywords to pass to the underlying database query. """ return rpc_utils.prepare_rows_as_nested_dicts( models.SpecialTask.query_objects(filter_data), ('host', 'queue_entry')) def get_host_special_tasks(host_id, **filter_data): """Get special task entries for a given host. Query the special tasks table for tasks that ran on the host given by `host_id` and matching the given `filter_data`. Return a list of the results. If the host is assigned to a shard, forward this call to that shard. @param host_id Id in the database of the target host. @param filter_data Filter keywords to pass to the underlying database query. """ # Retrieve host data even if the host is in an invalid state. host = models.Host.smart_get(host_id, False) if not host.shard: return get_special_tasks(host_id=host_id, **filter_data) else: # The return values from AFE methods are post-processed # objects that aren't JSON-serializable. So, we have to # call AFE.run() to get the raw, serializable output from # the shard. shard_afe = frontend.AFE(server=host.shard.hostname) return shard_afe.run('get_special_tasks', host_id=host_id, **filter_data) def get_num_special_tasks(**kwargs): """Get the number of special task entries from the local database. Query the special tasks table for tasks matching the given 'kwargs', and return the number of the results. No attempt is made to forward the call to shards; the buck will stop here. @param kwargs Filter keywords to pass to the underlying database query. """ return models.SpecialTask.query_count(kwargs) def get_host_num_special_tasks(host, **kwargs): """Get special task entries for a given host. Query the special tasks table for tasks that ran on the host given by 'host' and matching the given 'kwargs'. Return a list of the results. If the host is assigned to a shard, forward this call to that shard. @param host id or name of a host. More often a hostname. @param kwargs Filter keywords to pass to the underlying database query. """ # Retrieve host data even if the host is in an invalid state. host_model = models.Host.smart_get(host, False) if not host_model.shard: return get_num_special_tasks(host=host, **kwargs) else: shard_afe = frontend.AFE(server=host_model.shard.hostname) return shard_afe.run('get_num_special_tasks', host=host, **kwargs) def get_status_task(host_id, end_time): """Get the "status task" for a host from the local shard. Returns a single special task representing the given host's "status task". The status task is a completed special task that identifies whether the corresponding host was working or broken when it completed. A successful task indicates a working host; a failed task indicates broken. This call will not be forward to a shard; the receiving server must be the shard that owns the host. @param host_id Id in the database of the target host. @param end_time Time reference for the host's status. @return A single task; its status (successful or not) corresponds to the status of the host (working or broken) at the given time. If no task is found, return `None`. """ tasklist = rpc_utils.prepare_rows_as_nested_dicts( status_history.get_status_task(host_id, end_time), ('host', 'queue_entry')) return tasklist[0] if tasklist else None def get_host_status_task(host_id, end_time): """Get the "status task" for a host from its owning shard. Finds the given host's owning shard, and forwards to it a call to `get_status_task()` (see above). @param host_id Id in the database of the target host. @param end_time Time reference for the host's status. @return A single task; its status (successful or not) corresponds to the status of the host (working or broken) at the given time. If no task is found, return `None`. """ host = models.Host.smart_get(host_id) if not host.shard: return get_status_task(host_id, end_time) else: # The return values from AFE methods are post-processed # objects that aren't JSON-serializable. So, we have to # call AFE.run() to get the raw, serializable output from # the shard. shard_afe = frontend.AFE(server=host.shard.hostname) return shard_afe.run('get_status_task', host_id=host_id, end_time=end_time) def get_host_diagnosis_interval(host_id, end_time, success): """Find a "diagnosis interval" for a given host. A "diagnosis interval" identifies a start and end time where the host went from "working" to "broken", or vice versa. The interval's starting time is the starting time of the last status task with the old status; the end time is the finish time of the first status task with the new status. This routine finds the most recent diagnosis interval for the given host prior to `end_time`, with a starting status matching `success`. If `success` is true, the interval will start with a successful status task; if false the interval will start with a failed status task. @param host_id Id in the database of the target host. @param end_time Time reference for the diagnosis interval. @param success Whether the diagnosis interval should start with a successful or failed status task. @return A list of two strings. The first is the timestamp for the beginning of the interval; the second is the timestamp for the end. If the host has never changed state, the list is empty. """ host = models.Host.smart_get(host_id) if not host.shard or utils.is_shard(): return status_history.get_diagnosis_interval( host_id, end_time, success) else: shard_afe = frontend.AFE(server=host.shard.hostname) return shard_afe.get_host_diagnosis_interval( host_id, end_time, success) # support for host detail view def get_host_queue_entries_and_special_tasks(host, query_start=None, query_limit=None, start_time=None, end_time=None): """ @returns an interleaved list of HostQueueEntries and SpecialTasks, in approximate run order. each dict contains keys for type, host, job, status, started_on, execution_path, and ID. """ total_limit = None if query_limit is not None: total_limit = query_start + query_limit filter_data_common = {'host': host, 'query_limit': total_limit, 'sort_by': ['-id']} filter_data_special_tasks = rpc_utils.inject_times_to_filter( 'time_started__gte', 'time_started__lte', start_time, end_time, **filter_data_common) queue_entries = get_host_queue_entries( start_time, end_time, **filter_data_common) special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) interleaved_entries = rpc_utils.interleave_entries(queue_entries, special_tasks) if query_start is not None: interleaved_entries = interleaved_entries[query_start:] if query_limit is not None: interleaved_entries = interleaved_entries[:query_limit] return rpc_utils.prepare_host_queue_entries_and_special_tasks( interleaved_entries, queue_entries) def get_num_host_queue_entries_and_special_tasks(host, start_time=None, end_time=None): filter_data_common = {'host': host} filter_data_queue_entries, filter_data_special_tasks = ( rpc_utils.inject_times_to_hqe_special_tasks_filters( filter_data_common, start_time, end_time)) return (models.HostQueueEntry.query_count(filter_data_queue_entries) + get_host_num_special_tasks(**filter_data_special_tasks)) # other def echo(data=""): """\ Returns a passed in string. For doing a basic test to see if RPC calls can successfully be made. """ return data def get_motd(): """\ Returns the message of the day as a string. """ return rpc_utils.get_motd() def get_static_data(): """\ Returns a dictionary containing a bunch of data that shouldn't change often and is otherwise inaccessible. This includes: priorities: List of job priority choices. default_priority: Default priority value for new jobs. users: Sorted list of all users. labels: Sorted list of labels not start with 'cros-version' and 'fw-version'. tests: Sorted list of all tests. profilers: Sorted list of all profilers. current_user: Logged-in username. host_statuses: Sorted list of possible Host statuses. job_statuses: Sorted list of possible HostQueueEntry statuses. job_timeout_default: The default job timeout length in minutes. parse_failed_repair_default: Default value for the parse_failed_repair job option. reboot_before_options: A list of valid RebootBefore string enums. reboot_after_options: A list of valid RebootAfter string enums. motd: Server's message of the day. status_dictionary: A mapping from one word job status names to a more informative description. """ default_drone_set_name = models.DroneSet.default_drone_set_name() drone_sets = ([default_drone_set_name] + sorted(drone_set.name for drone_set in models.DroneSet.objects.exclude( name=default_drone_set_name))) result = {} result['priorities'] = priorities.Priority.choices() result['default_priority'] = 'Default' result['max_schedulable_priority'] = priorities.Priority.DEFAULT result['users'] = get_users(sort_by=['login']) label_exclude_filters = [{'name__startswith': 'cros-version'}, {'name__startswith': 'fw-version'}, {'name__startswith': 'fwrw-version'}, {'name__startswith': 'fwro-version'}] result['labels'] = get_labels( label_exclude_filters, sort_by=['-platform', 'name']) result['tests'] = get_tests(sort_by=['name']) result['profilers'] = get_profilers(sort_by=['name']) result['current_user'] = rpc_utils.prepare_for_serialization( models.User.current_user().get_object_dict()) result['host_statuses'] = sorted(models.Host.Status.names) result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS result['job_max_runtime_mins_default'] = ( models.Job.DEFAULT_MAX_RUNTIME_MINS) result['parse_failed_repair_default'] = bool( models.Job.DEFAULT_PARSE_FAILED_REPAIR) result['reboot_before_options'] = model_attributes.RebootBefore.names result['reboot_after_options'] = model_attributes.RebootAfter.names result['motd'] = rpc_utils.get_motd() result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() result['drone_sets'] = drone_sets result['status_dictionary'] = {"Aborted": "Aborted", "Verifying": "Verifying Host", "Provisioning": "Provisioning Host", "Pending": "Waiting on other hosts", "Running": "Running autoserv", "Completed": "Autoserv completed", "Failed": "Failed to complete", "Queued": "Queued", "Starting": "Next in host's queue", "Stopped": "Other host(s) failed verify", "Parsing": "Awaiting parse of final results", "Gathering": "Gathering log files", "Waiting": "Waiting for scheduler action", "Archiving": "Archiving results", "Resetting": "Resetting hosts"} result['wmatrix_url'] = rpc_utils.get_wmatrix_url() result['stainless_url'] = rpc_utils.get_stainless_url() result['is_moblab'] = bool(utils.is_moblab()) return result def get_server_time(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") def ping_db(): """Simple connection test to db""" try: db_connection.cursor() except DatabaseError: return [False] return [True] def get_hosts_by_attribute(attribute, value): """ Get the list of valid hosts that share the same host attribute value. @param attribute: String of the host attribute to check. @param value: String of the value that is shared between hosts. @returns List of hostnames that all have the same host attribute and value. """ rows = models.HostAttribute.query_objects({'attribute': attribute, 'value': value}) if RESPECT_STATIC_ATTRIBUTES: returned_hosts = set() # Add hosts: # * Non-valid # * Exist in afe_host_attribute with given attribute. # * Don't exist in afe_static_host_attribute OR exist in # afe_static_host_attribute with the same given value. for row in rows: if row.host.invalid != 0: continue static_hosts = models.StaticHostAttribute.query_objects( {'host_id': row.host.id, 'attribute': attribute}) values = [static_host.value for static_host in static_hosts] if len(values) == 0 or values[0] == value: returned_hosts.add(row.host.hostname) # Add hosts: # * Non-valid # * Exist in afe_static_host_attribute with given attribute # and value # * No need to check whether each static attribute has its # corresponding entry in afe_host_attribute since it is ensured # in inventory sync. static_rows = models.StaticHostAttribute.query_objects( {'attribute': attribute, 'value': value}) for row in static_rows: if row.host.invalid != 0: continue returned_hosts.add(row.host.hostname) return list(returned_hosts) else: return [row.host.hostname for row in rows if row.host.invalid == 0] def _get_control_file_by_suite(suite_name): """Get control file contents by suite name. @param suite_name: Suite name as string. @returns: Control file contents as string. """ getter = control_file_getter.FileSystemGetter( [_CONFIG.get_config_value('SCHEDULER', 'drone_installation_directory')]) return getter.get_control_file_contents_by_name(suite_name) @rpc_utils.route_rpc_to_main def create_suite_job( name='', board='', pool='', child_dependencies=(), control_file='', check_hosts=True, num=None, file_bugs=False, timeout=24, timeout_mins=None, priority=priorities.Priority.DEFAULT, suite_args=None, wait_for_results=True, job_retry=False, max_retries=None, max_runtime_mins=None, suite_min_duts=0, offload_failures_only=False, builds=None, test_source_build=None, run_prod_code=False, delay_minutes=0, is_cloning=False, job_keyvals=None, test_args=None, **kwargs): """ Create a job to run a test suite on the given device with the given image. When the timeout specified in the control file is reached, the job is guaranteed to have completed and results will be available. @param name: The test name if control_file is supplied, otherwise the name of the test suite to run, e.g. 'bvt'. @param board: the kind of device to run the tests on. @param builds: the builds to install e.g. {'cros-version:': 'x86-alex-release/R18-1655.0.0', 'fwrw-version:': 'x86-alex-firmware/R36-5771.50.0', 'fwro-version:': 'x86-alex-firmware/R36-5771.49.0'} If builds is given a value, it overrides argument build. @param test_source_build: Build that contains the server-side test code. @param pool: Specify the pool of machines to use for scheduling purposes. @param child_dependencies: (optional) list of additional dependency labels (strings) that will be added as dependency labels to child jobs. @param control_file: the control file of the job. @param check_hosts: require appropriate live hosts to exist in the lab. @param num: Specify the number of machines to schedule across (integer). Leave unspecified or use None to use default sharding factor. @param file_bugs: File a bug on each test failure in this suite. @param timeout: The max lifetime of this suite, in hours. @param timeout_mins: The max lifetime of this suite, in minutes. Takes priority over timeout. @param priority: Integer denoting priority. Higher is more important. @param suite_args: Optional arguments which will be parsed by the suite control file. Used by control.test_that_wrapper to determine which tests to run. @param wait_for_results: Set to False to run the suite job without waiting for test jobs to finish. Default is True. @param job_retry: Set to True to enable job-level retry. Default is False. @param max_retries: Integer, maximum job retries allowed at suite level. None for no max. @param max_runtime_mins: Maximum amount of time a job can be running in minutes. @param suite_min_duts: Integer. Scheduler will prioritize getting the minimum number of machines for the suite when it is competing with another suite that has a higher priority but already got minimum machines it needs. @param offload_failures_only: Only enable gs_offloading for failed jobs. @param run_prod_code: If True, the suite will run the test code that lives in prod aka the test code currently on the lab servers. If False, the control files and test code for this suite run will be retrieved from the build artifacts. @param delay_minutes: Delay the creation of test jobs for a given number of minutes. @param is_cloning: True if creating a cloning job. @param job_keyvals: A dict of job keyvals to be inject to control file. @param test_args: A dict of args passed all the way to each individual test that will be actually run. @param kwargs: extra keyword args. NOT USED. @raises ControlFileNotFound: if a unique suite control file doesn't exist. @raises NoControlFileList: if we can't list the control files at all. @raises StageControlFileFailure: If the dev server throws 500 while staging test_suites. @raises ControlFileEmpty: if the control file exists on the server, but can't be read. @return: the job ID of the suite; -1 on error. """ if num is not None: warnings.warn('num is deprecated for create_suite_job') del num if builds is None: builds = {} # Default test source build to CrOS build if it's not specified and # run_prod_code is set to False. if not run_prod_code: test_source_build = Suite.get_test_source_build( builds, test_source_build=test_source_build) sample_dut = rpc_utils.get_sample_dut(board, pool) suite_name = suite_common.canonicalize_suite_name(name) if run_prod_code: ds = dev_server.resolve(test_source_build, hostname=sample_dut) keyvals = {} else: ds, keyvals = suite_common.stage_build_artifacts( test_source_build, hostname=sample_dut) keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts # Do not change this naming convention without updating # site_utils.parse_job_name. if run_prod_code: # If run_prod_code is True, test_source_build is not set, use the # first build in the builds list for the sutie job name. name = '%s-%s' % (builds.values()[0], suite_name) else: name = '%s-%s' % (test_source_build, suite_name) timeout_mins = timeout_mins or timeout * 60 max_runtime_mins = max_runtime_mins or timeout * 60 if not board: board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0] if run_prod_code: control_file = _get_control_file_by_suite(suite_name) if not control_file: # No control file was supplied so look it up from the build artifacts. control_file = suite_common.get_control_file_by_build( test_source_build, ds, suite_name) # Prepend builds and board to the control file. if is_cloning: control_file = tools.remove_injection(control_file) if suite_args is None: suite_args = dict() inject_dict = { 'board': board, # `build` is needed for suites like AU to stage image inside suite # control file. 'build': test_source_build, 'builds': builds, 'check_hosts': check_hosts, 'pool': pool, 'child_dependencies': child_dependencies, 'file_bugs': file_bugs, 'timeout': timeout, 'timeout_mins': timeout_mins, 'devserver_url': ds.url(), 'priority': priority, 'wait_for_results': wait_for_results, 'job_retry': job_retry, 'max_retries': max_retries, 'max_runtime_mins': max_runtime_mins, 'offload_failures_only': offload_failures_only, 'test_source_build': test_source_build, 'run_prod_code': run_prod_code, 'delay_minutes': delay_minutes, 'job_keyvals': job_keyvals, 'test_args': test_args, } inject_dict.update(suite_args) control_file = tools.inject_vars(inject_dict, control_file) return rpc_utils.create_job_common(name, priority=priority, timeout_mins=timeout_mins, max_runtime_mins=max_runtime_mins, control_type='Server', control_file=control_file, hostless=True, keyvals=keyvals) def get_job_history(**filter_data): """Get history of the job, including the special tasks executed for the job @param filter_data: filter for the call, should at least include {'job_id': [job id]} @returns: JSON string of the job's history, including the information such as the hosts run the job and the special tasks executed before and after the job. """ job_id = filter_data['job_id'] job_info = job_history.get_job_info(job_id) return rpc_utils.prepare_for_serialization(job_info.get_history()) def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(), known_host_ids=(), known_host_statuses=()): """Receive updates for job statuses from shards and assign hosts and jobs. @param shard_hostname: Hostname of the calling shard @param jobs: Jobs in serialized form that should be updated with newer status from a shard. @param hqes: Hostqueueentries in serialized form that should be updated with newer status from a shard. Note that for every hostqueueentry the corresponding job must be in jobs. @param known_job_ids: List of ids of jobs the shard already has. @param known_host_ids: List of ids of hosts the shard already has. @param known_host_statuses: List of statuses of hosts the shard already has. @returns: Serialized representations of hosts, jobs, suite job keyvals and their dependencies to be inserted into a shard's database. """ # The following alternatives to sending host and job ids in every heartbeat # have been considered: # 1. Sending the highest known job and host ids. This would work for jobs: # Newer jobs always have larger ids. Also, if a job is not assigned to a # particular shard during a heartbeat, it never will be assigned to this # shard later. # This is not true for hosts though: A host that is leased won't be sent # to the shard now, but might be sent in a future heartbeat. This means # sometimes hosts should be transfered that have a lower id than the # maximum host id the shard knows. # 2. Send the number of jobs/hosts the shard knows to the main in each # heartbeat. Compare these to the number of records that already have # the shard_id set to this shard. In the normal case, they should match. # In case they don't, resend all entities of that type. # This would work well for hosts, because there aren't that many. # Resending all jobs is quite a big overhead though. # Also, this approach might run into edge cases when entities are # ever deleted. # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts. # Using two different approaches isn't consistent and might cause # confusion. Also the issues with the case of deletions might still # occur. # # The overhead of sending all job and host ids in every heartbeat is low: # At peaks one board has about 1200 created but unfinished jobs. # See the numbers here: http://goo.gl/gQCGWH # Assuming that job id's have 6 digits and that json serialization takes a # comma and a space as overhead, the traffic per id sent is about 8 bytes. # If 5000 ids need to be sent, this means 40 kilobytes of traffic. # A NOT IN query with 5000 ids took about 30ms in tests made. # These numbers seem low enough to outweigh the disadvantages of the # solutions described above. shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname) rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes) assert len(known_host_ids) == len(known_host_statuses) for i in range(len(known_host_ids)): host_model = models.Host.objects.get(pk=known_host_ids[i]) if host_model.status != known_host_statuses[i]: host_model.status = known_host_statuses[i] host_model.save() hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard( shard_obj, known_job_ids=known_job_ids, known_host_ids=known_host_ids) return { 'hosts': [host.serialize() for host in hosts], 'jobs': [job.serialize() for job in jobs], 'suite_keyvals': [kv.serialize() for kv in suite_keyvals], 'incorrect_host_ids': [int(i) for i in inc_ids], } def get_shards(**filter_data): """Return a list of all shards. @returns A sequence of nested dictionaries of shard information. """ shards = models.Shard.query_objects(filter_data) serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ()) for serialized, shard in zip(serialized_shards, shards): serialized['labels'] = [label.name for label in shard.labels.all()] return serialized_shards def _assign_board_to_shard_precheck(labels): """Verify whether board labels are valid to be added to a given shard. First check whether board label is in correct format. Second, check whether the board label exist. Third, check whether the board has already been assigned to shard. @param labels: Board labels separated by comma. @raises error.RPCException: If label provided doesn't start with `board:` or board has been added to shard already. @raises models.Label.DoesNotExist: If the label specified doesn't exist. @returns: A list of label models that ready to be added to shard. """ if not labels: # allow creation of label-less shards (labels='' would otherwise fail the # checks below) return [] labels = labels.split(',') label_models = [] for label in labels: # Check whether the board label is in correct format. if not label.startswith('board:'): raise error.RPCException('Sharding only supports `board:.*` label.') # Check whether the board label exist. If not, exception will be thrown # by smart_get function. label = models.Label.smart_get(label) # Check whether the board has been sharded already try: shard = models.Shard.objects.get(labels=label) raise error.RPCException( '%s is already on shard %s' % (label, shard.hostname)) except models.Shard.DoesNotExist: # board is not on any shard, so it's valid. label_models.append(label) return label_models def add_shard(hostname, labels): """Add a shard and start running jobs on it. @param hostname: The hostname of the shard to be added; needs to be unique. @param labels: Board labels separated by comma. Jobs of one of the labels will be assigned to the shard. @raises error.RPCException: If label provided doesn't start with `board:` or board has been added to shard already. @raises model_logic.ValidationError: If a shard with the given hostname already exist. @raises models.Label.DoesNotExist: If the label specified doesn't exist. @returns: The id of the added shard. """ labels = _assign_board_to_shard_precheck(labels) shard = models.Shard.add_object(hostname=hostname) for label in labels: shard.labels.add(label) return shard.id def add_board_to_shard(hostname, labels): """Add boards to a given shard @param hostname: The hostname of the shard to be changed. @param labels: Board labels separated by comma. @raises error.RPCException: If label provided doesn't start with `board:` or board has been added to shard already. @raises models.Label.DoesNotExist: If the label specified doesn't exist. @returns: The id of the changed shard. """ labels = _assign_board_to_shard_precheck(labels) shard = models.Shard.objects.get(hostname=hostname) for label in labels: shard.labels.add(label) return shard.id # Remove board RPCs are rare, so we can afford to make them a bit more # expensive (by performing in a transaction) in order to guarantee # atomicity. @transaction.commit_on_success def remove_board_from_shard(hostname, label): """Remove board from the given shard. @param hostname: The hostname of the shard to be changed. @param labels: Board label. @raises models.Label.DoesNotExist: If the label specified doesn't exist. @returns: The id of the changed shard. """ shard = models.Shard.objects.get(hostname=hostname) label = models.Label.smart_get(label) if label not in shard.labels.all(): raise error.RPCException( 'Cannot remove label from shard that does not belong to it.') shard.labels.remove(label) if label.is_replaced_by_static(): static_label = models.StaticLabel.smart_get(label.name) models.Host.objects.filter( static_labels__in=[static_label]).update(shard=None) else: models.Host.objects.filter(labels__in=[label]).update(shard=None) def delete_shard(hostname): """Delete a shard and reclaim all resources from it. This claims back all assigned hosts from the shard. """ shard = rpc_utils.retrieve_shard(shard_hostname=hostname) # Remove shard information. models.Host.objects.filter(shard=shard).update(shard=None) # Note: The original job-cleanup query was performed with django call # models.Job.objects.filter(shard=shard).update(shard=None) # # But that started becoming unreliable due to the large size of afe_jobs. # # We don't need atomicity here, so the new cleanup method is iterative, in # chunks of 100k jobs. QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s ' 'LIMIT 100000') try: with contextlib.closing(db_connection.cursor()) as cursor: clear_jobs = True assert shard.id is not None while clear_jobs: cursor.execute(QUERY % shard.id) clear_jobs = bool(cursor.fetchone()) # Unit tests use sqlite backend instead of MySQL. sqlite does not support # UPDATE ... LIMIT, so fall back to the old behavior. except DatabaseError as e: if 'syntax error' in str(e): models.Job.objects.filter(shard=shard).update(shard=None) else: raise shard.labels.clear() shard.delete() def get_servers(hostname=None, role=None, status=None): """Get a list of servers with matching role and status. @param hostname: FQDN of the server. @param role: Name of the server role, e.g., drone, scheduler. Default to None to match any role. @param status: Status of the server, e.g., primary, backup, repair_required. Default to None to match any server status. @raises error.RPCException: If server database is not used. @return: A list of server names for servers with matching role and status. """ raise DeprecationWarning("server_manager_utils has been removed.") @rpc_utils.route_rpc_to_main def get_stable_version(board=stable_version_utils.DEFAULT, android=False): """Get stable version for the given board. @param board: Name of the board. @param android: Unused legacy parameter. This is maintained for the sake of clients on old branches that still pass the parameter. TODO(jrbarnette) Remove this completely once R68 drops off stable. @return: Stable version of the given board. Return global configure value of CROS.stable_cros_version if stable_versinos table does not have entry of board DEFAULT. """ assert not android, 'get_stable_version no longer supports `android`.' return stable_version_utils.get(board=board) @rpc_utils.route_rpc_to_main def get_all_stable_versions(): """Get stable versions for all boards. @return: A dictionary of board:version. """ return stable_version_utils.get_all() @rpc_utils.route_rpc_to_main def set_stable_version(version, board=stable_version_utils.DEFAULT): """Modify stable version for the given board. @param version: The new value of stable version for given board. @param board: Name of the board, default to value `DEFAULT`. """ logging.warning("rpc_interface::set_stable_version: attempted to set stable version. setting the stable version is not permitted") return None @rpc_utils.route_rpc_to_main def delete_stable_version(board): """Modify stable version for the given board. Delete a stable version entry in afe_stable_versions table for a given board, so default stable version will be used. @param board: Name of the board. """ stable_version_utils.delete(board=board) def get_tests_by_build(build, ignore_invalid_tests=True): """Get the tests that are available for the specified build. @param build: unique name by which to refer to the image. @param ignore_invalid_tests: flag on if unparsable tests are ignored. @return: A sorted list of all tests that are in the build specified. """ # Collect the control files specified in this build cfile_getter = control_file_lib._initialize_control_file_getter(build) if suite_common.ENABLE_CONTROLS_IN_BATCH: control_file_info_list = cfile_getter.get_suite_info() control_file_list = control_file_info_list.keys() else: control_file_list = cfile_getter.get_control_file_list() test_objects = [] _id = 0 for control_file_path in control_file_list: # Read and parse the control file if suite_common.ENABLE_CONTROLS_IN_BATCH: control_file = control_file_info_list[control_file_path] else: control_file = cfile_getter.get_control_file_contents( control_file_path) try: control_obj = control_data.parse_control_string(control_file) except: logging.info('Failed to parse control file: %s', control_file_path) if not ignore_invalid_tests: raise # Extract the values needed for the AFE from the control_obj. # The keys list represents attributes in the control_obj that # are required by the AFE keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental', 'test_category', 'test_class', 'dependencies', 'run_verify', 'sync_count', 'job_retries', 'path'] test_object = {} for key in keys: test_object[key] = getattr(control_obj, key) if hasattr( control_obj, key) else '' # Unfortunately, the AFE expects different key-names for certain # values, these must be corrected to avoid the risk of tests # being omitted by the AFE. # The 'id' is an additional value used in the AFE. # The control_data parsing does not reference 'run_reset', but it # is also used in the AFE and defaults to True. test_object['id'] = _id test_object['run_reset'] = True test_object['description'] = test_object.get('doc', '') test_object['test_time'] = test_object.get('time', 0) # TODO(crbug.com/873716) DEPRECATED. Remove entirely. test_object['test_retry'] = 0 # Fix the test name to be consistent with the current presentation # of test names in the AFE. testpath, subname = os.path.split(control_file_path) testname = os.path.basename(testpath) subname = subname.split('.')[1:] if subname: testname = '%s:%s' % (testname, ':'.join(subname)) test_object['name'] = testname # Correct the test path as parse_control_string sets an empty string. test_object['path'] = control_file_path _id += 1 test_objects.append(test_object) test_objects = sorted(test_objects, key=lambda x: x.get('name')) return rpc_utils.prepare_for_serialization(test_objects) @rpc_utils.route_rpc_to_main def get_lab_health_indicators(board=None): """Get the healthy indicators for whole lab. The indicators now includes: 1. lab is closed or not. 2. Available DUTs list for a given board. 3. Devserver capacity. 4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes). @param board: if board is specified, a list of available DUTs will be returned for it. Otherwise, skip this indicator. @returns: A healthy indicator object including health info. """ return LabHealthIndicator(None, None, None, None)