• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import json
5import unittest
6from unittest.mock import patch
7
8from autotest_lib.client.common_lib import error
9from autotest_lib.client.cros.enterprise import policy_group
10from autotest_lib.client.cros.enterprise import policy_manager
11
12"""
13This is the unittest file for policy_manager.py.
14If you modify that file, you should be at minimum re-running this file.
15
16Add and correct tests as changes are made to the utils file.
17
18To run the tests, use the following command from your DEV box (outside_chroot):
19
20src/third_party/autotest/files/utils$ python unittest_suite.py \
21autotest_lib.client.cros.enterprise.policy_manager_unittest --debug
22
23"""
24FX_NAME = '_get_pol_from_api'
25PATCH_BASE = 'autotest_lib.client.cros.enterprise.enterprise_policy_utils'
26PATCH_PATH = '{}.{}'.format(PATCH_BASE, FX_NAME)
27
28
29class TestPolicyManager(unittest.TestCase):
30
31    def configPolicyManager(self, username=None):
32        self.policy_manager = policy_manager.Policy_Manager(username)
33        self.policy_manager.autotest_ext = 'Demo'
34        self.maxDiff = None
35        self.expected = {'deviceLocalAccountPolicies': {},
36                         'extensionPolicies': {
37                             'ExtID1':
38                                 {'Extension Policy 1':
39                                     {'scope': 'user',
40                                      'level': 'mandatory',
41                                      'value': 'EP1',
42                                      'source': 'cloud'}}},
43                         'chromePolicies':
44                             {'Suggested Policy 1':
45                                 {'scope': 'user',
46                                  'level': 'recommended',
47                                  'value': 'Suggested 1',
48                                  'source': 'cloud'},
49                              'Test Policy1':
50                                 {'scope': 'user',
51                                  'level': 'mandatory',
52                                  'value': 'Policy1',
53                                  'source': 'cloud'},
54                               'Device Policy1':
55                                 {'scope': 'machine',
56                                  'level': 'mandatory',
57                                  'value': 'Device 1', 'source': 'cloud'}}}
58
59    def test_Defaults(self):
60        self.configPolicyManager()
61        self.assertEqual(type(self.policy_manager._configured),
62                         policy_group.AllPolicies)
63        self.assertEqual(type(self.policy_manager._obtained),
64                         policy_group.AllPolicies)
65
66    def test_configure_policies_and_get_configured_as_dict(self):
67        self.configPolicyManager()
68        self.policy_manager.configure_policies(
69            user={'Test Policy1': 'Policy1'},
70            device={'Device Policy1': 'Device 1'},
71            suggested_user={'Suggested Policy 1': 'Suggested 1'},
72            extension={'ExtID1': {'Extension Policy 1': 'EP1'}})
73        configured = self.policy_manager.get_configured_policies_as_dict()
74        self.assertEqual(configured, self.expected)
75
76    @patch(PATCH_PATH)
77    def test_obtain_policies_from_device_and_get_obtained_policies_as_dict(
78            self, get_pol_mock):
79        self.configPolicyManager()
80        get_pol_mock.return_value = self.expected
81        self.policy_manager.obtain_policies_from_device()
82        received = self.policy_manager.get_obtained_policies_as_dict()
83        self.assertEqual(received, self.expected)
84
85    def test_changing_policy(self):
86        """Test setting a policy to True, then changing the value"""
87        self.policy_manager = policy_manager.Policy_Manager()
88
89        self.policy_manager.configure_policies(user={'TP1': False})
90        configured = self.policy_manager.get_configured_policies_as_dict()
91        self.assertEqual(configured['chromePolicies']['TP1']['value'], False)
92
93        # Now set the policy to False
94        self.policy_manager.configure_policies(user={'TP1': True}, new=False)
95        configured = self.policy_manager.get_configured_policies_as_dict()
96        self.assertEqual(configured['chromePolicies']['TP1']['value'], True)
97
98    def test_remove_policy(self):
99        """
100        Will test remove_policy and the private methods:
101            _removeChromePolicy
102            _removeExtensionPolicy
103
104        """
105        # Setup the policy Manager
106        self.policy_manager = policy_manager.Policy_Manager()
107        self.policy_manager.configure_policies(
108            user={'Test Policy1': 'Policy1', 'Test Policy2': 'Policy2'})
109
110        # Remove one policy. Verify it is gone, but the other is there.
111        self.policy_manager.remove_policy('Test Policy2', 'user')
112        self.assertNotIn('Test Policy2', self.policy_manager._configured.chrome)
113        self.assertIn('Test Policy1', self.policy_manager._configured.chrome)
114
115        # Add an extension Policy. Verify it is there, then remove and verify.
116        self.policy_manager.configure_policies(extension={'ID1': {'P1': 'V1'}},
117                                               new=False)
118
119        self.assertIn('ID1', self.policy_manager._configured.extension_configured_data)
120        self.assertIn('P1', self.policy_manager._configured.extension_configured_data['ID1'])
121        self.policy_manager.remove_policy('P1', 'extension', extID='ID1')
122        self.assertNotIn('P1', self.policy_manager._configured.extension_configured_data['ID1'])
123
124        # Attempt to remove non-existant policies. Verify an error is raised
125        with self.assertRaises(error.TestError) as context:
126            self.policy_manager.remove_policy('Test Policy2', 'user')
127        self.assertEqual(str(context.exception),
128                         'Policy Test Policy2 missing from chrome policies.')
129
130        # Attempt to remove an extension policy without an ID and non-existant
131        # policy in the valid extension
132        with self.assertRaises(error.TestError) as context:
133            self.policy_manager.remove_policy('P1', 'extension')
134        self.assertEqual(str(context.exception),
135                         'Cannot delete extension policy without extension ID')
136
137        with self.assertRaises(error.TestError) as context:
138            self.policy_manager.remove_policy('P2', 'extension', 'ID1')
139        self.assertEqual(str(context.exception),
140                         'Policy P2 missing from extension policies.')
141
142    @patch(PATCH_PATH)
143    def test_verify_policies(self, get_pol_mock):
144        """Test the verify_policies method."""
145        self.configPolicyManager()
146        self.policy_manager.configure_policies(
147            user={'Test Policy1': 'Policy1'},
148            device={'Device Policy1': 'Device 1'},
149            suggested_user={'Suggested Policy 1': 'Suggested 1'},
150            extension={'ExtID1': {'Extension Policy 1': 'EP1'}})
151        get_pol_mock.return_value = self.expected
152        self.policy_manager.obtain_policies_from_device()
153        self.policy_manager.verify_policies()
154
155        # Add another policy to the configured, and test verify_policies fails.
156        self.policy_manager.configure_policies(extension={'ID1': {'P1': 'V1'}},
157                                               new=False)
158        with self.assertRaises(error.TestError) as context:
159            self.policy_manager.verify_policies()
160        self.assertEqual(
161            str(context.exception),
162            'Configured policies did not match policies received from DUT.')
163
164    @patch(PATCH_PATH)
165    def test_verify_special_extension(self, get_pol_mock):
166        """Test the configure_extension_visual_policy and the verify_policies
167        methods."""
168        self.configPolicyManager()
169        self.policy_manager.configure_policies(
170            user={'Test Policy1': 'Policy1'},
171            device={'Device Policy1': 'Device 1'},
172            suggested_user={'Suggested Policy 1': 'Suggested 1'},
173            extension={'ExtID1': {'Afile': 'Adirectory'}})
174        get_pol_mock.return_value = self.expected
175        self.policy_manager.obtain_policies_from_device()
176        with self.assertRaises(error.TestError) as context:
177            self.policy_manager.verify_policies()
178        self.assertEqual(
179            str(context.exception),
180            'Configured policies did not match policies received from DUT.')
181
182        # Add the "visual" policy (aka how the policy should be reported).
183        self.policy_manager.configure_extension_visual_policy(
184            {'ExtID1': {'Extension Policy 1': 'EP1'}})
185        self.policy_manager.verify_policies()
186
187        # Test the configured shows actual policy value by default
188        configured_policies = (self.policy_manager.
189                               get_configured_policies_as_dict())
190        expected_extension = {'ExtID1':
191                                {'Afile':
192                                    {'scope': 'user',
193                                     'level': 'mandatory',
194                                     'value': 'Adirectory',
195                                     'source': 'cloud'}}}
196        self.assertEqual(expected_extension,
197                         configured_policies['extensionPolicies'])
198
199        # Finally, test the configured can also get the 'visual' policy.
200        visual_configured_policies = (self.policy_manager.
201                                      get_configured_policies_as_dict(True))
202        expected_extension = {'ExtID1':
203                                {'Extension Policy 1':
204                                    {'scope': 'user',
205                                     'level': 'mandatory',
206                                     'value': 'EP1',
207                                     'source': 'cloud'}}}
208        self.assertEqual(expected_extension,
209                         visual_configured_policies['extensionPolicies'])
210
211    @patch(PATCH_PATH)
212    def test_get_policy_value_from_DUT(self, get_pol_mock):
213        """Test obtaining a single policy value from the DUT:
214            Getting a policy prior to obtaining (returns None)
215            Getting a normal chrome policy, using the Refresh flag
216            Getting an ExtensionPolicy
217            """
218        self.configPolicyManager()
219        policy_value = self.policy_manager.get_policy_value_from_DUT(
220            'Test Policy1')
221        self.assertEqual(policy_value, None)
222        get_pol_mock.return_value = self.expected
223        policy_value = self.policy_manager.get_policy_value_from_DUT(
224            'Test Policy1', refresh=True)
225        self.assertEqual(policy_value, 'Policy1')
226        extension_policy_value = self.policy_manager.get_policy_value_from_DUT(
227            'Extension Policy 1', 'ExtID1', False)
228        self.assertEqual(extension_policy_value, 'EP1')
229
230    def test_DMServerConfig(self):
231        """
232        Test the getDMConfig method via the following:
233            Test the default DM json
234            Test the DM Json when policies are configured.
235
236        """
237        self.configPolicyManager('Test_UserName')
238        base_config = json.loads(self.policy_manager.getDMConfig())
239        default_dm = (
240            {'invalidation_name': 'test_policy',
241             'invalidation_source': 16,
242             'google/chromeos/device': {},
243             'current_key_index': 0,
244             'google/chrome/extension': {},
245             'managed_users': ['*'],
246             'google/chromeos/user': {'recommended': {}, 'mandatory': {}},
247             'policy_user': 'Test_UserName'})
248        self.assertEqual(base_config, default_dm)
249
250        self.policy_manager.configure_policies(
251            user={'Test Policy1': 'Policy1'},
252            device={'SystemTimezone': 'Device 1'},
253            suggested_user={'Suggested Policy 1': 'Suggested 1'},
254            extension={'ExtID1': {'Extension Policy 1': 'EP1'}})
255
256        expected_DMJson = (
257            {"invalidation_name": "test_policy",
258             "invalidation_source": 16,
259             "google/chromeos/device":
260                {"system_timezone.timezone": "Device 1"},
261             "current_key_index": 0,
262             "google/chrome/extension":
263                {"ExtID1": {"Extension Policy 1": "EP1"}},
264             "managed_users": ["*"],
265             "google/chromeos/user":
266                {"recommended": {"Suggested Policy 1": "Suggested 1"},
267                 "mandatory": {"Test Policy1": "Policy1"}},
268             "policy_user": "Test_UserName"})
269        Dm_with_policies = json.loads(self.policy_manager.getDMConfig())
270        self.assertEqual(Dm_with_policies, expected_DMJson)
271
272    def test_getCloudDpc(self):
273        """
274        Test getCloudDpc and the following private methods:
275            _arc_certs
276            _add_shared_arc_policy
277            _add_shared_policies
278            _add_arc_certs
279
280        """
281        self.configPolicyManager()
282        self.policy_manager.configure_policies(
283            user={'Test Policy1': 'Policy1',
284                  'ArcCertificatesSyncMode': 'TestValue1',
285                  'OpenNetworkConfiguration': 'Apolicy',
286                  'VideoCaptureAllowed': True,
287                  'ArcPolicy':
288                    {'applications': 'SomeApplication',
289                     'OtherPolicy': 'OtherValue'}
290                  })
291        expected = {'applications': 'SomeApplication',
292                    'cameraDisabled': True,
293                    'caCerts': 'Apolicy'}
294        cloudDpcPolicies = self.policy_manager.getCloudDpc()
295        self.assertEqual(expected, cloudDpcPolicies)
296
297
298if __name__ == '__main__':
299    unittest.main()
300