• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2013 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Integration tests for the defacl command."""
16
17from __future__ import absolute_import
18
19import re
20
21from gslib.cs_api_map import ApiSelector
22import gslib.tests.testcase as case
23from gslib.tests.testcase.integration_testcase import SkipForS3
24from gslib.tests.util import ObjectToURI as suri
25
26PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"'
27
28
29@SkipForS3('S3 does not support default object ACLs.')
30class TestDefacl(case.GsUtilIntegrationTestCase):
31  """Integration tests for the defacl command."""
32
33  _defacl_ch_prefix = ['defacl', 'ch']
34  _defacl_get_prefix = ['defacl', 'get']
35  _defacl_set_prefix = ['defacl', 'set']
36
37  def _MakeScopeRegex(self, role, entity_type, email_address):
38    template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' %
39                      (entity_type, email_address, role))
40    return re.compile(template_regex, flags=re.DOTALL)
41
42  def testChangeDefaultAcl(self):
43    """Tests defacl ch."""
44    bucket = self.CreateBucket()
45
46    test_regex = self._MakeScopeRegex(
47        'OWNER', 'group', self.GROUP_TEST_ADDRESS)
48    test_regex2 = self._MakeScopeRegex(
49        'READER', 'group', self.GROUP_TEST_ADDRESS)
50    json_text = self.RunGsUtil(self._defacl_get_prefix +
51                               [suri(bucket)], return_stdout=True)
52    self.assertNotRegexpMatches(json_text, test_regex)
53
54    self.RunGsUtil(self._defacl_ch_prefix +
55                   ['-g', self.GROUP_TEST_ADDRESS+':FC', suri(bucket)])
56    json_text2 = self.RunGsUtil(self._defacl_get_prefix +
57                                [suri(bucket)], return_stdout=True)
58    self.assertRegexpMatches(json_text2, test_regex)
59
60    self.RunGsUtil(self._defacl_ch_prefix +
61                   ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)])
62    json_text3 = self.RunGsUtil(self._defacl_get_prefix +
63                                [suri(bucket)], return_stdout=True)
64    self.assertRegexpMatches(json_text3, test_regex2)
65
66    stderr = self.RunGsUtil(self._defacl_ch_prefix +
67                            ['-g', self.GROUP_TEST_ADDRESS+':WRITE',
68                             suri(bucket)],
69                            return_stderr=True, expected_status=1)
70    self.assertIn('WRITER cannot be set as a default object ACL', stderr)
71
72  def testChangeDefaultAclEmpty(self):
73    """Tests adding and removing an entry from an empty default object ACL."""
74
75    bucket = self.CreateBucket()
76
77    # First, clear out the default object ACL on the bucket.
78    self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)])
79    json_text = self.RunGsUtil(self._defacl_get_prefix +
80                               [suri(bucket)], return_stdout=True)
81    empty_regex = r'\[\]\s*'
82    self.assertRegexpMatches(json_text, empty_regex)
83
84    group_regex = self._MakeScopeRegex(
85        'READER', 'group', self.GROUP_TEST_ADDRESS)
86    self.RunGsUtil(self._defacl_ch_prefix +
87                   ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)])
88    json_text2 = self.RunGsUtil(self._defacl_get_prefix +
89                                [suri(bucket)], return_stdout=True)
90    self.assertRegexpMatches(json_text2, group_regex)
91
92    if self.test_api == ApiSelector.JSON:
93      # TODO: Enable when JSON service respects creating a private (no entries)
94      # default object ACL via PATCH. For now, only supported in XML.
95      return
96
97    # After adding and removing a group, the default object ACL should be empty.
98    self.RunGsUtil(self._defacl_ch_prefix +
99                   ['-d', self.GROUP_TEST_ADDRESS, suri(bucket)])
100    json_text3 = self.RunGsUtil(self._defacl_get_prefix +
101                                [suri(bucket)], return_stdout=True)
102    self.assertRegexpMatches(json_text3, empty_regex)
103
104  def testChangeMultipleBuckets(self):
105    """Tests defacl ch on multiple buckets."""
106    bucket1 = self.CreateBucket()
107    bucket2 = self.CreateBucket()
108
109    test_regex = self._MakeScopeRegex(
110        'READER', 'group', self.GROUP_TEST_ADDRESS)
111    json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)],
112                               return_stdout=True)
113    self.assertNotRegexpMatches(json_text, test_regex)
114    json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)],
115                               return_stdout=True)
116    self.assertNotRegexpMatches(json_text, test_regex)
117
118    self.RunGsUtil(self._defacl_ch_prefix +
119                   ['-g', self.GROUP_TEST_ADDRESS+':READ',
120                    suri(bucket1), suri(bucket2)])
121    json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)],
122                               return_stdout=True)
123    self.assertRegexpMatches(json_text, test_regex)
124    json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)],
125                               return_stdout=True)
126    self.assertRegexpMatches(json_text, test_regex)
127
128  def testChangeMultipleAcls(self):
129    """Tests defacl ch with multiple ACL entries."""
130    bucket = self.CreateBucket()
131
132    test_regex_group = self._MakeScopeRegex(
133        'READER', 'group', self.GROUP_TEST_ADDRESS)
134    test_regex_user = self._MakeScopeRegex(
135        'OWNER', 'user', self.USER_TEST_ADDRESS)
136    json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
137                               return_stdout=True)
138    self.assertNotRegexpMatches(json_text, test_regex_group)
139    self.assertNotRegexpMatches(json_text, test_regex_user)
140
141    self.RunGsUtil(self._defacl_ch_prefix +
142                   ['-g', self.GROUP_TEST_ADDRESS+':READ',
143                    '-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
144    json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
145                               return_stdout=True)
146    self.assertRegexpMatches(json_text, test_regex_group)
147    self.assertRegexpMatches(json_text, test_regex_user)
148
149  def testEmptyDefAcl(self):
150    bucket = self.CreateBucket()
151    self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)])
152    stdout = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
153                            return_stdout=True)
154    self.assertEquals(stdout.rstrip(), '[]')
155    self.RunGsUtil(self._defacl_ch_prefix +
156                   ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
157
158  def testDeletePermissionsWithCh(self):
159    """Tests removing permissions with defacl ch."""
160    bucket = self.CreateBucket()
161
162    test_regex = self._MakeScopeRegex(
163        'OWNER', 'user', self.USER_TEST_ADDRESS)
164    json_text = self.RunGsUtil(
165        self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
166    self.assertNotRegexpMatches(json_text, test_regex)
167
168    self.RunGsUtil(self._defacl_ch_prefix +
169                   ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
170    json_text = self.RunGsUtil(
171        self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
172    self.assertRegexpMatches(json_text, test_regex)
173
174    self.RunGsUtil(self._defacl_ch_prefix +
175                   ['-d', self.USER_TEST_ADDRESS, suri(bucket)])
176    json_text = self.RunGsUtil(
177        self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
178    self.assertNotRegexpMatches(json_text, test_regex)
179
180  def testTooFewArgumentsFails(self):
181    """Tests calling defacl with insufficient number of arguments."""
182    # No arguments for get, but valid subcommand.
183    stderr = self.RunGsUtil(self._defacl_get_prefix, return_stderr=True,
184                            expected_status=1)
185    self.assertIn('command requires at least', stderr)
186
187    # No arguments for set, but valid subcommand.
188    stderr = self.RunGsUtil(self._defacl_set_prefix, return_stderr=True,
189                            expected_status=1)
190    self.assertIn('command requires at least', stderr)
191
192    # No arguments for ch, but valid subcommand.
193    stderr = self.RunGsUtil(self._defacl_ch_prefix, return_stderr=True,
194                            expected_status=1)
195    self.assertIn('command requires at least', stderr)
196
197    # Neither arguments nor subcommand.
198    stderr = self.RunGsUtil(['defacl'], return_stderr=True, expected_status=1)
199    self.assertIn('command requires at least', stderr)
200
201
202class TestDefaclOldAlias(TestDefacl):
203  _defacl_ch_prefix = ['chdefacl']
204  _defacl_get_prefix = ['getdefacl']
205  _defacl_set_prefix = ['setdefacl']
206