#!/usr/bin/python #pylint: disable-msg=C0111 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import unittest import common from autotest_lib.client.common_lib import global_config from autotest_lib.frontend import setup_django_environment from autotest_lib.frontend.afe import frontend_test_utils from autotest_lib.frontend.afe import models from autotest_lib.scheduler import rdb_testing_utils from autotest_lib.scheduler import scheduler_models from autotest_lib.scheduler.shard import shard_client class ShardClientIntegrationTest(rdb_testing_utils.AbstractBaseRDBTester, unittest.TestCase): """Integration tests for the shard_client.""" def setup_global_config(self): """Mock out global_config for shard client creation.""" global_config.global_config.override_config_value( 'SHARD', 'is_slave_shard', 'True') global_config.global_config.override_config_value( 'SHARD', 'shard_hostname', 'host1') def initialize_shard_client(self): self.setup_global_config() return shard_client.get_shard_client() def testCompleteStatusBasic(self): """Test that complete jobs are uploaded properly.""" client = self.initialize_shard_client() job = self.create_job(deps=set(['a']), shard_hostname=client.hostname) scheduler_models.initialize() hqe = scheduler_models.HostQueueEntry.fetch( where='job_id = %s' % job.id)[0] # This should set both the shard_id and the complete bit. hqe.set_status('Completed') # Only incomplete jobs should be in known ids. job_ids, host_ids, _ = client._get_known_jobs_and_hosts() assert(job_ids == []) # Jobs that have successfully gone through a set_status should # be ready for upload. jobs = client._get_jobs_to_upload() assert(job.id in [j.id for j in jobs]) def testOnlyShardId(self): """Test that setting only the shardid prevents the job from upload.""" client = self.initialize_shard_client() job = self.create_job(deps=set(['a']), shard_hostname=client.hostname) scheduler_models.initialize() hqe = scheduler_models.HostQueueEntry.fetch( where='job_id = %s' % job.id)[0] def _local_update_field(hqe, field_name, value): """Turns update_field on the complete field into a no-op.""" if field_name == 'complete': return models.HostQueueEntry.objects.filter(id=hqe.id).update( **{field_name: value}) setattr(hqe, field_name, value) self.god.stub_with(scheduler_models.HostQueueEntry, 'update_field', _local_update_field) # This should only update the shard_id. hqe.set_status('Completed') # Retrieve the hqe along an independent code path so we're assured of # freshness, then make sure it has shard=None and an unset complete bit. modified_hqe = self.db_helper.get_hqes(job_id=job.id)[0] assert(modified_hqe.id == hqe.id and modified_hqe.complete == 0 and modified_hqe.job.shard == None) # Make sure the job with a shard but without complete is still # in known_ids. job_ids, host_ids, _ = client._get_known_jobs_and_hosts() assert(set(job_ids) == set([job.id])) # Make sure the job with a shard but without complete is not # in uploaded jobs. jobs = client._get_jobs_to_upload() assert(jobs == []) def testHostSerialization(self): """Test simple host serialization.""" client = self.initialize_shard_client() host = self.db_helper.create_host(name='test_host') serialized_host = host.serialize() models.Host.objects.all().delete() models.Host.deserialize(serialized_host) models.Host.objects.get(hostname='test_host') def testUserExists(self): """Test user related race conditions.""" client = self.initialize_shard_client() user = self.db_helper.create_user(name='test_user') serialized_user = user.serialize() # Master sends a user with the same login but different id serialized_user['id'] = '3' models.User.deserialize(serialized_user) models.User.objects.get(id=3, login='test_user') # Master sends a user with the same id, different login serialized_user['login'] = 'fake_user' models.User.deserialize(serialized_user) models.User.objects.get(id=3, login='fake_user') # Master sends a new user user = self.db_helper.create_user(name='new_user') serialized_user = user.serialize() models.User.objects.all().delete() models.User.deserialize(serialized_user) models.User.objects.get(login='new_user') if __name__ == '__main__': unittest.main()