1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8import mock 9import unittest 10 11import common 12from autotest_lib.frontend import setup_django_environment 13from autotest_lib.frontend.afe import frontend_test_utils 14from autotest_lib.frontend.afe import models 15from autotest_lib.server.cros.dynamic_suite import constants 16from autotest_lib.scheduler import host_scheduler 17from autotest_lib.scheduler import monitor_db 18from autotest_lib.scheduler import rdb 19from autotest_lib.scheduler import rdb_lib 20from autotest_lib.scheduler import rdb_testing_utils 21from autotest_lib.scheduler import scheduler_models 22 23 24class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester, 25 unittest.TestCase): 26 """Verify scheduler behavior when pending jobs are already given hosts.""" 27 28 _config_section = 'AUTOTEST_WEB' 29 30 31 def testPendingQueueEntries(self): 32 """Test retrieval of pending queue entries.""" 33 job = self.create_job(deps=set(['a'])) 34 35 # Check that we don't pull the job we just created with only_hostless. 36 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries( 37 only_hostless=True) 38 self.assertTrue(len(jobs_with_hosts) == 0) 39 40 # Check that only_hostless=False pulls new jobs, as always. 41 jobs_without_hosts = self.job_query_manager.get_pending_queue_entries( 42 only_hostless=False) 43 self.assertTrue(jobs_without_hosts[0].id == job.id and 44 jobs_without_hosts[0].host_id is None) 45 46 47 def testPendingQueueEntriesForShard(self): 48 """Test queue entries for shards aren't executed by master scheduler""" 49 job1 = self.create_job(deps=set(['a'])) 50 job2 = self.create_job(deps=set(['b'])) 51 shard = models.Shard.objects.create() 52 # Assign the job's label to a shard 53 shard.labels.add(job1.dependency_labels.all()[0]) 54 55 # Check that we only pull jobs which are not assigned to a shard. 56 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries() 57 self.assertTrue(len(jobs_with_hosts) == 1) 58 self.assertEqual(jobs_with_hosts[0].id, job2.id) 59 60 61 def testHostQueries(self): 62 """Verify that the host query manager maintains its data structures.""" 63 # Create a job and use the host_query_managers internal datastructures 64 # to retrieve its job info. 65 job = self.create_job( 66 deps=rdb_testing_utils.DEFAULT_DEPS, 67 acls=rdb_testing_utils.DEFAULT_ACLS) 68 queue_entries = self._dispatcher._refresh_pending_queue_entries() 69 job_manager = rdb_lib.JobQueryManager(queue_entries) 70 job_info = job_manager.get_job_info(queue_entries[0]) 71 default_dep_ids = set([label.id for label in self.db_helper.get_labels( 72 name__in=rdb_testing_utils.DEFAULT_DEPS)]) 73 default_acl_ids = set([acl.id for acl in self.db_helper.get_acls( 74 name__in=rdb_testing_utils.DEFAULT_ACLS)]) 75 self.assertTrue(set(job_info['deps']) == default_dep_ids) 76 self.assertTrue(set(job_info['acls']) == default_acl_ids) 77 78 79 def testNewJobsWithHosts(self): 80 """Test that we handle inactive hqes with unleased hosts correctly.""" 81 # Create a job and assign it an unleased host, then check that the 82 # HQE becomes active and the host remains assigned to it. 83 job = self.create_job(deps=['a']) 84 host = self.db_helper.create_host('h1', deps=['a']) 85 self.db_helper.add_host_to_job(host, job.id) 86 87 queue_entries = self._dispatcher._refresh_pending_queue_entries() 88 self._dispatcher._schedule_new_jobs() 89 90 host = self.db_helper.get_host(hostname='h1')[0] 91 self.assertTrue(host.leased == True and 92 host.status == models.Host.Status.READY) 93 hqes = list(self.db_helper.get_hqes(host_id=host.id)) 94 self.assertTrue(len(hqes) == 1 and hqes[0].active and 95 hqes[0].status == models.HostQueueEntry.Status.QUEUED) 96 97 98 def testNewJobsWithInvalidHost(self): 99 """Test handling of inactive hqes assigned invalid, unleased hosts.""" 100 # Create a job and assign it an unleased host, then check that the 101 # HQE becomes DOES NOT become active, because we validate the 102 # assignment again. 103 job = self.create_job(deps=['a']) 104 host = self.db_helper.create_host('h1', deps=['b']) 105 self.db_helper.add_host_to_job(host, job.id) 106 107 queue_entries = self._dispatcher._refresh_pending_queue_entries() 108 self._dispatcher._schedule_new_jobs() 109 110 host = self.db_helper.get_host(hostname='h1')[0] 111 self.assertTrue(host.leased == False and 112 host.status == models.Host.Status.READY) 113 hqes = list(self.db_helper.get_hqes(host_id=host.id)) 114 self.assertTrue(len(hqes) == 1 and not hqes[0].active and 115 hqes[0].status == models.HostQueueEntry.Status.QUEUED) 116 117 118 def testNewJobsWithLeasedHost(self): 119 """Test handling of inactive hqes assigned leased hosts.""" 120 # Create a job and assign it a leased host, then check that the 121 # HQE does not become active through the scheduler, and that the 122 # host gets released. 123 job = self.create_job(deps=['a']) 124 host = self.db_helper.create_host('h1', deps=['b']) 125 self.db_helper.add_host_to_job(host, job.id) 126 host.leased = 1 127 host.save() 128 129 rdb.batch_acquire_hosts = mock.MagicMock() 130 queue_entries = self._dispatcher._refresh_pending_queue_entries() 131 self._dispatcher._schedule_new_jobs() 132 self.assertTrue(rdb.batch_acquire_hosts.call_count == 0) 133 host = self.db_helper.get_host(hostname='h1')[0] 134 self.assertTrue(host.leased == True and 135 host.status == models.Host.Status.READY) 136 hqes = list(self.db_helper.get_hqes(host_id=host.id)) 137 self.assertTrue(len(hqes) == 1 and not hqes[0].active and 138 hqes[0].status == models.HostQueueEntry.Status.QUEUED) 139 self.host_scheduler._release_hosts() 140 self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0) 141 142 143 def testSpecialTaskOrdering(self): 144 """Test priority ordering of special tasks.""" 145 146 # Create 2 special tasks, one with and one without an hqe. 147 # Activate the hqe and make sure it gets scheduled before the other. 148 host = self.db_helper.create_host('h1', deps=['a']) 149 job1 = self.create_job(deps=['a']) 150 self.db_helper.add_host_to_job(host, job1.id) 151 task1 = self.db_helper.create_special_task(job1.id) 152 hqe = self.db_helper.get_hqes(job=job1.id)[0] 153 154 # This task has no queue entry. 155 task2 = self.db_helper.create_special_task(host_id=host.id) 156 157 # Since the hqe task isn't active we get both back. 158 tasks = self.job_query_manager.get_prioritized_special_tasks() 159 self.assertTrue(tasks[1].queue_entry_id is None and 160 tasks[0].queue_entry_id == hqe.id) 161 162 # Activate the hqe and make sure the frontned task isn't returned. 163 self.db_helper.update_hqe(hqe.id, active=True) 164 tasks = self.job_query_manager.get_prioritized_special_tasks() 165 self.assertTrue(tasks[0].id == task1.id) 166 167 168class HostSchedulerTests(rdb_testing_utils.AbstractBaseRDBTester, 169 unittest.TestCase): 170 """Verify scheduler behavior when pending jobs are already given hosts.""" 171 172 _config_section = 'AUTOTEST_WEB' 173 174 175 def setUp(self): 176 super(HostSchedulerTests, self).setUp() 177 self.host_scheduler = host_scheduler.HostScheduler() 178 179 180 def testSpecialTaskLocking(self): 181 """Test that frontend special tasks lock hosts.""" 182 # Create multiple tasks with hosts and make sure the hosts get locked. 183 host = self.db_helper.create_host('h') 184 host1 = self.db_helper.create_host('h1') 185 task = self.db_helper.create_special_task(host_id=host.id) 186 task1 = self.db_helper.create_special_task(host_id=host1.id) 187 self.host_scheduler._lease_hosts_of_frontend_tasks() 188 self.assertTrue(self.db_helper.get_host(hostname='h')[0].leased == 1 and 189 self.db_helper.get_host(hostname='h1')[0].leased == 1) 190 191 192 def testJobScheduling(self): 193 """Test new host acquisitions.""" 194 # Create a job that will find a host through the host scheduler, and 195 # make sure the hqe is activated, and a special task is created. 196 job = self.create_job(deps=set(['a'])) 197 host = self.db_helper.create_host('h1', deps=set(['a'])) 198 self.host_scheduler._schedule_jobs() 199 hqe = self.db_helper.get_hqes(job_id=job.id)[0] 200 self.assertTrue(hqe.active and hqe.host_id == host.id and 201 hqe.status == models.HostQueueEntry.Status.QUEUED) 202 task = self.db_helper.get_tasks(queue_entry_id=hqe.id)[0] 203 self.assertTrue(task.is_active == 0 and task.host_id == host.id) 204 205 206 def _check_agent_invariants(self, host, agent): 207 host_agents = list(self._dispatcher._host_agents[host.id]) 208 self.assertTrue(len(host_agents) == 1) 209 self.assertTrue(host_agents[0].task.task.id == agent.id) 210 return host_agents[0] 211 212 213 def testLeasedFrontendTaskHost(self): 214 """Check that we don't scheduler a special task on an unleased host.""" 215 # Create a special task without an hqe and make sure it isn't returned 216 # for scheduling till its host is leased. 217 host = self.db_helper.create_host('h1', deps=['a']) 218 task = self.db_helper.create_special_task(host_id=host.id) 219 220 tasks = self.job_query_manager.get_prioritized_special_tasks( 221 only_tasks_with_leased_hosts=True) 222 self.assertTrue(tasks == []) 223 tasks = self.job_query_manager.get_prioritized_special_tasks( 224 only_tasks_with_leased_hosts=False) 225 self.assertTrue(tasks[0].id == task.id) 226 self.host_scheduler._lease_hosts_of_frontend_tasks() 227 tasks = self.job_query_manager.get_prioritized_special_tasks( 228 only_tasks_with_leased_hosts=True) 229 self.assertTrue(tasks[0].id == task.id) 230 231 232 def testTickLockStep(self): 233 """Check that a frontend task and an hqe never run simultaneously.""" 234 235 self.god.stub_with(monitor_db, '_inline_host_acquisition', False) 236 237 # Create a frontend special task against a host. 238 host = self.db_helper.create_host('h1', deps=set(['a'])) 239 frontend_task = self.db_helper.create_special_task(host_id=host.id) 240 self._dispatcher._schedule_special_tasks() 241 # The frontend special task shouldn't get scheduled on the host till 242 # the host is leased. 243 self.assertFalse(self._dispatcher.host_has_agent(host)) 244 245 # Create a job for the same host and make the host scheduler lease the 246 # host out to that job. 247 job = self.create_job(deps=set(['a'])) 248 self.host_scheduler._schedule_jobs() 249 hqe = self.db_helper.get_hqes(job_id=job.id)[0] 250 tasks = self.job_query_manager.get_prioritized_special_tasks( 251 only_tasks_with_leased_hosts=True) 252 # We should not find the frontend special task, even though its host is 253 # now leased, because its leased by an active hqe. 254 self.assertTrue(len(tasks) == 1 and tasks[0].queue_entry_id == hqe.id) 255 self._dispatcher._schedule_special_tasks() 256 self.assertTrue(self._dispatcher.host_has_agent(host)) 257 258 # Deactivate the hqe task and make sure the frontend task gets the host. 259 task = tasks[0] 260 self._dispatcher.remove_agent(self._check_agent_invariants(host, task)) 261 task.is_complete = 1 262 task.is_active = 0 263 task.save() 264 self.db_helper.update_hqe(hqe.id, active=False) 265 self._dispatcher._schedule_special_tasks() 266 self.assertTrue(self._dispatcher.host_has_agent(host)) 267 self._check_agent_invariants(host, frontend_task) 268 269 # Make sure we don't release the host being used by the incomplete task. 270 self.host_scheduler._release_hosts() 271 host = self.db_helper.get_host(hostname='h1')[0] 272 self.assertTrue(host.leased == True) 273 274 275class SuiteRecorderTest(rdb_testing_utils.AbstractBaseRDBTester, 276 unittest.TestCase): 277 """Test the functionality of SuiteRecorder""" 278 279 _config_section = 'AUTOTEST_WEB' 280 281 def testGetSuiteHostAssignment(self): 282 """Test the initialization of SuiteRecord.""" 283 hosts = [] 284 num = 4 285 for i in range (0, num): 286 hosts.append(self.db_helper.create_host( 287 'h%d' % i, deps=set(['board:lumpy']))) 288 single_job = self.create_job(deps=set(['a'])) 289 jobs_1 = self.create_suite(num=2, board='board:lumpy') 290 jobs_2 = self.create_suite(num=2, board='board:lumpy') 291 # We have 4 hosts, 5 jobs, one job in the second suite won't 292 # get a host. 293 all_jobs = ([single_job] + 294 [jobs_1[k] for k in jobs_1 if k !='parent_job'] + 295 [jobs_2[k] for k in jobs_2 if k !='parent_job']) 296 for i in range(0, num): 297 self.db_helper.add_host_to_job(hosts[i], all_jobs[i].id, 298 activate=True) 299 r = host_scheduler.SuiteRecorder(self.job_query_manager) 300 self.assertEqual(r.suite_host_num, 301 {jobs_1['parent_job'].id:2, 302 jobs_2['parent_job'].id:1}) 303 self.assertEqual(r.hosts_to_suites, 304 {hosts[1].id: jobs_1['parent_job'].id, 305 hosts[2].id: jobs_1['parent_job'].id, 306 hosts[3].id: jobs_2['parent_job'].id}) 307 308 309 def verify_state(self, recorder, suite_host_num, hosts_to_suites): 310 """Verify the suite, host information held by SuiteRecorder. 311 312 @param recorder: A SuiteRecorder object. 313 @param suite_host_num: a dict, expected value of suite_host_num. 314 @param hosts_to_suites: a dict, expected value of hosts_to_suites. 315 """ 316 self.assertEqual(recorder.suite_host_num, suite_host_num) 317 self.assertEqual(recorder.hosts_to_suites, hosts_to_suites) 318 319 320 def assign_host_to_job(self, host, job, recorder=None): 321 """A helper function that adds a host to a job and record it. 322 323 @param host: A Host object. 324 @param job: A Job object. 325 @param recorder: A SuiteRecorder object to record the assignment. 326 327 @return a HostQueueEntry object that binds the host and job together. 328 """ 329 self.db_helper.add_host_to_job(host, job) 330 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s', 331 params=(job.id,))[0] 332 if recorder: 333 recorder.record_assignment(hqe) 334 return hqe 335 336 337 def testRecordAssignmentAndRelease(self): 338 """Test when a host is assigned to suite""" 339 r = host_scheduler.SuiteRecorder(self.job_query_manager) 340 self.verify_state(r, {}, {}) 341 host1 = self.db_helper.create_host('h1') 342 host2 = self.db_helper.create_host('h2') 343 jobs = self.create_suite(num=2) 344 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s', 345 params=(jobs[0].id,))[0] 346 # HQE got a host. 347 hqe = self.assign_host_to_job(host1, jobs[0], r) 348 self.verify_state(r, {jobs['parent_job'].id:1}, 349 {host1.id: jobs['parent_job'].id}) 350 # Tried to call record_assignment again, nothing should happen. 351 r.record_assignment(hqe) 352 self.verify_state(r, {jobs['parent_job'].id:1}, 353 {host1.id: jobs['parent_job'].id}) 354 # Second hqe got a host 355 self.assign_host_to_job(host2, jobs[1], r) 356 self.verify_state(r, {jobs['parent_job'].id:2}, 357 {host1.id: jobs['parent_job'].id, 358 host2.id: jobs['parent_job'].id}) 359 # Release host1 360 r.record_release([host1]) 361 self.verify_state(r, {jobs['parent_job'].id:1}, 362 {host2.id: jobs['parent_job'].id}) 363 # Release host2 364 r.record_release([host2]) 365 self.verify_state(r, {}, {}) 366 367 368 def testGetMinDuts(self): 369 """Test get min dut for suite.""" 370 host1 = self.db_helper.create_host('h1') 371 host2 = self.db_helper.create_host('h2') 372 host3 = self.db_helper.create_host('h3') 373 jobs = self.create_suite(num=3) 374 pid = jobs['parent_job'].id 375 # Set min_dut=1 for the suite as a job keyval. 376 keyval = models.JobKeyval( 377 job_id=pid, key=constants.SUITE_MIN_DUTS_KEY, value=2) 378 keyval.save() 379 r = host_scheduler.SuiteRecorder(self.job_query_manager) 380 # Not job has got any host, min dut to request should equal to what's 381 # specified in the job keyval. 382 self.assertEqual(r.get_min_duts([pid]), {pid: 2}) 383 self.assign_host_to_job(host1, jobs[0], r) 384 self.assertEqual(r.get_min_duts([pid]), {pid: 1}) 385 self.assign_host_to_job(host2, jobs[1], r) 386 self.assertEqual(r.get_min_duts([pid]), {pid: 0}) 387 self.assign_host_to_job(host3, jobs[2], r) 388 self.assertEqual(r.get_min_duts([pid]), {pid: 0}) 389 r.record_release([host1]) 390 self.assertEqual(r.get_min_duts([pid]), {pid: 0}) 391 r.record_release([host2]) 392 self.assertEqual(r.get_min_duts([pid]), {pid: 1}) 393 r.record_release([host3]) 394 self.assertEqual(r.get_min_duts([pid]), {pid: 2}) 395 396if __name__ == '__main__': 397 unittest.main() 398 399