1# Copyright 2019 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import copy 6import json 7 8from autotest_lib.client.cros.enterprise.policy import Policy as Policy 9from autotest_lib.client.cros.enterprise.device_policy_lookup import DEVICE_POLICY_DICT 10 11CHROMEPOLICIES = 'chromePolicies' 12DEVICELOCALACCOUNT = 'deviceLocalAccountPolicies' 13EXTENSIONPOLICIES = 'extensionPolicies' 14 15 16class AllPolicies(object): 17 18 def __init__(self, isConfiguredPolicies=False): 19 self.extension_configured_data = {} 20 # Because Extensions have a different "displayed" value than the 21 # configured, the extension_displayed_values will be used when the 22 # policy group is "configured" to represent what the "displayed" policy 23 # values SHOULD be. 24 self.extension_displayed_values = {} 25 self.local = {} 26 self.chrome = {} 27 28 self.policy_dict = {CHROMEPOLICIES: {}, 29 EXTENSIONPOLICIES: {}, 30 DEVICELOCALACCOUNT: {}} 31 self.isConfiguredPolicies = isConfiguredPolicies 32 if self.isConfiguredPolicies: 33 self.createNewFakeDMServerJson() 34 35 def createNewFakeDMServerJson(self): 36 """Creates a fresh DM blob that will be used by the Fake DM server.""" 37 38 self._DMJSON = { 39 'managed_users': ['*'], 40 'policy_user': None, 41 'current_key_index': 0, 42 'invalidation_source': 16, 43 'invalidation_name': 'test_policy', 44 'google/chromeos/user': {'mandatory': {}, 'recommended': {}}, 45 'google/chromeos/device': {}, 46 'google/chrome/extension': {} 47 } 48 self._DM_MANDATORY = self._DMJSON['google/chromeos/user']['mandatory'] 49 self._DM_RECOMMENDED = ( 50 self._DMJSON['google/chromeos/user']['recommended']) 51 self._DM_DEVICE = self._DMJSON['google/chromeos/device'] 52 self._DM_EXTENSION = self._DMJSON['google/chrome/extension'] 53 54 def get_policy_as_dict(self, visual=False): 55 """Returns the policies as a dictionary.""" 56 self._update_policy_dict(visual) 57 return self.policy_dict 58 59 def set_extension_policy(self, policies, visual=False): 60 """ 61 Sets the extension policies 62 63 @param policies: Dict formatted as follows: 64 {'extID': {pol1: v1}, extid2: {p2:v2}} 65 66 @param visual: bool, If the extension policy provided is what should be 67 displayed via the api/chrome page. If False, then the 'policies' 68 should be the actual value that will be provided to the DMServer. 69 70 """ 71 # If the policy is configured (ie this policy group object represents) 72 # the policies being SET for testing) add the 'policy_group' value. 73 policy_group = 'extension' if self.isConfiguredPolicies else None 74 extension_policies = copy.deepcopy(policies) 75 76 for extension_ID, extension_policy in extension_policies.items(): 77 78 # Adding the extension ID key into the extension dict. 79 if visual: 80 self.extension_displayed_values[extension_ID] = {} 81 key = 'displayed_ext_values' 82 else: 83 self.extension_configured_data[extension_ID] = {} 84 key = 'ext_values' 85 self.set_policy(key, extension_policy, policy_group, extension_ID) 86 87 def set_policy(self, 88 policy_type, 89 policies, 90 group=None, 91 extension_key=None): 92 """ 93 Create and the policy object, and set it in the corresponding group. 94 95 @param policy_type: str of the policy type. Must be: 96 'chrome', 'ext_values', 'displayed_ext_values', or 'local'. 97 @param policies: dict of policy values. 98 @param group: str, group key for the Policy object setter. 99 @param extension_key: optional, key for exentsionID. 100 101 """ 102 policy_group = self._get_policy_group(policy_type, extension_key) 103 for name, value in policies.items(): 104 policy_group[name] = self._create_pol_obj(name, value, group) 105 106 def _get_policy_group(self, policy_type, extension_key=None): 107 """Simple lookup to put the policies in the correct bucket.""" 108 if policy_type == 'chrome': 109 policy_group = self.chrome 110 elif policy_type == 'ext_values': 111 policy_group = self.extension_configured_data[extension_key] 112 elif policy_type == 'displayed_ext_values': 113 policy_group = self.extension_displayed_values[extension_key] 114 elif policy_type == 'local': 115 policy_group = self.local 116 return policy_group 117 118 def updateDMJson(self): 119 """ 120 Update the ._DM_JSON with the values currently set in 121 self.chrome, self.extension_configured_data, and self.local. 122 123 """ 124 125 self._populateChromeData() 126 self._populateExtensionData() 127 128 def _populateChromeData(self): 129 """Update the DM_JSON's chrome values.""" 130 for policy_name, policy_object in self.chrome.items(): 131 if policy_object.scope == 'machine': 132 dm_name = DEVICE_POLICY_DICT[policy_name] 133 self._DM_DEVICE.update( 134 self._clean_pol({dm_name: policy_object.value})) 135 136 elif policy_object.level == 'recommended': 137 self._DM_RECOMMENDED.update( 138 self._clean_pol({policy_name: policy_object.value})) 139 140 elif (policy_object.level == 'mandatory' and 141 policy_object.scope == 'user'): 142 self._DM_MANDATORY.update( 143 self._clean_pol({policy_name: policy_object.value})) 144 145 def _populateExtensionData(self): 146 """Updates the DM_JSON's extension values.""" 147 for extension, ext_pol in self.extension_configured_data.items(): 148 extension_policies = {} 149 for polname, polItem in ext_pol.items(): 150 extension_policies[polname] = polItem.value 151 self._DM_EXTENSION.update({extension: extension_policies}) 152 153 def _clean_pol(self, policies): 154 """Cleans the policies to be set on the fake DM server.""" 155 cleaned = {} 156 for policy, value in policies.items(): 157 if value is None: 158 continue 159 cleaned[policy] = self._jsonify(policy, value) 160 return cleaned 161 162 def _jsonify(self, policy, value): 163 """Jsonify policy if its a dict or list that is not kiosk policy.""" 164 if isinstance(value, dict): 165 return json.dumps(value) 166 # Kiosk Policy, aka "account", is the only policy not formatted. 167 elif ( 168 isinstance(value, list) and 169 (policy != 'device_local_accounts.account')): 170 if value and isinstance(value[0], dict): 171 return json.dumps(value) 172 return value 173 174 def _create_pol_obj(self, name, data, group=None): 175 """ 176 Create a policy object from Policy.Policy(). 177 178 @param name: str, name of the policy 179 @param data: data value of the policy 180 @param group: optional, group of the policy. 181 182 @returns: Policy object, reperesenting the policy args provided. 183 """ 184 policy_obj = Policy() 185 policy_obj.name = name 186 if policy_obj.is_formatted_value(data): 187 policy_obj.set_policy_from_dict(data) 188 else: 189 policy_obj.value = data 190 policy_obj.group = group 191 return policy_obj 192 193 def _update_policy_dict(self, secondary_ext_policies): 194 """Update the local .policy_dict with the most current values.""" 195 for policy in self.chrome: 196 self.policy_dict[CHROMEPOLICIES].update( 197 self.chrome[policy].get_policy_as_dict()) 198 199 ext_item = self._select_ext_group(secondary_ext_policies) 200 201 for ext_name, ext_group in ext_item.items(): 202 ext_dict = {ext_name: {}} 203 for policy in ext_group: 204 pol_as_dict = ext_group[policy].get_policy_as_dict() 205 206 ext_dict[ext_name].update(pol_as_dict) 207 self.policy_dict[EXTENSIONPOLICIES].update(ext_dict) 208 for policy in self.local: 209 self.policy_dict[DEVICELOCALACCOUNT].update( 210 self.local[policy].get_policy_as_dict()) 211 212 def _select_ext_group(self, secondary_ext_policies): 213 """Determine which extension group to use for the configured dictionary 214 formatting. If the secondary_ext_policies flag has been set, and 215 the self.extension_displayed_values is not None, use 216 self.extension_displayed_values, 217 else: use the original configured 218 219 @param secondary_ext_policies: bool 220 221 """ 222 if secondary_ext_policies and self.extension_displayed_values: 223 return self.extension_displayed_values 224 else: 225 return self.extension_configured_data 226 227 def __ne__(self, other): 228 return not self.__eq__(other) 229 230 def __eq__(self, other): 231 """ 232 Override the == to check a policy group object vs another. 233 234 Will return False if: 235 A policy is missing from self is missing in other, 236 when the policy is not None. 237 An Extension from self is missing in other. 238 If the policy valus in self are are not equal to the other 239 (less obfuscation). 240 241 Else: True 242 """ 243 own_ext = self.extension_configured_data 244 if self.extension_displayed_values: 245 own_ext = self.extension_displayed_values 246 for ext_name, ext_group in own_ext.items(): 247 if ext_name not in other.extension_configured_data: 248 return False 249 if not self._check(own_ext[ext_name], 250 other.extension_configured_data[ext_name]): 251 return False 252 if ( 253 not self._check(self.chrome, other.chrome) or 254 not self._check(self.local, other.local)): 255 return False 256 return True 257 258 def _check(self, policy_group, other_policy_group): 259 """ 260 Check if the policy_group is ==. 261 262 Will return False if: 263 policy is missing from other policy object 264 policy objects != (per the Policy object __eq__ override) 265 Will return True if: 266 There is no policies 267 if the policy value is None 268 If no other conditions are violated 269 270 """ 271 if not policy_group: # No object 272 return True 273 for policy_name, policy_group in policy_group.items(): 274 if policy_group.value is None: 275 return True 276 if policy_name not in other_policy_group: 277 return False 278 if policy_group != other_policy_group[policy_name]: 279 return False 280 return True 281