#!/usr/bin/python #pylint: disable-msg=C0111 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import collections import unittest import common from autotest_lib.client.common_lib import host_queue_entry_states from autotest_lib.frontend import setup_django_environment from autotest_lib.frontend.afe import frontend_test_utils from autotest_lib.frontend.afe import models from autotest_lib.frontend.afe import rdb_model_extensions from autotest_lib.scheduler import rdb from autotest_lib.scheduler import rdb_hosts from autotest_lib.scheduler import rdb_lib from autotest_lib.scheduler import rdb_requests from autotest_lib.scheduler import rdb_testing_utils from autotest_lib.server.cros import provision class AssignmentValidator(object): """Utility class to check that priority inversion doesn't happen. """ @staticmethod def check_acls_deps(host, request): """Check if a host and request match by comparing acls and deps. @param host: A dictionary representing attributes of the host. @param request: A request, as defined in rdb_requests. @return True if the deps/acls of the request match the host. """ # Unfortunately the hosts labels are labelnames, not ids. request_deps = set([l.name for l in models.Label.objects.filter(id__in=request.deps)]) return (set(host['labels']).intersection(request_deps) == request_deps and set(host['acls']).intersection(request.acls)) @staticmethod def find_matching_host_for_request(hosts, request): """Find a host from the given list of hosts, matching the request. @param hosts: A list of dictionaries representing host attributes. @param requetst: The unsatisfied request. @return: A host, if a matching host is found from the input list. """ if not hosts or not request: return None for host in hosts: if AssignmentValidator.check_acls_deps(host, request): return host @staticmethod def sort_requests(requests): """Sort the requests by priority. @param requests: Unordered requests. @return: A list of requests ordered by priority. """ return sorted(collections.Counter(requests).items(), key=lambda request: request[0].priority, reverse=True) @staticmethod def verify_priority(request_queue, result): requests = AssignmentValidator.sort_requests(request_queue) for request, count in requests: hosts = result.get(request) # The request was completely satisfied. if hosts and len(hosts) == count: continue # Go through all hosts given to lower priority requests and # make sure we couldn't have allocated one of them for this # unsatisfied higher priority request. lower_requests = requests[requests.index((request,count))+1:] for lower_request, count in lower_requests: if (lower_request.priority < request.priority and AssignmentValidator.find_matching_host_for_request( result.get(lower_request), request)): raise ValueError('Priority inversion occured between ' 'priorities %s and %s' % (request.priority, lower_request.priority)) @staticmethod def priority_checking_response_handler(request_manager): """Fake response handler wrapper for any request_manager. Check that higher priority requests get a response over lower priority requests, by re-validating all the hosts assigned to a lower priority request against the unsatisfied higher priority ones. @param request_manager: A request_manager as defined in rdb_lib. @raises ValueError: If priority inversion is detected. """ # Fist call the rdb to make its decisions, then sort the requests # by priority and make sure unsatisfied requests higher up in the list # could not have been satisfied by hosts assigned to requests lower # down in the list. result = request_manager.api_call(request_manager.request_queue) if not result: raise ValueError('Expected results but got none.') AssignmentValidator.verify_priority( request_manager.request_queue, result) for hosts in result.values(): for host in hosts: yield host class BaseRDBTest(rdb_testing_utils.AbstractBaseRDBTester, unittest.TestCase): _config_section = 'AUTOTEST_WEB' def testAcquireLeasedHostBasic(self): """Test that acquisition of a leased host doesn't happen. @raises AssertionError: If the one host that satisfies the request is acquired. """ job = self.create_job(deps=set(['a'])) host = self.db_helper.create_host('h1', deps=set(['a'])) host.leased = 1 host.save() queue_entries = self._dispatcher._refresh_pending_queue_entries() hosts = list(rdb_lib.acquire_hosts(queue_entries)) self.assertTrue(len(hosts) == 1 and hosts[0] is None) def testAcquireLeasedHostRace(self): """Test behaviour when hosts are leased just before acquisition. If a fraction of the hosts somehow get leased between finding and acquisition, the rdb should just return the remaining hosts for the request to use. @raises AssertionError: If both the requests get a host successfully, since one host gets leased before the final attempt to lease both. """ j1 = self.create_job(deps=set(['a'])) j2 = self.create_job(deps=set(['a'])) hosts = [self.db_helper.create_host('h1', deps=set(['a'])), self.db_helper.create_host('h2', deps=set(['a']))] @rdb_hosts.return_rdb_host def local_find_hosts(host_query_manger, deps, acls): """Return a predetermined list of hosts, one of which is leased.""" h1 = models.Host.objects.get(hostname='h1') h1.leased = 1 h1.save() h2 = models.Host.objects.get(hostname='h2') return [h1, h2] self.god.stub_with(rdb.AvailableHostQueryManager, 'find_hosts', local_find_hosts) queue_entries = self._dispatcher._refresh_pending_queue_entries() hosts = list(rdb_lib.acquire_hosts(queue_entries)) self.assertTrue(len(hosts) == 2 and None in hosts) self.check_hosts(iter(hosts)) def testHostReleaseStates(self): """Test that we will only release an unused host if it is in Ready. @raises AssertionError: If the host gets released in any other state. """ host = self.db_helper.create_host('h1', deps=set(['x'])) for state in rdb_model_extensions.AbstractHostModel.Status.names: host.status = state host.leased = 1 host.save() self._release_unused_hosts() host = models.Host.objects.get(hostname='h1') self.assertTrue(host.leased == (state != 'Ready')) def testHostReleseHQE(self): """Test that we will not release a ready host if it's being used. @raises AssertionError: If the host is released even though it has been assigned to an active hqe. """ # Create a host and lease it out in Ready. host = self.db_helper.create_host('h1', deps=set(['x'])) host.status = 'Ready' host.leased = 1 host.save() # Create a job and give its hqe the leased host. job = self.create_job(deps=set(['x'])) self.db_helper.add_host_to_job(host, job.id) hqe = models.HostQueueEntry.objects.get(job_id=job.id) # Activate the hqe by setting its state. hqe.status = host_queue_entry_states.ACTIVE_STATUSES[0] hqe.save() # Make sure the hqes host isn't released, even if its in ready. self._release_unused_hosts() host = models.Host.objects.get(hostname='h1') self.assertTrue(host.leased == 1) def testBasicDepsAcls(self): """Test a basic deps/acls request. Make sure that a basic request with deps and acls, finds a host from the ready pool that has matching labels and is in a matching aclgroups. @raises AssertionError: If the request doesn't find a host, since the we insert a matching host in the ready pool. """ deps = set(['a', 'b']) acls = set(['a', 'b']) self.db_helper.create_host('h1', deps=deps, acls=acls) job = self.create_job(user='autotest_system', deps=deps, acls=acls) queue_entries = self._dispatcher._refresh_pending_queue_entries() matching_host = rdb_lib.acquire_hosts(queue_entries).next() self.check_host_assignment(job.id, matching_host.id) self.assertTrue(matching_host.leased == 1) def testPreferredDeps(self): """Test that perferred deps is respected. If multiple hosts satisfied a job's deps, the one with preferred label will be assigned to the job. @raises AssertionError: If a host without a preferred label is assigned to the job instead of one with a preferred label. """ lumpy_deps = set(['board:lumpy']) stumpy_deps = set(['board:stumpy']) stumpy_deps_with_crosversion = set( ['board:stumpy', 'cros-version:lumpy-release/R41-6323.0.0']) acls = set(['a', 'b']) # Hosts lumpy1 and lumpy2 are created as a control group, # which ensures that if no preferred label is used, the host # with a smaller id will be chosen first. We need to make sure # stumpy2 was chosen because it has a cros-version label, but not # because of other randomness. self.db_helper.create_host('lumpy1', deps=lumpy_deps, acls=acls) self.db_helper.create_host('lumpy2', deps=lumpy_deps, acls=acls) self.db_helper.create_host('stumpy1', deps=stumpy_deps, acls=acls) self.db_helper.create_host( 'stumpy2', deps=stumpy_deps_with_crosversion , acls=acls) job_1 = self.create_job(user='autotest_system', deps=lumpy_deps, acls=acls) job_2 = self.create_job(user='autotest_system', deps=stumpy_deps_with_crosversion, acls=acls) queue_entries = self._dispatcher._refresh_pending_queue_entries() matching_hosts = list(rdb_lib.acquire_hosts(queue_entries)) assignment = {} import logging for job, host in zip(queue_entries, matching_hosts): self.check_host_assignment(job.id, host.id) assignment[job.id] = host.hostname self.assertEqual(assignment[job_1.id], 'lumpy1') self.assertEqual(assignment[job_2.id], 'stumpy2') def testBadDeps(self): """Test that we find no hosts when only acls match. @raises AssertionError: If the request finds a host, since the only host in the ready pool will not have matching deps. """ host_labels = set(['a']) job_deps = set(['b']) acls = set(['a', 'b']) self.db_helper.create_host('h1', deps=host_labels, acls=acls) job = self.create_job(user='autotest_system', deps=job_deps, acls=acls) queue_entries = self._dispatcher._refresh_pending_queue_entries() matching_host = rdb_lib.acquire_hosts(queue_entries).next() self.assert_(not matching_host) def testBadAcls(self): """Test that we find no hosts when only deps match. @raises AssertionError: If the request finds a host, since the only host in the ready pool will not have matching acls. """ deps = set(['a']) host_acls = set(['a']) job_acls = set(['b']) self.db_helper.create_host('h1', deps=deps, acls=host_acls) # Create the job as a new user who is only in the 'b' and 'Everyone' # aclgroups. Though there are several hosts in the Everyone group, the # 1 host that has the 'a' dep isn't. job = self.create_job(user='new_user', deps=deps, acls=job_acls) queue_entries = self._dispatcher._refresh_pending_queue_entries() matching_host = rdb_lib.acquire_hosts(queue_entries).next() self.assert_(not matching_host) def testBasicPriority(self): """Test that priority inversion doesn't happen. Schedule 2 jobs with the same deps, acls and user, but different priorities, and confirm that the higher priority request gets the host. This confirmation happens through the AssignmentValidator. @raises AssertionError: If the un important request gets host h1 instead of the important request. """ deps = set(['a', 'b']) acls = set(['a', 'b']) self.db_helper.create_host('h1', deps=deps, acls=acls) important_job = self.create_job(user='autotest_system', deps=deps, acls=acls, priority=2) un_important_job = self.create_job(user='autotest_system', deps=deps, acls=acls, priority=0) queue_entries = self._dispatcher._refresh_pending_queue_entries() self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response', AssignmentValidator.priority_checking_response_handler) self.check_hosts(rdb_lib.acquire_hosts(queue_entries)) def testPriorityLevels(self): """Test that priority inversion doesn't happen. Increases a job's priority and makes several requests for hosts, checking that priority inversion doesn't happen. @raises AssertionError: If the unimportant job gets h1 while it is still unimportant, or doesn't get h1 while after it becomes the most important job. """ deps = set(['a', 'b']) acls = set(['a', 'b']) self.db_helper.create_host('h1', deps=deps, acls=acls) # Create jobs that will bucket differently and confirm that jobs in an # earlier bucket get a host. first_job = self.create_job(user='autotest_system', deps=deps, acls=acls) important_job = self.create_job(user='autotest_system', deps=deps, acls=acls, priority=2) deps.pop() unimportant_job = self.create_job(user='someother_system', deps=deps, acls=acls, priority=1) queue_entries = self._dispatcher._refresh_pending_queue_entries() self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response', AssignmentValidator.priority_checking_response_handler) self.check_hosts(rdb_lib.acquire_hosts(queue_entries)) # Elevate the priority of the unimportant job, so we now have # 2 jobs at the same priority. self.db_helper.increment_priority(job_id=unimportant_job.id) queue_entries = self._dispatcher._refresh_pending_queue_entries() self._release_unused_hosts() self.check_hosts(rdb_lib.acquire_hosts(queue_entries)) # Prioritize the first job, and confirm that it gets the host over the # jobs that got it the last time. self.db_helper.increment_priority(job_id=unimportant_job.id) queue_entries = self._dispatcher._refresh_pending_queue_entries() self._release_unused_hosts() self.check_hosts(rdb_lib.acquire_hosts(queue_entries)) def testFrontendJobScheduling(self): """Test that basic frontend job scheduling. @raises AssertionError: If the received and requested host don't match, or the mis-matching host is returned instead. """ deps = set(['x', 'y']) acls = set(['a', 'b']) # Create 2 frontend jobs and only one matching host. matching_job = self.create_job(acls=acls, deps=deps) matching_host = self.db_helper.create_host('h1', acls=acls, deps=deps) mis_matching_job = self.create_job(acls=acls, deps=deps) mis_matching_host = self.db_helper.create_host( 'h2', acls=acls, deps=deps.pop()) self.db_helper.add_host_to_job(matching_host, matching_job.id) self.db_helper.add_host_to_job(mis_matching_host, mis_matching_job.id) # Check that only the matching host is returned, and that we get 'None' # for the second request. queue_entries = self._dispatcher._refresh_pending_queue_entries() hosts = list(rdb_lib.acquire_hosts(queue_entries)) self.assertTrue(len(hosts) == 2 and None in hosts) returned_host = [host for host in hosts if host].pop() self.assertTrue(matching_host.id == returned_host.id) def testFrontendJobPriority(self): """Test that frontend job scheduling doesn't ignore priorities. @raises ValueError: If the priorities of frontend jobs are ignored. """ board = 'x' high_priority = self.create_job(priority=2, deps=set([board])) low_priority = self.create_job(priority=1, deps=set([board])) host = self.db_helper.create_host('h1', deps=set([board])) self.db_helper.add_host_to_job(host, low_priority.id) self.db_helper.add_host_to_job(host, high_priority.id) queue_entries = self._dispatcher._refresh_pending_queue_entries() def local_response_handler(request_manager): """Confirms that a higher priority frontend job gets a host. @raises ValueError: If priority inversion happens and the job with priority 1 gets the host instead. """ result = request_manager.api_call(request_manager.request_queue) if not result: raise ValueError('Excepted the high priority request to ' 'get a host, but the result is empty.') for request, hosts in result.iteritems(): if request.priority == 1: raise ValueError('Priority of frontend job ignored.') if len(hosts) > 1: raise ValueError('Multiple hosts returned against one ' 'frontend job scheduling request.') yield hosts[0] self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response', local_response_handler) self.check_hosts(rdb_lib.acquire_hosts(queue_entries)) def testSuiteOrderedHostAcquisition(self): """Test that older suite jobs acquire hosts first. Make sure older suite jobs get hosts first, but not at the expense of higher priority jobs. @raises ValueError: If unexpected acquisitions occur, eg: suite_job_2 acquires the last 2 hosts instead of suite_job_1. isolated_important_job doesn't get any hosts. Any job acquires more hosts than necessary. """ board = 'x' # Create 2 suites such that the later suite has an ordering of deps # that places it ahead of the earlier suite, if parent_job_id is # ignored. suite_without_dep = self.create_suite(num=2, priority=0, board=board) suite_with_dep = self.create_suite(num=1, priority=0, board=board) self.db_helper.add_deps_to_job(suite_with_dep[0], dep_names=list('y')) # Create an important job that should be ahead of the first suite, # because priority trumps parent_job_id and time of creation. isolated_important_job = self.create_job(priority=3, deps=set([board])) # Create 3 hosts, all with the deps to satisfy the last suite. for i in range(0, 3): self.db_helper.create_host('h%s' % i, deps=set([board, 'y'])) queue_entries = self._dispatcher._refresh_pending_queue_entries() def local_response_handler(request_manager): """Reorder requests and check host acquisition. @raises ValueError: If unexpected/no acquisitions occur. """ if any([request for request in request_manager.request_queue if request.parent_job_id is None]): raise ValueError('Parent_job_id can never be None.') # This will result in the ordering: # [suite_2_1, suite_1_*, suite_1_*, isolated_important_job] # The priority scheduling order should be: # [isolated_important_job, suite_1_*, suite_1_*, suite_2_1] # Since: # a. the isolated_important_job is the most important. # b. suite_1 was created before suite_2, regardless of deps disorderly_queue = sorted(request_manager.request_queue, key=lambda r: -r.parent_job_id) request_manager.request_queue = disorderly_queue result = request_manager.api_call(request_manager.request_queue) if not result: raise ValueError('Expected results but got none.') # Verify that the isolated_important_job got a host, and that the # first suite got both remaining free hosts. for request, hosts in result.iteritems(): if request.parent_job_id == 0: if len(hosts) > 1: raise ValueError('First job acquired more hosts than ' 'necessary. Response map: %s' % result) continue if request.parent_job_id == 1: if len(hosts) < 2: raise ValueError('First suite job requests were not ' 'satisfied. Response_map: %s' % result) continue # The second suite job got hosts instead of one of # the others. Eitherway this is a failure. raise ValueError('Unexpected host acquisition ' 'Response map: %s' % result) yield None self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response', local_response_handler) list(rdb_lib.acquire_hosts(queue_entries)) def testConfigurations(self): """Test that configurations don't matter. @raises AssertionError: If the request doesn't find a host, this will happen if configurations are not stripped out. """ self.god.stub_with(provision.Cleanup, '_actions', {'action': 'fakeTest'}) job_labels = set(['action', 'a']) host_deps = set(['a']) db_host = self.db_helper.create_host('h1', deps=host_deps) self.create_job(user='autotest_system', deps=job_labels) queue_entries = self._dispatcher._refresh_pending_queue_entries() matching_host = rdb_lib.acquire_hosts(queue_entries).next() self.assert_(matching_host.id == db_host.id) class RDBMinDutTest( rdb_testing_utils.AbstractBaseRDBTester, unittest.TestCase): """Test AvailableHostRequestHandler""" _config_section = 'AUTOTEST_WEB' def min_dut_test_helper(self, num_hosts, suite_settings): """A helper function to test min_dut logic. @param num_hosts: Total number of hosts to create. @param suite_settings: A dictionary specify how suites would be created and verified. E.g. {'priority': 10, 'num_jobs': 3, 'min_duts':2, 'expected_aquired': 1} With this setting, will create a suite that has 3 child jobs, with priority 10 and min_duts 2. The suite is expected to get 1 dut. """ acls = set(['fake_acl']) hosts = [] for i in range (0, num_hosts): hosts.append(self.db_helper.create_host( 'h%d' % i, deps=set(['board:lumpy']), acls=acls)) suites = {} suite_min_duts = {} for setting in suite_settings: s = self.create_suite(num=setting['num_jobs'], priority=setting['priority'], board='board:lumpy', acls=acls) # Empty list will be used to store acquired hosts. suites[s['parent_job'].id] = (setting, []) suite_min_duts[s['parent_job'].id] = setting['min_duts'] queue_entries = self._dispatcher._refresh_pending_queue_entries() matching_hosts = rdb_lib.acquire_hosts(queue_entries, suite_min_duts) for host, queue_entry in zip(matching_hosts, queue_entries): if host: suites[queue_entry.job.parent_job_id][1].append(host) for setting, hosts in suites.itervalues(): self.assertEqual(len(hosts),setting['expected_aquired']) def testHighPriorityTakeAll(self): """Min duts not satisfied.""" num_hosts = 1 suite1 = {'priority':20, 'num_jobs': 3, 'min_duts': 2, 'expected_aquired': 1} suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5, 'expected_aquired': 0} self.min_dut_test_helper(num_hosts, [suite1, suite2]) def testHighPriorityMinSatisfied(self): """High priority min duts satisfied.""" num_hosts = 4 suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2, 'expected_aquired': 2} suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5, 'expected_aquired': 2} self.min_dut_test_helper(num_hosts, [suite1, suite2]) def testAllPrioritiesMinSatisfied(self): """Min duts satisfied.""" num_hosts = 7 suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2, 'expected_aquired': 2} suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5, 'expected_aquired': 5} self.min_dut_test_helper(num_hosts, [suite1, suite2]) def testHighPrioritySatisfied(self): """Min duts satisfied, high priority suite satisfied.""" num_hosts = 10 suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2, 'expected_aquired': 4} suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5, 'expected_aquired': 6} self.min_dut_test_helper(num_hosts, [suite1, suite2]) def testEqualPriorityFirstSuiteMinSatisfied(self): """Equal priority, earlier suite got min duts.""" num_hosts = 4 suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2, 'expected_aquired': 2} suite2 = {'priority':20, 'num_jobs': 7, 'min_duts': 5, 'expected_aquired': 2} self.min_dut_test_helper(num_hosts, [suite1, suite2]) def testEqualPriorityAllSuitesMinSatisfied(self): """Equal priority, all suites got min duts.""" num_hosts = 7 suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2, 'expected_aquired': 2} suite2 = {'priority':20, 'num_jobs': 7, 'min_duts': 5, 'expected_aquired': 5} self.min_dut_test_helper(num_hosts, [suite1, suite2]) if __name__ == '__main__': unittest.main()