• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import pytest
16
17from google.api_core.iam import _DICT_ACCESS_MSG, InvalidOperationException
18
19
20class TestPolicy:
21    @staticmethod
22    def _get_target_class():
23        from google.api_core.iam import Policy
24
25        return Policy
26
27    def _make_one(self, *args, **kw):
28        return self._get_target_class()(*args, **kw)
29
30    def test_ctor_defaults(self):
31        empty = frozenset()
32        policy = self._make_one()
33        assert policy.etag is None
34        assert policy.version is None
35        assert policy.owners == empty
36        assert policy.editors == empty
37        assert policy.viewers == empty
38        assert len(policy) == 0
39        assert dict(policy) == {}
40
41    def test_ctor_explicit(self):
42        VERSION = 1
43        ETAG = "ETAG"
44        empty = frozenset()
45        policy = self._make_one(ETAG, VERSION)
46        assert policy.etag == ETAG
47        assert policy.version == VERSION
48        assert policy.owners == empty
49        assert policy.editors == empty
50        assert policy.viewers == empty
51        assert len(policy) == 0
52        assert dict(policy) == {}
53
54    def test___getitem___miss(self):
55        policy = self._make_one()
56        assert policy["nonesuch"] == set()
57
58    def test__getitem___and_set(self):
59        from google.api_core.iam import OWNER_ROLE
60
61        policy = self._make_one()
62
63        # get the policy using the getter and then modify it
64        policy[OWNER_ROLE].add("user:phred@example.com")
65        assert dict(policy) == {OWNER_ROLE: {"user:phred@example.com"}}
66
67    def test___getitem___version3(self):
68        policy = self._make_one("DEADBEEF", 3)
69        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
70            policy["role"]
71
72    def test___getitem___with_conditions(self):
73        USER = "user:phred@example.com"
74        CONDITION = {"expression": "2 > 1"}
75        policy = self._make_one("DEADBEEF", 1)
76        policy.bindings = [
77            {"role": "role/reader", "members": [USER], "condition": CONDITION}
78        ]
79        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
80            policy["role/reader"]
81
82    def test___setitem__(self):
83        USER = "user:phred@example.com"
84        PRINCIPALS = set([USER])
85        policy = self._make_one()
86        policy["rolename"] = [USER]
87        assert policy["rolename"] == PRINCIPALS
88        assert len(policy) == 1
89        assert dict(policy) == {"rolename": PRINCIPALS}
90
91    def test__set_item__overwrite(self):
92        GROUP = "group:test@group.com"
93        USER = "user:phred@example.com"
94        ALL_USERS = "allUsers"
95        MEMBERS = set([ALL_USERS])
96        GROUPS = set([GROUP])
97        policy = self._make_one()
98        policy["first"] = [GROUP]
99        policy["second"] = [USER]
100        policy["second"] = [ALL_USERS]
101        assert policy["second"] == MEMBERS
102        assert len(policy) == 2
103        assert dict(policy) == {"first": GROUPS, "second": MEMBERS}
104
105    def test___setitem___version3(self):
106        policy = self._make_one("DEADBEEF", 3)
107        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
108            policy["role/reader"] = ["user:phred@example.com"]
109
110    def test___setitem___with_conditions(self):
111        USER = "user:phred@example.com"
112        CONDITION = {"expression": "2 > 1"}
113        policy = self._make_one("DEADBEEF", 1)
114        policy.bindings = [
115            {"role": "role/reader", "members": set([USER]), "condition": CONDITION}
116        ]
117        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
118            policy["role/reader"] = ["user:phred@example.com"]
119
120    def test___delitem___hit(self):
121        policy = self._make_one()
122        policy.bindings = [
123            {"role": "to/keep", "members": set(["phred@example.com"])},
124            {"role": "to/remove", "members": set(["phred@example.com"])},
125        ]
126        del policy["to/remove"]
127        assert len(policy) == 1
128        assert dict(policy) == {"to/keep": set(["phred@example.com"])}
129
130    def test___delitem___miss(self):
131        policy = self._make_one()
132        with pytest.raises(KeyError):
133            del policy["nonesuch"]
134
135    def test___delitem___version3(self):
136        policy = self._make_one("DEADBEEF", 3)
137        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
138            del policy["role/reader"]
139
140    def test___delitem___with_conditions(self):
141        USER = "user:phred@example.com"
142        CONDITION = {"expression": "2 > 1"}
143        policy = self._make_one("DEADBEEF", 1)
144        policy.bindings = [
145            {"role": "role/reader", "members": set([USER]), "condition": CONDITION}
146        ]
147        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
148            del policy["role/reader"]
149
150    def test_bindings_property(self):
151        USER = "user:phred@example.com"
152        CONDITION = {"expression": "2 > 1"}
153        policy = self._make_one()
154        BINDINGS = [
155            {"role": "role/reader", "members": set([USER]), "condition": CONDITION}
156        ]
157        policy.bindings = BINDINGS
158        assert policy.bindings == BINDINGS
159
160    def test_owners_getter(self):
161        from google.api_core.iam import OWNER_ROLE
162
163        MEMBER = "user:phred@example.com"
164        expected = frozenset([MEMBER])
165        policy = self._make_one()
166        policy[OWNER_ROLE] = [MEMBER]
167        assert policy.owners == expected
168
169    def test_owners_setter(self):
170        import warnings
171        from google.api_core.iam import OWNER_ROLE
172
173        MEMBER = "user:phred@example.com"
174        expected = set([MEMBER])
175        policy = self._make_one()
176
177        with warnings.catch_warnings(record=True) as warned:
178            policy.owners = [MEMBER]
179
180        (warning,) = warned
181        assert warning.category is DeprecationWarning
182        assert policy[OWNER_ROLE] == expected
183
184    def test_editors_getter(self):
185        from google.api_core.iam import EDITOR_ROLE
186
187        MEMBER = "user:phred@example.com"
188        expected = frozenset([MEMBER])
189        policy = self._make_one()
190        policy[EDITOR_ROLE] = [MEMBER]
191        assert policy.editors == expected
192
193    def test_editors_setter(self):
194        import warnings
195        from google.api_core.iam import EDITOR_ROLE
196
197        MEMBER = "user:phred@example.com"
198        expected = set([MEMBER])
199        policy = self._make_one()
200
201        with warnings.catch_warnings(record=True) as warned:
202            policy.editors = [MEMBER]
203
204        (warning,) = warned
205        assert warning.category is DeprecationWarning
206        assert policy[EDITOR_ROLE] == expected
207
208    def test_viewers_getter(self):
209        from google.api_core.iam import VIEWER_ROLE
210
211        MEMBER = "user:phred@example.com"
212        expected = frozenset([MEMBER])
213        policy = self._make_one()
214        policy[VIEWER_ROLE] = [MEMBER]
215        assert policy.viewers == expected
216
217    def test_viewers_setter(self):
218        import warnings
219        from google.api_core.iam import VIEWER_ROLE
220
221        MEMBER = "user:phred@example.com"
222        expected = set([MEMBER])
223        policy = self._make_one()
224
225        with warnings.catch_warnings(record=True) as warned:
226            policy.viewers = [MEMBER]
227
228        (warning,) = warned
229        assert warning.category is DeprecationWarning
230        assert policy[VIEWER_ROLE] == expected
231
232    def test_user(self):
233        EMAIL = "phred@example.com"
234        MEMBER = "user:%s" % (EMAIL,)
235        policy = self._make_one()
236        assert policy.user(EMAIL) == MEMBER
237
238    def test_service_account(self):
239        EMAIL = "phred@example.com"
240        MEMBER = "serviceAccount:%s" % (EMAIL,)
241        policy = self._make_one()
242        assert policy.service_account(EMAIL) == MEMBER
243
244    def test_group(self):
245        EMAIL = "phred@example.com"
246        MEMBER = "group:%s" % (EMAIL,)
247        policy = self._make_one()
248        assert policy.group(EMAIL) == MEMBER
249
250    def test_domain(self):
251        DOMAIN = "example.com"
252        MEMBER = "domain:%s" % (DOMAIN,)
253        policy = self._make_one()
254        assert policy.domain(DOMAIN) == MEMBER
255
256    def test_all_users(self):
257        policy = self._make_one()
258        assert policy.all_users() == "allUsers"
259
260    def test_authenticated_users(self):
261        policy = self._make_one()
262        assert policy.authenticated_users() == "allAuthenticatedUsers"
263
264    def test_from_api_repr_only_etag(self):
265        empty = frozenset()
266        RESOURCE = {"etag": "ACAB"}
267        klass = self._get_target_class()
268        policy = klass.from_api_repr(RESOURCE)
269        assert policy.etag == "ACAB"
270        assert policy.version is None
271        assert policy.owners == empty
272        assert policy.editors == empty
273        assert policy.viewers == empty
274        assert dict(policy) == {}
275
276    def test_from_api_repr_complete(self):
277        from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
278
279        OWNER1 = "group:cloud-logs@google.com"
280        OWNER2 = "user:phred@example.com"
281        EDITOR1 = "domain:google.com"
282        EDITOR2 = "user:phred@example.com"
283        VIEWER1 = "serviceAccount:1234-abcdef@service.example.com"
284        VIEWER2 = "user:phred@example.com"
285        RESOURCE = {
286            "etag": "DEADBEEF",
287            "version": 1,
288            "bindings": [
289                {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]},
290                {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]},
291                {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]},
292            ],
293        }
294        klass = self._get_target_class()
295        policy = klass.from_api_repr(RESOURCE)
296        assert policy.etag == "DEADBEEF"
297        assert policy.version == 1
298        assert policy.owners, frozenset([OWNER1 == OWNER2])
299        assert policy.editors, frozenset([EDITOR1 == EDITOR2])
300        assert policy.viewers, frozenset([VIEWER1 == VIEWER2])
301        assert dict(policy) == {
302            OWNER_ROLE: set([OWNER1, OWNER2]),
303            EDITOR_ROLE: set([EDITOR1, EDITOR2]),
304            VIEWER_ROLE: set([VIEWER1, VIEWER2]),
305        }
306        assert policy.bindings == [
307            {"role": OWNER_ROLE, "members": set([OWNER1, OWNER2])},
308            {"role": EDITOR_ROLE, "members": set([EDITOR1, EDITOR2])},
309            {"role": VIEWER_ROLE, "members": set([VIEWER1, VIEWER2])},
310        ]
311
312    def test_from_api_repr_unknown_role(self):
313        USER = "user:phred@example.com"
314        GROUP = "group:cloud-logs@google.com"
315        RESOURCE = {
316            "etag": "DEADBEEF",
317            "version": 1,
318            "bindings": [{"role": "unknown", "members": [USER, GROUP]}],
319        }
320        klass = self._get_target_class()
321        policy = klass.from_api_repr(RESOURCE)
322        assert policy.etag == "DEADBEEF"
323        assert policy.version == 1
324        assert dict(policy), {"unknown": set([GROUP == USER])}
325
326    def test_to_api_repr_defaults(self):
327        policy = self._make_one()
328        assert policy.to_api_repr() == {}
329
330    def test_to_api_repr_only_etag(self):
331        policy = self._make_one("DEADBEEF")
332        assert policy.to_api_repr() == {"etag": "DEADBEEF"}
333
334    def test_to_api_repr_binding_wo_members(self):
335        policy = self._make_one()
336        policy["empty"] = []
337        assert policy.to_api_repr() == {}
338
339    def test_to_api_repr_binding_w_duplicates(self):
340        import warnings
341        from google.api_core.iam import OWNER_ROLE
342
343        OWNER = "group:cloud-logs@google.com"
344        policy = self._make_one()
345        with warnings.catch_warnings(record=True):
346            policy.owners = [OWNER, OWNER]
347        assert policy.to_api_repr() == {
348            "bindings": [{"role": OWNER_ROLE, "members": [OWNER]}]
349        }
350
351    def test_to_api_repr_full(self):
352        import operator
353        from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
354
355        OWNER1 = "group:cloud-logs@google.com"
356        OWNER2 = "user:phred@example.com"
357        EDITOR1 = "domain:google.com"
358        EDITOR2 = "user:phred@example.com"
359        VIEWER1 = "serviceAccount:1234-abcdef@service.example.com"
360        VIEWER2 = "user:phred@example.com"
361        CONDITION = {
362            "title": "title",
363            "description": "description",
364            "expression": "true",
365        }
366        BINDINGS = [
367            {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]},
368            {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]},
369            {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]},
370            {
371                "role": VIEWER_ROLE,
372                "members": [VIEWER1, VIEWER2],
373                "condition": CONDITION,
374            },
375        ]
376        policy = self._make_one("DEADBEEF", 1)
377        policy.bindings = BINDINGS
378        resource = policy.to_api_repr()
379        assert resource["etag"] == "DEADBEEF"
380        assert resource["version"] == 1
381        key = operator.itemgetter("role")
382        assert sorted(resource["bindings"], key=key) == sorted(BINDINGS, key=key)
383