• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2#
3# Copyright 2015 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Tests for transfer.py."""
18import string
19import unittest
20
21import httplib2
22import json
23import mock
24import six
25from six.moves import http_client
26
27from apitools.base.py import base_api
28from apitools.base.py import exceptions
29from apitools.base.py import gzip
30from apitools.base.py import http_wrapper
31from apitools.base.py import transfer
32
33
34class TransferTest(unittest.TestCase):
35
36    def assertRangeAndContentRangeCompatible(self, request, response):
37        request_prefix = 'bytes='
38        self.assertIn('range', request.headers)
39        self.assertTrue(request.headers['range'].startswith(request_prefix))
40        request_range = request.headers['range'][len(request_prefix):]
41
42        response_prefix = 'bytes '
43        self.assertIn('content-range', response.info)
44        response_header = response.info['content-range']
45        self.assertTrue(response_header.startswith(response_prefix))
46        response_range = (
47            response_header[len(response_prefix):].partition('/')[0])
48
49        msg = ('Request range ({0}) not a prefix of '
50               'response_range ({1})').format(
51                   request_range, response_range)
52        self.assertTrue(response_range.startswith(request_range), msg=msg)
53
54    def testComputeEndByte(self):
55        total_size = 100
56        chunksize = 10
57        download = transfer.Download.FromStream(
58            six.StringIO(), chunksize=chunksize, total_size=total_size)
59        self.assertEqual(chunksize - 1,
60                         download._Download__ComputeEndByte(0, end=50))
61
62    def testComputeEndByteReturnNone(self):
63        download = transfer.Download.FromStream(six.StringIO())
64        self.assertIsNone(
65            download._Download__ComputeEndByte(0, use_chunks=False))
66
67    def testComputeEndByteNoChunks(self):
68        total_size = 100
69        download = transfer.Download.FromStream(
70            six.StringIO(), chunksize=10, total_size=total_size)
71        for end in (None, 1000):
72            self.assertEqual(
73                total_size - 1,
74                download._Download__ComputeEndByte(0, end=end,
75                                                   use_chunks=False),
76                msg='Failed on end={0}'.format(end))
77
78    def testComputeEndByteNoTotal(self):
79        download = transfer.Download.FromStream(six.StringIO())
80        default_chunksize = download.chunksize
81        for chunksize in (100, default_chunksize):
82            download.chunksize = chunksize
83            for start in (0, 10):
84                self.assertEqual(
85                    download.chunksize + start - 1,
86                    download._Download__ComputeEndByte(start),
87                    msg='Failed on start={0}, chunksize={1}'.format(
88                        start, chunksize))
89
90    def testComputeEndByteSmallTotal(self):
91        total_size = 100
92        download = transfer.Download.FromStream(six.StringIO(),
93                                                total_size=total_size)
94        for start in (0, 10):
95            self.assertEqual(total_size - 1,
96                             download._Download__ComputeEndByte(start),
97                             msg='Failed on start={0}'.format(start))
98
99    def testDownloadThenStream(self):
100        bytes_http = object()
101        http = object()
102        download_stream = six.StringIO()
103        download = transfer.Download.FromStream(download_stream,
104                                                total_size=26)
105        download.bytes_http = bytes_http
106        base_url = 'https://part.one/'
107        with mock.patch.object(http_wrapper, 'MakeRequest',
108                               autospec=True) as make_request:
109            make_request.return_value = http_wrapper.Response(
110                info={
111                    'content-range': 'bytes 0-25/26',
112                    'status': http_client.OK,
113                },
114                content=string.ascii_lowercase,
115                request_url=base_url,
116            )
117            request = http_wrapper.Request(url='https://part.one/')
118            download.InitializeDownload(request, http=http)
119            self.assertEqual(1, make_request.call_count)
120            received_request = make_request.call_args[0][1]
121            self.assertEqual(base_url, received_request.url)
122            self.assertRangeAndContentRangeCompatible(
123                received_request, make_request.return_value)
124
125        with mock.patch.object(http_wrapper, 'MakeRequest',
126                               autospec=True) as make_request:
127            make_request.return_value = http_wrapper.Response(
128                info={
129                    'status': http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
130                },
131                content='error',
132                request_url=base_url,
133            )
134            download.StreamInChunks()
135            self.assertEqual(1, make_request.call_count)
136            received_request = make_request.call_args[0][1]
137            self.assertEqual('bytes=26-', received_request.headers['range'])
138
139    def testGetRange(self):
140        for (start_byte, end_byte) in [(0, 25), (5, 15), (0, 0), (25, 25)]:
141            bytes_http = object()
142            http = object()
143            download_stream = six.StringIO()
144            download = transfer.Download.FromStream(download_stream,
145                                                    total_size=26,
146                                                    auto_transfer=False)
147            download.bytes_http = bytes_http
148            base_url = 'https://part.one/'
149            with mock.patch.object(http_wrapper, 'MakeRequest',
150                                   autospec=True) as make_request:
151                make_request.return_value = http_wrapper.Response(
152                    info={
153                        'content-range': 'bytes %d-%d/26' %
154                                         (start_byte, end_byte),
155                        'status': http_client.OK,
156                    },
157                    content=string.ascii_lowercase[start_byte:end_byte + 1],
158                    request_url=base_url,
159                )
160                request = http_wrapper.Request(url='https://part.one/')
161                download.InitializeDownload(request, http=http)
162                download.GetRange(start_byte, end_byte)
163                self.assertEqual(1, make_request.call_count)
164                received_request = make_request.call_args[0][1]
165                self.assertEqual(base_url, received_request.url)
166                self.assertRangeAndContentRangeCompatible(
167                    received_request, make_request.return_value)
168
169    def testNonChunkedDownload(self):
170        bytes_http = object()
171        http = object()
172        download_stream = six.StringIO()
173        download = transfer.Download.FromStream(download_stream, total_size=52)
174        download.bytes_http = bytes_http
175        base_url = 'https://part.one/'
176
177        with mock.patch.object(http_wrapper, 'MakeRequest',
178                               autospec=True) as make_request:
179            make_request.return_value = http_wrapper.Response(
180                info={
181                    'content-range': 'bytes 0-51/52',
182                    'status': http_client.OK,
183                },
184                content=string.ascii_lowercase * 2,
185                request_url=base_url,
186            )
187            request = http_wrapper.Request(url='https://part.one/')
188            download.InitializeDownload(request, http=http)
189            self.assertEqual(1, make_request.call_count)
190            received_request = make_request.call_args[0][1]
191            self.assertEqual(base_url, received_request.url)
192            self.assertRangeAndContentRangeCompatible(
193                received_request, make_request.return_value)
194            download_stream.seek(0)
195            self.assertEqual(string.ascii_lowercase * 2,
196                             download_stream.getvalue())
197
198    def testChunkedDownload(self):
199        bytes_http = object()
200        http = object()
201        download_stream = six.StringIO()
202        download = transfer.Download.FromStream(
203            download_stream, chunksize=26, total_size=52)
204        download.bytes_http = bytes_http
205
206        # Setting autospec on a mock with an iterable side_effect is
207        # currently broken (http://bugs.python.org/issue17826), so
208        # instead we write a little function.
209        def _ReturnBytes(unused_http, http_request,
210                         *unused_args, **unused_kwds):
211            url = http_request.url
212            if url == 'https://part.one/':
213                return http_wrapper.Response(
214                    info={
215                        'content-location': 'https://part.two/',
216                        'content-range': 'bytes 0-25/52',
217                        'status': http_client.PARTIAL_CONTENT,
218                    },
219                    content=string.ascii_lowercase,
220                    request_url='https://part.one/',
221                )
222            elif url == 'https://part.two/':
223                return http_wrapper.Response(
224                    info={
225                        'content-range': 'bytes 26-51/52',
226                        'status': http_client.OK,
227                    },
228                    content=string.ascii_uppercase,
229                    request_url='https://part.two/',
230                )
231            else:
232                self.fail('Unknown URL requested: %s' % url)
233
234        with mock.patch.object(http_wrapper, 'MakeRequest',
235                               autospec=True) as make_request:
236            make_request.side_effect = _ReturnBytes
237            request = http_wrapper.Request(url='https://part.one/')
238            download.InitializeDownload(request, http=http)
239            self.assertEqual(2, make_request.call_count)
240            for call in make_request.call_args_list:
241                self.assertRangeAndContentRangeCompatible(
242                    call[0][1], _ReturnBytes(*call[0]))
243            download_stream.seek(0)
244            self.assertEqual(string.ascii_lowercase + string.ascii_uppercase,
245                             download_stream.getvalue())
246
247    # @mock.patch.object(transfer.Upload, 'RefreshResumableUploadState',
248    #                    new=mock.Mock())
249    def testFinalizesTransferUrlIfClientPresent(self):
250        """Tests download's enforcement of client custom endpoints."""
251        mock_client = mock.Mock()
252        fake_json_data = json.dumps({
253            'auto_transfer': False,
254            'progress': 0,
255            'total_size': 0,
256            'url': 'url',
257        })
258        transfer.Download.FromData(six.BytesIO(), fake_json_data,
259                                   client=mock_client)
260        mock_client.FinalizeTransferUrl.assert_called_once_with('url')
261
262    def testMultipartEncoding(self):
263        # This is really a table test for various issues we've seen in
264        # the past; see notes below for particular histories.
265
266        test_cases = [
267            # Python's mime module by default encodes lines that start
268            # with "From " as ">From ", which we need to make sure we
269            # don't run afoul of when sending content that isn't
270            # intended to be so encoded. This test calls out that we
271            # get this right. We test for both the multipart and
272            # non-multipart case.
273            'line one\nFrom \nline two',
274
275            # We had originally used a `six.StringIO` to hold the http
276            # request body in the case of a multipart upload; for
277            # bytes being uploaded in Python3, however, this causes
278            # issues like this:
279            # https://github.com/GoogleCloudPlatform/gcloud-python/issues/1760
280            # We test below to ensure that we don't end up mangling
281            # the body before sending.
282            u'name,main_ingredient\nRäksmörgås,Räkor\nBaguette,Bröd',
283        ]
284
285        for upload_contents in test_cases:
286            multipart_body = '{"body_field_one": 7}'
287            upload_bytes = upload_contents.encode('ascii', 'backslashreplace')
288            upload_config = base_api.ApiUploadInfo(
289                accept=['*/*'],
290                max_size=None,
291                resumable_multipart=True,
292                resumable_path=u'/resumable/upload',
293                simple_multipart=True,
294                simple_path=u'/upload',
295            )
296            url_builder = base_api._UrlBuilder('http://www.uploads.com')
297
298            # Test multipart: having a body argument in http_request forces
299            # multipart here.
300            upload = transfer.Upload.FromStream(
301                six.BytesIO(upload_bytes),
302                'text/plain',
303                total_size=len(upload_bytes))
304            http_request = http_wrapper.Request(
305                'http://www.uploads.com',
306                headers={'content-type': 'text/plain'},
307                body=multipart_body)
308            upload.ConfigureRequest(upload_config, http_request, url_builder)
309            self.assertEqual(
310                'multipart', url_builder.query_params['uploadType'])
311            rewritten_upload_contents = b'\n'.join(
312                http_request.body.split(b'--')[2].splitlines()[1:])
313            self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))
314
315            # Test non-multipart (aka media): no body argument means this is
316            # sent as media.
317            upload = transfer.Upload.FromStream(
318                six.BytesIO(upload_bytes),
319                'text/plain',
320                total_size=len(upload_bytes))
321            http_request = http_wrapper.Request(
322                'http://www.uploads.com',
323                headers={'content-type': 'text/plain'})
324            upload.ConfigureRequest(upload_config, http_request, url_builder)
325            self.assertEqual(url_builder.query_params['uploadType'], 'media')
326            rewritten_upload_contents = http_request.body
327            self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))
328
329
330class UploadTest(unittest.TestCase):
331
332    def setUp(self):
333        # Sample highly compressible data.
334        self.sample_data = b'abc' * 200
335        # Stream of the sample data.
336        self.sample_stream = six.BytesIO(self.sample_data)
337        # Sample url_builder.
338        self.url_builder = base_api._UrlBuilder('http://www.uploads.com')
339        # Sample request.
340        self.request = http_wrapper.Request(
341            'http://www.uploads.com',
342            headers={'content-type': 'text/plain'})
343        # Sample successful response.
344        self.response = http_wrapper.Response(
345            info={'status': http_client.OK,
346                  'location': 'http://www.uploads.com'},
347            content='',
348            request_url='http://www.uploads.com',)
349        # Sample failure response.
350        self.fail_response = http_wrapper.Response(
351            info={'status': http_client.SERVICE_UNAVAILABLE,
352                  'location': 'http://www.uploads.com'},
353            content='',
354            request_url='http://www.uploads.com',)
355
356    def testStreamInChunksCompressed(self):
357        """Test that StreamInChunks will handle compression correctly."""
358        # Create and configure the upload object.
359        upload = transfer.Upload(
360            stream=self.sample_stream,
361            mime_type='text/plain',
362            total_size=len(self.sample_data),
363            close_stream=False,
364            gzip_encoded=True)
365        upload.strategy = transfer.RESUMABLE_UPLOAD
366        # Set the chunk size so the entire stream is uploaded.
367        upload.chunksize = len(self.sample_data)
368        # Mock the upload to return the sample response.
369        with mock.patch.object(transfer.Upload,
370                               '_Upload__SendMediaRequest') as mock_result, \
371                mock.patch.object(http_wrapper,
372                                  'MakeRequest') as make_request:
373            mock_result.return_value = self.response
374            make_request.return_value = self.response
375
376            # Initialization.
377            upload.InitializeUpload(self.request, 'http')
378            upload.StreamInChunks()
379            # Get the uploaded request and end position of the stream.
380            (request, _), _ = mock_result.call_args_list[0]
381            # Ensure the mock was called.
382            self.assertTrue(mock_result.called)
383            # Ensure the correct content encoding was set.
384            self.assertEqual(request.headers['Content-Encoding'], 'gzip')
385            # Ensure the stream was compresed.
386            self.assertLess(len(request.body), len(self.sample_data))
387
388    def testStreamMediaCompressedFail(self):
389        """Test that non-chunked uploads raise an exception.
390
391        Ensure uploads with the compressed and resumable flags set called from
392        StreamMedia raise an exception. Those uploads are unsupported.
393        """
394        # Create the upload object.
395        upload = transfer.Upload(
396            stream=self.sample_stream,
397            mime_type='text/plain',
398            total_size=len(self.sample_data),
399            close_stream=False,
400            auto_transfer=True,
401            gzip_encoded=True)
402        upload.strategy = transfer.RESUMABLE_UPLOAD
403        # Mock the upload to return the sample response.
404        with mock.patch.object(http_wrapper,
405                               'MakeRequest') as make_request:
406            make_request.return_value = self.response
407
408            # Initialization.
409            upload.InitializeUpload(self.request, 'http')
410            # Ensure stream media raises an exception when the upload is
411            # compressed. Compression is not supported on non-chunked uploads.
412            with self.assertRaises(exceptions.InvalidUserInputError):
413                upload.StreamMedia()
414
415    def testAutoTransferCompressed(self):
416        """Test that automatic transfers are compressed.
417
418        Ensure uploads with the compressed, resumable, and automatic transfer
419        flags set call StreamInChunks. StreamInChunks is tested in an earlier
420        test.
421        """
422        # Create the upload object.
423        upload = transfer.Upload(
424            stream=self.sample_stream,
425            mime_type='text/plain',
426            total_size=len(self.sample_data),
427            close_stream=False,
428            gzip_encoded=True)
429        upload.strategy = transfer.RESUMABLE_UPLOAD
430        # Mock the upload to return the sample response.
431        with mock.patch.object(transfer.Upload,
432                               'StreamInChunks') as mock_result, \
433                mock.patch.object(http_wrapper,
434                                  'MakeRequest') as make_request:
435            mock_result.return_value = self.response
436            make_request.return_value = self.response
437
438            # Initialization.
439            upload.InitializeUpload(self.request, 'http')
440            # Ensure the mock was called.
441            self.assertTrue(mock_result.called)
442
443    def testMultipartCompressed(self):
444        """Test that multipart uploads are compressed."""
445        # Create the multipart configuration.
446        upload_config = base_api.ApiUploadInfo(
447            accept=['*/*'],
448            max_size=None,
449            simple_multipart=True,
450            simple_path=u'/upload',)
451        # Create the upload object.
452        upload = transfer.Upload(
453            stream=self.sample_stream,
454            mime_type='text/plain',
455            total_size=len(self.sample_data),
456            close_stream=False,
457            gzip_encoded=True)
458        # Set a body to trigger multipart configuration.
459        self.request.body = '{"body_field_one": 7}'
460        # Configure the request.
461        upload.ConfigureRequest(upload_config, self.request, self.url_builder)
462        # Ensure the request is a multipart request now.
463        self.assertEqual(
464            self.url_builder.query_params['uploadType'], 'multipart')
465        # Ensure the request is gzip encoded.
466        self.assertEqual(self.request.headers['Content-Encoding'], 'gzip')
467        # Ensure data is compressed
468        self.assertLess(len(self.request.body), len(self.sample_data))
469        # Ensure uncompressed data includes the sample data.
470        with gzip.GzipFile(fileobj=six.BytesIO(self.request.body)) as f:
471            original = f.read()
472            self.assertTrue(self.sample_data in original)
473
474    def testMediaCompressed(self):
475        """Test that media uploads are compressed."""
476        # Create the media configuration.
477        upload_config = base_api.ApiUploadInfo(
478            accept=['*/*'],
479            max_size=None,
480            simple_multipart=True,
481            simple_path=u'/upload',)
482        # Create the upload object.
483        upload = transfer.Upload(
484            stream=self.sample_stream,
485            mime_type='text/plain',
486            total_size=len(self.sample_data),
487            close_stream=False,
488            gzip_encoded=True)
489        # Configure the request.
490        upload.ConfigureRequest(upload_config, self.request, self.url_builder)
491        # Ensure the request is a media request now.
492        self.assertEqual(self.url_builder.query_params['uploadType'], 'media')
493        # Ensure the request is gzip encoded.
494        self.assertEqual(self.request.headers['Content-Encoding'], 'gzip')
495        # Ensure data is compressed
496        self.assertLess(len(self.request.body), len(self.sample_data))
497        # Ensure uncompressed data includes the sample data.
498        with gzip.GzipFile(fileobj=six.BytesIO(self.request.body)) as f:
499            original = f.read()
500            self.assertTrue(self.sample_data in original)
501
502    def HttpRequestSideEffect(self, responses=None):
503        responses = [(response.info, response.content)
504                     for response in responses]
505
506        def _side_effect(uri, **kwargs):  # pylint: disable=unused-argument
507            body = kwargs['body']
508            read_func = getattr(body, 'read', None)
509            if read_func:
510                # If the body is a stream, consume the stream.
511                body = read_func()
512            self.assertEqual(int(kwargs['headers']['content-length']),
513                             len(body))
514            return responses.pop(0)
515        return _side_effect
516
517    def testRetryRequestChunks(self):
518        """Test that StreamInChunks will retry correctly."""
519        refresh_response = http_wrapper.Response(
520            info={'status': http_wrapper.RESUME_INCOMPLETE,
521                  'location': 'http://www.uploads.com'},
522            content='',
523            request_url='http://www.uploads.com',)
524
525        # Create and configure the upload object.
526        bytes_http = httplib2.Http()
527        upload = transfer.Upload(
528            stream=self.sample_stream,
529            mime_type='text/plain',
530            total_size=len(self.sample_data),
531            close_stream=False,
532            http=bytes_http)
533
534        upload.strategy = transfer.RESUMABLE_UPLOAD
535        # Set the chunk size so the entire stream is uploaded.
536        upload.chunksize = len(self.sample_data)
537        # Mock the upload to return the sample response.
538        with mock.patch.object(bytes_http,
539                               'request') as make_request:
540            # This side effect also checks the request body.
541            responses = [
542                self.response,  # Initial request in InitializeUpload().
543                self.fail_response,  # 503 status code from server.
544                refresh_response,  # Refresh upload progress.
545                self.response,  # Successful request.
546            ]
547            make_request.side_effect = self.HttpRequestSideEffect(responses)
548
549            # Initialization.
550            upload.InitializeUpload(self.request, bytes_http)
551            upload.StreamInChunks()
552
553            # Ensure the mock was called the correct number of times.
554            self.assertEquals(make_request.call_count, len(responses))
555
556    def testStreamInChunks(self):
557        """Test StreamInChunks."""
558        resume_incomplete_responses = [http_wrapper.Response(
559            info={'status': http_wrapper.RESUME_INCOMPLETE,
560                  'location': 'http://www.uploads.com',
561                  'range': '0-{}'.format(end)},
562            content='',
563            request_url='http://www.uploads.com',) for end in [199, 399, 599]]
564        responses = [
565            self.response  # Initial request in InitializeUpload().
566        ] + resume_incomplete_responses + [
567            self.response,  # Successful request.
568        ]
569        # Create and configure the upload object.
570        bytes_http = httplib2.Http()
571        upload = transfer.Upload(
572            stream=self.sample_stream,
573            mime_type='text/plain',
574            total_size=len(self.sample_data),
575            close_stream=False,
576            http=bytes_http)
577
578        upload.strategy = transfer.RESUMABLE_UPLOAD
579        # Set the chunk size so the entire stream is uploaded.
580        upload.chunksize = 200
581        # Mock the upload to return the sample response.
582        with mock.patch.object(bytes_http,
583                               'request') as make_request:
584            # This side effect also checks the request body.
585            make_request.side_effect = self.HttpRequestSideEffect(responses)
586
587            # Initialization.
588            upload.InitializeUpload(self.request, bytes_http)
589            upload.StreamInChunks()
590
591            # Ensure the mock was called the correct number of times.
592            self.assertEquals(make_request.call_count, len(responses))
593
594    @mock.patch.object(transfer.Upload, 'RefreshResumableUploadState',
595                       new=mock.Mock())
596    def testFinalizesTransferUrlIfClientPresent(self):
597        """Tests upload's enforcement of client custom endpoints."""
598        mock_client = mock.Mock()
599        mock_http = mock.Mock()
600        fake_json_data = json.dumps({
601            'auto_transfer': False,
602            'mime_type': '',
603            'total_size': 0,
604            'url': 'url',
605        })
606        transfer.Upload.FromData(self.sample_stream, fake_json_data, mock_http,
607                                 client=mock_client)
608        mock_client.FinalizeTransferUrl.assert_called_once_with('url')
609