1# Copyright 2017 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import pytest 16 17from google.api_core.iam import _DICT_ACCESS_MSG, InvalidOperationException 18 19 20class TestPolicy: 21 @staticmethod 22 def _get_target_class(): 23 from google.api_core.iam import Policy 24 25 return Policy 26 27 def _make_one(self, *args, **kw): 28 return self._get_target_class()(*args, **kw) 29 30 def test_ctor_defaults(self): 31 empty = frozenset() 32 policy = self._make_one() 33 assert policy.etag is None 34 assert policy.version is None 35 assert policy.owners == empty 36 assert policy.editors == empty 37 assert policy.viewers == empty 38 assert len(policy) == 0 39 assert dict(policy) == {} 40 41 def test_ctor_explicit(self): 42 VERSION = 1 43 ETAG = "ETAG" 44 empty = frozenset() 45 policy = self._make_one(ETAG, VERSION) 46 assert policy.etag == ETAG 47 assert policy.version == VERSION 48 assert policy.owners == empty 49 assert policy.editors == empty 50 assert policy.viewers == empty 51 assert len(policy) == 0 52 assert dict(policy) == {} 53 54 def test___getitem___miss(self): 55 policy = self._make_one() 56 assert policy["nonesuch"] == set() 57 58 def test__getitem___and_set(self): 59 from google.api_core.iam import OWNER_ROLE 60 61 policy = self._make_one() 62 63 # get the policy using the getter and then modify it 64 policy[OWNER_ROLE].add("user:phred@example.com") 65 assert dict(policy) == {OWNER_ROLE: {"user:phred@example.com"}} 66 67 def test___getitem___version3(self): 68 policy = self._make_one("DEADBEEF", 3) 69 with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG): 70 policy["role"] 71 72 def test___getitem___with_conditions(self): 73 USER = "user:phred@example.com" 74 CONDITION = {"expression": "2 > 1"} 75 policy = self._make_one("DEADBEEF", 1) 76 policy.bindings = [ 77 {"role": "role/reader", "members": [USER], "condition": CONDITION} 78 ] 79 with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG): 80 policy["role/reader"] 81 82 def test___setitem__(self): 83 USER = "user:phred@example.com" 84 PRINCIPALS = set([USER]) 85 policy = self._make_one() 86 policy["rolename"] = [USER] 87 assert policy["rolename"] == PRINCIPALS 88 assert len(policy) == 1 89 assert dict(policy) == {"rolename": PRINCIPALS} 90 91 def test__set_item__overwrite(self): 92 GROUP = "group:test@group.com" 93 USER = "user:phred@example.com" 94 ALL_USERS = "allUsers" 95 MEMBERS = set([ALL_USERS]) 96 GROUPS = set([GROUP]) 97 policy = self._make_one() 98 policy["first"] = [GROUP] 99 policy["second"] = [USER] 100 policy["second"] = [ALL_USERS] 101 assert policy["second"] == MEMBERS 102 assert len(policy) == 2 103 assert dict(policy) == {"first": GROUPS, "second": MEMBERS} 104 105 def test___setitem___version3(self): 106 policy = self._make_one("DEADBEEF", 3) 107 with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG): 108 policy["role/reader"] = ["user:phred@example.com"] 109 110 def test___setitem___with_conditions(self): 111 USER = "user:phred@example.com" 112 CONDITION = {"expression": "2 > 1"} 113 policy = self._make_one("DEADBEEF", 1) 114 policy.bindings = [ 115 {"role": "role/reader", "members": set([USER]), "condition": CONDITION} 116 ] 117 with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG): 118 policy["role/reader"] = ["user:phred@example.com"] 119 120 def test___delitem___hit(self): 121 policy = self._make_one() 122 policy.bindings = [ 123 {"role": "to/keep", "members": set(["phred@example.com"])}, 124 {"role": "to/remove", "members": set(["phred@example.com"])}, 125 ] 126 del policy["to/remove"] 127 assert len(policy) == 1 128 assert dict(policy) == {"to/keep": set(["phred@example.com"])} 129 130 def test___delitem___miss(self): 131 policy = self._make_one() 132 with pytest.raises(KeyError): 133 del policy["nonesuch"] 134 135 def test___delitem___version3(self): 136 policy = self._make_one("DEADBEEF", 3) 137 with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG): 138 del policy["role/reader"] 139 140 def test___delitem___with_conditions(self): 141 USER = "user:phred@example.com" 142 CONDITION = {"expression": "2 > 1"} 143 policy = self._make_one("DEADBEEF", 1) 144 policy.bindings = [ 145 {"role": "role/reader", "members": set([USER]), "condition": CONDITION} 146 ] 147 with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG): 148 del policy["role/reader"] 149 150 def test_bindings_property(self): 151 USER = "user:phred@example.com" 152 CONDITION = {"expression": "2 > 1"} 153 policy = self._make_one() 154 BINDINGS = [ 155 {"role": "role/reader", "members": set([USER]), "condition": CONDITION} 156 ] 157 policy.bindings = BINDINGS 158 assert policy.bindings == BINDINGS 159 160 def test_owners_getter(self): 161 from google.api_core.iam import OWNER_ROLE 162 163 MEMBER = "user:phred@example.com" 164 expected = frozenset([MEMBER]) 165 policy = self._make_one() 166 policy[OWNER_ROLE] = [MEMBER] 167 assert policy.owners == expected 168 169 def test_owners_setter(self): 170 import warnings 171 from google.api_core.iam import OWNER_ROLE 172 173 MEMBER = "user:phred@example.com" 174 expected = set([MEMBER]) 175 policy = self._make_one() 176 177 with warnings.catch_warnings(record=True) as warned: 178 policy.owners = [MEMBER] 179 180 (warning,) = warned 181 assert warning.category is DeprecationWarning 182 assert policy[OWNER_ROLE] == expected 183 184 def test_editors_getter(self): 185 from google.api_core.iam import EDITOR_ROLE 186 187 MEMBER = "user:phred@example.com" 188 expected = frozenset([MEMBER]) 189 policy = self._make_one() 190 policy[EDITOR_ROLE] = [MEMBER] 191 assert policy.editors == expected 192 193 def test_editors_setter(self): 194 import warnings 195 from google.api_core.iam import EDITOR_ROLE 196 197 MEMBER = "user:phred@example.com" 198 expected = set([MEMBER]) 199 policy = self._make_one() 200 201 with warnings.catch_warnings(record=True) as warned: 202 policy.editors = [MEMBER] 203 204 (warning,) = warned 205 assert warning.category is DeprecationWarning 206 assert policy[EDITOR_ROLE] == expected 207 208 def test_viewers_getter(self): 209 from google.api_core.iam import VIEWER_ROLE 210 211 MEMBER = "user:phred@example.com" 212 expected = frozenset([MEMBER]) 213 policy = self._make_one() 214 policy[VIEWER_ROLE] = [MEMBER] 215 assert policy.viewers == expected 216 217 def test_viewers_setter(self): 218 import warnings 219 from google.api_core.iam import VIEWER_ROLE 220 221 MEMBER = "user:phred@example.com" 222 expected = set([MEMBER]) 223 policy = self._make_one() 224 225 with warnings.catch_warnings(record=True) as warned: 226 policy.viewers = [MEMBER] 227 228 (warning,) = warned 229 assert warning.category is DeprecationWarning 230 assert policy[VIEWER_ROLE] == expected 231 232 def test_user(self): 233 EMAIL = "phred@example.com" 234 MEMBER = "user:%s" % (EMAIL,) 235 policy = self._make_one() 236 assert policy.user(EMAIL) == MEMBER 237 238 def test_service_account(self): 239 EMAIL = "phred@example.com" 240 MEMBER = "serviceAccount:%s" % (EMAIL,) 241 policy = self._make_one() 242 assert policy.service_account(EMAIL) == MEMBER 243 244 def test_group(self): 245 EMAIL = "phred@example.com" 246 MEMBER = "group:%s" % (EMAIL,) 247 policy = self._make_one() 248 assert policy.group(EMAIL) == MEMBER 249 250 def test_domain(self): 251 DOMAIN = "example.com" 252 MEMBER = "domain:%s" % (DOMAIN,) 253 policy = self._make_one() 254 assert policy.domain(DOMAIN) == MEMBER 255 256 def test_all_users(self): 257 policy = self._make_one() 258 assert policy.all_users() == "allUsers" 259 260 def test_authenticated_users(self): 261 policy = self._make_one() 262 assert policy.authenticated_users() == "allAuthenticatedUsers" 263 264 def test_from_api_repr_only_etag(self): 265 empty = frozenset() 266 RESOURCE = {"etag": "ACAB"} 267 klass = self._get_target_class() 268 policy = klass.from_api_repr(RESOURCE) 269 assert policy.etag == "ACAB" 270 assert policy.version is None 271 assert policy.owners == empty 272 assert policy.editors == empty 273 assert policy.viewers == empty 274 assert dict(policy) == {} 275 276 def test_from_api_repr_complete(self): 277 from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE 278 279 OWNER1 = "group:cloud-logs@google.com" 280 OWNER2 = "user:phred@example.com" 281 EDITOR1 = "domain:google.com" 282 EDITOR2 = "user:phred@example.com" 283 VIEWER1 = "serviceAccount:1234-abcdef@service.example.com" 284 VIEWER2 = "user:phred@example.com" 285 RESOURCE = { 286 "etag": "DEADBEEF", 287 "version": 1, 288 "bindings": [ 289 {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]}, 290 {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]}, 291 {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]}, 292 ], 293 } 294 klass = self._get_target_class() 295 policy = klass.from_api_repr(RESOURCE) 296 assert policy.etag == "DEADBEEF" 297 assert policy.version == 1 298 assert policy.owners, frozenset([OWNER1 == OWNER2]) 299 assert policy.editors, frozenset([EDITOR1 == EDITOR2]) 300 assert policy.viewers, frozenset([VIEWER1 == VIEWER2]) 301 assert dict(policy) == { 302 OWNER_ROLE: set([OWNER1, OWNER2]), 303 EDITOR_ROLE: set([EDITOR1, EDITOR2]), 304 VIEWER_ROLE: set([VIEWER1, VIEWER2]), 305 } 306 assert policy.bindings == [ 307 {"role": OWNER_ROLE, "members": set([OWNER1, OWNER2])}, 308 {"role": EDITOR_ROLE, "members": set([EDITOR1, EDITOR2])}, 309 {"role": VIEWER_ROLE, "members": set([VIEWER1, VIEWER2])}, 310 ] 311 312 def test_from_api_repr_unknown_role(self): 313 USER = "user:phred@example.com" 314 GROUP = "group:cloud-logs@google.com" 315 RESOURCE = { 316 "etag": "DEADBEEF", 317 "version": 1, 318 "bindings": [{"role": "unknown", "members": [USER, GROUP]}], 319 } 320 klass = self._get_target_class() 321 policy = klass.from_api_repr(RESOURCE) 322 assert policy.etag == "DEADBEEF" 323 assert policy.version == 1 324 assert dict(policy), {"unknown": set([GROUP == USER])} 325 326 def test_to_api_repr_defaults(self): 327 policy = self._make_one() 328 assert policy.to_api_repr() == {} 329 330 def test_to_api_repr_only_etag(self): 331 policy = self._make_one("DEADBEEF") 332 assert policy.to_api_repr() == {"etag": "DEADBEEF"} 333 334 def test_to_api_repr_binding_wo_members(self): 335 policy = self._make_one() 336 policy["empty"] = [] 337 assert policy.to_api_repr() == {} 338 339 def test_to_api_repr_binding_w_duplicates(self): 340 import warnings 341 from google.api_core.iam import OWNER_ROLE 342 343 OWNER = "group:cloud-logs@google.com" 344 policy = self._make_one() 345 with warnings.catch_warnings(record=True): 346 policy.owners = [OWNER, OWNER] 347 assert policy.to_api_repr() == { 348 "bindings": [{"role": OWNER_ROLE, "members": [OWNER]}] 349 } 350 351 def test_to_api_repr_full(self): 352 import operator 353 from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE 354 355 OWNER1 = "group:cloud-logs@google.com" 356 OWNER2 = "user:phred@example.com" 357 EDITOR1 = "domain:google.com" 358 EDITOR2 = "user:phred@example.com" 359 VIEWER1 = "serviceAccount:1234-abcdef@service.example.com" 360 VIEWER2 = "user:phred@example.com" 361 CONDITION = { 362 "title": "title", 363 "description": "description", 364 "expression": "true", 365 } 366 BINDINGS = [ 367 {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]}, 368 {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]}, 369 {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]}, 370 { 371 "role": VIEWER_ROLE, 372 "members": [VIEWER1, VIEWER2], 373 "condition": CONDITION, 374 }, 375 ] 376 policy = self._make_one("DEADBEEF", 1) 377 policy.bindings = BINDINGS 378 resource = policy.to_api_repr() 379 assert resource["etag"] == "DEADBEEF" 380 assert resource["version"] == 1 381 key = operator.itemgetter("role") 382 assert sorted(resource["bindings"], key=key) == sorted(BINDINGS, key=key) 383