1#!/usr/bin/env python 2# Copyright 2016 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import unittest 7import mock 8import common 9from autotest_lib.server.cros.dynamic_suite import reporting 10from autotest_lib.site_utils import diagnosis_utils 11 12 13class DiagnosisUtilsTest(unittest.TestCase): 14 """Tests for diagnosis_utils.""" 15 16 def setUp(self): 17 """Set up test.""" 18 self.afe_mock = mock.MagicMock() 19 reporting.Reporter.__init__ = mock.Mock(return_value=None) 20 reporting.Reporter.report = mock.Mock(return_value=(None, 0)) 21 22 def _constructRPCHelper(self): 23 """Method to construct RPCHelper""" 24 return diagnosis_utils.RPCHelper(self.afe_mock) 25 26 def _mockZeroHost(self): 27 """Mock method to return zero host""" 28 return () 29 30 def _mockTwoAvailableHosts(self): 31 """Mock method to return two available hosts""" 32 host_a = mock.MagicMock(status='Ready', locked=False) 33 host_b = mock.MagicMock(status='Ready', locked=False) 34 return (host_a, host_b) 35 36 def _mockTwoFailedHosts(self): 37 """Mock method to return two unavailable hosts""" 38 host_a = mock.MagicMock(status='Repair Failed', locked=False) 39 host_b = mock.MagicMock(status='Repairing', locked=False) 40 return (host_a, host_b) 41 42 def testCheckDutAvailable(self): 43 """Test check_dut_availability with different scenarios""" 44 rpc_helper = self._constructRPCHelper() 45 board = 'test_board' 46 pool = 'test_pool' 47 48 # Mock aef.get_hosts to return 0 host 49 self.afe_mock.get_hosts.return_value = self._mockZeroHost() 50 skip_duts_check = False 51 52 # When minimum_duts is 0, do not force available DUTs 53 minimum_duts = 0 54 rpc_helper.check_dut_availability(board, pool, 55 minimum_duts=minimum_duts, 56 skip_duts_check=skip_duts_check) 57 58 # When skip_duts_check = False and minimum_duts > 0 and there's no host, 59 # should raise BoardNotAvailableError 60 minimum_duts = 1 61 with self.assertRaises(diagnosis_utils.BoardNotAvailableError): 62 rpc_helper.check_dut_availability(board, pool, 63 minimum_duts=minimum_duts, 64 skip_duts_check=skip_duts_check) 65 66 # Mock aef.get_hosts to return 2 available hosts 67 self.afe_mock.get_hosts.return_value = self._mockTwoAvailableHosts() 68 69 # Set skip_duts_check to True, should not force checking avialble DUTs 70 # although available DUTs are less then minimum_duts 71 minimum_duts = 4 72 skip_duts_check = True 73 rpc_helper.check_dut_availability(board, pool, 74 minimum_duts=minimum_duts, 75 skip_duts_check=skip_duts_check) 76 77 # Set skip_duts_check to False and current available DUTs are less 78 # then minimum_duts, should raise NotEnoughDutsError 79 skip_duts_check = False 80 with self.assertRaises(diagnosis_utils.NotEnoughDutsError): 81 rpc_helper.check_dut_availability(board, pool, 82 minimum_duts=minimum_duts, 83 skip_duts_check=skip_duts_check) 84 85 # When skip_duts_check is False and current available DUTs 86 # satisfy minimum_duts, no errors 87 minimum_duts = 2 88 rpc_helper.check_dut_availability(board, pool, 89 minimum_duts=minimum_duts, 90 skip_duts_check=skip_duts_check) 91 92 # Mock aef.get_hosts to return 2 failed hosts 93 self.afe_mock.get_hosts.return_value = self._mockTwoFailedHosts() 94 95 # When skip_duts_check is False and the two hosts are not available, 96 # should raise NotEnoughDutsError 97 with self.assertRaises(diagnosis_utils.NotEnoughDutsError): 98 rpc_helper.check_dut_availability(board, pool, 99 minimum_duts=minimum_duts, 100 skip_duts_check=skip_duts_check) 101 102if __name__ == '__main__': 103 unittest.main() 104