1#!/usr/bin/python 2# 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import datetime, unittest 8 9import mox 10 11import common 12 13# We want to import setup_django_lite_environment first so that the database 14# is setup correctly. 15from autotest_lib.frontend import setup_django_lite_environment 16from autotest_lib.client.common_lib import utils 17from autotest_lib.frontend.afe import models, rpc_interface 18from django import test 19from autotest_lib.server.cros import repair_utils 20 21 22# See complete_failures_functional_tests.py for why we need this. 23class MockDatetime(datetime.datetime): 24 """Used to mock out parts of datetime.datetime.""" 25 pass 26 27 28# We use a mock rpc client object so that we instead directly use the server. 29class MockAFE(): 30 """Used to mock out the rpc client.""" 31 def run(self, func, **kwargs): 32 """ 33 The fake run call directly contacts the server. 34 35 @param func: The name of the remote function that is being called. 36 37 @param kwargs: The arguments to the remotely called function. 38 """ 39 return utils.strip_unicode(getattr(rpc_interface, func)(**kwargs)) 40 41 42class FindProblemTestTests(mox.MoxTestBase, test.TestCase): 43 """Test that we properly find the last ran job.""" 44 45 46 def setUp(self): 47 super(FindProblemTestTests, self).setUp() 48 self.mox.StubOutWithMock(MockDatetime, 'today') 49 50 self.datetime = datetime.datetime 51 datetime.datetime = MockDatetime 52 self._orig_cutoff = repair_utils._CUTOFF_AFTER_TIMEOUT_MINS 53 self._orig_timeout = repair_utils._DEFAULT_TEST_TIMEOUT_MINS 54 55 56 def tearDown(self): 57 repair_utils._DEFAULT_TEST_TIMEOUT_MINS = self._orig_timeout 58 repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = self._orig_cutoff 59 datetime.datetime = self.datetime 60 super(FindProblemTestTests, self).tearDown() 61 62 63 def test_should_get_most_recent_job(self): 64 """Test that, for a given host, we get the last job ran on that host.""" 65 66 host = models.Host(hostname='host') 67 host.save() 68 69 old_job = models.Job(owner='me', name='old_job', 70 created_on=datetime.datetime(2012, 1, 1)) 71 old_job.save() 72 old_host_queue_entry = models.HostQueueEntry( 73 job=old_job, host=host, status='test', 74 started_on=datetime.datetime(2012, 1, 1, 1)) 75 old_host_queue_entry.save() 76 77 new_job = models.Job(owner='me', name='new_job', 78 created_on=datetime.datetime(2012, 1, 1)) 79 new_job.save() 80 new_host_queue_entry = models.HostQueueEntry( 81 job=new_job, host=host, status='test', 82 started_on=datetime.datetime(2012, 1, 1, 2)) 83 new_host_queue_entry.save() 84 85 mock_rpc = MockAFE() 86 datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1)) 87 repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440 88 89 repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60 90 91 self.mox.ReplayAll() 92 result = repair_utils._find_problem_test('host', mock_rpc) 93 94 self.assertEqual(result['job']['name'], 'new_job') 95 96 97 def test_should_get_job_for_specified_host_only(self): 98 """Test that we only get a job that is for the given host.""" 99 100 correct_job = models.Job(owner='me', name='correct_job', 101 created_on=datetime.datetime(2012, 1, 1)) 102 correct_job.save() 103 correct_host = models.Host(hostname='correct_host') 104 correct_host.save() 105 correct_host_queue_entry = models.HostQueueEntry( 106 job=correct_job, host=correct_host, status='test', 107 started_on=datetime.datetime(2012, 1, 1, 1)) 108 correct_host_queue_entry.save() 109 110 wrong_job = models.Job(owner='me', name='wrong_job', 111 created_on=datetime.datetime(2012, 1, 1)) 112 wrong_job.save() 113 wrong_host = models.Host(hostname='wrong_host') 114 wrong_host.save() 115 wrong_host_queue_entry = models.HostQueueEntry( 116 job=wrong_job, host=wrong_host, status='test', 117 started_on=datetime.datetime(2012, 1, 1, 2)) 118 wrong_host_queue_entry.save() 119 120 mock_rpc = MockAFE() 121 datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1)) 122 repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440 123 124 repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60 125 126 self.mox.ReplayAll() 127 result = repair_utils._find_problem_test('correct_host', mock_rpc) 128 129 self.assertEqual(result['job']['name'], 'correct_job') 130 131 132 def test_return_jobs_ran_soon_after_max_job_runtime(self): 133 """Test that we get jobs that are just past the max runtime.""" 134 135 host = models.Host(hostname='host') 136 host.save() 137 138 new_job = models.Job(owner='me', name='new_job', 139 created_on=datetime.datetime(2012, 1, 1, 0, 0)) 140 new_job.save() 141 new_host_queue_entry = models.HostQueueEntry( 142 job=new_job, host=host, status='test', 143 started_on=datetime.datetime(2012, 1, 1, 2)) 144 new_host_queue_entry.save() 145 146 mock_rpc = MockAFE() 147 datetime.datetime.today().AndReturn(datetime.datetime(2012, 1, 2, 0, 148 30)) 149 repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440 150 151 repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60 152 153 self.mox.ReplayAll() 154 result = repair_utils._find_problem_test('host', mock_rpc) 155 156 self.assertEqual(result['job']['name'], 'new_job') 157 158 159if __name__ == '__main__': 160 unittest.main() 161