1#!/usr/bin/python 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import itertools 7import mock 8import unittest 9 10import common 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.common_lib import hosts 13from autotest_lib.client.common_lib.cros import retry 14from autotest_lib.server.hosts import cros_firmware 15from autotest_lib.server.hosts import cros_repair 16from autotest_lib.server.hosts import repair_utils 17 18 19CROS_VERIFY_DAG = ( 20 (repair_utils.SshVerifier, 'ssh', ()), 21 (cros_repair.DevModeVerifier, 'devmode', ('ssh',)), 22 (cros_repair.HWIDVerifier, 'hwid', ('ssh',)), 23 (cros_repair.ACPowerVerifier, 'power', ('ssh',)), 24 (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)), 25 (cros_repair.WritableVerifier, 'writable', ('ssh',)), 26 (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)), 27 (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)), 28 (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)), 29 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)), 30 (cros_repair.PythonVerifier, 'python', ('ssh',)), 31 (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), 32 (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)), 33) 34 35CROS_REPAIR_ACTIONS = ( 36 (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), 37 (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)), 38 (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)), 39 (cros_firmware.FirmwareRepair, 40 'firmware', (), ('ssh', 'fwstatus', 'good_au')), 41 (cros_repair.CrosRebootRepair, 42 'reboot', ('ssh',), ('devmode', 'writable',)), 43 (cros_repair.ColdRebootRepair, 44 'coldboot', ('ssh',), ('ec_reset',)), 45 (cros_repair.AutoUpdateRepair, 46 'au', 47 ('ssh', 'writable', 'tpm', 'good_au', 'ext4'), 48 ('power', 'rwfw', 'python', 'cros')), 49 (cros_repair.PowerWashRepair, 50 'powerwash', 51 ('ssh', 'writable'), 52 ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros')), 53 (cros_repair.ServoInstallRepair, 54 'usb', 55 (), 56 ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 57 'python', 'cros')), 58) 59 60MOBLAB_VERIFY_DAG = ( 61 (repair_utils.SshVerifier, 'ssh', ()), 62 (cros_repair.ACPowerVerifier, 'power', ('ssh',)), 63 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)), 64 (cros_repair.PythonVerifier, 'python', ('ssh',)), 65 (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), 66) 67 68MOBLAB_REPAIR_ACTIONS = ( 69 (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), 70 (cros_repair.AutoUpdateRepair, 71 'au', ('ssh',), ('power', 'rwfw', 'python', 'cros',)), 72) 73 74JETSTREAM_VERIFY_DAG = ( 75 (repair_utils.SshVerifier, 'ssh', ()), 76 (cros_repair.DevModeVerifier, 'devmode', ('ssh',)), 77 (cros_repair.HWIDVerifier, 'hwid', ('ssh',)), 78 (cros_repair.ACPowerVerifier, 'power', ('ssh',)), 79 (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)), 80 (cros_repair.WritableVerifier, 'writable', ('ssh',)), 81 (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)), 82 (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)), 83 (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)), 84 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)), 85 (cros_repair.PythonVerifier, 'python', ('ssh',)), 86 (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), 87 (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)), 88 (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh',)), 89 (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation', 90 ('ssh',)), 91 (cros_repair.JetstreamServicesVerifier, 'jetstream_services', ('ssh',)), 92) 93 94JETSTREAM_REPAIR_ACTIONS = ( 95 (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), 96 (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)), 97 (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)), 98 (cros_firmware.FirmwareRepair, 99 'firmware', (), ('ssh', 'fwstatus', 'good_au')), 100 (cros_repair.CrosRebootRepair, 101 'reboot', ('ssh',), ('devmode', 'writable',)), 102 (cros_repair.ColdRebootRepair, 103 'coldboot', ('ssh',), ('ec_reset',)), 104 (cros_repair.JetstreamTpmRepair, 105 'jetstream_tpm_repair', 106 ('ssh', 'writable', 'tpm', 'good_au', 'ext4'), 107 ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 108 'jetstream_attestation')), 109 (cros_repair.JetstreamServiceRepair, 110 'jetstream_service_repair', 111 ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'jetstream_tpm', 112 'jetstream_attestation'), 113 ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 114 'jetstream_attestation', 'jetstream_services')), 115 (cros_repair.AutoUpdateRepair, 116 'au', 117 ('ssh', 'writable', 'tpm', 'good_au', 'ext4'), 118 ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 119 'jetstream_attestation', 'jetstream_services')), 120 (cros_repair.PowerWashRepair, 121 'powerwash', 122 ('ssh', 'writable'), 123 ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros', 124 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')), 125 (cros_repair.ServoInstallRepair, 126 'usb', 127 (), 128 ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 129 'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')), 130) 131 132CRYPTOHOME_STATUS_OWNED = """{ 133 "installattrs": { 134 "first_install": true, 135 "initialized": true, 136 "invalid": false, 137 "lockbox_index": 536870916, 138 "lockbox_nvram_version": 2, 139 "secure": true, 140 "size": 0, 141 "version": 1 142 }, 143 "mounts": [ ], 144 "tpm": { 145 "being_owned": false, 146 "can_connect": true, 147 "can_decrypt": false, 148 "can_encrypt": false, 149 "can_load_srk": true, 150 "can_load_srk_pubkey": true, 151 "enabled": true, 152 "has_context": true, 153 "has_cryptohome_key": false, 154 "has_key_handle": false, 155 "last_error": 0, 156 "owned": true 157 } 158} 159""" 160 161CRYPTOHOME_STATUS_NOT_OWNED = """{ 162 "installattrs": { 163 "first_install": true, 164 "initialized": true, 165 "invalid": false, 166 "lockbox_index": 536870916, 167 "lockbox_nvram_version": 2, 168 "secure": true, 169 "size": 0, 170 "version": 1 171 }, 172 "mounts": [ ], 173 "tpm": { 174 "being_owned": false, 175 "can_connect": true, 176 "can_decrypt": false, 177 "can_encrypt": false, 178 "can_load_srk": false, 179 "can_load_srk_pubkey": false, 180 "enabled": true, 181 "has_context": true, 182 "has_cryptohome_key": false, 183 "has_key_handle": false, 184 "last_error": 0, 185 "owned": false 186 } 187} 188""" 189 190CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{ 191 "installattrs": { 192 "first_install": true, 193 "initialized": true, 194 "invalid": false, 195 "lockbox_index": 536870916, 196 "lockbox_nvram_version": 2, 197 "secure": true, 198 "size": 0, 199 "version": 1 200 }, 201 "mounts": [ ], 202 "tpm": { 203 "being_owned": false, 204 "can_connect": true, 205 "can_decrypt": false, 206 "can_encrypt": false, 207 "can_load_srk": false, 208 "can_load_srk_pubkey": false, 209 "enabled": true, 210 "has_context": true, 211 "has_cryptohome_key": false, 212 "has_key_handle": false, 213 "last_error": 0, 214 "owned": true 215 } 216} 217""" 218 219TPM_STATUS_READY = """ 220TPM Enabled: true 221TPM Owned: true 222TPM Being Owned: false 223TPM Ready: true 224TPM Password: 9eaee4da8b4c 225""" 226 227TPM_STATUS_NOT_READY = """ 228TPM Enabled: true 229TPM Owned: false 230TPM Being Owned: true 231TPM Ready: false 232TPM Password: 233""" 234 235 236class CrosRepairUnittests(unittest.TestCase): 237 # pylint: disable=missing-docstring 238 239 maxDiff = None 240 241 def test_cros_repair(self): 242 verify_dag = cros_repair._cros_verify_dag() 243 self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG) 244 self.check_verify_dag(verify_dag) 245 repair_actions = cros_repair._cros_repair_actions() 246 self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS) 247 self.check_repair_actions(verify_dag, repair_actions) 248 249 def test_moblab_repair(self): 250 verify_dag = cros_repair._moblab_verify_dag() 251 self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG) 252 self.check_verify_dag(verify_dag) 253 repair_actions = cros_repair._moblab_repair_actions() 254 self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS) 255 self.check_repair_actions(verify_dag, repair_actions) 256 257 def test_jetstream_repair(self): 258 verify_dag = cros_repair._jetstream_verify_dag() 259 self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG) 260 self.check_verify_dag(verify_dag) 261 repair_actions = cros_repair._jetstream_repair_actions() 262 self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS) 263 self.check_repair_actions(verify_dag, repair_actions) 264 265 def check_verify_dag(self, verify_dag): 266 """Checks that dependency labels are defined.""" 267 labels = [n[1] for n in verify_dag] 268 for node in verify_dag: 269 for dep in node[2]: 270 self.assertIn(dep, labels) 271 272 def check_repair_actions(self, verify_dag, repair_actions): 273 """Checks that dependency and trigger labels are defined.""" 274 verify_labels = [n[1] for n in verify_dag] 275 for action in repair_actions: 276 deps = action[2] 277 triggers = action[3] 278 for label in deps + triggers: 279 self.assertIn(label, verify_labels) 280 281 def test_get_cryptohome_status_owned(self): 282 mock_host = mock.Mock() 283 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED 284 status = cros_repair.CryptohomeStatus(mock_host) 285 self.assertDictEqual({ 286 'being_owned': False, 287 'can_connect': True, 288 'can_decrypt': False, 289 'can_encrypt': False, 290 'can_load_srk': True, 291 'can_load_srk_pubkey': True, 292 'enabled': True, 293 'has_context': True, 294 'has_cryptohome_key': False, 295 'has_key_handle': False, 296 'last_error': 0, 297 'owned': True, 298 }, status['tpm']) 299 self.assertTrue(status.tpm_enabled) 300 self.assertTrue(status.tpm_owned) 301 self.assertTrue(status.tpm_can_load_srk) 302 self.assertTrue(status.tpm_can_load_srk_pubkey) 303 304 def test_get_cryptohome_status_not_owned(self): 305 mock_host = mock.Mock() 306 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 307 status = cros_repair.CryptohomeStatus(mock_host) 308 self.assertDictEqual({ 309 'being_owned': False, 310 'can_connect': True, 311 'can_decrypt': False, 312 'can_encrypt': False, 313 'can_load_srk': False, 314 'can_load_srk_pubkey': False, 315 'enabled': True, 316 'has_context': True, 317 'has_cryptohome_key': False, 318 'has_key_handle': False, 319 'last_error': 0, 320 'owned': False, 321 }, status['tpm']) 322 self.assertTrue(status.tpm_enabled) 323 self.assertFalse(status.tpm_owned) 324 self.assertFalse(status.tpm_can_load_srk) 325 self.assertFalse(status.tpm_can_load_srk_pubkey) 326 327 @mock.patch.object(cros_repair, '_is_virtual_machine') 328 def test_tpm_status_verifier_owned(self, mock_is_virt): 329 mock_is_virt.return_value = False 330 mock_host = mock.Mock() 331 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED 332 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 333 tpm_verifier.verify(mock_host) 334 335 @mock.patch.object(cros_repair, '_is_virtual_machine') 336 def test_tpm_status_verifier_not_owned(self, mock_is_virt): 337 mock_is_virt.return_value = False 338 mock_host = mock.Mock() 339 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 340 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 341 tpm_verifier.verify(mock_host) 342 343 @mock.patch.object(cros_repair, '_is_virtual_machine') 344 def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt): 345 mock_is_virt.return_value = False 346 mock_host = mock.Mock() 347 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK 348 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 349 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 350 tpm_verifier.verify(mock_host) 351 self.assertEqual('Cannot load the TPM SRK', 352 ctx.exception.message) 353 354 def test_jetstream_tpm_owned(self): 355 mock_host = mock.Mock() 356 mock_host.run.side_effect = [ 357 mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), 358 mock.Mock(stdout=TPM_STATUS_READY), 359 ] 360 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 361 tpm_verifier.verify(mock_host) 362 363 @mock.patch.object(retry.logging, 'warning') 364 @mock.patch.object(retry.time, 'time') 365 @mock.patch.object(retry.time, 'sleep') 366 def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging): 367 mock_time.side_effect = itertools.count(0, 20) 368 mock_host = mock.Mock() 369 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 370 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 371 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 372 tpm_verifier.verify(mock_host) 373 self.assertEqual('TPM is not owned', ctx.exception.message) 374 375 @mock.patch.object(retry.logging, 'warning') 376 @mock.patch.object(retry.time, 'time') 377 @mock.patch.object(retry.time, 'sleep') 378 def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging): 379 mock_time.side_effect = itertools.count(0, 20) 380 mock_host = mock.Mock() 381 mock_host.run.side_effect = itertools.cycle([ 382 mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), 383 mock.Mock(stdout=TPM_STATUS_NOT_READY), 384 ]) 385 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 386 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 387 tpm_verifier.verify(mock_host) 388 self.assertEqual('TPM is not ready', ctx.exception.message) 389 390 @mock.patch.object(retry.logging, 'warning') 391 @mock.patch.object(retry.time, 'time') 392 @mock.patch.object(retry.time, 'sleep') 393 def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time, 394 mock_logging): 395 mock_time.side_effect = itertools.count(0, 20) 396 mock_host = mock.Mock() 397 mock_host.run.side_effect = error.AutoservRunError('test', None) 398 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 399 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 400 tpm_verifier.verify(mock_host) 401 self.assertEqual('Could not determine TPM status', 402 ctx.exception.message) 403 404 405if __name__ == '__main__': 406 unittest.main() 407