1#!/usr/bin/python2 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import itertools 7import mock 8import unittest 9 10import common 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.common_lib import hosts 13from autotest_lib.client.common_lib.cros import retry 14from autotest_lib.server.hosts import cros_firmware 15from autotest_lib.server.hosts import cros_repair 16from autotest_lib.server.hosts import repair_utils 17 18 19CROS_VERIFY_DAG = ( 20 (repair_utils.PingVerifier, 'ping', ()), 21 (repair_utils.SshVerifier, 'ssh', ('ping', )), 22 (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()), 23 (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )), 24 (cros_repair.DevModeVerifier, 'devmode', ('ssh', )), 25 (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )), 26 (cros_repair.HWIDVerifier, 'hwid', ('ssh', )), 27 (cros_repair.ACPowerVerifier, 'power', ('ssh', )), 28 (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )), 29 (cros_repair.WritableVerifier, 'writable', ('ssh', )), 30 (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )), 31 (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )), 32 (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )), 33 (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )), 34 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )), 35 (cros_repair.PythonVerifier, 'python', ('ssh', )), 36 (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )), 37 (cros_repair.CrosVerisionVerifier, 'cros_version_label', ('ssh', )), 38 (cros_repair.StopStartUIVerifier, 'stop_start_ui', ('ssh', )), 39 (cros_repair.DUTStorageVerifier, 'storage', ('ssh', )), 40) 41 42CROS_REPAIR_ACTIONS = ( 43 (repair_utils.RPMCycleRepair, 'rpm', (), ( 44 'ping', 45 'ssh', 46 'power', 47 )), 48 (cros_repair.ServoResetRepair, 'servoreset', (), ( 49 'ping', 50 'ssh', 51 'stop_start_ui', 52 'power', 53 )), 54 ( 55 cros_repair.ServoCr50RebootRepair, 56 'cr50_reset', 57 (), 58 ('ping', 'ssh', 'stop_start_ui', 'power'), 59 ), 60 (cros_repair.ServoSysRqRepair, 'sysrq', (), ( 61 'ping', 62 'ssh', 63 )), 64 (cros_repair.LabelCleanupRepair, 'label_cleanup', ('ssh', ), 65 ('cros_version_label', )), 66 (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (), 67 ('ping', 'ssh', 'fwstatus', 'good_provision')), 68 (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ), 69 ('dev_default_boot', )), 70 (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), ( 71 'devmode', 72 'writable', 73 )), 74 (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ), 75 ('enrollment_state', )), 76 (cros_repair.ProvisionRepair, 'provision', 77 ('ping', 'ssh', 'writable', 'stop_start_ui', 'tpm', 'good_provision', 78 'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 79 'dev_default_boot')), 80 (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable', 81 'stop_start_ui'), 82 ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus', 83 'python', 'hwid', 'cros', 'dev_default_boot')), 84 (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ), 85 ('ping', 'ssh', 'writable', 'stop_start_ui', 'tpm', 'good_provision', 86 'ext4', 'power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 87 'dev_default_boot', 'faft_tpm')), 88 (cros_firmware.GeneralFirmwareRepair, 'general_firmware', 89 ('usb_drive', ), ( 90 'ping', 91 'ssh', 92 )), 93) 94 95MOBLAB_VERIFY_DAG = ( 96 (repair_utils.SshVerifier, 'ssh', ()), 97 (cros_repair.ACPowerVerifier, 'power', ('ssh',)), 98 (cros_repair.PythonVerifier, 'python', ('ssh',)), 99 (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), 100) 101 102MOBLAB_REPAIR_ACTIONS = ( 103 (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), 104 (cros_repair.ProvisionRepair, 105 'provision', ('ssh',), ('power', 'python', 'cros',)), 106) 107 108JETSTREAM_VERIFY_DAG = ( 109 (repair_utils.PingVerifier, 'ping', ()), 110 (repair_utils.SshVerifier, 'ssh', ('ping', )), 111 (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()), 112 (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )), 113 (cros_repair.DevModeVerifier, 'devmode', ('ssh', )), 114 (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )), 115 (cros_repair.HWIDVerifier, 'hwid', ('ssh', )), 116 (cros_repair.ACPowerVerifier, 'power', ('ssh', )), 117 (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )), 118 (cros_repair.WritableVerifier, 'writable', ('ssh', )), 119 (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )), 120 (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )), 121 (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )), 122 (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )), 123 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )), 124 (cros_repair.PythonVerifier, 'python', ('ssh', )), 125 (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )), 126 (cros_repair.CrosVerisionVerifier, 'cros_version_label', ('ssh', )), 127 (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh', )), 128 (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation', 129 ('ssh', )), 130 (cros_repair.JetstreamServicesVerifier, 'jetstream_services', 131 ('ssh', )), 132) 133 134JETSTREAM_REPAIR_ACTIONS = ( 135 (repair_utils.RPMCycleRepair, 'rpm', (), ( 136 'ping', 137 'ssh', 138 'power', 139 )), 140 (cros_repair.ServoResetRepair, 'servoreset', (), ( 141 'ping', 142 'ssh', 143 )), 144 (cros_repair.ServoCr50RebootRepair, 'cr50_reset', (), ( 145 'ping', 146 'ssh', 147 )), 148 (cros_repair.ServoSysRqRepair, 'sysrq', (), ( 149 'ping', 150 'ssh', 151 )), 152 (cros_repair.LabelCleanupRepair, 'label_cleanup', ('ssh', ), 153 ('cros_version_label', )), 154 (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (), 155 ('ping', 'ssh', 'fwstatus', 'good_provision')), 156 (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ), 157 ('dev_default_boot', )), 158 (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), ( 159 'devmode', 160 'writable', 161 )), 162 (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ), 163 ('enrollment_state', )), 164 (cros_repair.JetstreamTpmRepair, 'jetstream_tpm_repair', 165 ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4'), 166 ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 167 'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation')), 168 (cros_repair.JetstreamServiceRepair, 'jetstream_service_repair', 169 ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4', 170 'jetstream_tpm', 'jetstream_attestation'), 171 ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 172 'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation', 173 'jetstream_services')), 174 (cros_repair.ProvisionRepair, 'provision', 175 ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 176 'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 177 'dev_default_boot', 'jetstream_tpm', 178 'jetstream_attestation', 'jetstream_services')), 179 (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable'), 180 ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus', 181 'python', 'hwid', 'cros', 'dev_default_boot', 'jetstream_tpm', 182 'jetstream_attestation', 'jetstream_services')), 183 (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ), ( 184 'ping', 185 'ssh', 186 'writable', 187 'tpm', 188 'good_provision', 189 'ext4', 190 'power', 191 'rwfw', 192 'fwstatus', 193 'python', 194 'hwid', 195 'cros', 196 'dev_default_boot', 197 'jetstream_tpm', 198 'jetstream_attestation', 199 'jetstream_services', 200 'faft_tpm', 201 )), 202) 203 204CRYPTOHOME_STATUS_OWNED = """{ 205 "installattrs": { 206 "first_install": true, 207 "initialized": true, 208 "invalid": false, 209 "lockbox_index": 536870916, 210 "lockbox_nvram_version": 2, 211 "secure": true, 212 "size": 0, 213 "version": 1 214 }, 215 "mounts": [ ], 216 "tpm": { 217 "being_owned": false, 218 "can_connect": true, 219 "can_decrypt": false, 220 "can_encrypt": false, 221 "can_load_srk": true, 222 "can_load_srk_pubkey": true, 223 "enabled": true, 224 "has_context": true, 225 "has_cryptohome_key": false, 226 "has_key_handle": false, 227 "last_error": 0, 228 "owned": true 229 } 230} 231""" 232 233CRYPTOHOME_STATUS_NOT_OWNED = """{ 234 "installattrs": { 235 "first_install": true, 236 "initialized": true, 237 "invalid": false, 238 "lockbox_index": 536870916, 239 "lockbox_nvram_version": 2, 240 "secure": true, 241 "size": 0, 242 "version": 1 243 }, 244 "mounts": [ ], 245 "tpm": { 246 "being_owned": false, 247 "can_connect": true, 248 "can_decrypt": false, 249 "can_encrypt": false, 250 "can_load_srk": false, 251 "can_load_srk_pubkey": false, 252 "enabled": true, 253 "has_context": true, 254 "has_cryptohome_key": false, 255 "has_key_handle": false, 256 "last_error": 0, 257 "owned": false 258 } 259} 260""" 261 262CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{ 263 "installattrs": { 264 "first_install": true, 265 "initialized": true, 266 "invalid": false, 267 "lockbox_index": 536870916, 268 "lockbox_nvram_version": 2, 269 "secure": true, 270 "size": 0, 271 "version": 1 272 }, 273 "mounts": [ ], 274 "tpm": { 275 "being_owned": false, 276 "can_connect": true, 277 "can_decrypt": false, 278 "can_encrypt": false, 279 "can_load_srk": false, 280 "can_load_srk_pubkey": false, 281 "enabled": true, 282 "has_context": true, 283 "has_cryptohome_key": false, 284 "has_key_handle": false, 285 "last_error": 0, 286 "owned": true 287 } 288} 289""" 290 291TPM_STATUS_READY = """ 292TPM Enabled: true 293TPM Owned: true 294TPM Being Owned: false 295TPM Ready: true 296TPM Password: 9eaee4da8b4c 297""" 298 299TPM_STATUS_NOT_READY = """ 300TPM Enabled: true 301TPM Owned: false 302TPM Being Owned: true 303TPM Ready: false 304TPM Password: 305""" 306 307 308class CrosRepairUnittests(unittest.TestCase): 309 # pylint: disable=missing-docstring 310 311 maxDiff = None 312 313 def test_cros_repair(self): 314 verify_dag = cros_repair._cros_verify_dag() 315 self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG) 316 self.check_verify_dag(verify_dag) 317 repair_actions = cros_repair._cros_repair_actions() 318 self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS) 319 self.check_repair_actions(verify_dag, repair_actions) 320 321 def test_moblab_repair(self): 322 verify_dag = cros_repair._moblab_verify_dag() 323 self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG) 324 self.check_verify_dag(verify_dag) 325 repair_actions = cros_repair._moblab_repair_actions() 326 self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS) 327 self.check_repair_actions(verify_dag, repair_actions) 328 329 def test_jetstream_repair(self): 330 verify_dag = cros_repair._jetstream_verify_dag() 331 self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG) 332 self.check_verify_dag(verify_dag) 333 repair_actions = cros_repair._jetstream_repair_actions() 334 self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS) 335 self.check_repair_actions(verify_dag, repair_actions) 336 337 def check_verify_dag(self, verify_dag): 338 """Checks that dependency labels are defined.""" 339 labels = [n[1] for n in verify_dag] 340 for node in verify_dag: 341 for dep in node[2]: 342 self.assertIn(dep, labels) 343 344 def check_repair_actions(self, verify_dag, repair_actions): 345 """Checks that dependency and trigger labels are defined.""" 346 verify_labels = [n[1] for n in verify_dag] 347 for action in repair_actions: 348 deps = action[2] 349 triggers = action[3] 350 for label in deps + triggers: 351 self.assertIn(label, verify_labels) 352 353 def test_get_cryptohome_status_owned(self): 354 mock_host = mock.Mock() 355 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED 356 status = cros_repair.CryptohomeStatus(mock_host) 357 self.assertDictEqual({ 358 'being_owned': False, 359 'can_connect': True, 360 'can_decrypt': False, 361 'can_encrypt': False, 362 'can_load_srk': True, 363 'can_load_srk_pubkey': True, 364 'enabled': True, 365 'has_context': True, 366 'has_cryptohome_key': False, 367 'has_key_handle': False, 368 'last_error': 0, 369 'owned': True, 370 }, status['tpm']) 371 self.assertTrue(status.tpm_enabled) 372 self.assertTrue(status.tpm_owned) 373 self.assertTrue(status.tpm_can_load_srk) 374 self.assertTrue(status.tpm_can_load_srk_pubkey) 375 376 def test_get_cryptohome_status_not_owned(self): 377 mock_host = mock.Mock() 378 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 379 status = cros_repair.CryptohomeStatus(mock_host) 380 self.assertDictEqual({ 381 'being_owned': False, 382 'can_connect': True, 383 'can_decrypt': False, 384 'can_encrypt': False, 385 'can_load_srk': False, 386 'can_load_srk_pubkey': False, 387 'enabled': True, 388 'has_context': True, 389 'has_cryptohome_key': False, 390 'has_key_handle': False, 391 'last_error': 0, 392 'owned': False, 393 }, status['tpm']) 394 self.assertTrue(status.tpm_enabled) 395 self.assertFalse(status.tpm_owned) 396 self.assertFalse(status.tpm_can_load_srk) 397 self.assertFalse(status.tpm_can_load_srk_pubkey) 398 399 @mock.patch.object(cros_repair, '_is_virtual_machine') 400 def test_tpm_status_verifier_owned(self, mock_is_virt): 401 mock_is_virt.return_value = False 402 mock_host = mock.Mock() 403 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED 404 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 405 tpm_verifier.verify(mock_host) 406 407 @mock.patch.object(cros_repair, '_is_virtual_machine') 408 def test_tpm_status_verifier_not_owned(self, mock_is_virt): 409 mock_is_virt.return_value = False 410 mock_host = mock.Mock() 411 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 412 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 413 tpm_verifier.verify(mock_host) 414 415 @mock.patch.object(cros_repair, '_is_virtual_machine') 416 def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt): 417 mock_is_virt.return_value = False 418 mock_host = mock.Mock() 419 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK 420 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 421 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 422 tpm_verifier.verify(mock_host) 423 self.assertEqual('Cannot load the TPM SRK', 424 ctx.exception.message) 425 426 def test_jetstream_tpm_owned(self): 427 mock_host = mock.Mock() 428 mock_host.run.side_effect = [ 429 mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), 430 mock.Mock(stdout=TPM_STATUS_READY), 431 ] 432 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 433 tpm_verifier.verify(mock_host) 434 435 @mock.patch.object(retry.logging, 'warning') 436 @mock.patch.object(retry.time, 'time') 437 @mock.patch.object(retry.time, 'sleep') 438 def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging): 439 mock_time.side_effect = itertools.count(0, 20) 440 mock_host = mock.Mock() 441 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 442 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 443 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 444 tpm_verifier.verify(mock_host) 445 self.assertEqual('TPM is not owned', ctx.exception.message) 446 447 @mock.patch.object(retry.logging, 'warning') 448 @mock.patch.object(retry.time, 'time') 449 @mock.patch.object(retry.time, 'sleep') 450 def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging): 451 mock_time.side_effect = itertools.count(0, 20) 452 mock_host = mock.Mock() 453 mock_host.run.side_effect = itertools.cycle([ 454 mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), 455 mock.Mock(stdout=TPM_STATUS_NOT_READY), 456 ]) 457 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 458 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 459 tpm_verifier.verify(mock_host) 460 self.assertEqual('TPM is not ready', ctx.exception.message) 461 462 @mock.patch.object(retry.logging, 'warning') 463 @mock.patch.object(retry.time, 'time') 464 @mock.patch.object(retry.time, 'sleep') 465 def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time, 466 mock_logging): 467 mock_time.side_effect = itertools.count(0, 20) 468 mock_host = mock.Mock() 469 mock_host.run.side_effect = error.AutoservRunError('test', None) 470 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 471 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 472 tpm_verifier.verify(mock_host) 473 self.assertEqual('Could not determine TPM status', 474 ctx.exception.message) 475 476 477if __name__ == '__main__': 478 unittest.main() 479