1import mock 2import unittest 3 4from autotest_lib.client.common_lib.cros.cfm.usb import usb_device 5from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_spec 6from autotest_lib.server.cros.cfm.configurable_test import actions 7from autotest_lib.server.cros.cfm.configurable_test import action_context 8from autotest_lib.server.cros.cfm.configurable_test import scenario 9 10# Constants to use in case the actual values are irrelevant. 11USB_DEVICE_SPEC = usb_device_spec.UsbDeviceSpec( 12 'vid', 'pid', 'product', ['iface']) 13 14USB_DEVICE = usb_device.UsbDevice('v', 'p', 'prod', ['if'], 1, 2, 1) 15 16 17# Test, disable missing-docstring 18# pylint: disable=missing-docstring 19class TestActions(unittest.TestCase): 20 """ 21 Tests for the available actions for configurable CFM tests to run. 22 """ 23 24 def setUp(self): 25 self.host_mock = mock.MagicMock() 26 self.cfm_facade_mock = mock.MagicMock() 27 self.usb_device_collector_mock = mock.MagicMock() 28 self.usb_port_manager_mock = mock.MagicMock() 29 self.crash_detector_mock = mock.MagicMock() 30 self.metrics_collector_mock = mock.MagicMock() 31 self.context_with_mocks = action_context.ActionContext( 32 host=self.host_mock, 33 cfm_facade=self.cfm_facade_mock, 34 usb_device_collector=self.usb_device_collector_mock, 35 usb_port_manager=self.usb_port_manager_mock, 36 crash_detector=self.crash_detector_mock, 37 perf_metrics_collector=self.metrics_collector_mock) 38 39 40 def test_assert_file_does_not_contain_no_match(self): 41 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW']) 42 context = action_context.ActionContext( 43 file_contents_collector=FakeCollector('abc\ndef')) 44 action.execute(context) 45 46 def test_assert_file_does_not_contain_match(self): 47 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW']) 48 context = action_context.ActionContext( 49 file_contents_collector=FakeCollector('abc\naWWd')) 50 self.assertRaises(AssertionError, lambda: action.execute(context)) 51 52 def test_assert_file_does_not_contain_regex_match(self): 53 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'W{3}Q+']) 54 context = action_context.ActionContext( 55 file_contents_collector=FakeCollector('abc\naWWWQQd')) 56 self.assertRaises(AssertionError, lambda: action.execute(context)) 57 58 def test_reboot_dut_no_restart(self): 59 action = actions.RebootDut() 60 action.execute(self.context_with_mocks) 61 self.host_mock.reboot.assert_called_once_with() 62 self.assertFalse(self.cfm_facade_mock.method_calls) 63 64 def test_reboot_dut_with_restart(self): 65 action = actions.RebootDut(restart_chrome_for_cfm=True) 66 action.execute(self.context_with_mocks) 67 self.host_mock.reboot.assert_called_once_with() 68 (self.cfm_facade_mock.restart_chrome_for_cfm 69 .assert_called_once_with()) 70 (self.cfm_facade_mock.wait_for_meetings_telemetry_commands 71 .assert_called_once_with()) 72 73 def test_assert_usb_device_collector(self): 74 spec = usb_device_spec.UsbDeviceSpec( 75 'vid', 'pid', 'product', ['iface']) 76 action = actions.AssertUsbDevices([spec], lambda x: True) 77 action.execute(self.context_with_mocks) 78 79 def test_assert_usb_device_collector_matching_predicate(self): 80 spec = usb_device_spec.UsbDeviceSpec( 81 'vid', 'pid', 'product', ['iface']) 82 device = usb_device.UsbDevice( 83 'v', 'p', 'prod', ['if'], 1, 2, 1) 84 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 85 return_value=[device]) 86 action = actions.AssertUsbDevices( 87 [spec], lambda x: x[0].product_id == 'p') 88 action.execute(self.context_with_mocks) 89 90 def test_assert_usb_device_collector_non_matching_predicate(self): 91 spec = usb_device_spec.UsbDeviceSpec( 92 'vid', 'pid', 'product', ['iface']) 93 device = usb_device.UsbDevice( 94 'v', 'p', 'prod', ['if'], 1, 2, 1) 95 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 96 return_value=[device]) 97 action = actions.AssertUsbDevices( 98 [spec], lambda x: x[0].product_id == 'r') 99 self.assertRaises(AssertionError, lambda: action.execute( 100 self.context_with_mocks)) 101 102 def test_assert_usb_device_collector_default_predicate(self): 103 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 104 return_value=[USB_DEVICE]) # Default checks list is of size 1 105 action = actions.AssertUsbDevices([USB_DEVICE_SPEC]) 106 action.execute(self.context_with_mocks) 107 108 def test_select_scenario_at_random(self): 109 dummy_action1 = DummyAction() 110 dummy_action2 = DummyAction() 111 scenarios = [scenario.Scenario(dummy_action1), 112 scenario.Scenario(dummy_action2)] 113 action = actions.SelectScenarioAtRandom(scenarios, 10) 114 action.execute(self.context_with_mocks) 115 # Assert that our actions were executed the expected number of times. 116 total_executes = (dummy_action1.executed_times 117 + dummy_action2.executed_times) 118 self.assertEqual(10, total_executes) 119 120 def test_select_scenario_at_random_str_contains_seed(self): 121 action = actions.SelectScenarioAtRandom([], 10, 123) 122 self.assertTrue('seed=123' in str(action)) 123 124 def test_select_scenario_at_random_same_seed_same_actions(self): 125 scenario1_action1 = DummyAction() 126 scenario1_action2 = DummyAction() 127 scenarios1 = [scenario.Scenario(scenario1_action1), 128 scenario.Scenario(scenario1_action2)] 129 scenario2_action1 = DummyAction() 130 scenario2_action2 = DummyAction() 131 scenarios2 = [scenario.Scenario(scenario2_action1), 132 scenario.Scenario(scenario2_action2)] 133 action1 = actions.SelectScenarioAtRandom(scenarios1, 100, 0) 134 action2 = actions.SelectScenarioAtRandom(scenarios2, 100, 0) 135 action1.execute(self.context_with_mocks) 136 action2.execute(self.context_with_mocks) 137 self.assertEqual(scenario1_action1.executed_times, 138 scenario2_action1.executed_times) 139 self.assertEqual(scenario1_action2.executed_times, 140 scenario2_action2.executed_times) 141 142 def test_power_cycle_usb_port(self): 143 device = usb_device.UsbDevice( 144 'v', 'p', 'prod', ['if'], 1, 2, 1) 145 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 146 side_effect=[[device, device], [device], [device, device]]) 147 action = actions.PowerCycleUsbPort( 148 [USB_DEVICE_SPEC], 0, lambda x: [x[0]]) 149 action.execute(self.context_with_mocks) 150 self.usb_port_manager_mock.set_port_power.assert_has_calls( 151 [mock.call([(1, 2)], False), mock.call([(1, 2)], True)]) 152 153 def test_power_cycle_usb_port_device_does_not_turn_off(self): 154 # Return the same device all the time - i.e., it does not turn off. 155 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 156 return_value=[USB_DEVICE]) 157 action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0) 158 self.assertRaises( 159 actions.TimeoutError, 160 lambda: action.execute(self.context_with_mocks)) 161 162 def test_power_cycle_usb_port_device_does_not_turn_on(self): 163 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 164 side_effect=[[USB_DEVICE, USB_DEVICE], [], [USB_DEVICE]]) 165 action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0) 166 self.assertRaises( 167 actions.TimeoutError, 168 lambda: action.execute(self.context_with_mocks)) 169 170 def test_retry_action_success_after_retry(self): 171 action = actions.RetryAssertAction(RaisesFirstTimeAction(), 3, 0) 172 action.execute(self.context_with_mocks) 173 174 def test_retry_action_fail_when_no_more_retries(self): 175 action = actions.RetryAssertAction(RaisesFirstTimeAction(), 1) 176 self.assertRaises( 177 AssertionError, lambda: action.execute(self.context_with_mocks)) 178 179 def test_assert_no_new_crashes(self): 180 action = actions.AssertNoNewCrashes() 181 self.crash_detector_mock.get_new_crash_files = mock.Mock( 182 return_value=[]) 183 action.do_execute(self.context_with_mocks) 184 185 def test_assert_no_new_crashes_crash_detected(self): 186 action = actions.AssertNoNewCrashes() 187 self.crash_detector_mock.get_new_crash_files = mock.Mock( 188 return_value=['/a/new/crash/file']) 189 self.assertRaises( 190 AssertionError, 191 lambda: action.do_execute(self.context_with_mocks)) 192 193 def test_start_metrics_colllection(self): 194 action = actions.StartPerfMetricsCollection() 195 action.execute(self.context_with_mocks) 196 self.metrics_collector_mock.start.assert_called_once_with() 197 198 def test_stop_metrics_colllection(self): 199 action = actions.StopPerfMetricsCollection() 200 action.execute(self.context_with_mocks) 201 self.metrics_collector_mock.stop.assert_called_once_with() 202 203 def test_upload_metrics(self): 204 action = actions.UploadPerfMetrics() 205 action.execute(self.context_with_mocks) 206 self.metrics_collector_mock.upload_metrics.assert_called_once_with() 207 208 209 210class FakeCollector(object): 211 def __init__(self, contents): 212 self.contents = contents 213 214 def collect_file_contents(self, path): 215 return self.contents 216 217class DummyAction(actions.Action): 218 def __init__(self): 219 self.executed_times = 0 220 221 def do_execute(self, context): 222 self.executed_times += 1 223 224class RaisesFirstTimeAction(actions.Action): 225 def __init__(self): 226 self.executed = False 227 228 def do_execute(self, context): 229 if not self.executed: 230 self.executed = True 231 raise AssertionError() 232 233