1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging, os, shutil, tempfile 6 7import common, constants, cryptohome 8from autotest_lib.client.bin import utils 9from autotest_lib.client.common_lib import autotemp, error 10from autotest_lib.client.cros import cros_ui 11 12 13PK12UTIL = 'pk12util' 14CERTUTIL = 'certutil' 15OPENSSLP12 = 'openssl pkcs12' 16OPENSSLX509 = 'openssl x509' 17OPENSSLRSA = 'openssl rsa' 18OPENSSLREQ = 'openssl req' 19OPENSSLCRYPTO = 'openssl sha1' 20 21TESTUSER = 'ownership_test@chromium.org' 22TESTPASS = 'testme' 23 24 25class OwnershipError(error.TestError): 26 """Generic error for ownership-related failures.""" 27 pass 28 29 30class scoped_tempfile(object): 31 """A wrapper that provides scoped semantics for temporary files. 32 33 Providing a file path causes the scoped_tempfile to take ownership of the 34 file at the provided path. The file at the path will be deleted when this 35 object goes out of scope. If no path is provided, then a temporary file 36 object will be created for the lifetime of the scoped_tempfile 37 38 autotemp.tempfile objects don't seem to play nicely with being 39 used in system commands, so they can't be used for my purposes. 40 """ 41 42 tempdir = autotemp.tempdir(unique_id='ownership') 43 44 def __init__(self, name=None): 45 self.name = name 46 if not self.name: 47 self.fo = tempfile.TemporaryFile() 48 49 50 def __del__(self): 51 if self.name: 52 if os.path.exists(self.name): 53 os.unlink(self.name) 54 else: 55 self.fo.close() # Will destroy the underlying tempfile 56 57 58def system_output_on_fail(cmd): 59 """Run a |cmd|, capturing output and logging it only on error. 60 61 @param cmd: the command to run. 62 """ 63 output = None 64 try: 65 output = utils.system_output(cmd) 66 except: 67 logging.error(output) 68 raise 69 70 71def __unlink(filename): 72 """unlink a file, but log OSError and IOError instead of raising. 73 74 This allows unlinking files that don't exist safely. 75 76 @param filename: the file to attempt to unlink. 77 """ 78 try: 79 os.unlink(filename) 80 except (IOError, OSError) as error: 81 logging.info(error) 82 83 84def restart_ui_to_clear_ownership_files(): 85 """Remove on-disk state related to device ownership. 86 87 The UI must be stopped while we do this, or the session_manager will 88 write the policy and key files out again. 89 """ 90 cros_ui.stop(allow_fail=not cros_ui.is_up()) 91 clear_ownership_files_no_restart() 92 cros_ui.start() 93 94 95def clear_ownership_files_no_restart(): 96 """Remove on-disk state related to device ownership. 97 98 The UI must be stopped while we do this, or the session_manager will 99 write the policy and key files out again. 100 """ 101 if cros_ui.is_up(): 102 raise error.TestError("Tried to clear ownership with UI running.") 103 __unlink(constants.OWNER_KEY_FILE) 104 __unlink(constants.SIGNED_POLICY_FILE) 105 __unlink(os.path.join(constants.USER_DATA_DIR, 'Local State')) 106 107 108def fake_ownership(): 109 """Fake ownership by generating the necessary magic files.""" 110 # Determine the module directory. 111 dirname = os.path.dirname(__file__) 112 mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT) 113 mock_signedpolicyfile = os.path.join(dirname, 114 constants.MOCK_OWNER_POLICY) 115 utils.open_write_close(constants.OWNER_KEY_FILE, 116 cert_extract_pubkey_der(mock_certfile)) 117 shutil.copy(mock_signedpolicyfile, 118 constants.SIGNED_POLICY_FILE) 119 120 121POLICY_TYPE = 'google/chromeos/device' 122 123 124def assert_has_policy_data(response_proto): 125 """Assert that given protobuf has a policy_data field. 126 127 @param response_proto: a PolicyFetchResponse protobuf. 128 @raises OwnershipError on failure. 129 """ 130 if not response_proto.HasField("policy_data"): 131 raise OwnershipError('Malformatted response.') 132 133 134def assert_has_device_settings(data_proto): 135 """Assert that given protobuf is a policy with device settings in it. 136 137 @param data_proto: a PolicyData protobuf. 138 @raises OwnershipError if this isn't CrOS policy, or has no settings inside. 139 """ 140 if (not data_proto.HasField("policy_type") or 141 data_proto.policy_type != POLICY_TYPE or 142 not data_proto.HasField("policy_value")): 143 raise OwnershipError('Malformatted response.') 144 145 146def assert_username(data_proto, username): 147 """Assert that given protobuf is a policy associated with the given user. 148 149 @param data_proto: a PolicyData protobuf. 150 @param username: the username to check for 151 @raises OwnershipError if data_proto isn't associated with username 152 """ 153 if data_proto.username != username: 154 raise OwnershipError('Incorrect username.') 155 156 157def assert_guest_setting(settings, guests): 158 """Assert that given protobuf has given guest-related settings. 159 160 @param settings: a ChromeDeviceSettingsProto protobuf. 161 @param guests: boolean indicating whether guests are allowed to sign in. 162 @raises OwnershipError if settings doesn't enforce the provided setting. 163 """ 164 if not settings.HasField("guest_mode_enabled"): 165 raise OwnershipError('No guest mode setting protobuf.') 166 if not settings.guest_mode_enabled.HasField("guest_mode_enabled"): 167 raise OwnershipError('No guest mode setting.') 168 if settings.guest_mode_enabled.guest_mode_enabled != guests: 169 raise OwnershipError('Incorrect guest mode setting.') 170 171 172def assert_show_users(settings, show_users): 173 """Assert that given protobuf has given user-avatar-showing settings. 174 175 @param settings: a ChromeDeviceSettingsProto protobuf. 176 @param show_users: boolean indicating whether avatars are shown on sign in. 177 @raises OwnershipError if settings doesn't enforce the provided setting. 178 """ 179 if not settings.HasField("show_user_names"): 180 raise OwnershipError('No show users setting protobuf.') 181 if not settings.show_user_names.HasField("show_user_names"): 182 raise OwnershipError('No show users setting.') 183 if settings.show_user_names.show_user_names != show_users: 184 raise OwnershipError('Incorrect show users setting.') 185 186 187def assert_roaming(settings, roaming): 188 """Assert that given protobuf has given roaming settings. 189 190 @param settings: a ChromeDeviceSettingsProto protobuf. 191 @param roaming: boolean indicating whether roaming is allowed. 192 @raises OwnershipError if settings doesn't enforce the provided setting. 193 """ 194 if not settings.HasField("data_roaming_enabled"): 195 raise OwnershipError('No roaming setting protobuf.') 196 if not settings.data_roaming_enabled.HasField("data_roaming_enabled"): 197 raise OwnershipError('No roaming setting.') 198 if settings.data_roaming_enabled.data_roaming_enabled != roaming: 199 raise OwnershipError('Incorrect roaming setting.') 200 201 202def assert_new_users(settings, new_users): 203 """Assert that given protobuf has given new user settings. 204 205 @param settings: a ChromeDeviceSettingsProto protobuf. 206 @param new_users: boolean indicating whether adding users is allowed. 207 @raises OwnershipError if settings doesn't enforce the provided setting. 208 """ 209 if not settings.HasField("allow_new_users"): 210 raise OwnershipError('No allow new users setting protobuf.') 211 if not settings.allow_new_users.HasField("allow_new_users"): 212 raise OwnershipError('No allow new users setting.') 213 if settings.allow_new_users.allow_new_users != new_users: 214 raise OwnershipError('Incorrect allow new users setting.') 215 216 217def __user_nssdb(user): 218 """Returns the path to the NSSDB for the provided user. 219 220 @param user: the user whose NSSDB the caller wants. 221 @return: absolute path to user's NSSDB. 222 """ 223 return os.path.join(cryptohome.user_path(user), '.pki', 'nssdb') 224 225 226def use_known_ownerkeys(user): 227 """Sets the system up to use a well-known keypair for owner operations. 228 229 Assuming the appropriate cryptohome is already mounted, configures the 230 device to accept policies signed with the checked-in 'mock' owner key. 231 232 @param user: the user whose NSSDB should be populated with key material. 233 """ 234 dirname = os.path.dirname(__file__) 235 mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY) 236 mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT) 237 push_to_nss(mock_keyfile, mock_certfile, __user_nssdb(user)) 238 utils.open_write_close(constants.OWNER_KEY_FILE, 239 cert_extract_pubkey_der(mock_certfile)) 240 241 242def known_privkey(): 243 """Returns the mock owner private key in PEM format. 244 245 @return: mock owner private key in PEM format. 246 """ 247 dirname = os.path.dirname(__file__) 248 return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY)) 249 250 251def known_pubkey(): 252 """Returns the mock owner public key in DER format. 253 254 @return: mock owner public key in DER format. 255 """ 256 dirname = os.path.dirname(__file__) 257 return cert_extract_pubkey_der(os.path.join(dirname, 258 constants.MOCK_OWNER_CERT)) 259 260 261def pairgen(): 262 """Generate a self-signed cert and associated private key. 263 264 Generates a self-signed X509 certificate and the associated private key. 265 The key is 2048 bits. The generated material is stored in PEM format 266 and the paths to the two files are returned. 267 268 The caller is responsible for cleaning up these files. 269 270 @return: (/path/to/private_key, /path/to/self-signed_cert) 271 """ 272 keyfile = scoped_tempfile.tempdir.name + '/private.key' 273 certfile = scoped_tempfile.tempdir.name + '/cert.pem' 274 cmd = '%s -x509 -subj %s -newkey rsa:2048 -nodes -keyout %s -out %s' % ( 275 OPENSSLREQ, '/CN=me', keyfile, certfile) 276 system_output_on_fail(cmd) 277 return (keyfile, certfile) 278 279 280def pairgen_as_data(): 281 """Generates keypair, returns keys as data. 282 283 Generates a fresh owner keypair and then passes back the 284 PEM-encoded private key and the DER-encoded public key. 285 286 @return: (PEM-encoded private key, DER-encoded public key) 287 """ 288 (keypath, certpath) = pairgen() 289 keyfile = scoped_tempfile(keypath) 290 certfile = scoped_tempfile(certpath) 291 return (utils.read_file(keyfile.name), 292 cert_extract_pubkey_der(certfile.name)) 293 294 295def push_to_nss(keyfile, certfile, nssdb): 296 """Takes a pre-generated key pair and pushes them to an NSS DB. 297 298 Given paths to a private key and cert in PEM format, stores the pair 299 in the provided nssdb. 300 301 @param keyfile: path to PEM-formatted private key file. 302 @param certfile: path to PEM-formatted cert file for associated public key. 303 @param nssdb: path to NSSDB to be populated with the provided keys. 304 """ 305 for_push = scoped_tempfile(scoped_tempfile.tempdir.name + '/for_push.p12') 306 cmd = '%s -export -in %s -inkey %s -out %s ' % ( 307 OPENSSLP12, certfile, keyfile, for_push.name) 308 cmd += '-passin pass: -passout pass:' 309 system_output_on_fail(cmd) 310 cmd = '%s -d "sql:%s" -i %s -W ""' % (PK12UTIL, 311 nssdb, 312 for_push.name) 313 system_output_on_fail(cmd) 314 315 316def cert_extract_pubkey_der(pem): 317 """Given a PEM-formatted cert, extracts the public key in DER format. 318 319 Pass in an X509 certificate in PEM format, and you'll get back the 320 DER-formatted public key as a string. 321 322 @param pem: path to a PEM-formatted cert file. 323 @return: DER-encoded public key from cert, as a string. 324 """ 325 outfile = scoped_tempfile(scoped_tempfile.tempdir.name + '/pubkey.der') 326 cmd = '%s -in %s -pubkey -noout ' % (OPENSSLX509, pem) 327 cmd += '| %s -outform DER -pubin -out %s' % (OPENSSLRSA, 328 outfile.name) 329 system_output_on_fail(cmd) 330 der = utils.read_file(outfile.name) 331 return der 332 333 334def sign(pem_key, data): 335 """Signs |data| with key from |pem_key|, returns signature. 336 337 Using the PEM-formatted private key in |pem_key|, generates an 338 RSA-with-SHA1 signature over |data| and returns the signature in 339 a string. 340 341 @param pem_key: PEM-formatted private key, as a string. 342 @param data: data to be signed. 343 @return: signature as a string. 344 """ 345 sig = scoped_tempfile() 346 err = scoped_tempfile() 347 data_file = scoped_tempfile() 348 data_file.fo.write(data) 349 data_file.fo.seek(0) 350 351 pem_key_file = scoped_tempfile(scoped_tempfile.tempdir.name + '/pkey.pem') 352 utils.open_write_close(pem_key_file.name, pem_key) 353 354 cmd = '%s -sign %s' % (OPENSSLCRYPTO, pem_key_file.name) 355 try: 356 utils.run(cmd, 357 stdin=data_file.fo, 358 stdout_tee=sig.fo, 359 stderr_tee=err.fo) 360 except: 361 err.fo.seek(0) 362 logging.error(err.fo.read()) 363 raise 364 365 sig.fo.seek(0) 366 sig_data = sig.fo.read() 367 if not sig_data: 368 raise error.OwnershipError('Empty signature!') 369 return sig_data 370 371 372def get_user_policy_key_filename(username): 373 """Returns the path to the user policy key for the given username. 374 375 @param username: the user whose policy key we want the path to. 376 @return: absolute path to user's policy key file. 377 """ 378 return os.path.join(constants.USER_POLICY_DIR, 379 cryptohome.get_user_hash(username), 380 constants.USER_POLICY_KEY_FILENAME) 381