1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus, gobject, os, sys 6 7import common 8from autotest_lib.client.common_lib import error 9from autotest_lib.client.common_lib.cros import session_manager 10from autotest_lib.client.cros import ownership 11 12"""Utility class for tests that generate, push and fetch policies. 13 14As the python bindings for the protobufs used in policies are built as a part 15of tests that use them, callers must pass in their location at call time.""" 16 17 18def install_protobufs(autodir, job): 19 """Installs policy protobuf dependencies and set import path. 20 21 After calling this, you can simply import any policy pb2.py file directly, 22 e.g. import chrome_device_policy_pb2. 23 24 @param autodir: Autotest directory (usually the caller's self.autodir). 25 @param job: Job instance (usually the caller's self.job). 26 """ 27 # TODO(crbug.com/807950): Change the installation process so that policy 28 # proto imports can be moved to the top. 29 dep = 'policy_protos' 30 dep_dir = os.path.join(autodir, 'deps', dep) 31 job.install_pkg(dep, 'dep', dep_dir) 32 sys.path.append(dep_dir) 33 34 35def compare_policy_response(policy_response, owner=None, guests=None, 36 new_users=None, roaming=None, whitelist=None): 37 """Check the contents of |policy_response| against given args. 38 39 Deserializes |policy_response| into a PolicyFetchResponse protobuf, 40 with an embedded (serialized) PolicyData protobuf that embeds a 41 (serialized) ChromeDeviceSettingsProto, and checks to see if this 42 protobuf turducken contains the information passed in. 43 44 @param policy_response: string serialization of a PolicyData protobuf. 45 @param owner: string representing the owner's name/account. 46 @param guests: boolean indicating whether guests should be allowed. 47 @param new_users: boolean indicating if user pods are on login screen. 48 @param roaming: boolean indicating whether data roaming is enabled. 49 @param whitelist: list of accounts that are allowed to log in. 50 51 @return True if |policy_response| has all the provided data, else False. 52 """ 53 import chrome_device_policy_pb2 54 import device_management_backend_pb2 55 56 response_proto = device_management_backend_pb2.PolicyFetchResponse() 57 response_proto.ParseFromString(policy_response) 58 ownership.assert_has_policy_data(response_proto) 59 60 data_proto = device_management_backend_pb2.PolicyData() 61 data_proto.ParseFromString(response_proto.policy_data) 62 ownership.assert_has_device_settings(data_proto) 63 if owner: ownership.assert_username(data_proto, owner) 64 65 settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto() 66 settings.ParseFromString(data_proto.policy_value) 67 if guests: ownership.assert_guest_setting(settings, guests) 68 if new_users: ownership.assert_show_users(settings, new_users) 69 if roaming: ownership.assert_roaming(settings, roaming) 70 if whitelist: 71 ownership.assert_new_users(settings, False) 72 ownership.assert_users_on_whitelist(settings, whitelist) 73 74 75def build_policy_data(owner=None, guests=None, new_users=None, roaming=None, 76 whitelist=None): 77 """Generate and serialize a populated device policy protobuffer. 78 79 Creates a PolicyData protobuf, with an embedded 80 ChromeDeviceSettingsProto, containing the information passed in. 81 82 @param owner: string representing the owner's name/account. 83 @param guests: boolean indicating whether guests should be allowed. 84 @param new_users: boolean indicating if user pods are on login screen. 85 @param roaming: boolean indicating whether data roaming is enabled. 86 @param whitelist: list of accounts that are allowed to log in. 87 88 @return serialization of the PolicyData proto that we build. 89 """ 90 import chrome_device_policy_pb2 91 import device_management_backend_pb2 92 93 data_proto = device_management_backend_pb2.PolicyData() 94 data_proto.policy_type = ownership.POLICY_TYPE 95 if owner: data_proto.username = owner 96 97 settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto() 98 if guests: 99 settings.guest_mode_enabled.guest_mode_enabled = guests 100 if new_users: 101 settings.show_user_names.show_user_names = new_users 102 if roaming: 103 settings.data_roaming_enabled.data_roaming_enabled = roaming 104 if whitelist: 105 settings.allow_new_users.allow_new_users = False 106 for user in whitelist: 107 settings.user_whitelist.user_whitelist.append(user) 108 109 data_proto.policy_value = settings.SerializeToString() 110 return data_proto.SerializeToString() 111 112 113def generate_policy(key, pubkey, policy, old_key=None): 114 """Generate and serialize a populated, signed device policy protobuffer. 115 116 Creates a protobuf containing the device policy |policy|, signed with 117 |key|. Also includes the public key |pubkey|, signed with |old_key| 118 if provided. If not, |pubkey| is signed with |key|. The protobuf 119 is serialized to a string and returned. 120 121 @param key: new policy signing key. 122 @param pubkey: new public key to be signed and embedded in generated 123 PolicyFetchResponse. 124 @param policy: policy data to be embedded in generated PolicyFetchResponse. 125 @param old_key: if provided, this implies the generated PolicyFetchRespone 126 is intended to represent a key rotation. pubkey will be 127 signed with this key before embedding. 128 129 @return serialization of the PolicyFetchResponse proto that we build. 130 """ 131 import device_management_backend_pb2 132 133 if old_key == None: 134 old_key = key 135 policy_proto = device_management_backend_pb2.PolicyFetchResponse() 136 policy_proto.policy_data = policy 137 policy_proto.policy_data_signature = ownership.sign(key, policy) 138 policy_proto.new_public_key = pubkey 139 policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey) 140 return policy_proto.SerializeToString() 141 142 143def push_policy_and_verify(policy_string, sm): 144 """Push a device policy to the session manager over DBus. 145 146 The serialized device policy |policy_string| is sent to the session 147 manager with the StorePolicyEx DBus call. Success of the store is 148 validated by fetching the policy again and comparing. 149 150 @param policy_string: serialized policy to push to the session manager. 151 @param sm: a connected SessionManagerInterface. 152 153 @raises error.TestFail if policy push failed. 154 """ 155 listener = session_manager.OwnershipSignalListener(gobject.MainLoop()) 156 listener.listen_for_new_policy() 157 descriptor = session_manager.make_device_policy_descriptor() 158 sm.StorePolicyEx(descriptor, 159 dbus.ByteArray(policy_string), byte_arrays=True) 160 listener.wait_for_signals(desc='Policy push.') 161 162 retrieved_policy = sm.RetrievePolicyEx(descriptor, byte_arrays=True) 163 if retrieved_policy != policy_string: 164 raise error.TestFail('Policy should not be %s' % retrieved_policy) 165 166 167def get_policy(sm): 168 """Get a device policy from the session manager over DBus. 169 170 Provided mainly for symmetry with push_policy_and_verify(). 171 172 @param sm: a connected SessionManagerInterface. 173 174 @return Serialized PolicyFetchResponse. 175 """ 176 return sm.RetrievePolicyEx(session_manager.make_device_policy_descriptor(), 177 byte_arrays=True) 178