1# Copyright 2019 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import json 5import unittest 6from unittest.mock import patch 7 8from autotest_lib.client.common_lib import error 9from autotest_lib.client.cros.enterprise import policy_group 10from autotest_lib.client.cros.enterprise import policy_manager 11 12""" 13This is the unittest file for policy_manager.py. 14If you modify that file, you should be at minimum re-running this file. 15 16Add and correct tests as changes are made to the utils file. 17 18To run the tests, use the following command from your DEV box (outside_chroot): 19 20src/third_party/autotest/files/utils$ python unittest_suite.py \ 21autotest_lib.client.cros.enterprise.policy_manager_unittest --debug 22 23""" 24FX_NAME = '_get_pol_from_api' 25PATCH_BASE = 'autotest_lib.client.cros.enterprise.enterprise_policy_utils' 26PATCH_PATH = '{}.{}'.format(PATCH_BASE, FX_NAME) 27 28 29class TestPolicyManager(unittest.TestCase): 30 31 def configPolicyManager(self, username=None): 32 self.policy_manager = policy_manager.Policy_Manager(username) 33 self.policy_manager.autotest_ext = 'Demo' 34 self.maxDiff = None 35 self.expected = {'deviceLocalAccountPolicies': {}, 36 'extensionPolicies': { 37 'ExtID1': 38 {'Extension Policy 1': 39 {'scope': 'user', 40 'level': 'mandatory', 41 'value': 'EP1', 42 'source': 'cloud'}}}, 43 'chromePolicies': 44 {'Suggested Policy 1': 45 {'scope': 'user', 46 'level': 'recommended', 47 'value': 'Suggested 1', 48 'source': 'cloud'}, 49 'Test Policy1': 50 {'scope': 'user', 51 'level': 'mandatory', 52 'value': 'Policy1', 53 'source': 'cloud'}, 54 'Device Policy1': 55 {'scope': 'machine', 56 'level': 'mandatory', 57 'value': 'Device 1', 'source': 'cloud'}}} 58 59 def test_Defaults(self): 60 self.configPolicyManager() 61 self.assertEqual(type(self.policy_manager._configured), 62 policy_group.AllPolicies) 63 self.assertEqual(type(self.policy_manager._obtained), 64 policy_group.AllPolicies) 65 66 def test_configure_policies_and_get_configured_as_dict(self): 67 self.configPolicyManager() 68 self.policy_manager.configure_policies( 69 user={'Test Policy1': 'Policy1'}, 70 device={'Device Policy1': 'Device 1'}, 71 suggested_user={'Suggested Policy 1': 'Suggested 1'}, 72 extension={'ExtID1': {'Extension Policy 1': 'EP1'}}) 73 configured = self.policy_manager.get_configured_policies_as_dict() 74 self.assertEqual(configured, self.expected) 75 76 @patch(PATCH_PATH) 77 def test_obtain_policies_from_device_and_get_obtained_policies_as_dict( 78 self, get_pol_mock): 79 self.configPolicyManager() 80 get_pol_mock.return_value = self.expected 81 self.policy_manager.obtain_policies_from_device() 82 received = self.policy_manager.get_obtained_policies_as_dict() 83 self.assertEqual(received, self.expected) 84 85 def test_changing_policy(self): 86 """Test setting a policy to True, then changing the value""" 87 self.policy_manager = policy_manager.Policy_Manager() 88 89 self.policy_manager.configure_policies(user={'TP1': False}) 90 configured = self.policy_manager.get_configured_policies_as_dict() 91 self.assertEqual(configured['chromePolicies']['TP1']['value'], False) 92 93 # Now set the policy to False 94 self.policy_manager.configure_policies(user={'TP1': True}, new=False) 95 configured = self.policy_manager.get_configured_policies_as_dict() 96 self.assertEqual(configured['chromePolicies']['TP1']['value'], True) 97 98 def test_remove_policy(self): 99 """ 100 Will test remove_policy and the private methods: 101 _removeChromePolicy 102 _removeExtensionPolicy 103 104 """ 105 # Setup the policy Manager 106 self.policy_manager = policy_manager.Policy_Manager() 107 self.policy_manager.configure_policies( 108 user={'Test Policy1': 'Policy1', 'Test Policy2': 'Policy2'}) 109 110 # Remove one policy. Verify it is gone, but the other is there. 111 self.policy_manager.remove_policy('Test Policy2', 'user') 112 self.assertNotIn('Test Policy2', self.policy_manager._configured.chrome) 113 self.assertIn('Test Policy1', self.policy_manager._configured.chrome) 114 115 # Add an extension Policy. Verify it is there, then remove and verify. 116 self.policy_manager.configure_policies(extension={'ID1': {'P1': 'V1'}}, 117 new=False) 118 119 self.assertIn('ID1', self.policy_manager._configured.extension_configured_data) 120 self.assertIn('P1', self.policy_manager._configured.extension_configured_data['ID1']) 121 self.policy_manager.remove_policy('P1', 'extension', extID='ID1') 122 self.assertNotIn('P1', self.policy_manager._configured.extension_configured_data['ID1']) 123 124 # Attempt to remove non-existant policies. Verify an error is raised 125 with self.assertRaises(error.TestError) as context: 126 self.policy_manager.remove_policy('Test Policy2', 'user') 127 self.assertEqual(str(context.exception), 128 'Policy Test Policy2 missing from chrome policies.') 129 130 # Attempt to remove an extension policy without an ID and non-existant 131 # policy in the valid extension 132 with self.assertRaises(error.TestError) as context: 133 self.policy_manager.remove_policy('P1', 'extension') 134 self.assertEqual(str(context.exception), 135 'Cannot delete extension policy without extension ID') 136 137 with self.assertRaises(error.TestError) as context: 138 self.policy_manager.remove_policy('P2', 'extension', 'ID1') 139 self.assertEqual(str(context.exception), 140 'Policy P2 missing from extension policies.') 141 142 @patch(PATCH_PATH) 143 def test_verify_policies(self, get_pol_mock): 144 """Test the verify_policies method.""" 145 self.configPolicyManager() 146 self.policy_manager.configure_policies( 147 user={'Test Policy1': 'Policy1'}, 148 device={'Device Policy1': 'Device 1'}, 149 suggested_user={'Suggested Policy 1': 'Suggested 1'}, 150 extension={'ExtID1': {'Extension Policy 1': 'EP1'}}) 151 get_pol_mock.return_value = self.expected 152 self.policy_manager.obtain_policies_from_device() 153 self.policy_manager.verify_policies() 154 155 # Add another policy to the configured, and test verify_policies fails. 156 self.policy_manager.configure_policies(extension={'ID1': {'P1': 'V1'}}, 157 new=False) 158 with self.assertRaises(error.TestError) as context: 159 self.policy_manager.verify_policies() 160 self.assertEqual( 161 str(context.exception), 162 'Configured policies did not match policies received from DUT.') 163 164 @patch(PATCH_PATH) 165 def test_verify_special_extension(self, get_pol_mock): 166 """Test the configure_extension_visual_policy and the verify_policies 167 methods.""" 168 self.configPolicyManager() 169 self.policy_manager.configure_policies( 170 user={'Test Policy1': 'Policy1'}, 171 device={'Device Policy1': 'Device 1'}, 172 suggested_user={'Suggested Policy 1': 'Suggested 1'}, 173 extension={'ExtID1': {'Afile': 'Adirectory'}}) 174 get_pol_mock.return_value = self.expected 175 self.policy_manager.obtain_policies_from_device() 176 with self.assertRaises(error.TestError) as context: 177 self.policy_manager.verify_policies() 178 self.assertEqual( 179 str(context.exception), 180 'Configured policies did not match policies received from DUT.') 181 182 # Add the "visual" policy (aka how the policy should be reported). 183 self.policy_manager.configure_extension_visual_policy( 184 {'ExtID1': {'Extension Policy 1': 'EP1'}}) 185 self.policy_manager.verify_policies() 186 187 # Test the configured shows actual policy value by default 188 configured_policies = (self.policy_manager. 189 get_configured_policies_as_dict()) 190 expected_extension = {'ExtID1': 191 {'Afile': 192 {'scope': 'user', 193 'level': 'mandatory', 194 'value': 'Adirectory', 195 'source': 'cloud'}}} 196 self.assertEqual(expected_extension, 197 configured_policies['extensionPolicies']) 198 199 # Finally, test the configured can also get the 'visual' policy. 200 visual_configured_policies = (self.policy_manager. 201 get_configured_policies_as_dict(True)) 202 expected_extension = {'ExtID1': 203 {'Extension Policy 1': 204 {'scope': 'user', 205 'level': 'mandatory', 206 'value': 'EP1', 207 'source': 'cloud'}}} 208 self.assertEqual(expected_extension, 209 visual_configured_policies['extensionPolicies']) 210 211 @patch(PATCH_PATH) 212 def test_get_policy_value_from_DUT(self, get_pol_mock): 213 """Test obtaining a single policy value from the DUT: 214 Getting a policy prior to obtaining (returns None) 215 Getting a normal chrome policy, using the Refresh flag 216 Getting an ExtensionPolicy 217 """ 218 self.configPolicyManager() 219 policy_value = self.policy_manager.get_policy_value_from_DUT( 220 'Test Policy1') 221 self.assertEqual(policy_value, None) 222 get_pol_mock.return_value = self.expected 223 policy_value = self.policy_manager.get_policy_value_from_DUT( 224 'Test Policy1', refresh=True) 225 self.assertEqual(policy_value, 'Policy1') 226 extension_policy_value = self.policy_manager.get_policy_value_from_DUT( 227 'Extension Policy 1', 'ExtID1', False) 228 self.assertEqual(extension_policy_value, 'EP1') 229 230 def test_DMServerConfig(self): 231 """ 232 Test the getDMConfig method via the following: 233 Test the default DM json 234 Test the DM Json when policies are configured. 235 236 """ 237 self.configPolicyManager('Test_UserName') 238 base_config = json.loads(self.policy_manager.getDMConfig()) 239 default_dm = ( 240 {'invalidation_name': 'test_policy', 241 'invalidation_source': 16, 242 'google/chromeos/device': {}, 243 'current_key_index': 0, 244 'google/chrome/extension': {}, 245 'managed_users': ['*'], 246 'google/chromeos/user': {'recommended': {}, 'mandatory': {}}, 247 'policy_user': 'Test_UserName'}) 248 self.assertEqual(base_config, default_dm) 249 250 self.policy_manager.configure_policies( 251 user={'Test Policy1': 'Policy1'}, 252 device={'SystemTimezone': 'Device 1'}, 253 suggested_user={'Suggested Policy 1': 'Suggested 1'}, 254 extension={'ExtID1': {'Extension Policy 1': 'EP1'}}) 255 256 expected_DMJson = ( 257 {"invalidation_name": "test_policy", 258 "invalidation_source": 16, 259 "google/chromeos/device": 260 {"system_timezone.timezone": "Device 1"}, 261 "current_key_index": 0, 262 "google/chrome/extension": 263 {"ExtID1": {"Extension Policy 1": "EP1"}}, 264 "managed_users": ["*"], 265 "google/chromeos/user": 266 {"recommended": {"Suggested Policy 1": "Suggested 1"}, 267 "mandatory": {"Test Policy1": "Policy1"}}, 268 "policy_user": "Test_UserName"}) 269 Dm_with_policies = json.loads(self.policy_manager.getDMConfig()) 270 self.assertEqual(Dm_with_policies, expected_DMJson) 271 272 def test_getCloudDpc(self): 273 """ 274 Test getCloudDpc and the following private methods: 275 _arc_certs 276 _add_shared_arc_policy 277 _add_shared_policies 278 _add_arc_certs 279 280 """ 281 self.configPolicyManager() 282 self.policy_manager.configure_policies( 283 user={'Test Policy1': 'Policy1', 284 'ArcCertificatesSyncMode': 'TestValue1', 285 'OpenNetworkConfiguration': 'Apolicy', 286 'VideoCaptureAllowed': True, 287 'ArcPolicy': 288 {'applications': 'SomeApplication', 289 'OtherPolicy': 'OtherValue'} 290 }) 291 expected = {'applications': 'SomeApplication', 292 'cameraDisabled': True, 293 'caCerts': 'Apolicy'} 294 cloudDpcPolicies = self.policy_manager.getCloudDpc() 295 self.assertEqual(expected, cloudDpcPolicies) 296 297 298if __name__ == '__main__': 299 unittest.main() 300