1#!/usr/bin/env python2 2# pylint: disable=missing-docstring 3 4import datetime 5import mox 6import unittest 7 8import common 9from autotest_lib.client.common_lib import control_data 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib import global_config 12from autotest_lib.client.common_lib import priorities 13from autotest_lib.client.common_lib.cros import dev_server 14from autotest_lib.client.common_lib.test_utils import mock 15from autotest_lib.frontend import setup_django_environment 16from autotest_lib.frontend.afe import frontend_test_utils 17from autotest_lib.frontend.afe import model_logic 18from autotest_lib.frontend.afe import models 19from autotest_lib.frontend.afe import rpc_interface 20from autotest_lib.frontend.afe import rpc_utils 21from autotest_lib.server import frontend 22from autotest_lib.server import utils as server_utils 23from autotest_lib.server.cros import provision 24from autotest_lib.server.cros.dynamic_suite import constants 25from autotest_lib.server.cros.dynamic_suite import control_file_getter 26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 27 28CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT 29SERVER = control_data.CONTROL_TYPE_NAMES.SERVER 30 31_hqe_status = models.HostQueueEntry.Status 32 33 34class RpcInterfaceTest(unittest.TestCase, 35 frontend_test_utils.FrontendTestMixin): 36 def setUp(self): 37 self._frontend_common_setup() 38 self.god = mock.mock_god() 39 40 41 def tearDown(self): 42 self.god.unstub_all() 43 self._frontend_common_teardown() 44 global_config.global_config.reset_config_values() 45 46 47 def test_validation(self): 48 # omit a required field 49 self.assertRaises(model_logic.ValidationError, rpc_interface.add_label, 50 name=None) 51 # violate uniqueness constraint 52 self.assertRaises(model_logic.ValidationError, rpc_interface.add_host, 53 hostname='host1') 54 55 56 def test_multiple_platforms(self): 57 platform2 = models.Label.objects.create(name='platform2', platform=True) 58 self.assertRaises(model_logic.ValidationError, 59 rpc_interface. label_add_hosts, id='platform2', 60 hosts=['host1', 'host2']) 61 self.assertRaises(model_logic.ValidationError, 62 rpc_interface.host_add_labels, 63 id='host1', labels=['platform2']) 64 # make sure the platform didn't get added 65 platforms = rpc_interface.get_labels( 66 host__hostname__in=['host1', 'host2'], platform=True) 67 self.assertEquals(len(platforms), 1) 68 self.assertEquals(platforms[0]['name'], 'myplatform') 69 70 71 def _check_hostnames(self, hosts, expected_hostnames): 72 self.assertEquals(set(host['hostname'] for host in hosts), 73 set(expected_hostnames)) 74 75 76 def test_ping_db(self): 77 self.assertEquals(rpc_interface.ping_db(), [True]) 78 79 80 def test_get_hosts(self): 81 hosts = rpc_interface.get_hosts() 82 self._check_hostnames(hosts, [host.hostname for host in self.hosts]) 83 84 hosts = rpc_interface.get_hosts(hostname='host1') 85 self._check_hostnames(hosts, ['host1']) 86 host = hosts[0] 87 self.assertEquals(sorted(host['labels']), ['label1', 'myplatform']) 88 self.assertEquals(host['platform'], 'myplatform') 89 self.assertEquals(host['acls'], ['my_acl']) 90 self.assertEquals(host['attributes'], {}) 91 92 93 def test_get_hosts_multiple_labels(self): 94 hosts = rpc_interface.get_hosts( 95 multiple_labels=['myplatform', 'label1']) 96 self._check_hostnames(hosts, ['host1']) 97 98 99 def test_get_hosts_exclude_only_if_needed(self): 100 self.hosts[0].labels.add(self.label3) 101 102 hosts = rpc_interface.get_hosts(hostname__in=['host1', 'host2'], 103 exclude_only_if_needed_labels=True) 104 self._check_hostnames(hosts, ['host2']) 105 106 107 def test_job_keyvals(self): 108 keyval_dict = {'mykey': 'myvalue'} 109 job_id = rpc_interface.create_job(name='test', 110 priority=priorities.Priority.DEFAULT, 111 control_file='foo', 112 control_type=CLIENT, 113 hosts=['host1'], 114 keyvals=keyval_dict) 115 jobs = rpc_interface.get_jobs(id=job_id) 116 self.assertEquals(len(jobs), 1) 117 self.assertEquals(jobs[0]['keyvals'], keyval_dict) 118 119 120 def test_test_retry(self): 121 job_id = rpc_interface.create_job(name='flake', 122 priority=priorities.Priority.DEFAULT, 123 control_file='foo', 124 control_type=CLIENT, 125 hosts=['host1'], 126 test_retry=10) 127 jobs = rpc_interface.get_jobs(id=job_id) 128 self.assertEquals(len(jobs), 1) 129 self.assertEquals(jobs[0]['test_retry'], 10) 130 131 132 def test_get_jobs_summary(self): 133 job = self._create_job(hosts=xrange(1, 4)) 134 entries = list(job.hostqueueentry_set.all()) 135 entries[1].status = _hqe_status.FAILED 136 entries[1].save() 137 entries[2].status = _hqe_status.FAILED 138 entries[2].aborted = True 139 entries[2].save() 140 141 # Mock up tko_rpc_interface.get_status_counts. 142 self.god.stub_function_to_return(rpc_interface.tko_rpc_interface, 143 'get_status_counts', 144 None) 145 146 job_summaries = rpc_interface.get_jobs_summary(id=job.id) 147 self.assertEquals(len(job_summaries), 1) 148 summary = job_summaries[0] 149 self.assertEquals(summary['status_counts'], {'Queued': 1, 150 'Failed': 2}) 151 152 153 def _check_job_ids(self, actual_job_dicts, expected_jobs): 154 self.assertEquals( 155 set(job_dict['id'] for job_dict in actual_job_dicts), 156 set(job.id for job in expected_jobs)) 157 158 159 def test_get_jobs_status_filters(self): 160 HqeStatus = models.HostQueueEntry.Status 161 def create_two_host_job(): 162 return self._create_job(hosts=[1, 2]) 163 def set_hqe_statuses(job, first_status, second_status): 164 entries = job.hostqueueentry_set.all() 165 entries[0].update_object(status=first_status) 166 entries[1].update_object(status=second_status) 167 168 queued = create_two_host_job() 169 170 queued_and_running = create_two_host_job() 171 set_hqe_statuses(queued_and_running, HqeStatus.QUEUED, 172 HqeStatus.RUNNING) 173 174 running_and_complete = create_two_host_job() 175 set_hqe_statuses(running_and_complete, HqeStatus.RUNNING, 176 HqeStatus.COMPLETED) 177 178 complete = create_two_host_job() 179 set_hqe_statuses(complete, HqeStatus.COMPLETED, HqeStatus.COMPLETED) 180 181 started_but_inactive = create_two_host_job() 182 set_hqe_statuses(started_but_inactive, HqeStatus.QUEUED, 183 HqeStatus.COMPLETED) 184 185 parsing = create_two_host_job() 186 set_hqe_statuses(parsing, HqeStatus.PARSING, HqeStatus.PARSING) 187 188 self._check_job_ids(rpc_interface.get_jobs(not_yet_run=True), [queued]) 189 self._check_job_ids(rpc_interface.get_jobs(running=True), 190 [queued_and_running, running_and_complete, 191 started_but_inactive, parsing]) 192 self._check_job_ids(rpc_interface.get_jobs(finished=True), [complete]) 193 194 195 def test_get_jobs_type_filters(self): 196 self.assertRaises(AssertionError, rpc_interface.get_jobs, 197 suite=True, sub=True) 198 self.assertRaises(AssertionError, rpc_interface.get_jobs, 199 suite=True, standalone=True) 200 self.assertRaises(AssertionError, rpc_interface.get_jobs, 201 standalone=True, sub=True) 202 203 parent_job = self._create_job(hosts=[1]) 204 child_jobs = self._create_job(hosts=[1, 2], 205 parent_job_id=parent_job.id) 206 standalone_job = self._create_job(hosts=[1]) 207 208 self._check_job_ids(rpc_interface.get_jobs(suite=True), [parent_job]) 209 self._check_job_ids(rpc_interface.get_jobs(sub=True), [child_jobs]) 210 self._check_job_ids(rpc_interface.get_jobs(standalone=True), 211 [standalone_job]) 212 213 214 def _create_job_helper(self, **kwargs): 215 return rpc_interface.create_job(name='test', 216 priority=priorities.Priority.DEFAULT, 217 control_file='control file', 218 control_type=SERVER, **kwargs) 219 220 221 def test_one_time_hosts(self): 222 job = self._create_job_helper(one_time_hosts=['testhost']) 223 host = models.Host.objects.get(hostname='testhost') 224 self.assertEquals(host.invalid, True) 225 self.assertEquals(host.labels.count(), 0) 226 self.assertEquals(host.aclgroup_set.count(), 0) 227 228 229 def test_create_job_duplicate_hosts(self): 230 self.assertRaises(model_logic.ValidationError, self._create_job_helper, 231 hosts=[1, 1]) 232 233 234 def test_create_unrunnable_metahost_job(self): 235 self.assertRaises(error.NoEligibleHostException, 236 self._create_job_helper, meta_hosts=['unused']) 237 238 239 def test_create_hostless_job(self): 240 job_id = self._create_job_helper(hostless=True) 241 job = models.Job.objects.get(pk=job_id) 242 queue_entries = job.hostqueueentry_set.all() 243 self.assertEquals(len(queue_entries), 1) 244 self.assertEquals(queue_entries[0].host, None) 245 self.assertEquals(queue_entries[0].meta_host, None) 246 247 248 def _setup_special_tasks(self): 249 host = self.hosts[0] 250 251 job1 = self._create_job(hosts=[1]) 252 job2 = self._create_job(hosts=[1]) 253 254 entry1 = job1.hostqueueentry_set.all()[0] 255 entry1.update_object(started_on=datetime.datetime(2009, 1, 2), 256 execution_subdir='host1') 257 entry2 = job2.hostqueueentry_set.all()[0] 258 entry2.update_object(started_on=datetime.datetime(2009, 1, 3), 259 execution_subdir='host1') 260 261 self.task1 = models.SpecialTask.objects.create( 262 host=host, task=models.SpecialTask.Task.VERIFY, 263 time_started=datetime.datetime(2009, 1, 1), # ran before job 1 264 is_complete=True, requested_by=models.User.current_user()) 265 self.task2 = models.SpecialTask.objects.create( 266 host=host, task=models.SpecialTask.Task.VERIFY, 267 queue_entry=entry2, # ran with job 2 268 is_active=True, requested_by=models.User.current_user()) 269 self.task3 = models.SpecialTask.objects.create( 270 host=host, task=models.SpecialTask.Task.VERIFY, 271 requested_by=models.User.current_user()) # not yet run 272 273 274 def test_get_special_tasks(self): 275 self._setup_special_tasks() 276 tasks = rpc_interface.get_special_tasks(host__hostname='host1', 277 queue_entry__isnull=True) 278 self.assertEquals(len(tasks), 2) 279 self.assertEquals(tasks[0]['task'], models.SpecialTask.Task.VERIFY) 280 self.assertEquals(tasks[0]['is_active'], False) 281 self.assertEquals(tasks[0]['is_complete'], True) 282 283 284 def test_get_latest_special_task(self): 285 # a particular usage of get_special_tasks() 286 self._setup_special_tasks() 287 self.task2.time_started = datetime.datetime(2009, 1, 2) 288 self.task2.save() 289 290 tasks = rpc_interface.get_special_tasks( 291 host__hostname='host1', task=models.SpecialTask.Task.VERIFY, 292 time_started__isnull=False, sort_by=['-time_started'], 293 query_limit=1) 294 self.assertEquals(len(tasks), 1) 295 self.assertEquals(tasks[0]['id'], 2) 296 297 298 def _common_entry_check(self, entry_dict): 299 self.assertEquals(entry_dict['host']['hostname'], 'host1') 300 self.assertEquals(entry_dict['job']['id'], 2) 301 302 303 def test_get_host_queue_entries_and_special_tasks(self): 304 self._setup_special_tasks() 305 306 host = self.hosts[0].id 307 entries_and_tasks = ( 308 rpc_interface.get_host_queue_entries_and_special_tasks(host)) 309 310 paths = [entry['execution_path'] for entry in entries_and_tasks] 311 self.assertEquals(paths, ['hosts/host1/3-verify', 312 '2-autotest_system/host1', 313 'hosts/host1/2-verify', 314 '1-autotest_system/host1', 315 'hosts/host1/1-verify']) 316 317 verify2 = entries_and_tasks[2] 318 self._common_entry_check(verify2) 319 self.assertEquals(verify2['type'], 'Verify') 320 self.assertEquals(verify2['status'], 'Running') 321 self.assertEquals(verify2['execution_path'], 'hosts/host1/2-verify') 322 323 entry2 = entries_and_tasks[1] 324 self._common_entry_check(entry2) 325 self.assertEquals(entry2['type'], 'Job') 326 self.assertEquals(entry2['status'], 'Queued') 327 self.assertEquals(entry2['started_on'], '2009-01-03 00:00:00') 328 329 330 def _create_hqes_and_start_time_index_entries(self): 331 shard = models.Shard.objects.create(hostname='shard') 332 job = self._create_job(shard=shard, control_file='foo') 333 HqeStatus = models.HostQueueEntry.Status 334 335 models.HostQueueEntry( 336 id=1, job=job, started_on='2017-01-01', 337 status=HqeStatus.QUEUED).save() 338 models.HostQueueEntry( 339 id=2, job=job, started_on='2017-01-02', 340 status=HqeStatus.QUEUED).save() 341 models.HostQueueEntry( 342 id=3, job=job, started_on='2017-01-03', 343 status=HqeStatus.QUEUED).save() 344 345 models.HostQueueEntryStartTimes( 346 insert_time='2017-01-03', highest_hqe_id=3).save() 347 models.HostQueueEntryStartTimes( 348 insert_time='2017-01-02', highest_hqe_id=2).save() 349 models.HostQueueEntryStartTimes( 350 insert_time='2017-01-01', highest_hqe_id=1).save() 351 352 def test_get_host_queue_entries_by_insert_time(self): 353 """Check the insert_time_after and insert_time_before constraints.""" 354 self._create_hqes_and_start_time_index_entries() 355 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 356 insert_time_after='2017-01-01') 357 self.assertEquals(len(hqes), 3) 358 359 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 360 insert_time_after='2017-01-02') 361 self.assertEquals(len(hqes), 2) 362 363 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 364 insert_time_after='2017-01-03') 365 self.assertEquals(len(hqes), 1) 366 367 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 368 insert_time_before='2017-01-01') 369 self.assertEquals(len(hqes), 1) 370 371 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 372 insert_time_before='2017-01-02') 373 self.assertEquals(len(hqes), 2) 374 375 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 376 insert_time_before='2017-01-03') 377 self.assertEquals(len(hqes), 3) 378 379 380 def test_get_host_queue_entries_by_insert_time_with_missing_index_row(self): 381 """Shows that the constraints are approximate. 382 383 The query may return rows which are actually outside of the bounds 384 given, if the index table does not have an entry for the specific time. 385 """ 386 self._create_hqes_and_start_time_index_entries() 387 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 388 insert_time_before='2016-12-01') 389 self.assertEquals(len(hqes), 1) 390 391 def test_get_hqe_by_insert_time_with_before_and_after(self): 392 self._create_hqes_and_start_time_index_entries() 393 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 394 insert_time_before='2017-01-02', 395 insert_time_after='2017-01-02') 396 self.assertEquals(len(hqes), 1) 397 398 def test_get_hqe_by_insert_time_and_id_constraint(self): 399 self._create_hqes_and_start_time_index_entries() 400 # The time constraint is looser than the id constraint, so the time 401 # constraint should take precedence. 402 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 403 insert_time_before='2017-01-02', 404 id__lte=1) 405 self.assertEquals(len(hqes), 1) 406 407 # Now make the time constraint tighter than the id constraint. 408 hqes = rpc_interface.get_host_queue_entries_by_insert_time( 409 insert_time_before='2017-01-01', 410 id__lte=42) 411 self.assertEquals(len(hqes), 1) 412 413 def test_view_invalid_host(self): 414 # RPCs used by View Host page should work for invalid hosts 415 self._create_job_helper(hosts=[1]) 416 host = self.hosts[0] 417 host.delete() 418 419 self.assertEquals(1, rpc_interface.get_num_hosts(hostname='host1', 420 valid_only=False)) 421 data = rpc_interface.get_hosts(hostname='host1', valid_only=False) 422 self.assertEquals(1, len(data)) 423 424 self.assertEquals(1, rpc_interface.get_num_host_queue_entries( 425 host__hostname='host1')) 426 data = rpc_interface.get_host_queue_entries(host__hostname='host1') 427 self.assertEquals(1, len(data)) 428 429 count = rpc_interface.get_num_host_queue_entries_and_special_tasks( 430 host=host.id) 431 self.assertEquals(1, count) 432 data = rpc_interface.get_host_queue_entries_and_special_tasks( 433 host=host.id) 434 self.assertEquals(1, len(data)) 435 436 437 def test_reverify_hosts(self): 438 hostname_list = rpc_interface.reverify_hosts(id__in=[1, 2]) 439 self.assertEquals(hostname_list, ['host1', 'host2']) 440 tasks = rpc_interface.get_special_tasks() 441 self.assertEquals(len(tasks), 2) 442 self.assertEquals(set(task['host']['id'] for task in tasks), 443 set([1, 2])) 444 445 task = tasks[0] 446 self.assertEquals(task['task'], models.SpecialTask.Task.VERIFY) 447 self.assertEquals(task['requested_by'], 'autotest_system') 448 449 450 def test_repair_hosts(self): 451 hostname_list = rpc_interface.repair_hosts(id__in=[1, 2]) 452 self.assertEquals(hostname_list, ['host1', 'host2']) 453 tasks = rpc_interface.get_special_tasks() 454 self.assertEquals(len(tasks), 2) 455 self.assertEquals(set(task['host']['id'] for task in tasks), 456 set([1, 2])) 457 458 task = tasks[0] 459 self.assertEquals(task['task'], models.SpecialTask.Task.REPAIR) 460 self.assertEquals(task['requested_by'], 'autotest_system') 461 462 463 def _modify_host_helper(self, on_shard=False, host_on_shard=False): 464 shard_hostname = 'shard1' 465 if on_shard: 466 global_config.global_config.override_config_value( 467 'SHARD', 'shard_hostname', shard_hostname) 468 469 host = models.Host.objects.all()[0] 470 if host_on_shard: 471 shard = models.Shard.objects.create(hostname=shard_hostname) 472 host.shard = shard 473 host.save() 474 475 self.assertFalse(host.locked) 476 477 self.god.stub_class_method(frontend.AFE, 'run') 478 479 if host_on_shard and not on_shard: 480 mock_afe = self.god.create_mock_class_obj( 481 frontend_wrappers.RetryingAFE, 'MockAFE') 482 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 483 484 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new( 485 server=shard_hostname, user=None) 486 mock_afe2.run.expect_call('modify_host_local', id=host.id, 487 locked=True, lock_reason='_modify_host_helper lock', 488 lock_time=datetime.datetime(2015, 12, 15)) 489 elif on_shard: 490 mock_afe = self.god.create_mock_class_obj( 491 frontend_wrappers.RetryingAFE, 'MockAFE') 492 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 493 494 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new( 495 server=server_utils.get_global_afe_hostname(), user=None) 496 mock_afe2.run.expect_call('modify_host', id=host.id, 497 locked=True, lock_reason='_modify_host_helper lock', 498 lock_time=datetime.datetime(2015, 12, 15)) 499 500 rpc_interface.modify_host(id=host.id, locked=True, 501 lock_reason='_modify_host_helper lock', 502 lock_time=datetime.datetime(2015, 12, 15)) 503 504 host = models.Host.objects.get(pk=host.id) 505 if on_shard: 506 # modify_host on shard does nothing but routing the RPC to master. 507 self.assertFalse(host.locked) 508 else: 509 self.assertTrue(host.locked) 510 self.god.check_playback() 511 512 513 def test_modify_host_on_master_host_on_master(self): 514 """Call modify_host to master for host in master.""" 515 self._modify_host_helper() 516 517 518 def test_modify_host_on_master_host_on_shard(self): 519 """Call modify_host to master for host in shard.""" 520 self._modify_host_helper(host_on_shard=True) 521 522 523 def test_modify_host_on_shard(self): 524 """Call modify_host to shard for host in shard.""" 525 self._modify_host_helper(on_shard=True, host_on_shard=True) 526 527 528 def test_modify_hosts_on_master_host_on_shard(self): 529 """Ensure calls to modify_hosts are correctly forwarded to shards.""" 530 host1 = models.Host.objects.all()[0] 531 host2 = models.Host.objects.all()[1] 532 533 shard1 = models.Shard.objects.create(hostname='shard1') 534 host1.shard = shard1 535 host1.save() 536 537 shard2 = models.Shard.objects.create(hostname='shard2') 538 host2.shard = shard2 539 host2.save() 540 541 self.assertFalse(host1.locked) 542 self.assertFalse(host2.locked) 543 544 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE, 545 'MockAFE') 546 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 547 548 # The statuses of one host might differ on master and shard. 549 # Filters are always applied on the master. So the host on the shard 550 # will be affected no matter what his status is. 551 filters_to_use = {'status': 'Ready'} 552 553 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new( 554 server='shard2', user=None) 555 mock_afe2.run.expect_call( 556 'modify_hosts_local', 557 host_filter_data={'id__in': [shard1.id, shard2.id]}, 558 update_data={'locked': True, 559 'lock_reason': 'Testing forward to shard', 560 'lock_time' : datetime.datetime(2015, 12, 15) }) 561 562 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new( 563 server='shard1', user=None) 564 mock_afe1.run.expect_call( 565 'modify_hosts_local', 566 host_filter_data={'id__in': [shard1.id, shard2.id]}, 567 update_data={'locked': True, 568 'lock_reason': 'Testing forward to shard', 569 'lock_time' : datetime.datetime(2015, 12, 15)}) 570 571 rpc_interface.modify_hosts( 572 host_filter_data={'status': 'Ready'}, 573 update_data={'locked': True, 574 'lock_reason': 'Testing forward to shard', 575 'lock_time' : datetime.datetime(2015, 12, 15) }) 576 577 host1 = models.Host.objects.get(pk=host1.id) 578 self.assertTrue(host1.locked) 579 host2 = models.Host.objects.get(pk=host2.id) 580 self.assertTrue(host2.locked) 581 self.god.check_playback() 582 583 584 def test_delete_host(self): 585 """Ensure an RPC is made on delete a host, if it is on a shard.""" 586 host1 = models.Host.objects.all()[0] 587 shard1 = models.Shard.objects.create(hostname='shard1') 588 host1.shard = shard1 589 host1.save() 590 host1_id = host1.id 591 592 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE, 593 'MockAFE') 594 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 595 596 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new( 597 server='shard1', user=None) 598 mock_afe1.run.expect_call('delete_host', id=host1.id) 599 600 rpc_interface.delete_host(id=host1.id) 601 602 self.assertRaises(models.Host.DoesNotExist, 603 models.Host.smart_get, host1_id) 604 605 self.god.check_playback() 606 607 608 def test_modify_label(self): 609 label1 = models.Label.objects.all()[0] 610 self.assertEqual(label1.invalid, 0) 611 612 host2 = models.Host.objects.all()[1] 613 shard1 = models.Shard.objects.create(hostname='shard1') 614 host2.shard = shard1 615 host2.labels.add(label1) 616 host2.save() 617 618 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE, 619 'MockAFE') 620 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 621 622 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new( 623 server='shard1', user=None) 624 mock_afe1.run.expect_call('modify_label', id=label1.id, invalid=1) 625 626 rpc_interface.modify_label(label1.id, invalid=1) 627 628 self.assertEqual(models.Label.objects.all()[0].invalid, 1) 629 self.god.check_playback() 630 631 632 def test_delete_label(self): 633 label1 = models.Label.objects.all()[0] 634 635 host2 = models.Host.objects.all()[1] 636 shard1 = models.Shard.objects.create(hostname='shard1') 637 host2.shard = shard1 638 host2.labels.add(label1) 639 host2.save() 640 641 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE, 642 'MockAFE') 643 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 644 645 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new( 646 server='shard1', user=None) 647 mock_afe1.run.expect_call('delete_label', id=label1.id) 648 649 rpc_interface.delete_label(id=label1.id) 650 651 self.assertRaises(models.Label.DoesNotExist, 652 models.Label.smart_get, label1.id) 653 self.god.check_playback() 654 655 656 def test_get_image_for_job_with_keyval_build(self): 657 keyval_dict = {'build': 'cool-image'} 658 job_id = rpc_interface.create_job(name='test', 659 priority=priorities.Priority.DEFAULT, 660 control_file='foo', 661 control_type=CLIENT, 662 hosts=['host1'], 663 keyvals=keyval_dict) 664 job = models.Job.objects.get(id=job_id) 665 self.assertIsNotNone(job) 666 image = rpc_interface._get_image_for_job(job, True) 667 self.assertEquals('cool-image', image) 668 669 670 def test_get_image_for_job_with_keyval_builds(self): 671 keyval_dict = {'builds': {'cros-version': 'cool-image'}} 672 job_id = rpc_interface.create_job(name='test', 673 priority=priorities.Priority.DEFAULT, 674 control_file='foo', 675 control_type=CLIENT, 676 hosts=['host1'], 677 keyvals=keyval_dict) 678 job = models.Job.objects.get(id=job_id) 679 self.assertIsNotNone(job) 680 image = rpc_interface._get_image_for_job(job, True) 681 self.assertEquals('cool-image', image) 682 683 684 def test_get_image_for_job_with_control_build(self): 685 CONTROL_FILE = """build='cool-image' 686 """ 687 job_id = rpc_interface.create_job(name='test', 688 priority=priorities.Priority.DEFAULT, 689 control_file='foo', 690 control_type=CLIENT, 691 hosts=['host1']) 692 job = models.Job.objects.get(id=job_id) 693 self.assertIsNotNone(job) 694 job.control_file = CONTROL_FILE 695 image = rpc_interface._get_image_for_job(job, True) 696 self.assertEquals('cool-image', image) 697 698 699 def test_get_image_for_job_with_control_builds(self): 700 CONTROL_FILE = """builds={'cros-version': 'cool-image'} 701 """ 702 job_id = rpc_interface.create_job(name='test', 703 priority=priorities.Priority.DEFAULT, 704 control_file='foo', 705 control_type=CLIENT, 706 hosts=['host1']) 707 job = models.Job.objects.get(id=job_id) 708 self.assertIsNotNone(job) 709 job.control_file = CONTROL_FILE 710 image = rpc_interface._get_image_for_job(job, True) 711 self.assertEquals('cool-image', image) 712 713 714class ExtraRpcInterfaceTest(mox.MoxTestBase, 715 frontend_test_utils.FrontendTestMixin): 716 """Unit tests for functions originally in site_rpc_interface.py. 717 718 @var _NAME: fake suite name. 719 @var _BOARD: fake board to reimage. 720 @var _BUILD: fake build with which to reimage. 721 @var _PRIORITY: fake priority with which to reimage. 722 """ 723 _NAME = 'name' 724 _BOARD = 'link' 725 _BUILD = 'link-release/R36-5812.0.0' 726 _BUILDS = {provision.CROS_VERSION_PREFIX: _BUILD} 727 _PRIORITY = priorities.Priority.DEFAULT 728 _TIMEOUT = 24 729 730 731 def setUp(self): 732 super(ExtraRpcInterfaceTest, self).setUp() 733 self._SUITE_NAME = rpc_interface.canonicalize_suite_name( 734 self._NAME) 735 self.dev_server = self.mox.CreateMock(dev_server.ImageServer) 736 self._frontend_common_setup(fill_data=False) 737 738 739 def tearDown(self): 740 self._frontend_common_teardown() 741 742 743 def _setupDevserver(self): 744 self.mox.StubOutClassWithMocks(dev_server, 'ImageServer') 745 dev_server.resolve(self._BUILD).AndReturn(self.dev_server) 746 747 748 def _mockDevServerGetter(self, get_control_file=True): 749 self._setupDevserver() 750 if get_control_file: 751 self.getter = self.mox.CreateMock( 752 control_file_getter.DevServerGetter) 753 self.mox.StubOutWithMock(control_file_getter.DevServerGetter, 754 'create') 755 control_file_getter.DevServerGetter.create( 756 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter) 757 758 759 def _mockRpcUtils(self, to_return, control_file_substring=''): 760 """Fake out the autotest rpc_utils module with a mockable class. 761 762 @param to_return: the value that rpc_utils.create_job_common() should 763 be mocked out to return. 764 @param control_file_substring: A substring that is expected to appear 765 in the control file output string that 766 is passed to create_job_common. 767 Default: '' 768 """ 769 download_started_time = constants.DOWNLOAD_STARTED_TIME 770 payload_finished_time = constants.PAYLOAD_FINISHED_TIME 771 self.mox.StubOutWithMock(rpc_utils, 'create_job_common') 772 rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME), 773 mox.StrContains(self._BUILD)), 774 priority=self._PRIORITY, 775 timeout_mins=self._TIMEOUT*60, 776 max_runtime_mins=self._TIMEOUT*60, 777 control_type='Server', 778 control_file=mox.And(mox.StrContains(self._BOARD), 779 mox.StrContains(self._BUILD), 780 mox.StrContains( 781 control_file_substring)), 782 hostless=True, 783 keyvals=mox.And(mox.In(download_started_time), 784 mox.In(payload_finished_time)) 785 ).AndReturn(to_return) 786 787 788 def testStageBuildFail(self): 789 """Ensure that a failure to stage the desired build fails the RPC.""" 790 self._setupDevserver() 791 792 self.dev_server.hostname = 'mox_url' 793 self.dev_server.stage_artifacts( 794 image=self._BUILD, artifacts=['test_suites']).AndRaise( 795 dev_server.DevServerException()) 796 self.mox.ReplayAll() 797 self.assertRaises(error.StageControlFileFailure, 798 rpc_interface.create_suite_job, 799 name=self._NAME, 800 board=self._BOARD, 801 builds=self._BUILDS, 802 pool=None) 803 804 805 def testGetControlFileFail(self): 806 """Ensure that a failure to get needed control file fails the RPC.""" 807 self._mockDevServerGetter() 808 809 self.dev_server.hostname = 'mox_url' 810 self.dev_server.stage_artifacts( 811 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 812 813 self.getter.get_control_file_contents_by_name( 814 self._SUITE_NAME).AndReturn(None) 815 self.mox.ReplayAll() 816 self.assertRaises(error.ControlFileEmpty, 817 rpc_interface.create_suite_job, 818 name=self._NAME, 819 board=self._BOARD, 820 builds=self._BUILDS, 821 pool=None) 822 823 824 def testGetControlFileListFail(self): 825 """Ensure that a failure to get needed control file fails the RPC.""" 826 self._mockDevServerGetter() 827 828 self.dev_server.hostname = 'mox_url' 829 self.dev_server.stage_artifacts( 830 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 831 832 self.getter.get_control_file_contents_by_name( 833 self._SUITE_NAME).AndRaise(error.NoControlFileList()) 834 self.mox.ReplayAll() 835 self.assertRaises(error.NoControlFileList, 836 rpc_interface.create_suite_job, 837 name=self._NAME, 838 board=self._BOARD, 839 builds=self._BUILDS, 840 pool=None) 841 842 843 def testBadNumArgument(self): 844 """Ensure we handle bad values for the |num| argument.""" 845 self.assertRaises(error.SuiteArgumentException, 846 rpc_interface.create_suite_job, 847 name=self._NAME, 848 board=self._BOARD, 849 builds=self._BUILDS, 850 pool=None, 851 num='goo') 852 self.assertRaises(error.SuiteArgumentException, 853 rpc_interface.create_suite_job, 854 name=self._NAME, 855 board=self._BOARD, 856 builds=self._BUILDS, 857 pool=None, 858 num=[]) 859 self.assertRaises(error.SuiteArgumentException, 860 rpc_interface.create_suite_job, 861 name=self._NAME, 862 board=self._BOARD, 863 builds=self._BUILDS, 864 pool=None, 865 num='5') 866 867 868 869 def testCreateSuiteJobFail(self): 870 """Ensure that failure to schedule the suite job fails the RPC.""" 871 self._mockDevServerGetter() 872 873 self.dev_server.hostname = 'mox_url' 874 self.dev_server.stage_artifacts( 875 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 876 877 self.getter.get_control_file_contents_by_name( 878 self._SUITE_NAME).AndReturn('f') 879 880 self.dev_server.url().AndReturn('mox_url') 881 self._mockRpcUtils(-1) 882 self.mox.ReplayAll() 883 self.assertEquals( 884 rpc_interface.create_suite_job(name=self._NAME, 885 board=self._BOARD, 886 builds=self._BUILDS, pool=None), 887 -1) 888 889 890 def testCreateSuiteJobSuccess(self): 891 """Ensures that success results in a successful RPC.""" 892 self._mockDevServerGetter() 893 894 self.dev_server.hostname = 'mox_url' 895 self.dev_server.stage_artifacts( 896 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 897 898 self.getter.get_control_file_contents_by_name( 899 self._SUITE_NAME).AndReturn('f') 900 901 self.dev_server.url().AndReturn('mox_url') 902 job_id = 5 903 self._mockRpcUtils(job_id) 904 self.mox.ReplayAll() 905 self.assertEquals( 906 rpc_interface.create_suite_job(name=self._NAME, 907 board=self._BOARD, 908 builds=self._BUILDS, 909 pool=None), 910 job_id) 911 912 913 def testCreateSuiteJobNoHostCheckSuccess(self): 914 """Ensures that success results in a successful RPC.""" 915 self._mockDevServerGetter() 916 917 self.dev_server.hostname = 'mox_url' 918 self.dev_server.stage_artifacts( 919 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 920 921 self.getter.get_control_file_contents_by_name( 922 self._SUITE_NAME).AndReturn('f') 923 924 self.dev_server.url().AndReturn('mox_url') 925 job_id = 5 926 self._mockRpcUtils(job_id) 927 self.mox.ReplayAll() 928 self.assertEquals( 929 rpc_interface.create_suite_job(name=self._NAME, 930 board=self._BOARD, 931 builds=self._BUILDS, 932 pool=None, check_hosts=False), 933 job_id) 934 935 def testCreateSuiteIntegerNum(self): 936 """Ensures that success results in a successful RPC.""" 937 self._mockDevServerGetter() 938 939 self.dev_server.hostname = 'mox_url' 940 self.dev_server.stage_artifacts( 941 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 942 943 self.getter.get_control_file_contents_by_name( 944 self._SUITE_NAME).AndReturn('f') 945 946 self.dev_server.url().AndReturn('mox_url') 947 job_id = 5 948 self._mockRpcUtils(job_id, control_file_substring='num=17') 949 self.mox.ReplayAll() 950 self.assertEquals( 951 rpc_interface.create_suite_job(name=self._NAME, 952 board=self._BOARD, 953 builds=self._BUILDS, 954 pool=None, 955 check_hosts=False, 956 num=17), 957 job_id) 958 959 960 def testCreateSuiteJobControlFileSupplied(self): 961 """Ensure we can supply the control file to create_suite_job.""" 962 self._mockDevServerGetter(get_control_file=False) 963 964 self.dev_server.hostname = 'mox_url' 965 self.dev_server.stage_artifacts( 966 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 967 self.dev_server.url().AndReturn('mox_url') 968 job_id = 5 969 self._mockRpcUtils(job_id) 970 self.mox.ReplayAll() 971 self.assertEquals( 972 rpc_interface.create_suite_job(name='%s/%s' % (self._NAME, 973 self._BUILD), 974 board=None, 975 builds=self._BUILDS, 976 pool=None, 977 control_file='CONTROL FILE'), 978 job_id) 979 980 981 def _get_records_for_sending_to_master(self): 982 return [{'control_file': 'foo', 983 'control_type': 1, 984 'created_on': datetime.datetime(2014, 8, 21), 985 'drone_set': None, 986 'email_list': '', 987 'max_runtime_hrs': 72, 988 'max_runtime_mins': 1440, 989 'name': 'dummy', 990 'owner': 'autotest_system', 991 'parse_failed_repair': True, 992 'priority': 40, 993 'reboot_after': 0, 994 'reboot_before': 1, 995 'run_reset': True, 996 'run_verify': False, 997 'synch_count': 0, 998 'test_retry': 10, 999 'timeout': 24, 1000 'timeout_mins': 1440, 1001 'id': 1 1002 }], [{ 1003 'aborted': False, 1004 'active': False, 1005 'complete': False, 1006 'deleted': False, 1007 'execution_subdir': '', 1008 'finished_on': None, 1009 'started_on': None, 1010 'status': 'Queued', 1011 'id': 1 1012 }] 1013 1014 1015 def _do_heartbeat_and_assert_response(self, shard_hostname='shard1', 1016 upload_jobs=(), upload_hqes=(), 1017 known_jobs=(), known_hosts=(), 1018 **kwargs): 1019 known_job_ids = [job.id for job in known_jobs] 1020 known_host_ids = [host.id for host in known_hosts] 1021 known_host_statuses = [host.status for host in known_hosts] 1022 1023 retval = rpc_interface.shard_heartbeat( 1024 shard_hostname=shard_hostname, 1025 jobs=upload_jobs, hqes=upload_hqes, 1026 known_job_ids=known_job_ids, known_host_ids=known_host_ids, 1027 known_host_statuses=known_host_statuses) 1028 1029 self._assert_shard_heartbeat_response(shard_hostname, retval, 1030 **kwargs) 1031 1032 return shard_hostname 1033 1034 1035 def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[], 1036 hosts=[], hqes=[], 1037 incorrect_host_ids=[]): 1038 1039 retval_hosts, retval_jobs = retval['hosts'], retval['jobs'] 1040 retval_incorrect_hosts = retval['incorrect_host_ids'] 1041 1042 expected_jobs = [ 1043 (job.id, job.name, shard_hostname) for job in jobs] 1044 returned_jobs = [(job['id'], job['name'], job['shard']['hostname']) 1045 for job in retval_jobs] 1046 self.assertEqual(returned_jobs, expected_jobs) 1047 1048 expected_hosts = [(host.id, host.hostname) for host in hosts] 1049 returned_hosts = [(host['id'], host['hostname']) 1050 for host in retval_hosts] 1051 self.assertEqual(returned_hosts, expected_hosts) 1052 1053 retval_hqes = [] 1054 for job in retval_jobs: 1055 retval_hqes += job['hostqueueentry_set'] 1056 1057 expected_hqes = [(hqe.id) for hqe in hqes] 1058 returned_hqes = [(hqe['id']) for hqe in retval_hqes] 1059 self.assertEqual(returned_hqes, expected_hqes) 1060 1061 self.assertEqual(retval_incorrect_hosts, incorrect_host_ids) 1062 1063 1064 def _send_records_to_master_helper( 1065 self, jobs, hqes, shard_hostname='host1', 1066 exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False): 1067 job_id = rpc_interface.create_job( 1068 name='dummy', 1069 priority=self._PRIORITY, 1070 control_file='foo', 1071 control_type=SERVER, 1072 test_retry=10, hostless=True) 1073 job = models.Job.objects.get(pk=job_id) 1074 shard = models.Shard.objects.create(hostname='host1') 1075 job.shard = shard 1076 job.save() 1077 1078 if aborted: 1079 job.hostqueueentry_set.update(aborted=True) 1080 job.shard = None 1081 job.save() 1082 1083 hqe = job.hostqueueentry_set.all()[0] 1084 if not exception_to_throw: 1085 self._do_heartbeat_and_assert_response( 1086 shard_hostname=shard_hostname, 1087 upload_jobs=jobs, upload_hqes=hqes) 1088 else: 1089 self.assertRaises( 1090 exception_to_throw, 1091 self._do_heartbeat_and_assert_response, 1092 shard_hostname=shard_hostname, 1093 upload_jobs=jobs, upload_hqes=hqes) 1094 1095 1096 def testSendingRecordsToMaster(self): 1097 """Send records to the master and ensure they are persisted.""" 1098 jobs, hqes = self._get_records_for_sending_to_master() 1099 hqes[0]['status'] = 'Completed' 1100 self._send_records_to_master_helper( 1101 jobs=jobs, hqes=hqes, exception_to_throw=None) 1102 1103 # Check the entry was actually written to db 1104 self.assertEqual(models.HostQueueEntry.objects.all()[0].status, 1105 'Completed') 1106 1107 1108 def testSendingRecordsToMasterAbortedOnMaster(self): 1109 """Send records to the master and ensure they are persisted.""" 1110 jobs, hqes = self._get_records_for_sending_to_master() 1111 hqes[0]['status'] = 'Completed' 1112 self._send_records_to_master_helper( 1113 jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True) 1114 1115 # Check the entry was actually written to db 1116 self.assertEqual(models.HostQueueEntry.objects.all()[0].status, 1117 'Completed') 1118 1119 1120 def testSendingRecordsToMasterJobAssignedToDifferentShard(self): 1121 """Ensure records belonging to different shard are silently rejected.""" 1122 shard1 = models.Shard.objects.create(hostname='shard1') 1123 shard2 = models.Shard.objects.create(hostname='shard2') 1124 job1 = self._create_job(shard=shard1, control_file='foo1') 1125 job2 = self._create_job(shard=shard2, control_file='foo2') 1126 job1_id = job1.id 1127 job2_id = job2.id 1128 hqe1 = models.HostQueueEntry.objects.create(job=job1) 1129 hqe2 = models.HostQueueEntry.objects.create(job=job2) 1130 hqe1_id = hqe1.id 1131 hqe2_id = hqe2.id 1132 job1_record = job1.serialize(include_dependencies=False) 1133 job2_record = job2.serialize(include_dependencies=False) 1134 hqe1_record = hqe1.serialize(include_dependencies=False) 1135 hqe2_record = hqe2.serialize(include_dependencies=False) 1136 1137 # Prepare a bogus job record update from the wrong shard. The update 1138 # should not throw an exception. Non-bogus jobs in the same update 1139 # should happily update. 1140 job1_record.update({'control_file': 'bar1'}) 1141 job2_record.update({'control_file': 'bar2'}) 1142 hqe1_record.update({'status': 'Aborted'}) 1143 hqe2_record.update({'status': 'Aborted'}) 1144 self._do_heartbeat_and_assert_response( 1145 shard_hostname='shard2', upload_jobs=[job1_record, job2_record], 1146 upload_hqes=[hqe1_record, hqe2_record]) 1147 1148 # Job and HQE record for wrong job should not be modified, because the 1149 # rpc came from the wrong shard. Job and HQE record for valid job are 1150 # modified. 1151 self.assertEqual(models.Job.objects.get(id=job1_id).control_file, 1152 'foo1') 1153 self.assertEqual(models.Job.objects.get(id=job2_id).control_file, 1154 'bar2') 1155 self.assertEqual(models.HostQueueEntry.objects.get(id=hqe1_id).status, 1156 '') 1157 self.assertEqual(models.HostQueueEntry.objects.get(id=hqe2_id).status, 1158 'Aborted') 1159 1160 1161 def testSendingRecordsToMasterNotExistingJob(self): 1162 """Ensure update for non existing job gets rejected.""" 1163 jobs, hqes = self._get_records_for_sending_to_master() 1164 jobs[0]['id'] = 3 1165 1166 self._send_records_to_master_helper( 1167 jobs=jobs, hqes=hqes) 1168 1169 1170 def _createShardAndHostWithLabel(self, shard_hostname='shard1', 1171 host_hostname='host1', 1172 label_name='board:lumpy'): 1173 """Create a label, host, shard, and assign host to shard.""" 1174 label = models.Label.objects.create(name=label_name) 1175 1176 shard = models.Shard.objects.create(hostname=shard_hostname) 1177 shard.labels.add(label) 1178 1179 host = models.Host.objects.create(hostname=host_hostname, leased=False, 1180 shard=shard) 1181 host.labels.add(label) 1182 1183 return shard, host, label 1184 1185 1186 def _createJobForLabel(self, label): 1187 job_id = rpc_interface.create_job(name='dummy', priority=self._PRIORITY, 1188 control_file='foo', 1189 control_type=CLIENT, 1190 meta_hosts=[label.name], 1191 dependencies=(label.name,)) 1192 return models.Job.objects.get(id=job_id) 1193 1194 1195 def testShardHeartbeatFetchHostlessJob(self): 1196 """Create a hostless job and ensure it's not assigned to a shard.""" 1197 shard1, host1, lumpy_label = self._createShardAndHostWithLabel( 1198 'shard1', 'host1', 'board:lumpy') 1199 1200 label2 = models.Label.objects.create(name='bluetooth', platform=False) 1201 1202 job1 = self._create_job(hostless=True) 1203 1204 # Hostless jobs should be executed by the global scheduler. 1205 self._do_heartbeat_and_assert_response(hosts=[host1]) 1206 1207 1208 def testShardHeartbeatIncorrectHosts(self): 1209 """Ensure that hosts that don't belong to shard are determined.""" 1210 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1211 1212 host2 = models.Host.objects.create(hostname='host2', leased=False) 1213 1214 # host2 should not belong to shard1. Ensure that if shard1 thinks host2 1215 # is a known host, then it is returned as invalid. 1216 self._do_heartbeat_and_assert_response(known_hosts=[host1, host2], 1217 incorrect_host_ids=[host2.id]) 1218 1219 1220 def testShardHeartbeatLabelRemovalRace(self): 1221 """Ensure correctness if label removed during heartbeat.""" 1222 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1223 1224 host2 = models.Host.objects.create(hostname='host2', leased=False) 1225 host2.labels.add(lumpy_label) 1226 self.assertEqual(host2.shard, None) 1227 1228 # In the middle of the assign_to_shard call, remove lumpy_label from 1229 # shard1. 1230 self.mox.StubOutWithMock(models.Host, '_assign_to_shard_nothing_helper') 1231 def remove_label(): 1232 rpc_interface.remove_board_from_shard( 1233 shard1.hostname, lumpy_label.name) 1234 models.Host._assign_to_shard_nothing_helper().WithSideEffects( 1235 remove_label) 1236 self.mox.ReplayAll() 1237 1238 self._do_heartbeat_and_assert_response( 1239 known_hosts=[host1], hosts=[], incorrect_host_ids=[host1.id]) 1240 host2 = models.Host.smart_get(host2.id) 1241 self.assertEqual(host2.shard, None) 1242 1243 1244 def testShardLabelRemovalInvalid(self): 1245 """Ensure you cannot remove the wrong label from shard.""" 1246 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1247 stumpy_label = models.Label.objects.create( 1248 name='board:stumpy', platform=True) 1249 with self.assertRaises(error.RPCException): 1250 rpc_interface.remove_board_from_shard( 1251 shard1.hostname, stumpy_label.name) 1252 1253 1254 def testShardHeartbeatLabelRemoval(self): 1255 """Ensure label removal from shard works.""" 1256 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1257 1258 self.assertEqual(host1.shard, shard1) 1259 self.assertItemsEqual(shard1.labels.all(), [lumpy_label]) 1260 rpc_interface.remove_board_from_shard( 1261 shard1.hostname, lumpy_label.name) 1262 host1 = models.Host.smart_get(host1.id) 1263 shard1 = models.Shard.smart_get(shard1.id) 1264 self.assertEqual(host1.shard, None) 1265 self.assertItemsEqual(shard1.labels.all(), []) 1266 1267 1268 def testShardRetrieveJobs(self): 1269 """Create jobs and retrieve them.""" 1270 # should never be returned by heartbeat 1271 leased_host = models.Host.objects.create(hostname='leased_host', 1272 leased=True) 1273 1274 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1275 shard2, host2, grumpy_label = self._createShardAndHostWithLabel( 1276 'shard2', 'host2', 'board:grumpy') 1277 1278 leased_host.labels.add(lumpy_label) 1279 1280 job1 = self._createJobForLabel(lumpy_label) 1281 1282 job2 = self._createJobForLabel(grumpy_label) 1283 1284 job_completed = self._createJobForLabel(lumpy_label) 1285 # Job is already being run, so don't sync it 1286 job_completed.hostqueueentry_set.update(complete=True) 1287 job_completed.hostqueueentry_set.create(complete=False) 1288 1289 job_active = self._createJobForLabel(lumpy_label) 1290 # Job is already started, so don't sync it 1291 job_active.hostqueueentry_set.update(active=True) 1292 job_active.hostqueueentry_set.create(complete=False, active=False) 1293 1294 self._do_heartbeat_and_assert_response( 1295 jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all()) 1296 1297 self._do_heartbeat_and_assert_response( 1298 shard_hostname=shard2.hostname, 1299 jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all()) 1300 1301 host3 = models.Host.objects.create(hostname='host3', leased=False) 1302 host3.labels.add(lumpy_label) 1303 1304 self._do_heartbeat_and_assert_response( 1305 known_jobs=[job1], known_hosts=[host1], hosts=[host3]) 1306 1307 1308 def testResendJobsAfterFailedHeartbeat(self): 1309 """Create jobs, retrieve them, fail on client, fetch them again.""" 1310 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1311 1312 job1 = self._createJobForLabel(lumpy_label) 1313 1314 self._do_heartbeat_and_assert_response( 1315 jobs=[job1], 1316 hqes=job1.hostqueueentry_set.all(), hosts=[host1]) 1317 1318 # Make sure it's resubmitted by sending last_job=None again 1319 self._do_heartbeat_and_assert_response( 1320 known_hosts=[host1], 1321 jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[]) 1322 1323 # Now it worked, make sure it's not sent again 1324 self._do_heartbeat_and_assert_response( 1325 known_jobs=[job1], known_hosts=[host1]) 1326 1327 job1 = models.Job.objects.get(pk=job1.id) 1328 job1.hostqueueentry_set.all().update(complete=True) 1329 1330 # Job is completed, make sure it's not sent again 1331 self._do_heartbeat_and_assert_response( 1332 known_hosts=[host1]) 1333 1334 job2 = self._createJobForLabel(lumpy_label) 1335 1336 # job2's creation was later, it should be returned now. 1337 self._do_heartbeat_and_assert_response( 1338 known_hosts=[host1], 1339 jobs=[job2], hqes=job2.hostqueueentry_set.all()) 1340 1341 self._do_heartbeat_and_assert_response( 1342 known_jobs=[job2], known_hosts=[host1]) 1343 1344 job2 = models.Job.objects.get(pk=job2.pk) 1345 job2.hostqueueentry_set.update(aborted=True) 1346 # Setting a job to a complete status will set the shard_id to None in 1347 # scheduler_models. We have to emulate that here, because we use Django 1348 # models in tests. 1349 job2.shard = None 1350 job2.save() 1351 1352 self._do_heartbeat_and_assert_response( 1353 known_jobs=[job2], known_hosts=[host1], 1354 jobs=[job2], 1355 hqes=job2.hostqueueentry_set.all()) 1356 1357 models.Test.objects.create(name='platform_BootPerfServer:shard', 1358 test_type=1) 1359 self.mox.StubOutWithMock(server_utils, 'read_file') 1360 server_utils.read_file(mox.IgnoreArg()).AndReturn('') 1361 self.mox.ReplayAll() 1362 rpc_interface.delete_shard(hostname=shard1.hostname) 1363 1364 self.assertRaises( 1365 models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id) 1366 1367 job1 = models.Job.objects.get(pk=job1.id) 1368 lumpy_label = models.Label.objects.get(pk=lumpy_label.id) 1369 host1 = models.Host.objects.get(pk=host1.id) 1370 super_job = models.Job.objects.get(priority=priorities.Priority.SUPER) 1371 super_job_host = models.HostQueueEntry.objects.get( 1372 job_id=super_job.id) 1373 1374 self.assertIsNone(job1.shard) 1375 self.assertEqual(len(lumpy_label.shard_set.all()), 0) 1376 self.assertIsNone(host1.shard) 1377 self.assertIsNotNone(super_job) 1378 self.assertEqual(super_job_host.host_id, host1.id) 1379 1380 1381 def testCreateListShard(self): 1382 """Retrieve a list of all shards.""" 1383 lumpy_label = models.Label.objects.create(name='board:lumpy', 1384 platform=True) 1385 stumpy_label = models.Label.objects.create(name='board:stumpy', 1386 platform=True) 1387 peppy_label = models.Label.objects.create(name='board:peppy', 1388 platform=True) 1389 1390 shard_id = rpc_interface.add_shard( 1391 hostname='host1', labels='board:lumpy,board:stumpy') 1392 self.assertRaises(error.RPCException, 1393 rpc_interface.add_shard, 1394 hostname='host1', labels='board:lumpy,board:stumpy') 1395 self.assertRaises(model_logic.ValidationError, 1396 rpc_interface.add_shard, 1397 hostname='host1', labels='board:peppy') 1398 shard = models.Shard.objects.get(pk=shard_id) 1399 self.assertEqual(shard.hostname, 'host1') 1400 self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,)) 1401 self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,)) 1402 1403 self.assertEqual(rpc_interface.get_shards(), 1404 [{'labels': ['board:lumpy','board:stumpy'], 1405 'hostname': 'host1', 1406 'id': 1}]) 1407 1408 1409 def testAddBoardsToShard(self): 1410 """Add boards to a given shard.""" 1411 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1412 stumpy_label = models.Label.objects.create(name='board:stumpy', 1413 platform=True) 1414 shard_id = rpc_interface.add_board_to_shard( 1415 hostname='shard1', labels='board:stumpy') 1416 # Test whether raise exception when board label does not exist. 1417 self.assertRaises(models.Label.DoesNotExist, 1418 rpc_interface.add_board_to_shard, 1419 hostname='shard1', labels='board:test') 1420 # Test whether raise exception when board already sharded. 1421 self.assertRaises(error.RPCException, 1422 rpc_interface.add_board_to_shard, 1423 hostname='shard1', labels='board:lumpy') 1424 shard = models.Shard.objects.get(pk=shard_id) 1425 self.assertEqual(shard.hostname, 'shard1') 1426 self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,)) 1427 self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,)) 1428 1429 self.assertEqual(rpc_interface.get_shards(), 1430 [{'labels': ['board:lumpy','board:stumpy'], 1431 'hostname': 'shard1', 1432 'id': 1}]) 1433 1434 1435 def testResendHostsAfterFailedHeartbeat(self): 1436 """Check that master accepts resending updated records after failure.""" 1437 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1438 1439 # Send the host 1440 self._do_heartbeat_and_assert_response(hosts=[host1]) 1441 1442 # Send it again because previous one didn't persist correctly 1443 self._do_heartbeat_and_assert_response(hosts=[host1]) 1444 1445 # Now it worked, make sure it isn't sent again 1446 self._do_heartbeat_and_assert_response(known_hosts=[host1]) 1447 1448 1449if __name__ == '__main__': 1450 unittest.main() 1451