• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import itertools
7import mock
8import unittest
9
10import common
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.common_lib import hosts
13from autotest_lib.client.common_lib.cros import retry
14from autotest_lib.server.hosts import cros_firmware
15from autotest_lib.server.hosts import cros_repair
16from autotest_lib.server.hosts import repair_utils
17
18
19CROS_VERIFY_DAG = (
20    (repair_utils.SshVerifier, 'ssh', ()),
21    (cros_repair.ServoTypeVerifier, 'servo_type', ()),
22    (cros_repair.DevModeVerifier, 'devmode', ('ssh',)),
23    (cros_repair.HWIDVerifier,    'hwid',    ('ssh',)),
24    (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
25    (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)),
26    (cros_repair.WritableVerifier, 'writable', ('ssh',)),
27    (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)),
28    (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)),
29    (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)),
30    (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
31    (cros_repair.PythonVerifier, 'python', ('ssh',)),
32    (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
33    (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)),
34    (cros_repair.StopStartUIVerifier, 'stop_start_ui', ('ssh',)),
35)
36
37CROS_REPAIR_ACTIONS = (
38    (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
39    (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)),
40    (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)),
41    (cros_firmware.FirmwareRepair,
42     'firmware', (), ('ssh', 'fwstatus', 'good_au')),
43    (cros_repair.CrosRebootRepair,
44     'reboot', ('ssh',), ('devmode', 'writable',)),
45    (cros_repair.ColdRebootRepair,
46     'coldboot', ('ssh',), ('ec_reset',)),
47    (cros_repair.AutoUpdateRepair,
48     'au',
49     ('ssh', 'writable', 'stop_start_ui', 'tpm', 'good_au', 'ext4'),
50     ('power', 'rwfw', 'python', 'cros', 'ec_reset')),
51    (cros_repair.PowerWashRepair,
52     'powerwash',
53     ('ssh', 'writable', 'stop_start_ui'),
54     ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros', 'ec_reset')),
55    (cros_repair.ServoInstallRepair,
56     'usb',
57     (),
58     ('ssh', 'writable', 'stop_start_ui', 'tpm', 'good_au', 'ext4', 'power',
59      'rwfw', 'python', 'cros', 'ec_reset')),
60)
61
62MOBLAB_VERIFY_DAG = (
63    (repair_utils.SshVerifier, 'ssh', ()),
64    (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
65    (cros_repair.PythonVerifier, 'python', ('ssh',)),
66    (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
67)
68
69MOBLAB_REPAIR_ACTIONS = (
70    (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
71    (cros_repair.AutoUpdateRepair,
72     'au', ('ssh',), ('power', 'python', 'cros',)),
73)
74
75JETSTREAM_VERIFY_DAG = (
76    (repair_utils.SshVerifier, 'ssh', ()),
77    (cros_repair.ServoTypeVerifier, 'servo_type', ()),
78    (cros_repair.DevModeVerifier, 'devmode', ('ssh',)),
79    (cros_repair.HWIDVerifier,    'hwid',    ('ssh',)),
80    (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
81    (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)),
82    (cros_repair.WritableVerifier, 'writable', ('ssh',)),
83    (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)),
84    (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)),
85    (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)),
86    (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
87    (cros_repair.PythonVerifier, 'python', ('ssh',)),
88    (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
89    (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)),
90    (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh',)),
91    (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation',
92     ('ssh',)),
93    (cros_repair.JetstreamServicesVerifier, 'jetstream_services', ('ssh',)),
94)
95
96JETSTREAM_REPAIR_ACTIONS = (
97    (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
98    (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)),
99    (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)),
100    (cros_firmware.FirmwareRepair,
101     'firmware', (), ('ssh', 'fwstatus', 'good_au')),
102    (cros_repair.CrosRebootRepair,
103     'reboot', ('ssh',), ('devmode', 'writable',)),
104    (cros_repair.ColdRebootRepair,
105     'coldboot', ('ssh',), ('ec_reset',)),
106    (cros_repair.JetstreamTpmRepair,
107     'jetstream_tpm_repair',
108     ('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
109     ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
110      'jetstream_attestation')),
111    (cros_repair.JetstreamServiceRepair,
112     'jetstream_service_repair',
113     ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'jetstream_tpm',
114      'jetstream_attestation'),
115     ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
116      'jetstream_attestation', 'jetstream_services')),
117    (cros_repair.AutoUpdateRepair,
118     'au',
119     ('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
120     ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
121      'jetstream_attestation', 'jetstream_services')),
122    (cros_repair.PowerWashRepair,
123     'powerwash',
124     ('ssh', 'writable'),
125     ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros',
126      'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')),
127    (cros_repair.ServoInstallRepair,
128     'usb',
129     (),
130     ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python',
131      'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')),
132)
133
134CRYPTOHOME_STATUS_OWNED = """{
135   "installattrs": {
136      "first_install": true,
137      "initialized": true,
138      "invalid": false,
139      "lockbox_index": 536870916,
140      "lockbox_nvram_version": 2,
141      "secure": true,
142      "size": 0,
143      "version": 1
144   },
145   "mounts": [  ],
146   "tpm": {
147      "being_owned": false,
148      "can_connect": true,
149      "can_decrypt": false,
150      "can_encrypt": false,
151      "can_load_srk": true,
152      "can_load_srk_pubkey": true,
153      "enabled": true,
154      "has_context": true,
155      "has_cryptohome_key": false,
156      "has_key_handle": false,
157      "last_error": 0,
158      "owned": true
159   }
160}
161"""
162
163CRYPTOHOME_STATUS_NOT_OWNED = """{
164   "installattrs": {
165      "first_install": true,
166      "initialized": true,
167      "invalid": false,
168      "lockbox_index": 536870916,
169      "lockbox_nvram_version": 2,
170      "secure": true,
171      "size": 0,
172      "version": 1
173   },
174   "mounts": [  ],
175   "tpm": {
176      "being_owned": false,
177      "can_connect": true,
178      "can_decrypt": false,
179      "can_encrypt": false,
180      "can_load_srk": false,
181      "can_load_srk_pubkey": false,
182      "enabled": true,
183      "has_context": true,
184      "has_cryptohome_key": false,
185      "has_key_handle": false,
186      "last_error": 0,
187      "owned": false
188   }
189}
190"""
191
192CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{
193   "installattrs": {
194      "first_install": true,
195      "initialized": true,
196      "invalid": false,
197      "lockbox_index": 536870916,
198      "lockbox_nvram_version": 2,
199      "secure": true,
200      "size": 0,
201      "version": 1
202   },
203   "mounts": [  ],
204   "tpm": {
205      "being_owned": false,
206      "can_connect": true,
207      "can_decrypt": false,
208      "can_encrypt": false,
209      "can_load_srk": false,
210      "can_load_srk_pubkey": false,
211      "enabled": true,
212      "has_context": true,
213      "has_cryptohome_key": false,
214      "has_key_handle": false,
215      "last_error": 0,
216      "owned": true
217   }
218}
219"""
220
221TPM_STATUS_READY = """
222TPM Enabled: true
223TPM Owned: true
224TPM Being Owned: false
225TPM Ready: true
226TPM Password: 9eaee4da8b4c
227"""
228
229TPM_STATUS_NOT_READY = """
230TPM Enabled: true
231TPM Owned: false
232TPM Being Owned: true
233TPM Ready: false
234TPM Password:
235"""
236
237
238class CrosRepairUnittests(unittest.TestCase):
239    # pylint: disable=missing-docstring
240
241    maxDiff = None
242
243    def test_cros_repair(self):
244        verify_dag = cros_repair._cros_verify_dag()
245        self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG)
246        self.check_verify_dag(verify_dag)
247        repair_actions = cros_repair._cros_repair_actions()
248        self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS)
249        self.check_repair_actions(verify_dag, repair_actions)
250
251    def test_moblab_repair(self):
252        verify_dag = cros_repair._moblab_verify_dag()
253        self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG)
254        self.check_verify_dag(verify_dag)
255        repair_actions = cros_repair._moblab_repair_actions()
256        self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS)
257        self.check_repair_actions(verify_dag, repair_actions)
258
259    def test_jetstream_repair(self):
260        verify_dag = cros_repair._jetstream_verify_dag()
261        self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG)
262        self.check_verify_dag(verify_dag)
263        repair_actions = cros_repair._jetstream_repair_actions()
264        self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS)
265        self.check_repair_actions(verify_dag, repair_actions)
266
267    def check_verify_dag(self, verify_dag):
268        """Checks that dependency labels are defined."""
269        labels = [n[1] for n in verify_dag]
270        for node in verify_dag:
271            for dep in node[2]:
272                self.assertIn(dep, labels)
273
274    def check_repair_actions(self, verify_dag, repair_actions):
275        """Checks that dependency and trigger labels are defined."""
276        verify_labels = [n[1] for n in verify_dag]
277        for action in repair_actions:
278            deps = action[2]
279            triggers = action[3]
280            for label in deps + triggers:
281                self.assertIn(label, verify_labels)
282
283    def test_get_cryptohome_status_owned(self):
284        mock_host = mock.Mock()
285        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
286        status = cros_repair.CryptohomeStatus(mock_host)
287        self.assertDictEqual({
288            'being_owned': False,
289            'can_connect': True,
290            'can_decrypt': False,
291            'can_encrypt': False,
292            'can_load_srk': True,
293            'can_load_srk_pubkey': True,
294            'enabled': True,
295            'has_context': True,
296            'has_cryptohome_key': False,
297            'has_key_handle': False,
298            'last_error': 0,
299            'owned': True,
300            }, status['tpm'])
301        self.assertTrue(status.tpm_enabled)
302        self.assertTrue(status.tpm_owned)
303        self.assertTrue(status.tpm_can_load_srk)
304        self.assertTrue(status.tpm_can_load_srk_pubkey)
305
306    def test_get_cryptohome_status_not_owned(self):
307        mock_host = mock.Mock()
308        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
309        status = cros_repair.CryptohomeStatus(mock_host)
310        self.assertDictEqual({
311            'being_owned': False,
312            'can_connect': True,
313            'can_decrypt': False,
314            'can_encrypt': False,
315            'can_load_srk': False,
316            'can_load_srk_pubkey': False,
317            'enabled': True,
318            'has_context': True,
319            'has_cryptohome_key': False,
320            'has_key_handle': False,
321            'last_error': 0,
322            'owned': False,
323        }, status['tpm'])
324        self.assertTrue(status.tpm_enabled)
325        self.assertFalse(status.tpm_owned)
326        self.assertFalse(status.tpm_can_load_srk)
327        self.assertFalse(status.tpm_can_load_srk_pubkey)
328
329    @mock.patch.object(cros_repair, '_is_virtual_machine')
330    def test_tpm_status_verifier_owned(self, mock_is_virt):
331        mock_is_virt.return_value = False
332        mock_host = mock.Mock()
333        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
334        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
335        tpm_verifier.verify(mock_host)
336
337    @mock.patch.object(cros_repair, '_is_virtual_machine')
338    def test_tpm_status_verifier_not_owned(self, mock_is_virt):
339        mock_is_virt.return_value = False
340        mock_host = mock.Mock()
341        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
342        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
343        tpm_verifier.verify(mock_host)
344
345    @mock.patch.object(cros_repair, '_is_virtual_machine')
346    def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt):
347        mock_is_virt.return_value = False
348        mock_host = mock.Mock()
349        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK
350        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
351        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
352            tpm_verifier.verify(mock_host)
353        self.assertEqual('Cannot load the TPM SRK',
354                         ctx.exception.message)
355
356    def test_jetstream_tpm_owned(self):
357        mock_host = mock.Mock()
358        mock_host.run.side_effect = [
359            mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
360            mock.Mock(stdout=TPM_STATUS_READY),
361        ]
362        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
363        tpm_verifier.verify(mock_host)
364
365    @mock.patch.object(retry.logging, 'warning')
366    @mock.patch.object(retry.time, 'time')
367    @mock.patch.object(retry.time, 'sleep')
368    def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging):
369        mock_time.side_effect = itertools.count(0, 20)
370        mock_host = mock.Mock()
371        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
372        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
373        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
374            tpm_verifier.verify(mock_host)
375        self.assertEqual('TPM is not owned', ctx.exception.message)
376
377    @mock.patch.object(retry.logging, 'warning')
378    @mock.patch.object(retry.time, 'time')
379    @mock.patch.object(retry.time, 'sleep')
380    def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging):
381        mock_time.side_effect = itertools.count(0, 20)
382        mock_host = mock.Mock()
383        mock_host.run.side_effect = itertools.cycle([
384            mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
385            mock.Mock(stdout=TPM_STATUS_NOT_READY),
386        ])
387        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
388        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
389            tpm_verifier.verify(mock_host)
390        self.assertEqual('TPM is not ready', ctx.exception.message)
391
392    @mock.patch.object(retry.logging, 'warning')
393    @mock.patch.object(retry.time, 'time')
394    @mock.patch.object(retry.time, 'sleep')
395    def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time,
396                                          mock_logging):
397        mock_time.side_effect = itertools.count(0, 20)
398        mock_host = mock.Mock()
399        mock_host.run.side_effect = error.AutoservRunError('test', None)
400        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
401        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
402            tpm_verifier.verify(mock_host)
403        self.assertEqual('Could not determine TPM status',
404                         ctx.exception.message)
405
406
407if __name__ == '__main__':
408    unittest.main()
409