• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2# pylint: disable=missing-docstring
3
4import datetime
5import mox
6import unittest
7
8import common
9from autotest_lib.client.common_lib import control_data
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib import priorities
13from autotest_lib.client.common_lib.cros import dev_server
14from autotest_lib.client.common_lib.test_utils import mock
15from autotest_lib.frontend import setup_django_environment
16from autotest_lib.frontend.afe import frontend_test_utils
17from autotest_lib.frontend.afe import model_logic
18from autotest_lib.frontend.afe import models
19from autotest_lib.frontend.afe import rpc_interface
20from autotest_lib.frontend.afe import rpc_utils
21from autotest_lib.server import frontend
22from autotest_lib.server import utils as server_utils
23from autotest_lib.server.cros import provision
24from autotest_lib.server.cros.dynamic_suite import constants
25from autotest_lib.server.cros.dynamic_suite import control_file_getter
26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
27
28CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT
29SERVER = control_data.CONTROL_TYPE_NAMES.SERVER
30
31_hqe_status = models.HostQueueEntry.Status
32
33
34class RpcInterfaceTest(unittest.TestCase,
35                       frontend_test_utils.FrontendTestMixin):
36    def setUp(self):
37        self._frontend_common_setup()
38        self.god = mock.mock_god()
39
40
41    def tearDown(self):
42        self.god.unstub_all()
43        self._frontend_common_teardown()
44        global_config.global_config.reset_config_values()
45
46
47    def test_validation(self):
48        # omit a required field
49        self.assertRaises(model_logic.ValidationError, rpc_interface.add_label,
50                          name=None)
51        # violate uniqueness constraint
52        self.assertRaises(model_logic.ValidationError, rpc_interface.add_host,
53                          hostname='host1')
54
55
56    def test_multiple_platforms(self):
57        platform2 = models.Label.objects.create(name='platform2', platform=True)
58        self.assertRaises(model_logic.ValidationError,
59                          rpc_interface. label_add_hosts, id='platform2',
60                          hosts=['host1', 'host2'])
61        self.assertRaises(model_logic.ValidationError,
62                          rpc_interface.host_add_labels,
63                          id='host1', labels=['platform2'])
64        # make sure the platform didn't get added
65        platforms = rpc_interface.get_labels(
66            host__hostname__in=['host1', 'host2'], platform=True)
67        self.assertEquals(len(platforms), 1)
68        self.assertEquals(platforms[0]['name'], 'myplatform')
69
70
71    def _check_hostnames(self, hosts, expected_hostnames):
72        self.assertEquals(set(host['hostname'] for host in hosts),
73                          set(expected_hostnames))
74
75
76    def test_ping_db(self):
77        self.assertEquals(rpc_interface.ping_db(), [True])
78
79
80    def test_get_hosts(self):
81        hosts = rpc_interface.get_hosts()
82        self._check_hostnames(hosts, [host.hostname for host in self.hosts])
83
84        hosts = rpc_interface.get_hosts(hostname='host1')
85        self._check_hostnames(hosts, ['host1'])
86        host = hosts[0]
87        self.assertEquals(sorted(host['labels']), ['label1', 'myplatform'])
88        self.assertEquals(host['platform'], 'myplatform')
89        self.assertEquals(host['acls'], ['my_acl'])
90        self.assertEquals(host['attributes'], {})
91
92
93    def test_get_hosts_multiple_labels(self):
94        hosts = rpc_interface.get_hosts(
95                multiple_labels=['myplatform', 'label1'])
96        self._check_hostnames(hosts, ['host1'])
97
98
99    def test_get_hosts_exclude_only_if_needed(self):
100        self.hosts[0].labels.add(self.label3)
101
102        hosts = rpc_interface.get_hosts(hostname__in=['host1', 'host2'],
103                                        exclude_only_if_needed_labels=True)
104        self._check_hostnames(hosts, ['host2'])
105
106
107    def test_job_keyvals(self):
108        keyval_dict = {'mykey': 'myvalue'}
109        job_id = rpc_interface.create_job(name='test',
110                                          priority=priorities.Priority.DEFAULT,
111                                          control_file='foo',
112                                          control_type=CLIENT,
113                                          hosts=['host1'],
114                                          keyvals=keyval_dict)
115        jobs = rpc_interface.get_jobs(id=job_id)
116        self.assertEquals(len(jobs), 1)
117        self.assertEquals(jobs[0]['keyvals'], keyval_dict)
118
119
120    def test_test_retry(self):
121        job_id = rpc_interface.create_job(name='flake',
122                                          priority=priorities.Priority.DEFAULT,
123                                          control_file='foo',
124                                          control_type=CLIENT,
125                                          hosts=['host1'],
126                                          test_retry=10)
127        jobs = rpc_interface.get_jobs(id=job_id)
128        self.assertEquals(len(jobs), 1)
129        self.assertEquals(jobs[0]['test_retry'], 10)
130
131
132    def test_get_jobs_summary(self):
133        job = self._create_job(hosts=xrange(1, 4))
134        entries = list(job.hostqueueentry_set.all())
135        entries[1].status = _hqe_status.FAILED
136        entries[1].save()
137        entries[2].status = _hqe_status.FAILED
138        entries[2].aborted = True
139        entries[2].save()
140
141        # Mock up tko_rpc_interface.get_status_counts.
142        self.god.stub_function_to_return(rpc_interface.tko_rpc_interface,
143                                         'get_status_counts',
144                                         None)
145
146        job_summaries = rpc_interface.get_jobs_summary(id=job.id)
147        self.assertEquals(len(job_summaries), 1)
148        summary = job_summaries[0]
149        self.assertEquals(summary['status_counts'], {'Queued': 1,
150                                                     'Failed': 2})
151
152
153    def _check_job_ids(self, actual_job_dicts, expected_jobs):
154        self.assertEquals(
155                set(job_dict['id'] for job_dict in actual_job_dicts),
156                set(job.id for job in expected_jobs))
157
158
159    def test_get_jobs_status_filters(self):
160        HqeStatus = models.HostQueueEntry.Status
161        def create_two_host_job():
162            return self._create_job(hosts=[1, 2])
163        def set_hqe_statuses(job, first_status, second_status):
164            entries = job.hostqueueentry_set.all()
165            entries[0].update_object(status=first_status)
166            entries[1].update_object(status=second_status)
167
168        queued = create_two_host_job()
169
170        queued_and_running = create_two_host_job()
171        set_hqe_statuses(queued_and_running, HqeStatus.QUEUED,
172                           HqeStatus.RUNNING)
173
174        running_and_complete = create_two_host_job()
175        set_hqe_statuses(running_and_complete, HqeStatus.RUNNING,
176                           HqeStatus.COMPLETED)
177
178        complete = create_two_host_job()
179        set_hqe_statuses(complete, HqeStatus.COMPLETED, HqeStatus.COMPLETED)
180
181        started_but_inactive = create_two_host_job()
182        set_hqe_statuses(started_but_inactive, HqeStatus.QUEUED,
183                           HqeStatus.COMPLETED)
184
185        parsing = create_two_host_job()
186        set_hqe_statuses(parsing, HqeStatus.PARSING, HqeStatus.PARSING)
187
188        self._check_job_ids(rpc_interface.get_jobs(not_yet_run=True), [queued])
189        self._check_job_ids(rpc_interface.get_jobs(running=True),
190                      [queued_and_running, running_and_complete,
191                       started_but_inactive, parsing])
192        self._check_job_ids(rpc_interface.get_jobs(finished=True), [complete])
193
194
195    def test_get_jobs_type_filters(self):
196        self.assertRaises(AssertionError, rpc_interface.get_jobs,
197                          suite=True, sub=True)
198        self.assertRaises(AssertionError, rpc_interface.get_jobs,
199                          suite=True, standalone=True)
200        self.assertRaises(AssertionError, rpc_interface.get_jobs,
201                          standalone=True, sub=True)
202
203        parent_job = self._create_job(hosts=[1])
204        child_jobs = self._create_job(hosts=[1, 2],
205                                      parent_job_id=parent_job.id)
206        standalone_job = self._create_job(hosts=[1])
207
208        self._check_job_ids(rpc_interface.get_jobs(suite=True), [parent_job])
209        self._check_job_ids(rpc_interface.get_jobs(sub=True), [child_jobs])
210        self._check_job_ids(rpc_interface.get_jobs(standalone=True),
211                            [standalone_job])
212
213
214    def _create_job_helper(self, **kwargs):
215        return rpc_interface.create_job(name='test',
216                                        priority=priorities.Priority.DEFAULT,
217                                        control_file='control file',
218                                        control_type=SERVER, **kwargs)
219
220
221    def test_one_time_hosts(self):
222        job = self._create_job_helper(one_time_hosts=['testhost'])
223        host = models.Host.objects.get(hostname='testhost')
224        self.assertEquals(host.invalid, True)
225        self.assertEquals(host.labels.count(), 0)
226        self.assertEquals(host.aclgroup_set.count(), 0)
227
228
229    def test_create_job_duplicate_hosts(self):
230        self.assertRaises(model_logic.ValidationError, self._create_job_helper,
231                          hosts=[1, 1])
232
233
234    def test_create_unrunnable_metahost_job(self):
235        self.assertRaises(error.NoEligibleHostException,
236                          self._create_job_helper, meta_hosts=['unused'])
237
238
239    def test_create_hostless_job(self):
240        job_id = self._create_job_helper(hostless=True)
241        job = models.Job.objects.get(pk=job_id)
242        queue_entries = job.hostqueueentry_set.all()
243        self.assertEquals(len(queue_entries), 1)
244        self.assertEquals(queue_entries[0].host, None)
245        self.assertEquals(queue_entries[0].meta_host, None)
246
247
248    def _setup_special_tasks(self):
249        host = self.hosts[0]
250
251        job1 = self._create_job(hosts=[1])
252        job2 = self._create_job(hosts=[1])
253
254        entry1 = job1.hostqueueentry_set.all()[0]
255        entry1.update_object(started_on=datetime.datetime(2009, 1, 2),
256                             execution_subdir='host1')
257        entry2 = job2.hostqueueentry_set.all()[0]
258        entry2.update_object(started_on=datetime.datetime(2009, 1, 3),
259                             execution_subdir='host1')
260
261        self.task1 = models.SpecialTask.objects.create(
262                host=host, task=models.SpecialTask.Task.VERIFY,
263                time_started=datetime.datetime(2009, 1, 1), # ran before job 1
264                is_complete=True, requested_by=models.User.current_user())
265        self.task2 = models.SpecialTask.objects.create(
266                host=host, task=models.SpecialTask.Task.VERIFY,
267                queue_entry=entry2, # ran with job 2
268                is_active=True, requested_by=models.User.current_user())
269        self.task3 = models.SpecialTask.objects.create(
270                host=host, task=models.SpecialTask.Task.VERIFY,
271                requested_by=models.User.current_user()) # not yet run
272
273
274    def test_get_special_tasks(self):
275        self._setup_special_tasks()
276        tasks = rpc_interface.get_special_tasks(host__hostname='host1',
277                                                queue_entry__isnull=True)
278        self.assertEquals(len(tasks), 2)
279        self.assertEquals(tasks[0]['task'], models.SpecialTask.Task.VERIFY)
280        self.assertEquals(tasks[0]['is_active'], False)
281        self.assertEquals(tasks[0]['is_complete'], True)
282
283
284    def test_get_latest_special_task(self):
285        # a particular usage of get_special_tasks()
286        self._setup_special_tasks()
287        self.task2.time_started = datetime.datetime(2009, 1, 2)
288        self.task2.save()
289
290        tasks = rpc_interface.get_special_tasks(
291                host__hostname='host1', task=models.SpecialTask.Task.VERIFY,
292                time_started__isnull=False, sort_by=['-time_started'],
293                query_limit=1)
294        self.assertEquals(len(tasks), 1)
295        self.assertEquals(tasks[0]['id'], 2)
296
297
298    def _common_entry_check(self, entry_dict):
299        self.assertEquals(entry_dict['host']['hostname'], 'host1')
300        self.assertEquals(entry_dict['job']['id'], 2)
301
302
303    def test_get_host_queue_entries_and_special_tasks(self):
304        self._setup_special_tasks()
305
306        host = self.hosts[0].id
307        entries_and_tasks = (
308                rpc_interface.get_host_queue_entries_and_special_tasks(host))
309
310        paths = [entry['execution_path'] for entry in entries_and_tasks]
311        self.assertEquals(paths, ['hosts/host1/3-verify',
312                                  '2-autotest_system/host1',
313                                  'hosts/host1/2-verify',
314                                  '1-autotest_system/host1',
315                                  'hosts/host1/1-verify'])
316
317        verify2 = entries_and_tasks[2]
318        self._common_entry_check(verify2)
319        self.assertEquals(verify2['type'], 'Verify')
320        self.assertEquals(verify2['status'], 'Running')
321        self.assertEquals(verify2['execution_path'], 'hosts/host1/2-verify')
322
323        entry2 = entries_and_tasks[1]
324        self._common_entry_check(entry2)
325        self.assertEquals(entry2['type'], 'Job')
326        self.assertEquals(entry2['status'], 'Queued')
327        self.assertEquals(entry2['started_on'], '2009-01-03 00:00:00')
328
329
330    def _create_hqes_and_start_time_index_entries(self):
331        shard = models.Shard.objects.create(hostname='shard')
332        job = self._create_job(shard=shard, control_file='foo')
333        HqeStatus = models.HostQueueEntry.Status
334
335        models.HostQueueEntry(
336            id=1, job=job, started_on='2017-01-01',
337            status=HqeStatus.QUEUED).save()
338        models.HostQueueEntry(
339            id=2, job=job, started_on='2017-01-02',
340            status=HqeStatus.QUEUED).save()
341        models.HostQueueEntry(
342            id=3, job=job, started_on='2017-01-03',
343            status=HqeStatus.QUEUED).save()
344
345        models.HostQueueEntryStartTimes(
346            insert_time='2017-01-03', highest_hqe_id=3).save()
347        models.HostQueueEntryStartTimes(
348            insert_time='2017-01-02', highest_hqe_id=2).save()
349        models.HostQueueEntryStartTimes(
350            insert_time='2017-01-01', highest_hqe_id=1).save()
351
352    def test_get_host_queue_entries_by_insert_time(self):
353        """Check the insert_time_after and insert_time_before constraints."""
354        self._create_hqes_and_start_time_index_entries()
355        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
356            insert_time_after='2017-01-01')
357        self.assertEquals(len(hqes), 3)
358
359        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
360            insert_time_after='2017-01-02')
361        self.assertEquals(len(hqes), 2)
362
363        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
364            insert_time_after='2017-01-03')
365        self.assertEquals(len(hqes), 1)
366
367        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
368            insert_time_before='2017-01-01')
369        self.assertEquals(len(hqes), 1)
370
371        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
372            insert_time_before='2017-01-02')
373        self.assertEquals(len(hqes), 2)
374
375        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
376            insert_time_before='2017-01-03')
377        self.assertEquals(len(hqes), 3)
378
379
380    def test_get_host_queue_entries_by_insert_time_with_missing_index_row(self):
381        """Shows that the constraints are approximate.
382
383        The query may return rows which are actually outside of the bounds
384        given, if the index table does not have an entry for the specific time.
385        """
386        self._create_hqes_and_start_time_index_entries()
387        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
388            insert_time_before='2016-12-01')
389        self.assertEquals(len(hqes), 1)
390
391    def test_get_hqe_by_insert_time_with_before_and_after(self):
392        self._create_hqes_and_start_time_index_entries()
393        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
394            insert_time_before='2017-01-02',
395            insert_time_after='2017-01-02')
396        self.assertEquals(len(hqes), 1)
397
398    def test_get_hqe_by_insert_time_and_id_constraint(self):
399        self._create_hqes_and_start_time_index_entries()
400        # The time constraint is looser than the id constraint, so the time
401        # constraint should take precedence.
402        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
403            insert_time_before='2017-01-02',
404            id__lte=1)
405        self.assertEquals(len(hqes), 1)
406
407        # Now make the time constraint tighter than the id constraint.
408        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
409            insert_time_before='2017-01-01',
410            id__lte=42)
411        self.assertEquals(len(hqes), 1)
412
413    def test_view_invalid_host(self):
414        # RPCs used by View Host page should work for invalid hosts
415        self._create_job_helper(hosts=[1])
416        host = self.hosts[0]
417        host.delete()
418
419        self.assertEquals(1, rpc_interface.get_num_hosts(hostname='host1',
420                                                         valid_only=False))
421        data = rpc_interface.get_hosts(hostname='host1', valid_only=False)
422        self.assertEquals(1, len(data))
423
424        self.assertEquals(1, rpc_interface.get_num_host_queue_entries(
425                host__hostname='host1'))
426        data = rpc_interface.get_host_queue_entries(host__hostname='host1')
427        self.assertEquals(1, len(data))
428
429        count = rpc_interface.get_num_host_queue_entries_and_special_tasks(
430                host=host.id)
431        self.assertEquals(1, count)
432        data = rpc_interface.get_host_queue_entries_and_special_tasks(
433                host=host.id)
434        self.assertEquals(1, len(data))
435
436
437    def test_reverify_hosts(self):
438        hostname_list = rpc_interface.reverify_hosts(id__in=[1, 2])
439        self.assertEquals(hostname_list, ['host1', 'host2'])
440        tasks = rpc_interface.get_special_tasks()
441        self.assertEquals(len(tasks), 2)
442        self.assertEquals(set(task['host']['id'] for task in tasks),
443                          set([1, 2]))
444
445        task = tasks[0]
446        self.assertEquals(task['task'], models.SpecialTask.Task.VERIFY)
447        self.assertEquals(task['requested_by'], 'autotest_system')
448
449
450    def test_repair_hosts(self):
451        hostname_list = rpc_interface.repair_hosts(id__in=[1, 2])
452        self.assertEquals(hostname_list, ['host1', 'host2'])
453        tasks = rpc_interface.get_special_tasks()
454        self.assertEquals(len(tasks), 2)
455        self.assertEquals(set(task['host']['id'] for task in tasks),
456                          set([1, 2]))
457
458        task = tasks[0]
459        self.assertEquals(task['task'], models.SpecialTask.Task.REPAIR)
460        self.assertEquals(task['requested_by'], 'autotest_system')
461
462
463    def _modify_host_helper(self, on_shard=False, host_on_shard=False):
464        shard_hostname = 'shard1'
465        if on_shard:
466            global_config.global_config.override_config_value(
467                'SHARD', 'shard_hostname', shard_hostname)
468
469        host = models.Host.objects.all()[0]
470        if host_on_shard:
471            shard = models.Shard.objects.create(hostname=shard_hostname)
472            host.shard = shard
473            host.save()
474
475        self.assertFalse(host.locked)
476
477        self.god.stub_class_method(frontend.AFE, 'run')
478
479        if host_on_shard and not on_shard:
480            mock_afe = self.god.create_mock_class_obj(
481                    frontend_wrappers.RetryingAFE, 'MockAFE')
482            self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
483
484            mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
485                    server=shard_hostname, user=None)
486            mock_afe2.run.expect_call('modify_host_local', id=host.id,
487                    locked=True, lock_reason='_modify_host_helper lock',
488                    lock_time=datetime.datetime(2015, 12, 15))
489        elif on_shard:
490            mock_afe = self.god.create_mock_class_obj(
491                    frontend_wrappers.RetryingAFE, 'MockAFE')
492            self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
493
494            mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
495                    server=server_utils.get_global_afe_hostname(), user=None)
496            mock_afe2.run.expect_call('modify_host', id=host.id,
497                    locked=True, lock_reason='_modify_host_helper lock',
498                    lock_time=datetime.datetime(2015, 12, 15))
499
500        rpc_interface.modify_host(id=host.id, locked=True,
501                                  lock_reason='_modify_host_helper lock',
502                                  lock_time=datetime.datetime(2015, 12, 15))
503
504        host = models.Host.objects.get(pk=host.id)
505        if on_shard:
506            # modify_host on shard does nothing but routing the RPC to master.
507            self.assertFalse(host.locked)
508        else:
509            self.assertTrue(host.locked)
510        self.god.check_playback()
511
512
513    def test_modify_host_on_master_host_on_master(self):
514        """Call modify_host to master for host in master."""
515        self._modify_host_helper()
516
517
518    def test_modify_host_on_master_host_on_shard(self):
519        """Call modify_host to master for host in shard."""
520        self._modify_host_helper(host_on_shard=True)
521
522
523    def test_modify_host_on_shard(self):
524        """Call modify_host to shard for host in shard."""
525        self._modify_host_helper(on_shard=True, host_on_shard=True)
526
527
528    def test_modify_hosts_on_master_host_on_shard(self):
529        """Ensure calls to modify_hosts are correctly forwarded to shards."""
530        host1 = models.Host.objects.all()[0]
531        host2 = models.Host.objects.all()[1]
532
533        shard1 = models.Shard.objects.create(hostname='shard1')
534        host1.shard = shard1
535        host1.save()
536
537        shard2 = models.Shard.objects.create(hostname='shard2')
538        host2.shard = shard2
539        host2.save()
540
541        self.assertFalse(host1.locked)
542        self.assertFalse(host2.locked)
543
544        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
545                                                  'MockAFE')
546        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
547
548        # The statuses of one host might differ on master and shard.
549        # Filters are always applied on the master. So the host on the shard
550        # will be affected no matter what his status is.
551        filters_to_use = {'status': 'Ready'}
552
553        mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
554                server='shard2', user=None)
555        mock_afe2.run.expect_call(
556            'modify_hosts_local',
557            host_filter_data={'id__in': [shard1.id, shard2.id]},
558            update_data={'locked': True,
559                         'lock_reason': 'Testing forward to shard',
560                         'lock_time' : datetime.datetime(2015, 12, 15) })
561
562        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
563                server='shard1', user=None)
564        mock_afe1.run.expect_call(
565            'modify_hosts_local',
566            host_filter_data={'id__in': [shard1.id, shard2.id]},
567            update_data={'locked': True,
568                         'lock_reason': 'Testing forward to shard',
569                         'lock_time' : datetime.datetime(2015, 12, 15)})
570
571        rpc_interface.modify_hosts(
572                host_filter_data={'status': 'Ready'},
573                update_data={'locked': True,
574                             'lock_reason': 'Testing forward to shard',
575                             'lock_time' : datetime.datetime(2015, 12, 15) })
576
577        host1 = models.Host.objects.get(pk=host1.id)
578        self.assertTrue(host1.locked)
579        host2 = models.Host.objects.get(pk=host2.id)
580        self.assertTrue(host2.locked)
581        self.god.check_playback()
582
583
584    def test_delete_host(self):
585        """Ensure an RPC is made on delete a host, if it is on a shard."""
586        host1 = models.Host.objects.all()[0]
587        shard1 = models.Shard.objects.create(hostname='shard1')
588        host1.shard = shard1
589        host1.save()
590        host1_id = host1.id
591
592        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
593                                                 'MockAFE')
594        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
595
596        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
597                server='shard1', user=None)
598        mock_afe1.run.expect_call('delete_host', id=host1.id)
599
600        rpc_interface.delete_host(id=host1.id)
601
602        self.assertRaises(models.Host.DoesNotExist,
603                          models.Host.smart_get, host1_id)
604
605        self.god.check_playback()
606
607
608    def test_modify_label(self):
609        label1 = models.Label.objects.all()[0]
610        self.assertEqual(label1.invalid, 0)
611
612        host2 = models.Host.objects.all()[1]
613        shard1 = models.Shard.objects.create(hostname='shard1')
614        host2.shard = shard1
615        host2.labels.add(label1)
616        host2.save()
617
618        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
619                                                  'MockAFE')
620        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
621
622        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
623                server='shard1', user=None)
624        mock_afe1.run.expect_call('modify_label', id=label1.id, invalid=1)
625
626        rpc_interface.modify_label(label1.id, invalid=1)
627
628        self.assertEqual(models.Label.objects.all()[0].invalid, 1)
629        self.god.check_playback()
630
631
632    def test_delete_label(self):
633        label1 = models.Label.objects.all()[0]
634
635        host2 = models.Host.objects.all()[1]
636        shard1 = models.Shard.objects.create(hostname='shard1')
637        host2.shard = shard1
638        host2.labels.add(label1)
639        host2.save()
640
641        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
642                                                  'MockAFE')
643        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
644
645        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
646                server='shard1', user=None)
647        mock_afe1.run.expect_call('delete_label', id=label1.id)
648
649        rpc_interface.delete_label(id=label1.id)
650
651        self.assertRaises(models.Label.DoesNotExist,
652                          models.Label.smart_get, label1.id)
653        self.god.check_playback()
654
655
656    def test_get_image_for_job_with_keyval_build(self):
657        keyval_dict = {'build': 'cool-image'}
658        job_id = rpc_interface.create_job(name='test',
659                                          priority=priorities.Priority.DEFAULT,
660                                          control_file='foo',
661                                          control_type=CLIENT,
662                                          hosts=['host1'],
663                                          keyvals=keyval_dict)
664        job = models.Job.objects.get(id=job_id)
665        self.assertIsNotNone(job)
666        image = rpc_interface._get_image_for_job(job, True)
667        self.assertEquals('cool-image', image)
668
669
670    def test_get_image_for_job_with_keyval_builds(self):
671        keyval_dict = {'builds': {'cros-version': 'cool-image'}}
672        job_id = rpc_interface.create_job(name='test',
673                                          priority=priorities.Priority.DEFAULT,
674                                          control_file='foo',
675                                          control_type=CLIENT,
676                                          hosts=['host1'],
677                                          keyvals=keyval_dict)
678        job = models.Job.objects.get(id=job_id)
679        self.assertIsNotNone(job)
680        image = rpc_interface._get_image_for_job(job, True)
681        self.assertEquals('cool-image', image)
682
683
684    def test_get_image_for_job_with_control_build(self):
685        CONTROL_FILE = """build='cool-image'
686        """
687        job_id = rpc_interface.create_job(name='test',
688                                          priority=priorities.Priority.DEFAULT,
689                                          control_file='foo',
690                                          control_type=CLIENT,
691                                          hosts=['host1'])
692        job = models.Job.objects.get(id=job_id)
693        self.assertIsNotNone(job)
694        job.control_file = CONTROL_FILE
695        image = rpc_interface._get_image_for_job(job, True)
696        self.assertEquals('cool-image', image)
697
698
699    def test_get_image_for_job_with_control_builds(self):
700        CONTROL_FILE = """builds={'cros-version': 'cool-image'}
701        """
702        job_id = rpc_interface.create_job(name='test',
703                                          priority=priorities.Priority.DEFAULT,
704                                          control_file='foo',
705                                          control_type=CLIENT,
706                                          hosts=['host1'])
707        job = models.Job.objects.get(id=job_id)
708        self.assertIsNotNone(job)
709        job.control_file = CONTROL_FILE
710        image = rpc_interface._get_image_for_job(job, True)
711        self.assertEquals('cool-image', image)
712
713
714class ExtraRpcInterfaceTest(mox.MoxTestBase,
715                           frontend_test_utils.FrontendTestMixin):
716    """Unit tests for functions originally in site_rpc_interface.py.
717
718    @var _NAME: fake suite name.
719    @var _BOARD: fake board to reimage.
720    @var _BUILD: fake build with which to reimage.
721    @var _PRIORITY: fake priority with which to reimage.
722    """
723    _NAME = 'name'
724    _BOARD = 'link'
725    _BUILD = 'link-release/R36-5812.0.0'
726    _BUILDS = {provision.CROS_VERSION_PREFIX: _BUILD}
727    _PRIORITY = priorities.Priority.DEFAULT
728    _TIMEOUT = 24
729
730
731    def setUp(self):
732        super(ExtraRpcInterfaceTest, self).setUp()
733        self._SUITE_NAME = rpc_interface.canonicalize_suite_name(
734            self._NAME)
735        self.dev_server = self.mox.CreateMock(dev_server.ImageServer)
736        self._frontend_common_setup(fill_data=False)
737
738
739    def tearDown(self):
740        self._frontend_common_teardown()
741
742
743    def _setupDevserver(self):
744        self.mox.StubOutClassWithMocks(dev_server, 'ImageServer')
745        dev_server.resolve(self._BUILD).AndReturn(self.dev_server)
746
747
748    def _mockDevServerGetter(self, get_control_file=True):
749        self._setupDevserver()
750        if get_control_file:
751          self.getter = self.mox.CreateMock(
752              control_file_getter.DevServerGetter)
753          self.mox.StubOutWithMock(control_file_getter.DevServerGetter,
754                                   'create')
755          control_file_getter.DevServerGetter.create(
756              mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter)
757
758
759    def _mockRpcUtils(self, to_return, control_file_substring=''):
760        """Fake out the autotest rpc_utils module with a mockable class.
761
762        @param to_return: the value that rpc_utils.create_job_common() should
763                          be mocked out to return.
764        @param control_file_substring: A substring that is expected to appear
765                                       in the control file output string that
766                                       is passed to create_job_common.
767                                       Default: ''
768        """
769        download_started_time = constants.DOWNLOAD_STARTED_TIME
770        payload_finished_time = constants.PAYLOAD_FINISHED_TIME
771        self.mox.StubOutWithMock(rpc_utils, 'create_job_common')
772        rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME),
773                                    mox.StrContains(self._BUILD)),
774                            priority=self._PRIORITY,
775                            timeout_mins=self._TIMEOUT*60,
776                            max_runtime_mins=self._TIMEOUT*60,
777                            control_type='Server',
778                            control_file=mox.And(mox.StrContains(self._BOARD),
779                                                 mox.StrContains(self._BUILD),
780                                                 mox.StrContains(
781                                                     control_file_substring)),
782                            hostless=True,
783                            keyvals=mox.And(mox.In(download_started_time),
784                                            mox.In(payload_finished_time))
785                            ).AndReturn(to_return)
786
787
788    def testStageBuildFail(self):
789        """Ensure that a failure to stage the desired build fails the RPC."""
790        self._setupDevserver()
791
792        self.dev_server.hostname = 'mox_url'
793        self.dev_server.stage_artifacts(
794                image=self._BUILD, artifacts=['test_suites']).AndRaise(
795                dev_server.DevServerException())
796        self.mox.ReplayAll()
797        self.assertRaises(error.StageControlFileFailure,
798                          rpc_interface.create_suite_job,
799                          name=self._NAME,
800                          board=self._BOARD,
801                          builds=self._BUILDS,
802                          pool=None)
803
804
805    def testGetControlFileFail(self):
806        """Ensure that a failure to get needed control file fails the RPC."""
807        self._mockDevServerGetter()
808
809        self.dev_server.hostname = 'mox_url'
810        self.dev_server.stage_artifacts(
811                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
812
813        self.getter.get_control_file_contents_by_name(
814            self._SUITE_NAME).AndReturn(None)
815        self.mox.ReplayAll()
816        self.assertRaises(error.ControlFileEmpty,
817                          rpc_interface.create_suite_job,
818                          name=self._NAME,
819                          board=self._BOARD,
820                          builds=self._BUILDS,
821                          pool=None)
822
823
824    def testGetControlFileListFail(self):
825        """Ensure that a failure to get needed control file fails the RPC."""
826        self._mockDevServerGetter()
827
828        self.dev_server.hostname = 'mox_url'
829        self.dev_server.stage_artifacts(
830                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
831
832        self.getter.get_control_file_contents_by_name(
833            self._SUITE_NAME).AndRaise(error.NoControlFileList())
834        self.mox.ReplayAll()
835        self.assertRaises(error.NoControlFileList,
836                          rpc_interface.create_suite_job,
837                          name=self._NAME,
838                          board=self._BOARD,
839                          builds=self._BUILDS,
840                          pool=None)
841
842
843    def testBadNumArgument(self):
844        """Ensure we handle bad values for the |num| argument."""
845        self.assertRaises(error.SuiteArgumentException,
846                          rpc_interface.create_suite_job,
847                          name=self._NAME,
848                          board=self._BOARD,
849                          builds=self._BUILDS,
850                          pool=None,
851                          num='goo')
852        self.assertRaises(error.SuiteArgumentException,
853                          rpc_interface.create_suite_job,
854                          name=self._NAME,
855                          board=self._BOARD,
856                          builds=self._BUILDS,
857                          pool=None,
858                          num=[])
859        self.assertRaises(error.SuiteArgumentException,
860                          rpc_interface.create_suite_job,
861                          name=self._NAME,
862                          board=self._BOARD,
863                          builds=self._BUILDS,
864                          pool=None,
865                          num='5')
866
867
868
869    def testCreateSuiteJobFail(self):
870        """Ensure that failure to schedule the suite job fails the RPC."""
871        self._mockDevServerGetter()
872
873        self.dev_server.hostname = 'mox_url'
874        self.dev_server.stage_artifacts(
875                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
876
877        self.getter.get_control_file_contents_by_name(
878            self._SUITE_NAME).AndReturn('f')
879
880        self.dev_server.url().AndReturn('mox_url')
881        self._mockRpcUtils(-1)
882        self.mox.ReplayAll()
883        self.assertEquals(
884            rpc_interface.create_suite_job(name=self._NAME,
885                                           board=self._BOARD,
886                                           builds=self._BUILDS, pool=None),
887            -1)
888
889
890    def testCreateSuiteJobSuccess(self):
891        """Ensures that success results in a successful RPC."""
892        self._mockDevServerGetter()
893
894        self.dev_server.hostname = 'mox_url'
895        self.dev_server.stage_artifacts(
896                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
897
898        self.getter.get_control_file_contents_by_name(
899            self._SUITE_NAME).AndReturn('f')
900
901        self.dev_server.url().AndReturn('mox_url')
902        job_id = 5
903        self._mockRpcUtils(job_id)
904        self.mox.ReplayAll()
905        self.assertEquals(
906            rpc_interface.create_suite_job(name=self._NAME,
907                                           board=self._BOARD,
908                                           builds=self._BUILDS,
909                                           pool=None),
910            job_id)
911
912
913    def testCreateSuiteJobNoHostCheckSuccess(self):
914        """Ensures that success results in a successful RPC."""
915        self._mockDevServerGetter()
916
917        self.dev_server.hostname = 'mox_url'
918        self.dev_server.stage_artifacts(
919                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
920
921        self.getter.get_control_file_contents_by_name(
922            self._SUITE_NAME).AndReturn('f')
923
924        self.dev_server.url().AndReturn('mox_url')
925        job_id = 5
926        self._mockRpcUtils(job_id)
927        self.mox.ReplayAll()
928        self.assertEquals(
929          rpc_interface.create_suite_job(name=self._NAME,
930                                         board=self._BOARD,
931                                         builds=self._BUILDS,
932                                         pool=None, check_hosts=False),
933          job_id)
934
935    def testCreateSuiteIntegerNum(self):
936        """Ensures that success results in a successful RPC."""
937        self._mockDevServerGetter()
938
939        self.dev_server.hostname = 'mox_url'
940        self.dev_server.stage_artifacts(
941                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
942
943        self.getter.get_control_file_contents_by_name(
944            self._SUITE_NAME).AndReturn('f')
945
946        self.dev_server.url().AndReturn('mox_url')
947        job_id = 5
948        self._mockRpcUtils(job_id, control_file_substring='num=17')
949        self.mox.ReplayAll()
950        self.assertEquals(
951            rpc_interface.create_suite_job(name=self._NAME,
952                                           board=self._BOARD,
953                                           builds=self._BUILDS,
954                                           pool=None,
955                                           check_hosts=False,
956                                           num=17),
957            job_id)
958
959
960    def testCreateSuiteJobControlFileSupplied(self):
961        """Ensure we can supply the control file to create_suite_job."""
962        self._mockDevServerGetter(get_control_file=False)
963
964        self.dev_server.hostname = 'mox_url'
965        self.dev_server.stage_artifacts(
966                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
967        self.dev_server.url().AndReturn('mox_url')
968        job_id = 5
969        self._mockRpcUtils(job_id)
970        self.mox.ReplayAll()
971        self.assertEquals(
972            rpc_interface.create_suite_job(name='%s/%s' % (self._NAME,
973                                                           self._BUILD),
974                                           board=None,
975                                           builds=self._BUILDS,
976                                           pool=None,
977                                           control_file='CONTROL FILE'),
978            job_id)
979
980
981    def _get_records_for_sending_to_master(self):
982        return [{'control_file': 'foo',
983                 'control_type': 1,
984                 'created_on': datetime.datetime(2014, 8, 21),
985                 'drone_set': None,
986                 'email_list': '',
987                 'max_runtime_hrs': 72,
988                 'max_runtime_mins': 1440,
989                 'name': 'dummy',
990                 'owner': 'autotest_system',
991                 'parse_failed_repair': True,
992                 'priority': 40,
993                 'reboot_after': 0,
994                 'reboot_before': 1,
995                 'run_reset': True,
996                 'run_verify': False,
997                 'synch_count': 0,
998                 'test_retry': 10,
999                 'timeout': 24,
1000                 'timeout_mins': 1440,
1001                 'id': 1
1002                 }], [{
1003                    'aborted': False,
1004                    'active': False,
1005                    'complete': False,
1006                    'deleted': False,
1007                    'execution_subdir': '',
1008                    'finished_on': None,
1009                    'started_on': None,
1010                    'status': 'Queued',
1011                    'id': 1
1012                }]
1013
1014
1015    def _do_heartbeat_and_assert_response(self, shard_hostname='shard1',
1016                                          upload_jobs=(), upload_hqes=(),
1017                                          known_jobs=(), known_hosts=(),
1018                                          **kwargs):
1019        known_job_ids = [job.id for job in known_jobs]
1020        known_host_ids = [host.id for host in known_hosts]
1021        known_host_statuses = [host.status for host in known_hosts]
1022
1023        retval = rpc_interface.shard_heartbeat(
1024            shard_hostname=shard_hostname,
1025            jobs=upload_jobs, hqes=upload_hqes,
1026            known_job_ids=known_job_ids, known_host_ids=known_host_ids,
1027            known_host_statuses=known_host_statuses)
1028
1029        self._assert_shard_heartbeat_response(shard_hostname, retval,
1030                                              **kwargs)
1031
1032        return shard_hostname
1033
1034
1035    def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[],
1036                                         hosts=[], hqes=[],
1037                                         incorrect_host_ids=[]):
1038
1039        retval_hosts, retval_jobs = retval['hosts'], retval['jobs']
1040        retval_incorrect_hosts = retval['incorrect_host_ids']
1041
1042        expected_jobs = [
1043            (job.id, job.name, shard_hostname) for job in jobs]
1044        returned_jobs = [(job['id'], job['name'], job['shard']['hostname'])
1045                         for job in retval_jobs]
1046        self.assertEqual(returned_jobs, expected_jobs)
1047
1048        expected_hosts = [(host.id, host.hostname) for host in hosts]
1049        returned_hosts = [(host['id'], host['hostname'])
1050                          for host in retval_hosts]
1051        self.assertEqual(returned_hosts, expected_hosts)
1052
1053        retval_hqes = []
1054        for job in retval_jobs:
1055            retval_hqes += job['hostqueueentry_set']
1056
1057        expected_hqes = [(hqe.id) for hqe in hqes]
1058        returned_hqes = [(hqe['id']) for hqe in retval_hqes]
1059        self.assertEqual(returned_hqes, expected_hqes)
1060
1061        self.assertEqual(retval_incorrect_hosts, incorrect_host_ids)
1062
1063
1064    def _send_records_to_master_helper(
1065        self, jobs, hqes, shard_hostname='host1',
1066        exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False):
1067        job_id = rpc_interface.create_job(
1068                name='dummy',
1069                priority=self._PRIORITY,
1070                control_file='foo',
1071                control_type=SERVER,
1072                test_retry=10, hostless=True)
1073        job = models.Job.objects.get(pk=job_id)
1074        shard = models.Shard.objects.create(hostname='host1')
1075        job.shard = shard
1076        job.save()
1077
1078        if aborted:
1079            job.hostqueueentry_set.update(aborted=True)
1080            job.shard = None
1081            job.save()
1082
1083        hqe = job.hostqueueentry_set.all()[0]
1084        if not exception_to_throw:
1085            self._do_heartbeat_and_assert_response(
1086                shard_hostname=shard_hostname,
1087                upload_jobs=jobs, upload_hqes=hqes)
1088        else:
1089            self.assertRaises(
1090                exception_to_throw,
1091                self._do_heartbeat_and_assert_response,
1092                shard_hostname=shard_hostname,
1093                upload_jobs=jobs, upload_hqes=hqes)
1094
1095
1096    def testSendingRecordsToMaster(self):
1097        """Send records to the master and ensure they are persisted."""
1098        jobs, hqes = self._get_records_for_sending_to_master()
1099        hqes[0]['status'] = 'Completed'
1100        self._send_records_to_master_helper(
1101            jobs=jobs, hqes=hqes, exception_to_throw=None)
1102
1103        # Check the entry was actually written to db
1104        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
1105                         'Completed')
1106
1107
1108    def testSendingRecordsToMasterAbortedOnMaster(self):
1109        """Send records to the master and ensure they are persisted."""
1110        jobs, hqes = self._get_records_for_sending_to_master()
1111        hqes[0]['status'] = 'Completed'
1112        self._send_records_to_master_helper(
1113            jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True)
1114
1115        # Check the entry was actually written to db
1116        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
1117                         'Completed')
1118
1119
1120    def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
1121        """Ensure records belonging to different shard are silently rejected."""
1122        shard1 = models.Shard.objects.create(hostname='shard1')
1123        shard2 = models.Shard.objects.create(hostname='shard2')
1124        job1 = self._create_job(shard=shard1, control_file='foo1')
1125        job2 = self._create_job(shard=shard2, control_file='foo2')
1126        job1_id = job1.id
1127        job2_id = job2.id
1128        hqe1 = models.HostQueueEntry.objects.create(job=job1)
1129        hqe2 = models.HostQueueEntry.objects.create(job=job2)
1130        hqe1_id = hqe1.id
1131        hqe2_id = hqe2.id
1132        job1_record = job1.serialize(include_dependencies=False)
1133        job2_record = job2.serialize(include_dependencies=False)
1134        hqe1_record = hqe1.serialize(include_dependencies=False)
1135        hqe2_record = hqe2.serialize(include_dependencies=False)
1136
1137        # Prepare a bogus job record update from the wrong shard. The update
1138        # should not throw an exception. Non-bogus jobs in the same update
1139        # should happily update.
1140        job1_record.update({'control_file': 'bar1'})
1141        job2_record.update({'control_file': 'bar2'})
1142        hqe1_record.update({'status': 'Aborted'})
1143        hqe2_record.update({'status': 'Aborted'})
1144        self._do_heartbeat_and_assert_response(
1145            shard_hostname='shard2', upload_jobs=[job1_record, job2_record],
1146            upload_hqes=[hqe1_record, hqe2_record])
1147
1148        # Job and HQE record for wrong job should not be modified, because the
1149        # rpc came from the wrong shard. Job and HQE record for valid job are
1150        # modified.
1151        self.assertEqual(models.Job.objects.get(id=job1_id).control_file,
1152                         'foo1')
1153        self.assertEqual(models.Job.objects.get(id=job2_id).control_file,
1154                         'bar2')
1155        self.assertEqual(models.HostQueueEntry.objects.get(id=hqe1_id).status,
1156                         '')
1157        self.assertEqual(models.HostQueueEntry.objects.get(id=hqe2_id).status,
1158                         'Aborted')
1159
1160
1161    def testSendingRecordsToMasterNotExistingJob(self):
1162        """Ensure update for non existing job gets rejected."""
1163        jobs, hqes = self._get_records_for_sending_to_master()
1164        jobs[0]['id'] = 3
1165
1166        self._send_records_to_master_helper(
1167            jobs=jobs, hqes=hqes)
1168
1169
1170    def _createShardAndHostWithLabel(self, shard_hostname='shard1',
1171                                     host_hostname='host1',
1172                                     label_name='board:lumpy'):
1173        """Create a label, host, shard, and assign host to shard."""
1174        label = models.Label.objects.create(name=label_name)
1175
1176        shard = models.Shard.objects.create(hostname=shard_hostname)
1177        shard.labels.add(label)
1178
1179        host = models.Host.objects.create(hostname=host_hostname, leased=False,
1180                                          shard=shard)
1181        host.labels.add(label)
1182
1183        return shard, host, label
1184
1185
1186    def _createJobForLabel(self, label):
1187        job_id = rpc_interface.create_job(name='dummy', priority=self._PRIORITY,
1188                                          control_file='foo',
1189                                          control_type=CLIENT,
1190                                          meta_hosts=[label.name],
1191                                          dependencies=(label.name,))
1192        return models.Job.objects.get(id=job_id)
1193
1194
1195    def testShardHeartbeatFetchHostlessJob(self):
1196        """Create a hostless job and ensure it's not assigned to a shard."""
1197        shard1, host1, lumpy_label = self._createShardAndHostWithLabel(
1198            'shard1', 'host1', 'board:lumpy')
1199
1200        label2 = models.Label.objects.create(name='bluetooth', platform=False)
1201
1202        job1 = self._create_job(hostless=True)
1203
1204        # Hostless jobs should be executed by the global scheduler.
1205        self._do_heartbeat_and_assert_response(hosts=[host1])
1206
1207
1208    def testShardHeartbeatIncorrectHosts(self):
1209        """Ensure that hosts that don't belong to shard are determined."""
1210        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1211
1212        host2 = models.Host.objects.create(hostname='host2', leased=False)
1213
1214        # host2 should not belong to shard1. Ensure that if shard1 thinks host2
1215        # is a known host, then it is returned as invalid.
1216        self._do_heartbeat_and_assert_response(known_hosts=[host1, host2],
1217                                               incorrect_host_ids=[host2.id])
1218
1219
1220    def testShardHeartbeatLabelRemovalRace(self):
1221        """Ensure correctness if label removed during heartbeat."""
1222        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1223
1224        host2 = models.Host.objects.create(hostname='host2', leased=False)
1225        host2.labels.add(lumpy_label)
1226        self.assertEqual(host2.shard, None)
1227
1228        # In the middle of the assign_to_shard call, remove lumpy_label from
1229        # shard1.
1230        self.mox.StubOutWithMock(models.Host, '_assign_to_shard_nothing_helper')
1231        def remove_label():
1232            rpc_interface.remove_board_from_shard(
1233                    shard1.hostname, lumpy_label.name)
1234        models.Host._assign_to_shard_nothing_helper().WithSideEffects(
1235            remove_label)
1236        self.mox.ReplayAll()
1237
1238        self._do_heartbeat_and_assert_response(
1239            known_hosts=[host1], hosts=[], incorrect_host_ids=[host1.id])
1240        host2 = models.Host.smart_get(host2.id)
1241        self.assertEqual(host2.shard, None)
1242
1243
1244    def testShardLabelRemovalInvalid(self):
1245        """Ensure you cannot remove the wrong label from shard."""
1246        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1247        stumpy_label = models.Label.objects.create(
1248                name='board:stumpy', platform=True)
1249        with self.assertRaises(error.RPCException):
1250            rpc_interface.remove_board_from_shard(
1251                    shard1.hostname, stumpy_label.name)
1252
1253
1254    def testShardHeartbeatLabelRemoval(self):
1255        """Ensure label removal from shard works."""
1256        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1257
1258        self.assertEqual(host1.shard, shard1)
1259        self.assertItemsEqual(shard1.labels.all(), [lumpy_label])
1260        rpc_interface.remove_board_from_shard(
1261                shard1.hostname, lumpy_label.name)
1262        host1 = models.Host.smart_get(host1.id)
1263        shard1 = models.Shard.smart_get(shard1.id)
1264        self.assertEqual(host1.shard, None)
1265        self.assertItemsEqual(shard1.labels.all(), [])
1266
1267
1268    def testShardRetrieveJobs(self):
1269        """Create jobs and retrieve them."""
1270        # should never be returned by heartbeat
1271        leased_host = models.Host.objects.create(hostname='leased_host',
1272                                                 leased=True)
1273
1274        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1275        shard2, host2, grumpy_label = self._createShardAndHostWithLabel(
1276            'shard2', 'host2', 'board:grumpy')
1277
1278        leased_host.labels.add(lumpy_label)
1279
1280        job1 = self._createJobForLabel(lumpy_label)
1281
1282        job2 = self._createJobForLabel(grumpy_label)
1283
1284        job_completed = self._createJobForLabel(lumpy_label)
1285        # Job is already being run, so don't sync it
1286        job_completed.hostqueueentry_set.update(complete=True)
1287        job_completed.hostqueueentry_set.create(complete=False)
1288
1289        job_active = self._createJobForLabel(lumpy_label)
1290        # Job is already started, so don't sync it
1291        job_active.hostqueueentry_set.update(active=True)
1292        job_active.hostqueueentry_set.create(complete=False, active=False)
1293
1294        self._do_heartbeat_and_assert_response(
1295            jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all())
1296
1297        self._do_heartbeat_and_assert_response(
1298            shard_hostname=shard2.hostname,
1299            jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all())
1300
1301        host3 = models.Host.objects.create(hostname='host3', leased=False)
1302        host3.labels.add(lumpy_label)
1303
1304        self._do_heartbeat_and_assert_response(
1305            known_jobs=[job1], known_hosts=[host1], hosts=[host3])
1306
1307
1308    def testResendJobsAfterFailedHeartbeat(self):
1309        """Create jobs, retrieve them, fail on client, fetch them again."""
1310        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1311
1312        job1 = self._createJobForLabel(lumpy_label)
1313
1314        self._do_heartbeat_and_assert_response(
1315            jobs=[job1],
1316            hqes=job1.hostqueueentry_set.all(), hosts=[host1])
1317
1318        # Make sure it's resubmitted by sending last_job=None again
1319        self._do_heartbeat_and_assert_response(
1320            known_hosts=[host1],
1321            jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[])
1322
1323        # Now it worked, make sure it's not sent again
1324        self._do_heartbeat_and_assert_response(
1325            known_jobs=[job1], known_hosts=[host1])
1326
1327        job1 = models.Job.objects.get(pk=job1.id)
1328        job1.hostqueueentry_set.all().update(complete=True)
1329
1330        # Job is completed, make sure it's not sent again
1331        self._do_heartbeat_and_assert_response(
1332            known_hosts=[host1])
1333
1334        job2 = self._createJobForLabel(lumpy_label)
1335
1336        # job2's creation was later, it should be returned now.
1337        self._do_heartbeat_and_assert_response(
1338            known_hosts=[host1],
1339            jobs=[job2], hqes=job2.hostqueueentry_set.all())
1340
1341        self._do_heartbeat_and_assert_response(
1342            known_jobs=[job2], known_hosts=[host1])
1343
1344        job2 = models.Job.objects.get(pk=job2.pk)
1345        job2.hostqueueentry_set.update(aborted=True)
1346        # Setting a job to a complete status will set the shard_id to None in
1347        # scheduler_models. We have to emulate that here, because we use Django
1348        # models in tests.
1349        job2.shard = None
1350        job2.save()
1351
1352        self._do_heartbeat_and_assert_response(
1353            known_jobs=[job2], known_hosts=[host1],
1354            jobs=[job2],
1355            hqes=job2.hostqueueentry_set.all())
1356
1357        models.Test.objects.create(name='platform_BootPerfServer:shard',
1358                                   test_type=1)
1359        self.mox.StubOutWithMock(server_utils, 'read_file')
1360        server_utils.read_file(mox.IgnoreArg()).AndReturn('')
1361        self.mox.ReplayAll()
1362        rpc_interface.delete_shard(hostname=shard1.hostname)
1363
1364        self.assertRaises(
1365            models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id)
1366
1367        job1 = models.Job.objects.get(pk=job1.id)
1368        lumpy_label = models.Label.objects.get(pk=lumpy_label.id)
1369        host1 = models.Host.objects.get(pk=host1.id)
1370        super_job = models.Job.objects.get(priority=priorities.Priority.SUPER)
1371        super_job_host = models.HostQueueEntry.objects.get(
1372                job_id=super_job.id)
1373
1374        self.assertIsNone(job1.shard)
1375        self.assertEqual(len(lumpy_label.shard_set.all()), 0)
1376        self.assertIsNone(host1.shard)
1377        self.assertIsNotNone(super_job)
1378        self.assertEqual(super_job_host.host_id, host1.id)
1379
1380
1381    def testCreateListShard(self):
1382        """Retrieve a list of all shards."""
1383        lumpy_label = models.Label.objects.create(name='board:lumpy',
1384                                                  platform=True)
1385        stumpy_label = models.Label.objects.create(name='board:stumpy',
1386                                                  platform=True)
1387        peppy_label = models.Label.objects.create(name='board:peppy',
1388                                                  platform=True)
1389
1390        shard_id = rpc_interface.add_shard(
1391            hostname='host1', labels='board:lumpy,board:stumpy')
1392        self.assertRaises(error.RPCException,
1393                          rpc_interface.add_shard,
1394                          hostname='host1', labels='board:lumpy,board:stumpy')
1395        self.assertRaises(model_logic.ValidationError,
1396                          rpc_interface.add_shard,
1397                          hostname='host1', labels='board:peppy')
1398        shard = models.Shard.objects.get(pk=shard_id)
1399        self.assertEqual(shard.hostname, 'host1')
1400        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
1401        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
1402
1403        self.assertEqual(rpc_interface.get_shards(),
1404                         [{'labels': ['board:lumpy','board:stumpy'],
1405                           'hostname': 'host1',
1406                           'id': 1}])
1407
1408
1409    def testAddBoardsToShard(self):
1410        """Add boards to a given shard."""
1411        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1412        stumpy_label = models.Label.objects.create(name='board:stumpy',
1413                                                   platform=True)
1414        shard_id = rpc_interface.add_board_to_shard(
1415            hostname='shard1', labels='board:stumpy')
1416        # Test whether raise exception when board label does not exist.
1417        self.assertRaises(models.Label.DoesNotExist,
1418                          rpc_interface.add_board_to_shard,
1419                          hostname='shard1', labels='board:test')
1420        # Test whether raise exception when board already sharded.
1421        self.assertRaises(error.RPCException,
1422                          rpc_interface.add_board_to_shard,
1423                          hostname='shard1', labels='board:lumpy')
1424        shard = models.Shard.objects.get(pk=shard_id)
1425        self.assertEqual(shard.hostname, 'shard1')
1426        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
1427        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
1428
1429        self.assertEqual(rpc_interface.get_shards(),
1430                         [{'labels': ['board:lumpy','board:stumpy'],
1431                           'hostname': 'shard1',
1432                           'id': 1}])
1433
1434
1435    def testResendHostsAfterFailedHeartbeat(self):
1436        """Check that master accepts resending updated records after failure."""
1437        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1438
1439        # Send the host
1440        self._do_heartbeat_and_assert_response(hosts=[host1])
1441
1442        # Send it again because previous one didn't persist correctly
1443        self._do_heartbeat_and_assert_response(hosts=[host1])
1444
1445        # Now it worked, make sure it isn't sent again
1446        self._do_heartbeat_and_assert_response(known_hosts=[host1])
1447
1448
1449if __name__ == '__main__':
1450    unittest.main()
1451