• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2014 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Tests for signurl command."""
16from datetime import timedelta
17import pkgutil
18
19import gslib.commands.signurl
20from gslib.commands.signurl import HAVE_OPENSSL
21from gslib.exception import CommandException
22import gslib.tests.testcase as testcase
23from gslib.tests.testcase.integration_testcase import SkipForS3
24from gslib.tests.util import ObjectToURI as suri
25from gslib.tests.util import unittest
26
27
28# pylint: disable=protected-access
29@unittest.skipUnless(HAVE_OPENSSL, 'signurl requires pyopenssl.')
30@SkipForS3('Signed URLs are only supported for gs:// URLs.')
31class TestSignUrl(testcase.GsUtilIntegrationTestCase):
32  """Integration tests for signurl command."""
33
34  def _GetKsFile(self):
35    if not hasattr(self, 'ks_file'):
36      # Dummy pkcs12 keystore generated with the command
37
38      # openssl req -new -passout pass:notasecret -batch \
39      # -x509 -keyout signed_url_test.key -out signed_url_test.pem \
40      # -subj '/CN=test.apps.googleusercontent.com'
41
42      # &&
43
44      # openssl pkcs12 -export -passin pass:notasecret \
45      # -passout pass:notasecret -inkey signed_url_test.key \
46      # -in signed_url_test.pem -out test.p12
47
48      # &&
49
50      # rm signed_url_test.key signed_url_test.pem
51      contents = pkgutil.get_data('gslib', 'tests/test_data/test.p12')
52      self.ks_file = self.CreateTempFile(contents=contents)
53    return self.ks_file
54
55  def testSignUrlOutput(self):
56    """Tests signurl output of a sample object."""
57
58    object_url = self.CreateObject(contents='z')
59    stdout = self.RunGsUtil(['signurl', '-p', 'notasecret',
60                             self._GetKsFile(), suri(object_url)],
61                            return_stdout=True)
62
63    self.assertIn(object_url.uri, stdout)
64    self.assertIn('test@developer.gserviceaccount.com', stdout)
65    self.assertIn('Expires=', stdout)
66    self.assertIn('\tGET\t', stdout)
67
68    stdout = self.RunGsUtil(['signurl', '-m', 'PUT', '-p',
69                             'notasecret', self._GetKsFile(),
70                             'gs://test/test.txt'], return_stdout=True)
71
72    self.assertIn('test@developer.gserviceaccount.com', stdout)
73    self.assertIn('Expires=', stdout)
74    self.assertIn('\tPUT\t', stdout)
75
76  def testSignUrlWithURLEncodeRequiredChars(self):
77    objs = ['gs://example.org/test 1', 'gs://example.org/test/test 2',
78            'gs://example.org/Аудиоарi хив']
79    expected_partial_urls = [
80        ('https://storage.googleapis.com/example.org/test%201?GoogleAccessId=te'
81         'st@developer.gserviceaccount.com'),
82        ('https://storage.googleapis.com/example.org/test/test%202?GoogleAccess'
83         'Id=test@developer.gserviceaccount.com'),
84        ('https://storage.googleapis.com/example.org/%D0%90%D1%83%D0%B4%D0%B8%D'
85         '0%BE%D0%B0%D1%80i%20%D1%85%D0%B8%D0%B2?GoogleAccessId=test@developer.'
86         'gserviceaccount.com')
87        ]
88
89    self.assertEquals(len(objs), len(expected_partial_urls))
90
91    cmd_args = ['signurl', '-p', 'notasecret', self._GetKsFile()]
92    cmd_args.extend(objs)
93
94    stdout = self.RunGsUtil(cmd_args, return_stdout=True)
95
96    lines = stdout.split('\n')
97    # Header, signed urls, trailing newline.
98    self.assertEquals(len(lines), len(objs) + 2)
99
100    # Strip the header line to make the indices line up.
101    lines = lines[1:]
102
103    for obj, line, partial_url in zip(objs, lines, expected_partial_urls):
104      self.assertIn(obj, line)
105      self.assertIn(partial_url, line)
106
107  def testSignUrlWithWildcard(self):
108    objs = ['test1', 'test2', 'test3']
109    bucket = self.CreateBucket()
110    obj_urls = []
111
112    for obj_name in objs:
113      obj_urls.append(self.CreateObject(bucket_uri=bucket,
114                                        object_name=obj_name, contents=''))
115
116    stdout = self.RunGsUtil(['signurl', '-p',
117                             'notasecret', self._GetKsFile(),
118                             suri(bucket) + '/*'], return_stdout=True)
119
120    # Header, 3 signed urls, trailing newline
121    self.assertEquals(len(stdout.split('\n')), 5)
122
123    for obj_url in obj_urls:
124      self.assertIn(suri(obj_url), stdout)
125
126  def testSignUrlOfNonObjectUrl(self):
127    """Tests the signurl output of a non-existent file."""
128    self.RunGsUtil(['signurl', self._GetKsFile(), 'gs://'],
129                   expected_status=1, stdin='notasecret')
130    self.RunGsUtil(['signurl', 'file://tmp/abc'], expected_status=1)
131
132
133@unittest.skipUnless(HAVE_OPENSSL, 'signurl requires pyopenssl.')
134class UnitTestSignUrl(testcase.GsUtilUnitTestCase):
135  """Unit tests for the signurl command."""
136
137  def setUp(self):
138    super(UnitTestSignUrl, self).setUp()
139    self.ks_contents = pkgutil.get_data('gslib', 'tests/test_data/test.p12')
140
141  def testDurationSpec(self):
142    tests = [('1h', timedelta(hours=1)),
143             ('2d', timedelta(days=2)),
144             ('5D', timedelta(days=5)),
145             ('35s', timedelta(seconds=35)),
146             ('1h', timedelta(hours=1)),
147             ('33', timedelta(hours=33)),
148             ('22m', timedelta(minutes=22)),
149             ('3.7', None),
150             ('27Z', None),
151            ]
152
153    for inp, expected in tests:
154      try:
155        td = gslib.commands.signurl._DurationToTimeDelta(inp)
156        self.assertEquals(td, expected)
157      except CommandException:
158        if expected is not None:
159          self.fail('{0} failed to parse')
160
161  def testSignPut(self):
162    """Tests the return value of the _GenSignedUrl function with \
163    a PUT method."""
164
165    expected = ('https://storage.googleapis.com/test/test.txt?'
166                'GoogleAccessId=test@developer.gserviceaccount.com'
167                '&Expires=1391816302&Signature=A6QbgTA8cXZCtjy2xCr401bdi0e'
168                '7zChTBQ6BX61L7AfytTGEQDMD%2BbvOQKjX7%2FsEh77cmzcSxOEKqTLUD'
169                'bbkPgPqW3j8sGPSRX9VM58bgj1vt9yU8cRKoegFHXAqsATx2G5rc%2FvEl'
170                'iFp9UWMfVj5TaukqlBAVuzZWlyx0aQa9tCKXRtC9YcxORxG41RfiowA2kd8'
171                'XBTQt4M9XTzpVyr5rVMzfr2LvtGf9UAJvlt8p6T6nThl2vy9%2FwBoPcMFa'
172                'OWQcGTagwjyKWDcI1vQPIFQLGftAcv3QnGZxZTtg8pZW%2FIxRJrBhfFfcA'
173                'c62hDKyaU2YssSMy%2FjUJynWx3TIiJjhg%3D%3D')
174
175    expiration = 1391816302
176    ks, client_id = (gslib.commands.signurl
177                     ._ReadKeystore(self.ks_contents, 'notasecret'))
178    signed_url = (gslib.commands.signurl
179                  ._GenSignedUrl(ks.get_privatekey(),
180                                 client_id, 'PUT', '',
181                                 '', expiration, 'test/test.txt'))
182    self.assertEquals(expected, signed_url)
183
184  def testSignurlPutContentype(self):
185    """Tests the return value of the _GenSignedUrl function with \
186    a PUT method and specified content type."""
187
188    expected = ('https://storage.googleapis.com/test/test.txt?'
189                'GoogleAccessId=test@developer.gserviceaccount.com&'
190                'Expires=1391816302&Signature=APn%2BCCVcQrfc1fKQXrs'
191                'PEZFj9%2FmASO%2BolR8xwgBY6PbWMkcCtrUVFBauP6t4NxqZO'
192                'UnbOFYTZYzul0RC57ZkEWJp3VcyDIHcn6usEE%2FTzUHhbDCDW'
193                'awAkZS7p8kO8IIACuJlF5s9xZmZzaEBtzF0%2BBOsGgBPBlg2y'
194                'zrhFB6cyyAwNiUgmhLQaVkdobnSwtI5QJkvXoIjJb6hhLiVbLC'
195                'rWdgSZVusjAKGlWCJsM%2B4TkCR%2Bi8AnrkECngcMHuJ9mYbS'
196                'XI1VfEmcnRVcfkKkJGZGctaDIWK%2FMTEmfYCW6USt3Zk2WowJ'
197                'SGuJHqEcFz0kyfAlkpmG%2Fl5E1FQROYqLN2kZQ%3D%3D')
198
199    expiration = 1391816302
200    ks, client_id = (gslib.commands.signurl
201                     ._ReadKeystore(self.ks_contents,
202                                    'notasecret'))
203    signed_url = (gslib.commands.signurl
204                  ._GenSignedUrl(ks.get_privatekey(),
205                                 client_id, 'PUT', '',
206                                 'text/plain', expiration,
207                                 'test/test.txt'))
208    self.assertEquals(expected, signed_url)
209
210  def testSignurlGet(self):
211    """Tests the return value of the _GenSignedUrl function with \
212    a GET method."""
213
214    expected = ('https://storage.googleapis.com/test/test.txt?'
215                'GoogleAccessId=test@developer.gserviceaccount.com&'
216                'Expires=0&Signature=TCZwe32cU%2BMksmLiSY9shHXQjLs1'
217                'F3y%2F%2F1M0UhiK4qsPRVNZVwI7YWvv2qa2Xa%2BVBBafboF0'
218                '1%2BWvx3ZG316pwpNIRR6y7jNnE0LvQmHE8afbm2VYCi%2B2JS'
219                'ZK2YZFJAyEek8si53jhYQEmaRq1zPfGbX84B2FJ8v4iI%2FTC1'
220                'I9OE5vHF0sWwIR9d73JDrFLjaync7QYFWRExdwvqlQX%2BPO3r'
221                'OG9Ns%2BcQFIN7npnsVjH28yNY9gBzXya8LYmNvUx6bWHWZMiu'
222                'fLwDZ0jejNeDZTOfQGRM%2B0vY7NslzaT06W1wo8P7McSkAZEl'
223                'DCbhR0Vo1fturPMwmAhi88f0qzRzywbg%3D%3D')
224
225    expiration = 0
226    ks, client_id = (gslib.commands.signurl
227                     ._ReadKeystore(self.ks_contents,
228                                    'notasecret'))
229    signed_url = (gslib.commands.signurl
230                  ._GenSignedUrl(ks.get_privatekey(),
231                                 client_id, 'GET', '',
232                                 '', expiration, 'test/test.txt'))
233    self.assertEquals(expected, signed_url)
234