1#!/usr/bin/python2 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import itertools 7import mock 8import unittest 9 10import common 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.common_lib import hosts 13from autotest_lib.client.common_lib.cros import retry 14from autotest_lib.server.hosts import cros_firmware 15from autotest_lib.server.hosts import cros_repair 16from autotest_lib.server.hosts import repair_utils 17 18 19CROS_VERIFY_DAG = ( 20 (repair_utils.SshVerifier, 'ssh', ()), 21 (cros_repair.ServoTypeVerifier, 'servo_type', ()), 22 (cros_repair.DevModeVerifier, 'devmode', ('ssh',)), 23 (cros_repair.HWIDVerifier, 'hwid', ('ssh',)), 24 (cros_repair.ACPowerVerifier, 'power', ('ssh',)), 25 (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)), 26 (cros_repair.WritableVerifier, 'writable', ('ssh',)), 27 (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)), 28 (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)), 29 (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)), 30 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)), 31 (cros_repair.PythonVerifier, 'python', ('ssh',)), 32 (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), 33 (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)), 34 (cros_repair.StopStartUIVerifier, 'stop_start_ui', ('ssh',)), 35) 36 37CROS_REPAIR_ACTIONS = ( 38 (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), 39 (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)), 40 (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)), 41 (cros_firmware.FirmwareRepair, 42 'firmware', (), ('ssh', 'fwstatus', 'good_au')), 43 (cros_repair.CrosRebootRepair, 44 'reboot', ('ssh',), ('devmode', 'writable',)), 45 (cros_repair.ColdRebootRepair, 46 'coldboot', ('ssh',), ('ec_reset',)), 47 (cros_repair.AutoUpdateRepair, 48 'au', 49 ('ssh', 'writable', 'stop_start_ui', 'tpm', 'good_au', 'ext4'), 50 ('power', 'rwfw', 'python', 'cros', 'ec_reset')), 51 (cros_repair.PowerWashRepair, 52 'powerwash', 53 ('ssh', 'writable', 'stop_start_ui'), 54 ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros', 'ec_reset')), 55 (cros_repair.ServoInstallRepair, 56 'usb', 57 (), 58 ('ssh', 'writable', 'stop_start_ui', 'tpm', 'good_au', 'ext4', 'power', 59 'rwfw', 'python', 'cros', 'ec_reset')), 60) 61 62MOBLAB_VERIFY_DAG = ( 63 (repair_utils.SshVerifier, 'ssh', ()), 64 (cros_repair.ACPowerVerifier, 'power', ('ssh',)), 65 (cros_repair.PythonVerifier, 'python', ('ssh',)), 66 (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), 67) 68 69MOBLAB_REPAIR_ACTIONS = ( 70 (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), 71 (cros_repair.AutoUpdateRepair, 72 'au', ('ssh',), ('power', 'python', 'cros',)), 73) 74 75JETSTREAM_VERIFY_DAG = ( 76 (repair_utils.SshVerifier, 'ssh', ()), 77 (cros_repair.ServoTypeVerifier, 'servo_type', ()), 78 (cros_repair.DevModeVerifier, 'devmode', ('ssh',)), 79 (cros_repair.HWIDVerifier, 'hwid', ('ssh',)), 80 (cros_repair.ACPowerVerifier, 'power', ('ssh',)), 81 (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)), 82 (cros_repair.WritableVerifier, 'writable', ('ssh',)), 83 (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)), 84 (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)), 85 (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)), 86 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)), 87 (cros_repair.PythonVerifier, 'python', ('ssh',)), 88 (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), 89 (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)), 90 (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh',)), 91 (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation', 92 ('ssh',)), 93 (cros_repair.JetstreamServicesVerifier, 'jetstream_services', ('ssh',)), 94) 95 96JETSTREAM_REPAIR_ACTIONS = ( 97 (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), 98 (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)), 99 (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)), 100 (cros_firmware.FirmwareRepair, 101 'firmware', (), ('ssh', 'fwstatus', 'good_au')), 102 (cros_repair.CrosRebootRepair, 103 'reboot', ('ssh',), ('devmode', 'writable',)), 104 (cros_repair.ColdRebootRepair, 105 'coldboot', ('ssh',), ('ec_reset',)), 106 (cros_repair.JetstreamTpmRepair, 107 'jetstream_tpm_repair', 108 ('ssh', 'writable', 'tpm', 'good_au', 'ext4'), 109 ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 110 'jetstream_attestation')), 111 (cros_repair.JetstreamServiceRepair, 112 'jetstream_service_repair', 113 ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'jetstream_tpm', 114 'jetstream_attestation'), 115 ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 116 'jetstream_attestation', 'jetstream_services')), 117 (cros_repair.AutoUpdateRepair, 118 'au', 119 ('ssh', 'writable', 'tpm', 'good_au', 'ext4'), 120 ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm', 121 'jetstream_attestation', 'jetstream_services')), 122 (cros_repair.PowerWashRepair, 123 'powerwash', 124 ('ssh', 'writable'), 125 ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros', 126 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')), 127 (cros_repair.ServoInstallRepair, 128 'usb', 129 (), 130 ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 131 'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')), 132) 133 134CRYPTOHOME_STATUS_OWNED = """{ 135 "installattrs": { 136 "first_install": true, 137 "initialized": true, 138 "invalid": false, 139 "lockbox_index": 536870916, 140 "lockbox_nvram_version": 2, 141 "secure": true, 142 "size": 0, 143 "version": 1 144 }, 145 "mounts": [ ], 146 "tpm": { 147 "being_owned": false, 148 "can_connect": true, 149 "can_decrypt": false, 150 "can_encrypt": false, 151 "can_load_srk": true, 152 "can_load_srk_pubkey": true, 153 "enabled": true, 154 "has_context": true, 155 "has_cryptohome_key": false, 156 "has_key_handle": false, 157 "last_error": 0, 158 "owned": true 159 } 160} 161""" 162 163CRYPTOHOME_STATUS_NOT_OWNED = """{ 164 "installattrs": { 165 "first_install": true, 166 "initialized": true, 167 "invalid": false, 168 "lockbox_index": 536870916, 169 "lockbox_nvram_version": 2, 170 "secure": true, 171 "size": 0, 172 "version": 1 173 }, 174 "mounts": [ ], 175 "tpm": { 176 "being_owned": false, 177 "can_connect": true, 178 "can_decrypt": false, 179 "can_encrypt": false, 180 "can_load_srk": false, 181 "can_load_srk_pubkey": false, 182 "enabled": true, 183 "has_context": true, 184 "has_cryptohome_key": false, 185 "has_key_handle": false, 186 "last_error": 0, 187 "owned": false 188 } 189} 190""" 191 192CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{ 193 "installattrs": { 194 "first_install": true, 195 "initialized": true, 196 "invalid": false, 197 "lockbox_index": 536870916, 198 "lockbox_nvram_version": 2, 199 "secure": true, 200 "size": 0, 201 "version": 1 202 }, 203 "mounts": [ ], 204 "tpm": { 205 "being_owned": false, 206 "can_connect": true, 207 "can_decrypt": false, 208 "can_encrypt": false, 209 "can_load_srk": false, 210 "can_load_srk_pubkey": false, 211 "enabled": true, 212 "has_context": true, 213 "has_cryptohome_key": false, 214 "has_key_handle": false, 215 "last_error": 0, 216 "owned": true 217 } 218} 219""" 220 221TPM_STATUS_READY = """ 222TPM Enabled: true 223TPM Owned: true 224TPM Being Owned: false 225TPM Ready: true 226TPM Password: 9eaee4da8b4c 227""" 228 229TPM_STATUS_NOT_READY = """ 230TPM Enabled: true 231TPM Owned: false 232TPM Being Owned: true 233TPM Ready: false 234TPM Password: 235""" 236 237 238class CrosRepairUnittests(unittest.TestCase): 239 # pylint: disable=missing-docstring 240 241 maxDiff = None 242 243 def test_cros_repair(self): 244 verify_dag = cros_repair._cros_verify_dag() 245 self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG) 246 self.check_verify_dag(verify_dag) 247 repair_actions = cros_repair._cros_repair_actions() 248 self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS) 249 self.check_repair_actions(verify_dag, repair_actions) 250 251 def test_moblab_repair(self): 252 verify_dag = cros_repair._moblab_verify_dag() 253 self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG) 254 self.check_verify_dag(verify_dag) 255 repair_actions = cros_repair._moblab_repair_actions() 256 self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS) 257 self.check_repair_actions(verify_dag, repair_actions) 258 259 def test_jetstream_repair(self): 260 verify_dag = cros_repair._jetstream_verify_dag() 261 self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG) 262 self.check_verify_dag(verify_dag) 263 repair_actions = cros_repair._jetstream_repair_actions() 264 self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS) 265 self.check_repair_actions(verify_dag, repair_actions) 266 267 def check_verify_dag(self, verify_dag): 268 """Checks that dependency labels are defined.""" 269 labels = [n[1] for n in verify_dag] 270 for node in verify_dag: 271 for dep in node[2]: 272 self.assertIn(dep, labels) 273 274 def check_repair_actions(self, verify_dag, repair_actions): 275 """Checks that dependency and trigger labels are defined.""" 276 verify_labels = [n[1] for n in verify_dag] 277 for action in repair_actions: 278 deps = action[2] 279 triggers = action[3] 280 for label in deps + triggers: 281 self.assertIn(label, verify_labels) 282 283 def test_get_cryptohome_status_owned(self): 284 mock_host = mock.Mock() 285 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED 286 status = cros_repair.CryptohomeStatus(mock_host) 287 self.assertDictEqual({ 288 'being_owned': False, 289 'can_connect': True, 290 'can_decrypt': False, 291 'can_encrypt': False, 292 'can_load_srk': True, 293 'can_load_srk_pubkey': True, 294 'enabled': True, 295 'has_context': True, 296 'has_cryptohome_key': False, 297 'has_key_handle': False, 298 'last_error': 0, 299 'owned': True, 300 }, status['tpm']) 301 self.assertTrue(status.tpm_enabled) 302 self.assertTrue(status.tpm_owned) 303 self.assertTrue(status.tpm_can_load_srk) 304 self.assertTrue(status.tpm_can_load_srk_pubkey) 305 306 def test_get_cryptohome_status_not_owned(self): 307 mock_host = mock.Mock() 308 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 309 status = cros_repair.CryptohomeStatus(mock_host) 310 self.assertDictEqual({ 311 'being_owned': False, 312 'can_connect': True, 313 'can_decrypt': False, 314 'can_encrypt': False, 315 'can_load_srk': False, 316 'can_load_srk_pubkey': False, 317 'enabled': True, 318 'has_context': True, 319 'has_cryptohome_key': False, 320 'has_key_handle': False, 321 'last_error': 0, 322 'owned': False, 323 }, status['tpm']) 324 self.assertTrue(status.tpm_enabled) 325 self.assertFalse(status.tpm_owned) 326 self.assertFalse(status.tpm_can_load_srk) 327 self.assertFalse(status.tpm_can_load_srk_pubkey) 328 329 @mock.patch.object(cros_repair, '_is_virtual_machine') 330 def test_tpm_status_verifier_owned(self, mock_is_virt): 331 mock_is_virt.return_value = False 332 mock_host = mock.Mock() 333 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED 334 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 335 tpm_verifier.verify(mock_host) 336 337 @mock.patch.object(cros_repair, '_is_virtual_machine') 338 def test_tpm_status_verifier_not_owned(self, mock_is_virt): 339 mock_is_virt.return_value = False 340 mock_host = mock.Mock() 341 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 342 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 343 tpm_verifier.verify(mock_host) 344 345 @mock.patch.object(cros_repair, '_is_virtual_machine') 346 def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt): 347 mock_is_virt.return_value = False 348 mock_host = mock.Mock() 349 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK 350 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 351 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 352 tpm_verifier.verify(mock_host) 353 self.assertEqual('Cannot load the TPM SRK', 354 ctx.exception.message) 355 356 def test_jetstream_tpm_owned(self): 357 mock_host = mock.Mock() 358 mock_host.run.side_effect = [ 359 mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), 360 mock.Mock(stdout=TPM_STATUS_READY), 361 ] 362 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 363 tpm_verifier.verify(mock_host) 364 365 @mock.patch.object(retry.logging, 'warning') 366 @mock.patch.object(retry.time, 'time') 367 @mock.patch.object(retry.time, 'sleep') 368 def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging): 369 mock_time.side_effect = itertools.count(0, 20) 370 mock_host = mock.Mock() 371 mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED 372 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 373 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 374 tpm_verifier.verify(mock_host) 375 self.assertEqual('TPM is not owned', ctx.exception.message) 376 377 @mock.patch.object(retry.logging, 'warning') 378 @mock.patch.object(retry.time, 'time') 379 @mock.patch.object(retry.time, 'sleep') 380 def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging): 381 mock_time.side_effect = itertools.count(0, 20) 382 mock_host = mock.Mock() 383 mock_host.run.side_effect = itertools.cycle([ 384 mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), 385 mock.Mock(stdout=TPM_STATUS_NOT_READY), 386 ]) 387 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 388 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 389 tpm_verifier.verify(mock_host) 390 self.assertEqual('TPM is not ready', ctx.exception.message) 391 392 @mock.patch.object(retry.logging, 'warning') 393 @mock.patch.object(retry.time, 'time') 394 @mock.patch.object(retry.time, 'sleep') 395 def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time, 396 mock_logging): 397 mock_time.side_effect = itertools.count(0, 20) 398 mock_host = mock.Mock() 399 mock_host.run.side_effect = error.AutoservRunError('test', None) 400 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 401 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 402 tpm_verifier.verify(mock_host) 403 self.assertEqual('Could not determine TPM status', 404 ctx.exception.message) 405 406 407if __name__ == '__main__': 408 unittest.main() 409