• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import itertools
7import mock
8import unittest
9
10import common
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.common_lib import hosts
13from autotest_lib.client.common_lib.cros import retry
14from autotest_lib.server.hosts import cros_firmware
15from autotest_lib.server.hosts import cros_repair
16from autotest_lib.server.hosts import repair_utils
17
18
19CROS_VERIFY_DAG = (
20        (repair_utils.PingVerifier, 'ping', ()),
21        (repair_utils.SshVerifier, 'ssh', ('ping', )),
22        (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()),
23        (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )),
24        (cros_repair.DevModeVerifier, 'devmode', ('ssh', )),
25        (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )),
26        (cros_repair.HWIDVerifier, 'hwid', ('ssh', )),
27        (cros_repair.ACPowerVerifier, 'power', ('ssh', )),
28        (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )),
29        (cros_repair.WritableVerifier, 'writable', ('ssh', )),
30        (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )),
31        (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )),
32        (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )),
33        (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )),
34        (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )),
35        (cros_repair.PythonVerifier, 'python', ('ssh', )),
36        (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )),
37        (cros_repair.CrosVerisionVerifier, 'cros_version_label', ('ssh', )),
38        (cros_repair.StopStartUIVerifier, 'stop_start_ui', ('ssh', )),
39        (cros_repair.DUTStorageVerifier, 'storage', ('ssh', )),
40)
41
42CROS_REPAIR_ACTIONS = (
43        (repair_utils.RPMCycleRepair, 'rpm', (), (
44                'ping',
45                'ssh',
46                'power',
47        )),
48        (cros_repair.ServoResetRepair, 'servoreset', (), (
49                'ping',
50                'ssh',
51                'stop_start_ui',
52                'power',
53        )),
54        (
55                cros_repair.ServoCr50RebootRepair,
56                'cr50_reset',
57                (),
58                ('ping', 'ssh', 'stop_start_ui', 'power'),
59        ),
60        (cros_repair.ServoSysRqRepair, 'sysrq', (), (
61                'ping',
62                'ssh',
63        )),
64        (cros_repair.LabelCleanupRepair, 'label_cleanup', ('ssh', ),
65         ('cros_version_label', )),
66        (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (),
67         ('ping', 'ssh', 'fwstatus', 'good_provision')),
68        (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ),
69         ('dev_default_boot', )),
70        (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), (
71                'devmode',
72                'writable',
73        )),
74        (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ),
75         ('enrollment_state', )),
76        (cros_repair.ProvisionRepair, 'provision',
77         ('ping', 'ssh', 'writable', 'stop_start_ui', 'tpm', 'good_provision',
78          'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
79                    'dev_default_boot')),
80        (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable',
81                                                    'stop_start_ui'),
82         ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus',
83          'python', 'hwid', 'cros', 'dev_default_boot')),
84        (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ),
85         ('ping', 'ssh', 'writable', 'stop_start_ui', 'tpm', 'good_provision',
86          'ext4', 'power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
87          'dev_default_boot', 'faft_tpm')),
88        (cros_firmware.GeneralFirmwareRepair, 'general_firmware',
89         ('usb_drive', ), (
90                 'ping',
91                 'ssh',
92         )),
93)
94
95MOBLAB_VERIFY_DAG = (
96    (repair_utils.SshVerifier, 'ssh', ()),
97    (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
98    (cros_repair.PythonVerifier, 'python', ('ssh',)),
99    (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
100)
101
102MOBLAB_REPAIR_ACTIONS = (
103    (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
104    (cros_repair.ProvisionRepair,
105     'provision', ('ssh',), ('power', 'python', 'cros',)),
106)
107
108JETSTREAM_VERIFY_DAG = (
109        (repair_utils.PingVerifier, 'ping', ()),
110        (repair_utils.SshVerifier, 'ssh', ('ping', )),
111        (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()),
112        (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )),
113        (cros_repair.DevModeVerifier, 'devmode', ('ssh', )),
114        (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )),
115        (cros_repair.HWIDVerifier, 'hwid', ('ssh', )),
116        (cros_repair.ACPowerVerifier, 'power', ('ssh', )),
117        (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )),
118        (cros_repair.WritableVerifier, 'writable', ('ssh', )),
119        (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )),
120        (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )),
121        (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )),
122        (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )),
123        (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )),
124        (cros_repair.PythonVerifier, 'python', ('ssh', )),
125        (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )),
126        (cros_repair.CrosVerisionVerifier, 'cros_version_label', ('ssh', )),
127        (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh', )),
128        (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation',
129         ('ssh', )),
130        (cros_repair.JetstreamServicesVerifier, 'jetstream_services',
131         ('ssh', )),
132)
133
134JETSTREAM_REPAIR_ACTIONS = (
135        (repair_utils.RPMCycleRepair, 'rpm', (), (
136                'ping',
137                'ssh',
138                'power',
139        )),
140        (cros_repair.ServoResetRepair, 'servoreset', (), (
141                'ping',
142                'ssh',
143        )),
144        (cros_repair.ServoCr50RebootRepair, 'cr50_reset', (), (
145                'ping',
146                'ssh',
147        )),
148        (cros_repair.ServoSysRqRepair, 'sysrq', (), (
149                'ping',
150                'ssh',
151        )),
152        (cros_repair.LabelCleanupRepair, 'label_cleanup', ('ssh', ),
153         ('cros_version_label', )),
154        (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (),
155         ('ping', 'ssh', 'fwstatus', 'good_provision')),
156        (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ),
157         ('dev_default_boot', )),
158        (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), (
159                'devmode',
160                'writable',
161        )),
162        (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ),
163         ('enrollment_state', )),
164        (cros_repair.JetstreamTpmRepair, 'jetstream_tpm_repair',
165         ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4'),
166         ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
167          'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation')),
168        (cros_repair.JetstreamServiceRepair, 'jetstream_service_repair',
169         ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4',
170          'jetstream_tpm', 'jetstream_attestation'),
171         ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
172          'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation',
173          'jetstream_services')),
174        (cros_repair.ProvisionRepair, 'provision',
175         ('ping', 'ssh', 'writable', 'tpm', 'good_provision',
176          'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
177                    'dev_default_boot', 'jetstream_tpm',
178                    'jetstream_attestation', 'jetstream_services')),
179        (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable'),
180         ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus',
181          'python', 'hwid', 'cros', 'dev_default_boot', 'jetstream_tpm',
182          'jetstream_attestation', 'jetstream_services')),
183        (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ), (
184                'ping',
185                'ssh',
186                'writable',
187                'tpm',
188                'good_provision',
189                'ext4',
190                'power',
191                'rwfw',
192                'fwstatus',
193                'python',
194                'hwid',
195                'cros',
196                'dev_default_boot',
197                'jetstream_tpm',
198                'jetstream_attestation',
199                'jetstream_services',
200                'faft_tpm',
201        )),
202)
203
204CRYPTOHOME_STATUS_OWNED = """{
205   "installattrs": {
206      "first_install": true,
207      "initialized": true,
208      "invalid": false,
209      "lockbox_index": 536870916,
210      "lockbox_nvram_version": 2,
211      "secure": true,
212      "size": 0,
213      "version": 1
214   },
215   "mounts": [  ],
216   "tpm": {
217      "being_owned": false,
218      "can_connect": true,
219      "can_decrypt": false,
220      "can_encrypt": false,
221      "can_load_srk": true,
222      "can_load_srk_pubkey": true,
223      "enabled": true,
224      "has_context": true,
225      "has_cryptohome_key": false,
226      "has_key_handle": false,
227      "last_error": 0,
228      "owned": true
229   }
230}
231"""
232
233CRYPTOHOME_STATUS_NOT_OWNED = """{
234   "installattrs": {
235      "first_install": true,
236      "initialized": true,
237      "invalid": false,
238      "lockbox_index": 536870916,
239      "lockbox_nvram_version": 2,
240      "secure": true,
241      "size": 0,
242      "version": 1
243   },
244   "mounts": [  ],
245   "tpm": {
246      "being_owned": false,
247      "can_connect": true,
248      "can_decrypt": false,
249      "can_encrypt": false,
250      "can_load_srk": false,
251      "can_load_srk_pubkey": false,
252      "enabled": true,
253      "has_context": true,
254      "has_cryptohome_key": false,
255      "has_key_handle": false,
256      "last_error": 0,
257      "owned": false
258   }
259}
260"""
261
262CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{
263   "installattrs": {
264      "first_install": true,
265      "initialized": true,
266      "invalid": false,
267      "lockbox_index": 536870916,
268      "lockbox_nvram_version": 2,
269      "secure": true,
270      "size": 0,
271      "version": 1
272   },
273   "mounts": [  ],
274   "tpm": {
275      "being_owned": false,
276      "can_connect": true,
277      "can_decrypt": false,
278      "can_encrypt": false,
279      "can_load_srk": false,
280      "can_load_srk_pubkey": false,
281      "enabled": true,
282      "has_context": true,
283      "has_cryptohome_key": false,
284      "has_key_handle": false,
285      "last_error": 0,
286      "owned": true
287   }
288}
289"""
290
291TPM_STATUS_READY = """
292TPM Enabled: true
293TPM Owned: true
294TPM Being Owned: false
295TPM Ready: true
296TPM Password: 9eaee4da8b4c
297"""
298
299TPM_STATUS_NOT_READY = """
300TPM Enabled: true
301TPM Owned: false
302TPM Being Owned: true
303TPM Ready: false
304TPM Password:
305"""
306
307
308class CrosRepairUnittests(unittest.TestCase):
309    # pylint: disable=missing-docstring
310
311    maxDiff = None
312
313    def test_cros_repair(self):
314        verify_dag = cros_repair._cros_verify_dag()
315        self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG)
316        self.check_verify_dag(verify_dag)
317        repair_actions = cros_repair._cros_repair_actions()
318        self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS)
319        self.check_repair_actions(verify_dag, repair_actions)
320
321    def test_moblab_repair(self):
322        verify_dag = cros_repair._moblab_verify_dag()
323        self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG)
324        self.check_verify_dag(verify_dag)
325        repair_actions = cros_repair._moblab_repair_actions()
326        self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS)
327        self.check_repair_actions(verify_dag, repair_actions)
328
329    def test_jetstream_repair(self):
330        verify_dag = cros_repair._jetstream_verify_dag()
331        self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG)
332        self.check_verify_dag(verify_dag)
333        repair_actions = cros_repair._jetstream_repair_actions()
334        self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS)
335        self.check_repair_actions(verify_dag, repair_actions)
336
337    def check_verify_dag(self, verify_dag):
338        """Checks that dependency labels are defined."""
339        labels = [n[1] for n in verify_dag]
340        for node in verify_dag:
341            for dep in node[2]:
342                self.assertIn(dep, labels)
343
344    def check_repair_actions(self, verify_dag, repair_actions):
345        """Checks that dependency and trigger labels are defined."""
346        verify_labels = [n[1] for n in verify_dag]
347        for action in repair_actions:
348            deps = action[2]
349            triggers = action[3]
350            for label in deps + triggers:
351                self.assertIn(label, verify_labels)
352
353    def test_get_cryptohome_status_owned(self):
354        mock_host = mock.Mock()
355        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
356        status = cros_repair.CryptohomeStatus(mock_host)
357        self.assertDictEqual({
358            'being_owned': False,
359            'can_connect': True,
360            'can_decrypt': False,
361            'can_encrypt': False,
362            'can_load_srk': True,
363            'can_load_srk_pubkey': True,
364            'enabled': True,
365            'has_context': True,
366            'has_cryptohome_key': False,
367            'has_key_handle': False,
368            'last_error': 0,
369            'owned': True,
370            }, status['tpm'])
371        self.assertTrue(status.tpm_enabled)
372        self.assertTrue(status.tpm_owned)
373        self.assertTrue(status.tpm_can_load_srk)
374        self.assertTrue(status.tpm_can_load_srk_pubkey)
375
376    def test_get_cryptohome_status_not_owned(self):
377        mock_host = mock.Mock()
378        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
379        status = cros_repair.CryptohomeStatus(mock_host)
380        self.assertDictEqual({
381            'being_owned': False,
382            'can_connect': True,
383            'can_decrypt': False,
384            'can_encrypt': False,
385            'can_load_srk': False,
386            'can_load_srk_pubkey': False,
387            'enabled': True,
388            'has_context': True,
389            'has_cryptohome_key': False,
390            'has_key_handle': False,
391            'last_error': 0,
392            'owned': False,
393        }, status['tpm'])
394        self.assertTrue(status.tpm_enabled)
395        self.assertFalse(status.tpm_owned)
396        self.assertFalse(status.tpm_can_load_srk)
397        self.assertFalse(status.tpm_can_load_srk_pubkey)
398
399    @mock.patch.object(cros_repair, '_is_virtual_machine')
400    def test_tpm_status_verifier_owned(self, mock_is_virt):
401        mock_is_virt.return_value = False
402        mock_host = mock.Mock()
403        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
404        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
405        tpm_verifier.verify(mock_host)
406
407    @mock.patch.object(cros_repair, '_is_virtual_machine')
408    def test_tpm_status_verifier_not_owned(self, mock_is_virt):
409        mock_is_virt.return_value = False
410        mock_host = mock.Mock()
411        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
412        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
413        tpm_verifier.verify(mock_host)
414
415    @mock.patch.object(cros_repair, '_is_virtual_machine')
416    def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt):
417        mock_is_virt.return_value = False
418        mock_host = mock.Mock()
419        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK
420        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
421        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
422            tpm_verifier.verify(mock_host)
423        self.assertEqual('Cannot load the TPM SRK',
424                         ctx.exception.message)
425
426    def test_jetstream_tpm_owned(self):
427        mock_host = mock.Mock()
428        mock_host.run.side_effect = [
429            mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
430            mock.Mock(stdout=TPM_STATUS_READY),
431        ]
432        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
433        tpm_verifier.verify(mock_host)
434
435    @mock.patch.object(retry.logging, 'warning')
436    @mock.patch.object(retry.time, 'time')
437    @mock.patch.object(retry.time, 'sleep')
438    def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging):
439        mock_time.side_effect = itertools.count(0, 20)
440        mock_host = mock.Mock()
441        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
442        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
443        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
444            tpm_verifier.verify(mock_host)
445        self.assertEqual('TPM is not owned', ctx.exception.message)
446
447    @mock.patch.object(retry.logging, 'warning')
448    @mock.patch.object(retry.time, 'time')
449    @mock.patch.object(retry.time, 'sleep')
450    def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging):
451        mock_time.side_effect = itertools.count(0, 20)
452        mock_host = mock.Mock()
453        mock_host.run.side_effect = itertools.cycle([
454            mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
455            mock.Mock(stdout=TPM_STATUS_NOT_READY),
456        ])
457        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
458        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
459            tpm_verifier.verify(mock_host)
460        self.assertEqual('TPM is not ready', ctx.exception.message)
461
462    @mock.patch.object(retry.logging, 'warning')
463    @mock.patch.object(retry.time, 'time')
464    @mock.patch.object(retry.time, 'sleep')
465    def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time,
466                                          mock_logging):
467        mock_time.side_effect = itertools.count(0, 20)
468        mock_host = mock.Mock()
469        mock_host.run.side_effect = error.AutoservRunError('test', None)
470        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
471        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
472            tpm_verifier.verify(mock_host)
473        self.assertEqual('Could not determine TPM status',
474                         ctx.exception.message)
475
476
477if __name__ == '__main__':
478    unittest.main()
479