• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import mock
2import unittest
3
4from autotest_lib.client.common_lib.cros.cfm.usb import usb_device
5from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_spec
6from autotest_lib.server.cros.cfm.configurable_test import actions
7from autotest_lib.server.cros.cfm.configurable_test import action_context
8from autotest_lib.server.cros.cfm.configurable_test import scenario
9
10# Constants to use in case the actual values are irrelevant.
11USB_DEVICE_SPEC = usb_device_spec.UsbDeviceSpec(
12        'vid', 'pid', 'product', ['iface'])
13
14USB_DEVICE = usb_device.UsbDevice('v', 'p', 'prod', ['if'], 1, 2, 1)
15
16
17# Test, disable missing-docstring
18# pylint: disable=missing-docstring
19class TestActions(unittest.TestCase):
20    """
21    Tests for the available actions for configurable CFM tests to run.
22    """
23
24    def setUp(self):
25        self.host_mock = mock.MagicMock()
26        self.cfm_facade_mock = mock.MagicMock()
27        self.usb_device_collector_mock = mock.MagicMock()
28        self.usb_port_manager_mock = mock.MagicMock()
29        self.crash_detector_mock = mock.MagicMock()
30        self.metrics_collector_mock = mock.MagicMock()
31        self.context_with_mocks = action_context.ActionContext(
32                host=self.host_mock,
33                cfm_facade=self.cfm_facade_mock,
34                usb_device_collector=self.usb_device_collector_mock,
35                usb_port_manager=self.usb_port_manager_mock,
36                crash_detector=self.crash_detector_mock,
37                perf_metrics_collector=self.metrics_collector_mock)
38
39
40    def test_assert_file_does_not_contain_no_match(self):
41        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW'])
42        context = action_context.ActionContext(
43                file_contents_collector=FakeCollector('abc\ndef'))
44        action.execute(context)
45
46    def test_assert_file_does_not_contain_match(self):
47        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW'])
48        context = action_context.ActionContext(
49                file_contents_collector=FakeCollector('abc\naWWd'))
50        self.assertRaises(AssertionError, lambda: action.execute(context))
51
52    def test_assert_file_does_not_contain_regex_match(self):
53        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'W{3}Q+'])
54        context = action_context.ActionContext(
55                file_contents_collector=FakeCollector('abc\naWWWQQd'))
56        self.assertRaises(AssertionError, lambda: action.execute(context))
57
58    def test_reboot_dut_no_restart(self):
59        action = actions.RebootDut()
60        action.execute(self.context_with_mocks)
61        self.host_mock.reboot.assert_called_once_with()
62        self.assertFalse(self.cfm_facade_mock.method_calls)
63
64    def test_reboot_dut_with_restart(self):
65        action = actions.RebootDut(restart_chrome_for_cfm=True)
66        action.execute(self.context_with_mocks)
67        self.host_mock.reboot.assert_called_once_with()
68        (self.cfm_facade_mock.restart_chrome_for_cfm
69                .assert_called_once_with())
70        (self.cfm_facade_mock.wait_for_meetings_telemetry_commands
71                .assert_called_once_with())
72
73    def test_assert_usb_device_collector(self):
74        spec = usb_device_spec.UsbDeviceSpec(
75                'vid', 'pid', 'product', ['iface'])
76        action = actions.AssertUsbDevices([spec], lambda x: True)
77        action.execute(self.context_with_mocks)
78
79    def test_assert_usb_device_collector_matching_predicate(self):
80        spec = usb_device_spec.UsbDeviceSpec(
81                'vid', 'pid', 'product', ['iface'])
82        device = usb_device.UsbDevice(
83                'v', 'p', 'prod', ['if'], 1, 2, 1)
84        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
85                return_value=[device])
86        action = actions.AssertUsbDevices(
87                [spec], lambda x: x[0].product_id == 'p')
88        action.execute(self.context_with_mocks)
89
90    def test_assert_usb_device_collector_non_matching_predicate(self):
91        spec = usb_device_spec.UsbDeviceSpec(
92                'vid', 'pid', 'product', ['iface'])
93        device = usb_device.UsbDevice(
94                'v', 'p', 'prod', ['if'], 1, 2, 1)
95        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
96                return_value=[device])
97        action = actions.AssertUsbDevices(
98                [spec], lambda x: x[0].product_id == 'r')
99        self.assertRaises(AssertionError, lambda: action.execute(
100                self.context_with_mocks))
101
102    def test_assert_usb_device_collector_default_predicate(self):
103        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
104                return_value=[USB_DEVICE])  # Default checks list is of size 1
105        action = actions.AssertUsbDevices([USB_DEVICE_SPEC])
106        action.execute(self.context_with_mocks)
107
108    def test_select_scenario_at_random(self):
109        dummy_action1 = DummyAction()
110        dummy_action2 = DummyAction()
111        scenarios = [scenario.Scenario(dummy_action1),
112                     scenario.Scenario(dummy_action2)]
113        action = actions.SelectScenarioAtRandom(scenarios, 10)
114        action.execute(self.context_with_mocks)
115        # Assert that our actions were executed the expected number of times.
116        total_executes = (dummy_action1.executed_times
117                          + dummy_action2.executed_times)
118        self.assertEqual(10, total_executes)
119
120    def test_select_scenario_at_random_str_contains_seed(self):
121        action = actions.SelectScenarioAtRandom([], 10, 123)
122        self.assertTrue('seed=123' in str(action))
123
124    def test_select_scenario_at_random_same_seed_same_actions(self):
125        scenario1_action1 = DummyAction()
126        scenario1_action2 = DummyAction()
127        scenarios1 = [scenario.Scenario(scenario1_action1),
128                     scenario.Scenario(scenario1_action2)]
129        scenario2_action1 = DummyAction()
130        scenario2_action2 = DummyAction()
131        scenarios2 = [scenario.Scenario(scenario2_action1),
132                     scenario.Scenario(scenario2_action2)]
133        action1 = actions.SelectScenarioAtRandom(scenarios1, 100, 0)
134        action2 = actions.SelectScenarioAtRandom(scenarios2, 100, 0)
135        action1.execute(self.context_with_mocks)
136        action2.execute(self.context_with_mocks)
137        self.assertEqual(scenario1_action1.executed_times,
138                         scenario2_action1.executed_times)
139        self.assertEqual(scenario1_action2.executed_times,
140                         scenario2_action2.executed_times)
141
142    def test_power_cycle_usb_port(self):
143        device = usb_device.UsbDevice(
144                'v', 'p', 'prod', ['if'], 1, 2, 1)
145        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
146                side_effect=[[device, device], [device], [device, device]])
147        action = actions.PowerCycleUsbPort(
148                [USB_DEVICE_SPEC], 0, lambda x: [x[0]])
149        action.execute(self.context_with_mocks)
150        self.usb_port_manager_mock.set_port_power.assert_has_calls(
151                [mock.call([(1, 2)], False), mock.call([(1, 2)], True)])
152
153    def test_power_cycle_usb_port_device_does_not_turn_off(self):
154        # Return the same device all the time - i.e., it does not turn off.
155        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
156                return_value=[USB_DEVICE])
157        action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0)
158        self.assertRaises(
159                actions.TimeoutError,
160                lambda: action.execute(self.context_with_mocks))
161
162    def test_power_cycle_usb_port_device_does_not_turn_on(self):
163        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
164                side_effect=[[USB_DEVICE, USB_DEVICE], [], [USB_DEVICE]])
165        action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0)
166        self.assertRaises(
167                actions.TimeoutError,
168                lambda: action.execute(self.context_with_mocks))
169
170    def test_retry_action_success_after_retry(self):
171        action = actions.RetryAssertAction(RaisesFirstTimeAction(), 3, 0)
172        action.execute(self.context_with_mocks)
173
174    def test_retry_action_fail_when_no_more_retries(self):
175        action = actions.RetryAssertAction(RaisesFirstTimeAction(), 1)
176        self.assertRaises(
177                AssertionError, lambda: action.execute(self.context_with_mocks))
178
179    def test_assert_no_new_crashes(self):
180        action = actions.AssertNoNewCrashes()
181        self.crash_detector_mock.get_new_crash_files = mock.Mock(
182                return_value=[])
183        action.do_execute(self.context_with_mocks)
184
185    def test_assert_no_new_crashes_crash_detected(self):
186        action = actions.AssertNoNewCrashes()
187        self.crash_detector_mock.get_new_crash_files = mock.Mock(
188                return_value=['/a/new/crash/file'])
189        self.assertRaises(
190                AssertionError,
191                lambda: action.do_execute(self.context_with_mocks))
192
193    def test_start_metrics_colllection(self):
194        action = actions.StartPerfMetricsCollection()
195        action.execute(self.context_with_mocks)
196        self.metrics_collector_mock.start.assert_called_once_with()
197
198    def test_stop_metrics_colllection(self):
199        action = actions.StopPerfMetricsCollection()
200        action.execute(self.context_with_mocks)
201        self.metrics_collector_mock.stop.assert_called_once_with()
202
203    def test_upload_metrics(self):
204        action = actions.UploadPerfMetrics()
205        action.execute(self.context_with_mocks)
206        self.metrics_collector_mock.upload_metrics.assert_called_once_with()
207
208
209
210class FakeCollector(object):
211    def __init__(self, contents):
212        self.contents = contents
213
214    def collect_file_contents(self, path):
215        return self.contents
216
217class DummyAction(actions.Action):
218    def __init__(self):
219        self.executed_times = 0
220
221    def do_execute(self, context):
222        self.executed_times += 1
223
224class RaisesFirstTimeAction(actions.Action):
225    def __init__(self):
226        self.executed = False
227
228    def do_execute(self, context):
229        if not self.executed:
230            self.executed = True
231            raise AssertionError()
232
233