#!/usr/bin/python # Copyright 2017 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import itertools import mock import unittest import common from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import hosts from autotest_lib.client.common_lib.cros import retry from autotest_lib.server.hosts import cros_firmware from autotest_lib.server.hosts import cros_repair from autotest_lib.server.hosts import repair_utils CROS_VERIFY_DAG = ( (repair_utils.SshVerifier, 'ssh', ()), (cros_repair.DevModeVerifier, 'devmode', ('ssh',)), (cros_repair.HWIDVerifier, 'hwid', ('ssh',)), (cros_repair.ACPowerVerifier, 'power', ('ssh',)), (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)), (cros_repair.WritableVerifier, 'writable', ('ssh',)), (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)), (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)), (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)), (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)), (cros_repair.PythonVerifier, 'python', ('ssh',)), (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)), ) CROS_REPAIR_ACTIONS = ( (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)), (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)), (cros_firmware.FirmwareRepair, 'firmware', (), ('ssh', 'fwstatus', 'good_au')), (cros_repair.CrosRebootRepair, 'reboot', ('ssh',), ('devmode', 'writable',)), (cros_repair.ColdRebootRepair, 'coldboot', ('ssh',), ('ec_reset',)), (cros_repair.AutoUpdateRepair, 'au', ('ssh', 'writable', 'tpm', 'good_au', 'ext4'), ('power', 'rwfw', 'python', 'cros')), (cros_repair.PowerWashRepair, 'powerwash', ('ssh', 'writable'), ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros')), (cros_repair.ServoInstallRepair, 'usb', (), ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros')), ) MOBLAB_VERIFY_DAG = ( (repair_utils.SshVerifier, 'ssh', ()), (cros_repair.ACPowerVerifier, 'power', ('ssh',)), (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)), (cros_repair.PythonVerifier, 'python', ('ssh',)), (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), ) MOBLAB_REPAIR_ACTIONS = ( (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), (cros_repair.AutoUpdateRepair, 'au', ('ssh',), ('power', 'rwfw', 'python', 'cros',)), ) JETSTREAM_VERIFY_DAG = ( (repair_utils.SshVerifier, 'ssh', ()), (cros_repair.DevModeVerifier, 'devmode', ('ssh',)), (cros_repair.HWIDVerifier, 'hwid', ('ssh',)), (cros_repair.ACPowerVerifier, 'power', ('ssh',)), (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)), (cros_repair.WritableVerifier, 'writable', ('ssh',)), (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)), (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)), (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)), (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)), (cros_repair.PythonVerifier, 'python', ('ssh',)), (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)), (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh',)), (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation', ('ssh',)), (cros_repair.JetstreamServicesVerifier, 'jetstream_services', ('ssh',)), ) JETSTREAM_REPAIR_ACTIONS = ( (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)), (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)), (cros_firmware.FirmwareRepair, 'firmware', (), ('ssh', 'fwstatus', 'good_au')), (cros_repair.CrosRebootRepair, 'reboot', ('ssh',), ('devmode', 'writable',)), (cros_repair.ColdRebootRepair, 'coldboot', ('ssh',), ('ec_reset',)), (cros_repair.JetstreamTpmRepair, 'jetstream_tpm_repair', ('ssh', 'writable', 'tpm', 'good_au', 'ext4'), ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 'jetstream_attestation')), (cros_repair.JetstreamServiceRepair, 'jetstream_service_repair', ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'jetstream_tpm', 'jetstream_attestation'), ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')), (cros_repair.AutoUpdateRepair, 'au', ('ssh', 'writable', 'tpm', 'good_au', 'ext4'), ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')), (cros_repair.PowerWashRepair, 'powerwash', ('ssh', 'writable'), ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')), (cros_repair.ServoInstallRepair, 'usb', (), ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')), ) CRYPTOHOME_STATUS_OWNED = """{ "installattrs": { "first_install": true, "initialized": true, "invalid": false, "lockbox_index": 536870916, "lockbox_nvram_version": 2, "secure": true, "size": 0, "version": 1 }, "mounts": [ ], "tpm": { "being_owned": false, "can_connect": true, "can_decrypt": false, "can_encrypt": false, "can_load_srk": true, "can_load_srk_pubkey": true, "enabled": true, "has_context": true, "has_cryptohome_key": false, "has_key_handle": false, "last_error": 0, "owned": true } } """ CRYPTOHOME_STATUS_NOT_OWNED = """{ "installattrs": { "first_install": true, "initialized": true, "invalid": false, "lockbox_index": 536870916, "lockbox_nvram_version": 2, "secure": true, "size": 0, "version": 1 }, "mounts": [ ], "tpm": { "being_owned": false, "can_connect": true, "can_decrypt": false, "can_encrypt": false, "can_load_srk": false, "can_load_srk_pubkey": false, "enabled": true, "has_context": true, "has_cryptohome_key": false, "has_key_handle": false, "last_error": 0, "owned": false } } """ CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{ "installattrs": { "first_install": true, "initialized": true, "invalid": false, "lockbox_index": 536870916, "lockbox_nvram_version": 2, "secure": true, "size": 0, "version": 1 }, "mounts": [ ], "tpm": { "being_owned": false, "can_connect": true, "can_decrypt": false, "can_encrypt": false, "can_load_srk": false, "can_load_srk_pubkey": false, "enabled": true, "has_context": true, "has_cryptohome_key": false, "has_key_handle": false, "last_error": 0, "owned": true } } """ TPM_STATUS_READY = """ TPM Enabled: true TPM Owned: true TPM Being Owned: false TPM Ready: true TPM Password: 9eaee4da8b4c """ TPM_STATUS_NOT_READY = """ TPM Enabled: true TPM Owned: false TPM Being Owned: true TPM Ready: false TPM Password: """ class CrosRepairUnittests(unittest.TestCase): # pylint: disable=missing-docstring maxDiff = None def test_cros_repair(self): verify_dag = cros_repair._cros_verify_dag() self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG) self.check_verify_dag(verify_dag) repair_actions = cros_repair._cros_repair_actions() self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS) self.check_repair_actions(verify_dag, repair_actions) def test_moblab_repair(self): verify_dag = cros_repair._moblab_verify_dag() self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG) self.check_verify_dag(verify_dag) repair_actions = cros_repair._moblab_repair_actions() self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS) self.check_repair_actions(verify_dag, repair_actions) def test_jetstream_repair(self): verify_dag = cros_repair._jetstream_verify_dag() self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG) self.check_verify_dag(verify_dag) repair_actions = cros_repair._jetstream_repair_actions() self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS) self.check_repair_actions(verify_dag, repair_actions) def check_verify_dag(self, verify_dag): """Checks that dependency labels are defined.""" labels = [n[1] for n in verify_dag] for node in verify_dag: for dep in node[2]: self.assertIn(dep, labels) def check_repair_actions(self, verify_dag, repair_actions): """Checks that dependency and trigger labels are defined.""" verify_labels = [n[1] for n in verify_dag] for action in repair_actions: deps = action[2] triggers = action[3] for label in deps + triggers: self.assertIn(label, verify_labels) def test_get_cryptohome_status_owned(self): mock_host = mock.Mock() mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED status = cros_repair.CryptohomeStatus(mock_host) self.assertDictEqual({ 'being_owned': False, 'can_connect': True, 'can_decrypt': False, 'can_encrypt': False, 'can_load_srk': True, 'can_load_srk_pubkey': True, 'enabled': True, 'has_context': True, 'has_cryptohome_key': False, 'has_key_handle': False, 'last_error': 0, 'owned': True, }, status['tpm']) self.assertTrue(status.tpm_enabled) self.assertTrue(status.tpm_owned) self.assertTrue(status.tpm_can_load_srk) self.assertTrue(status.tpm_can_load_srk_pubkey) def test_get_cryptohome_status_not_owned(self): mock_host = mock.Mock() mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED status = cros_repair.CryptohomeStatus(mock_host) self.assertDictEqual({ 'being_owned': False, 'can_connect': True, 'can_decrypt': False, 'can_encrypt': False, 'can_load_srk': False, 'can_load_srk_pubkey': False, 'enabled': True, 'has_context': True, 'has_cryptohome_key': False, 'has_key_handle': False, 'last_error': 0, 'owned': False, }, status['tpm']) self.assertTrue(status.tpm_enabled) self.assertFalse(status.tpm_owned) self.assertFalse(status.tpm_can_load_srk) self.assertFalse(status.tpm_can_load_srk_pubkey) @mock.patch.object(cros_repair, '_is_virtual_machine') def test_tpm_status_verifier_owned(self, mock_is_virt): mock_is_virt.return_value = False mock_host = mock.Mock() mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED tpm_verifier = cros_repair.TPMStatusVerifier('test', []) tpm_verifier.verify(mock_host) @mock.patch.object(cros_repair, '_is_virtual_machine') def test_tpm_status_verifier_not_owned(self, mock_is_virt): mock_is_virt.return_value = False mock_host = mock.Mock() mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED tpm_verifier = cros_repair.TPMStatusVerifier('test', []) tpm_verifier.verify(mock_host) @mock.patch.object(cros_repair, '_is_virtual_machine') def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt): mock_is_virt.return_value = False mock_host = mock.Mock() mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK tpm_verifier = cros_repair.TPMStatusVerifier('test', []) with self.assertRaises(hosts.AutoservVerifyError) as ctx: tpm_verifier.verify(mock_host) self.assertEqual('Cannot load the TPM SRK', ctx.exception.message) def test_jetstream_tpm_owned(self): mock_host = mock.Mock() mock_host.run.side_effect = [ mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), mock.Mock(stdout=TPM_STATUS_READY), ] tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) tpm_verifier.verify(mock_host) @mock.patch.object(retry.logging, 'warning') @mock.patch.object(retry.time, 'time') @mock.patch.object(retry.time, 'sleep') def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging): mock_time.side_effect = itertools.count(0, 20) mock_host = mock.Mock() mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) with self.assertRaises(hosts.AutoservVerifyError) as ctx: tpm_verifier.verify(mock_host) self.assertEqual('TPM is not owned', ctx.exception.message) @mock.patch.object(retry.logging, 'warning') @mock.patch.object(retry.time, 'time') @mock.patch.object(retry.time, 'sleep') def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging): mock_time.side_effect = itertools.count(0, 20) mock_host = mock.Mock() mock_host.run.side_effect = itertools.cycle([ mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), mock.Mock(stdout=TPM_STATUS_NOT_READY), ]) tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) with self.assertRaises(hosts.AutoservVerifyError) as ctx: tpm_verifier.verify(mock_host) self.assertEqual('TPM is not ready', ctx.exception.message) @mock.patch.object(retry.logging, 'warning') @mock.patch.object(retry.time, 'time') @mock.patch.object(retry.time, 'sleep') def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time, mock_logging): mock_time.side_effect = itertools.count(0, 20) mock_host = mock.Mock() mock_host.run.side_effect = error.AutoservRunError('test', None) tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) with self.assertRaises(hosts.AutoservVerifyError) as ctx: tpm_verifier.verify(mock_host) self.assertEqual('Could not determine TPM status', ctx.exception.message) if __name__ == '__main__': unittest.main()