• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import copy
6import json
7
8from autotest_lib.client.cros.enterprise.policy import Policy as Policy
9from autotest_lib.client.cros.enterprise.device_policy_lookup import DEVICE_POLICY_DICT
10
11CHROMEPOLICIES = 'chromePolicies'
12DEVICELOCALACCOUNT = 'deviceLocalAccountPolicies'
13EXTENSIONPOLICIES = 'extensionPolicies'
14
15
16class AllPolicies(object):
17
18    def __init__(self, isConfiguredPolicies=False):
19        self.extension_configured_data = {}
20        # Because Extensions have a different "displayed" value than the
21        # configured, the extension_displayed_values will be used when the
22        # policy group is "configured" to represent what the "displayed" policy
23        # values SHOULD be.
24        self.extension_displayed_values = {}
25        self.local = {}
26        self.chrome = {}
27
28        self.policy_dict = {CHROMEPOLICIES: {},
29                            EXTENSIONPOLICIES: {},
30                            DEVICELOCALACCOUNT: {}}
31        self.isConfiguredPolicies = isConfiguredPolicies
32        if self.isConfiguredPolicies:
33            self.createNewFakeDMServerJson()
34
35    def createNewFakeDMServerJson(self):
36        """Creates a fresh DM blob that will be used by the Fake DM server."""
37
38        self._DMJSON = {
39            'managed_users': ['*'],
40            'policy_user': None,
41            'current_key_index': 0,
42            'invalidation_source': 16,
43            'invalidation_name': 'test_policy',
44            'google/chromeos/user': {'mandatory': {}, 'recommended': {}},
45            'google/chromeos/device': {},
46            'google/chrome/extension': {}
47        }
48        self._DM_MANDATORY = self._DMJSON['google/chromeos/user']['mandatory']
49        self._DM_RECOMMENDED = (
50            self._DMJSON['google/chromeos/user']['recommended'])
51        self._DM_DEVICE = self._DMJSON['google/chromeos/device']
52        self._DM_EXTENSION = self._DMJSON['google/chrome/extension']
53
54    def get_policy_as_dict(self, visual=False):
55        """Returns the policies as a dictionary."""
56        self._update_policy_dict(visual)
57        return self.policy_dict
58
59    def set_extension_policy(self, policies, visual=False):
60        """
61        Sets the extension policies
62
63        @param policies: Dict formatted as follows:
64            {'extID': {pol1: v1}, extid2: {p2:v2}}
65
66        @param visual: bool, If the extension policy provided is what should be
67            displayed via the api/chrome page. If False, then the 'policies'
68            should be the actual value that will be provided to the DMServer.
69
70        """
71        # If the policy is configured (ie this policy group object represents)
72        # the policies being SET for testing) add the 'policy_group' value.
73        policy_group = 'extension' if self.isConfiguredPolicies else None
74        extension_policies = copy.deepcopy(policies)
75
76        for extension_ID, extension_policy in extension_policies.items():
77
78            # Adding the extension ID key into the extension dict.
79            if visual:
80                self.extension_displayed_values[extension_ID] = {}
81                key = 'displayed_ext_values'
82            else:
83                self.extension_configured_data[extension_ID] = {}
84                key = 'ext_values'
85            self.set_policy(key, extension_policy, policy_group, extension_ID)
86
87    def set_policy(self,
88                   policy_type,
89                   policies,
90                   group=None,
91                   extension_key=None):
92        """
93        Create and the policy object, and set it in the corresponding group.
94
95        @param policy_type: str of the policy type. Must be:
96            'chrome', 'ext_values', 'displayed_ext_values', or 'local'.
97        @param policies: dict of policy values.
98        @param group: str, group key for the Policy object setter.
99        @param extension_key: optional, key for exentsionID.
100
101        """
102        policy_group = self._get_policy_group(policy_type, extension_key)
103        for name, value in policies.items():
104            policy_group[name] = self._create_pol_obj(name, value, group)
105
106    def _get_policy_group(self, policy_type, extension_key=None):
107        """Simple lookup to put the policies in the correct bucket."""
108        if policy_type == 'chrome':
109            policy_group = self.chrome
110        elif policy_type == 'ext_values':
111            policy_group = self.extension_configured_data[extension_key]
112        elif policy_type == 'displayed_ext_values':
113            policy_group = self.extension_displayed_values[extension_key]
114        elif policy_type == 'local':
115            policy_group = self.local
116        return policy_group
117
118    def updateDMJson(self):
119        """
120        Update the ._DM_JSON with the values currently set in
121        self.chrome, self.extension_configured_data, and self.local.
122
123        """
124
125        self._populateChromeData()
126        self._populateExtensionData()
127
128    def _populateChromeData(self):
129        """Update the DM_JSON's chrome values."""
130        for policy_name, policy_object in self.chrome.items():
131            if policy_object.scope == 'machine':
132                dm_name = DEVICE_POLICY_DICT[policy_name]
133                self._DM_DEVICE.update(
134                    self._clean_pol({dm_name: policy_object.value}))
135
136            elif policy_object.level == 'recommended':
137                self._DM_RECOMMENDED.update(
138                    self._clean_pol({policy_name: policy_object.value}))
139
140            elif (policy_object.level == 'mandatory' and
141                    policy_object.scope == 'user'):
142                self._DM_MANDATORY.update(
143                    self._clean_pol({policy_name: policy_object.value}))
144
145    def _populateExtensionData(self):
146        """Updates the DM_JSON's extension values."""
147        for extension, ext_pol in self.extension_configured_data.items():
148            extension_policies = {}
149            for polname, polItem in ext_pol.items():
150                extension_policies[polname] = polItem.value
151            self._DM_EXTENSION.update({extension: extension_policies})
152
153    def _clean_pol(self, policies):
154        """Cleans the policies to be set on the fake DM server."""
155        cleaned = {}
156        for policy, value in policies.items():
157            if value is None:
158                continue
159            cleaned[policy] = self._jsonify(policy, value)
160        return cleaned
161
162    def _jsonify(self, policy, value):
163        """Jsonify policy if its a dict or list that is not kiosk policy."""
164        if isinstance(value, dict):
165            return json.dumps(value)
166        # Kiosk Policy, aka "account", is the only policy not formatted.
167        elif (
168                isinstance(value, list) and
169                (policy != 'device_local_accounts.account')):
170            if value and isinstance(value[0], dict):
171                return json.dumps(value)
172        return value
173
174    def _create_pol_obj(self, name, data, group=None):
175        """
176        Create a policy object from Policy.Policy().
177
178        @param name: str, name of the policy
179        @param data: data value of the policy
180        @param group: optional, group of the policy.
181
182        @returns: Policy object, reperesenting the policy args provided.
183        """
184        policy_obj = Policy()
185        policy_obj.name = name
186        if policy_obj.is_formatted_value(data):
187            policy_obj.set_policy_from_dict(data)
188        else:
189            policy_obj.value = data
190        policy_obj.group = group
191        return policy_obj
192
193    def _update_policy_dict(self, secondary_ext_policies):
194        """Update the local .policy_dict with the most current values."""
195        for policy in self.chrome:
196            self.policy_dict[CHROMEPOLICIES].update(
197                self.chrome[policy].get_policy_as_dict())
198
199        ext_item = self._select_ext_group(secondary_ext_policies)
200
201        for ext_name, ext_group in ext_item.items():
202            ext_dict = {ext_name: {}}
203            for policy in ext_group:
204                pol_as_dict = ext_group[policy].get_policy_as_dict()
205
206                ext_dict[ext_name].update(pol_as_dict)
207            self.policy_dict[EXTENSIONPOLICIES].update(ext_dict)
208        for policy in self.local:
209            self.policy_dict[DEVICELOCALACCOUNT].update(
210                self.local[policy].get_policy_as_dict())
211
212    def _select_ext_group(self, secondary_ext_policies):
213        """Determine which extension group to use for the configured dictionary
214        formatting. If the secondary_ext_policies flag has been set, and
215        the self.extension_displayed_values is not None, use
216        self.extension_displayed_values,
217        else: use the original configured
218
219        @param secondary_ext_policies: bool
220
221        """
222        if secondary_ext_policies and self.extension_displayed_values:
223            return self.extension_displayed_values
224        else:
225            return self.extension_configured_data
226
227    def __ne__(self, other):
228        return not self.__eq__(other)
229
230    def __eq__(self, other):
231        """
232        Override the == to check a policy group object vs another.
233
234        Will return False if:
235            A policy is missing from self is missing in other,
236                when the policy is not None.
237            An Extension from self is missing in other.
238            If the policy valus in self are are not equal to the other
239                (less obfuscation).
240
241        Else: True
242        """
243        own_ext = self.extension_configured_data
244        if self.extension_displayed_values:
245            own_ext = self.extension_displayed_values
246        for ext_name, ext_group in own_ext.items():
247            if ext_name not in other.extension_configured_data:
248                return False
249            if not self._check(own_ext[ext_name],
250                               other.extension_configured_data[ext_name]):
251                return False
252        if (
253                not self._check(self.chrome, other.chrome) or
254                not self._check(self.local, other.local)):
255            return False
256        return True
257
258    def _check(self, policy_group, other_policy_group):
259        """
260        Check if the policy_group is ==.
261
262        Will return False if:
263            policy is missing from other policy object
264            policy objects != (per the Policy object __eq__ override)
265        Will return True if:
266            There is no policies
267            if the policy value is None
268            If no other conditions are violated
269
270        """
271        if not policy_group:  # No object
272            return True
273        for policy_name, policy_group in policy_group.items():
274            if policy_group.value is None:
275                return True
276            if policy_name not in other_policy_group:
277                return False
278            if policy_group != other_policy_group[policy_name]:
279                return False
280        return True
281