1# -*- coding: utf-8 -*- 2# 3# Copyright 2015 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Tests for transfer.py.""" 18import string 19import unittest 20 21import httplib2 22import json 23import mock 24import six 25from six.moves import http_client 26 27from apitools.base.py import base_api 28from apitools.base.py import exceptions 29from apitools.base.py import gzip 30from apitools.base.py import http_wrapper 31from apitools.base.py import transfer 32 33 34class TransferTest(unittest.TestCase): 35 36 def assertRangeAndContentRangeCompatible(self, request, response): 37 request_prefix = 'bytes=' 38 self.assertIn('range', request.headers) 39 self.assertTrue(request.headers['range'].startswith(request_prefix)) 40 request_range = request.headers['range'][len(request_prefix):] 41 42 response_prefix = 'bytes ' 43 self.assertIn('content-range', response.info) 44 response_header = response.info['content-range'] 45 self.assertTrue(response_header.startswith(response_prefix)) 46 response_range = ( 47 response_header[len(response_prefix):].partition('/')[0]) 48 49 msg = ('Request range ({0}) not a prefix of ' 50 'response_range ({1})').format( 51 request_range, response_range) 52 self.assertTrue(response_range.startswith(request_range), msg=msg) 53 54 def testComputeEndByte(self): 55 total_size = 100 56 chunksize = 10 57 download = transfer.Download.FromStream( 58 six.StringIO(), chunksize=chunksize, total_size=total_size) 59 self.assertEqual(chunksize - 1, 60 download._Download__ComputeEndByte(0, end=50)) 61 62 def testComputeEndByteReturnNone(self): 63 download = transfer.Download.FromStream(six.StringIO()) 64 self.assertIsNone( 65 download._Download__ComputeEndByte(0, use_chunks=False)) 66 67 def testComputeEndByteNoChunks(self): 68 total_size = 100 69 download = transfer.Download.FromStream( 70 six.StringIO(), chunksize=10, total_size=total_size) 71 for end in (None, 1000): 72 self.assertEqual( 73 total_size - 1, 74 download._Download__ComputeEndByte(0, end=end, 75 use_chunks=False), 76 msg='Failed on end={0}'.format(end)) 77 78 def testComputeEndByteNoTotal(self): 79 download = transfer.Download.FromStream(six.StringIO()) 80 default_chunksize = download.chunksize 81 for chunksize in (100, default_chunksize): 82 download.chunksize = chunksize 83 for start in (0, 10): 84 self.assertEqual( 85 download.chunksize + start - 1, 86 download._Download__ComputeEndByte(start), 87 msg='Failed on start={0}, chunksize={1}'.format( 88 start, chunksize)) 89 90 def testComputeEndByteSmallTotal(self): 91 total_size = 100 92 download = transfer.Download.FromStream(six.StringIO(), 93 total_size=total_size) 94 for start in (0, 10): 95 self.assertEqual(total_size - 1, 96 download._Download__ComputeEndByte(start), 97 msg='Failed on start={0}'.format(start)) 98 99 def testDownloadThenStream(self): 100 bytes_http = object() 101 http = object() 102 download_stream = six.StringIO() 103 download = transfer.Download.FromStream(download_stream, 104 total_size=26) 105 download.bytes_http = bytes_http 106 base_url = 'https://part.one/' 107 with mock.patch.object(http_wrapper, 'MakeRequest', 108 autospec=True) as make_request: 109 make_request.return_value = http_wrapper.Response( 110 info={ 111 'content-range': 'bytes 0-25/26', 112 'status': http_client.OK, 113 }, 114 content=string.ascii_lowercase, 115 request_url=base_url, 116 ) 117 request = http_wrapper.Request(url='https://part.one/') 118 download.InitializeDownload(request, http=http) 119 self.assertEqual(1, make_request.call_count) 120 received_request = make_request.call_args[0][1] 121 self.assertEqual(base_url, received_request.url) 122 self.assertRangeAndContentRangeCompatible( 123 received_request, make_request.return_value) 124 125 with mock.patch.object(http_wrapper, 'MakeRequest', 126 autospec=True) as make_request: 127 make_request.return_value = http_wrapper.Response( 128 info={ 129 'status': http_client.REQUESTED_RANGE_NOT_SATISFIABLE, 130 }, 131 content='error', 132 request_url=base_url, 133 ) 134 download.StreamInChunks() 135 self.assertEqual(1, make_request.call_count) 136 received_request = make_request.call_args[0][1] 137 self.assertEqual('bytes=26-', received_request.headers['range']) 138 139 def testGetRange(self): 140 for (start_byte, end_byte) in [(0, 25), (5, 15), (0, 0), (25, 25)]: 141 bytes_http = object() 142 http = object() 143 download_stream = six.StringIO() 144 download = transfer.Download.FromStream(download_stream, 145 total_size=26, 146 auto_transfer=False) 147 download.bytes_http = bytes_http 148 base_url = 'https://part.one/' 149 with mock.patch.object(http_wrapper, 'MakeRequest', 150 autospec=True) as make_request: 151 make_request.return_value = http_wrapper.Response( 152 info={ 153 'content-range': 'bytes %d-%d/26' % 154 (start_byte, end_byte), 155 'status': http_client.OK, 156 }, 157 content=string.ascii_lowercase[start_byte:end_byte + 1], 158 request_url=base_url, 159 ) 160 request = http_wrapper.Request(url='https://part.one/') 161 download.InitializeDownload(request, http=http) 162 download.GetRange(start_byte, end_byte) 163 self.assertEqual(1, make_request.call_count) 164 received_request = make_request.call_args[0][1] 165 self.assertEqual(base_url, received_request.url) 166 self.assertRangeAndContentRangeCompatible( 167 received_request, make_request.return_value) 168 169 def testNonChunkedDownload(self): 170 bytes_http = object() 171 http = object() 172 download_stream = six.StringIO() 173 download = transfer.Download.FromStream(download_stream, total_size=52) 174 download.bytes_http = bytes_http 175 base_url = 'https://part.one/' 176 177 with mock.patch.object(http_wrapper, 'MakeRequest', 178 autospec=True) as make_request: 179 make_request.return_value = http_wrapper.Response( 180 info={ 181 'content-range': 'bytes 0-51/52', 182 'status': http_client.OK, 183 }, 184 content=string.ascii_lowercase * 2, 185 request_url=base_url, 186 ) 187 request = http_wrapper.Request(url='https://part.one/') 188 download.InitializeDownload(request, http=http) 189 self.assertEqual(1, make_request.call_count) 190 received_request = make_request.call_args[0][1] 191 self.assertEqual(base_url, received_request.url) 192 self.assertRangeAndContentRangeCompatible( 193 received_request, make_request.return_value) 194 download_stream.seek(0) 195 self.assertEqual(string.ascii_lowercase * 2, 196 download_stream.getvalue()) 197 198 def testChunkedDownload(self): 199 bytes_http = object() 200 http = object() 201 download_stream = six.StringIO() 202 download = transfer.Download.FromStream( 203 download_stream, chunksize=26, total_size=52) 204 download.bytes_http = bytes_http 205 206 # Setting autospec on a mock with an iterable side_effect is 207 # currently broken (http://bugs.python.org/issue17826), so 208 # instead we write a little function. 209 def _ReturnBytes(unused_http, http_request, 210 *unused_args, **unused_kwds): 211 url = http_request.url 212 if url == 'https://part.one/': 213 return http_wrapper.Response( 214 info={ 215 'content-location': 'https://part.two/', 216 'content-range': 'bytes 0-25/52', 217 'status': http_client.PARTIAL_CONTENT, 218 }, 219 content=string.ascii_lowercase, 220 request_url='https://part.one/', 221 ) 222 elif url == 'https://part.two/': 223 return http_wrapper.Response( 224 info={ 225 'content-range': 'bytes 26-51/52', 226 'status': http_client.OK, 227 }, 228 content=string.ascii_uppercase, 229 request_url='https://part.two/', 230 ) 231 else: 232 self.fail('Unknown URL requested: %s' % url) 233 234 with mock.patch.object(http_wrapper, 'MakeRequest', 235 autospec=True) as make_request: 236 make_request.side_effect = _ReturnBytes 237 request = http_wrapper.Request(url='https://part.one/') 238 download.InitializeDownload(request, http=http) 239 self.assertEqual(2, make_request.call_count) 240 for call in make_request.call_args_list: 241 self.assertRangeAndContentRangeCompatible( 242 call[0][1], _ReturnBytes(*call[0])) 243 download_stream.seek(0) 244 self.assertEqual(string.ascii_lowercase + string.ascii_uppercase, 245 download_stream.getvalue()) 246 247 # @mock.patch.object(transfer.Upload, 'RefreshResumableUploadState', 248 # new=mock.Mock()) 249 def testFinalizesTransferUrlIfClientPresent(self): 250 """Tests download's enforcement of client custom endpoints.""" 251 mock_client = mock.Mock() 252 fake_json_data = json.dumps({ 253 'auto_transfer': False, 254 'progress': 0, 255 'total_size': 0, 256 'url': 'url', 257 }) 258 transfer.Download.FromData(six.BytesIO(), fake_json_data, 259 client=mock_client) 260 mock_client.FinalizeTransferUrl.assert_called_once_with('url') 261 262 def testMultipartEncoding(self): 263 # This is really a table test for various issues we've seen in 264 # the past; see notes below for particular histories. 265 266 test_cases = [ 267 # Python's mime module by default encodes lines that start 268 # with "From " as ">From ", which we need to make sure we 269 # don't run afoul of when sending content that isn't 270 # intended to be so encoded. This test calls out that we 271 # get this right. We test for both the multipart and 272 # non-multipart case. 273 'line one\nFrom \nline two', 274 275 # We had originally used a `six.StringIO` to hold the http 276 # request body in the case of a multipart upload; for 277 # bytes being uploaded in Python3, however, this causes 278 # issues like this: 279 # https://github.com/GoogleCloudPlatform/gcloud-python/issues/1760 280 # We test below to ensure that we don't end up mangling 281 # the body before sending. 282 u'name,main_ingredient\nRäksmörgås,Räkor\nBaguette,Bröd', 283 ] 284 285 for upload_contents in test_cases: 286 multipart_body = '{"body_field_one": 7}' 287 upload_bytes = upload_contents.encode('ascii', 'backslashreplace') 288 upload_config = base_api.ApiUploadInfo( 289 accept=['*/*'], 290 max_size=None, 291 resumable_multipart=True, 292 resumable_path=u'/resumable/upload', 293 simple_multipart=True, 294 simple_path=u'/upload', 295 ) 296 url_builder = base_api._UrlBuilder('http://www.uploads.com') 297 298 # Test multipart: having a body argument in http_request forces 299 # multipart here. 300 upload = transfer.Upload.FromStream( 301 six.BytesIO(upload_bytes), 302 'text/plain', 303 total_size=len(upload_bytes)) 304 http_request = http_wrapper.Request( 305 'http://www.uploads.com', 306 headers={'content-type': 'text/plain'}, 307 body=multipart_body) 308 upload.ConfigureRequest(upload_config, http_request, url_builder) 309 self.assertEqual( 310 'multipart', url_builder.query_params['uploadType']) 311 rewritten_upload_contents = b'\n'.join( 312 http_request.body.split(b'--')[2].splitlines()[1:]) 313 self.assertTrue(rewritten_upload_contents.endswith(upload_bytes)) 314 315 # Test non-multipart (aka media): no body argument means this is 316 # sent as media. 317 upload = transfer.Upload.FromStream( 318 six.BytesIO(upload_bytes), 319 'text/plain', 320 total_size=len(upload_bytes)) 321 http_request = http_wrapper.Request( 322 'http://www.uploads.com', 323 headers={'content-type': 'text/plain'}) 324 upload.ConfigureRequest(upload_config, http_request, url_builder) 325 self.assertEqual(url_builder.query_params['uploadType'], 'media') 326 rewritten_upload_contents = http_request.body 327 self.assertTrue(rewritten_upload_contents.endswith(upload_bytes)) 328 329 330class UploadTest(unittest.TestCase): 331 332 def setUp(self): 333 # Sample highly compressible data. 334 self.sample_data = b'abc' * 200 335 # Stream of the sample data. 336 self.sample_stream = six.BytesIO(self.sample_data) 337 # Sample url_builder. 338 self.url_builder = base_api._UrlBuilder('http://www.uploads.com') 339 # Sample request. 340 self.request = http_wrapper.Request( 341 'http://www.uploads.com', 342 headers={'content-type': 'text/plain'}) 343 # Sample successful response. 344 self.response = http_wrapper.Response( 345 info={'status': http_client.OK, 346 'location': 'http://www.uploads.com'}, 347 content='', 348 request_url='http://www.uploads.com',) 349 # Sample failure response. 350 self.fail_response = http_wrapper.Response( 351 info={'status': http_client.SERVICE_UNAVAILABLE, 352 'location': 'http://www.uploads.com'}, 353 content='', 354 request_url='http://www.uploads.com',) 355 356 def testStreamInChunksCompressed(self): 357 """Test that StreamInChunks will handle compression correctly.""" 358 # Create and configure the upload object. 359 upload = transfer.Upload( 360 stream=self.sample_stream, 361 mime_type='text/plain', 362 total_size=len(self.sample_data), 363 close_stream=False, 364 gzip_encoded=True) 365 upload.strategy = transfer.RESUMABLE_UPLOAD 366 # Set the chunk size so the entire stream is uploaded. 367 upload.chunksize = len(self.sample_data) 368 # Mock the upload to return the sample response. 369 with mock.patch.object(transfer.Upload, 370 '_Upload__SendMediaRequest') as mock_result, \ 371 mock.patch.object(http_wrapper, 372 'MakeRequest') as make_request: 373 mock_result.return_value = self.response 374 make_request.return_value = self.response 375 376 # Initialization. 377 upload.InitializeUpload(self.request, 'http') 378 upload.StreamInChunks() 379 # Get the uploaded request and end position of the stream. 380 (request, _), _ = mock_result.call_args_list[0] 381 # Ensure the mock was called. 382 self.assertTrue(mock_result.called) 383 # Ensure the correct content encoding was set. 384 self.assertEqual(request.headers['Content-Encoding'], 'gzip') 385 # Ensure the stream was compresed. 386 self.assertLess(len(request.body), len(self.sample_data)) 387 388 def testStreamMediaCompressedFail(self): 389 """Test that non-chunked uploads raise an exception. 390 391 Ensure uploads with the compressed and resumable flags set called from 392 StreamMedia raise an exception. Those uploads are unsupported. 393 """ 394 # Create the upload object. 395 upload = transfer.Upload( 396 stream=self.sample_stream, 397 mime_type='text/plain', 398 total_size=len(self.sample_data), 399 close_stream=False, 400 auto_transfer=True, 401 gzip_encoded=True) 402 upload.strategy = transfer.RESUMABLE_UPLOAD 403 # Mock the upload to return the sample response. 404 with mock.patch.object(http_wrapper, 405 'MakeRequest') as make_request: 406 make_request.return_value = self.response 407 408 # Initialization. 409 upload.InitializeUpload(self.request, 'http') 410 # Ensure stream media raises an exception when the upload is 411 # compressed. Compression is not supported on non-chunked uploads. 412 with self.assertRaises(exceptions.InvalidUserInputError): 413 upload.StreamMedia() 414 415 def testAutoTransferCompressed(self): 416 """Test that automatic transfers are compressed. 417 418 Ensure uploads with the compressed, resumable, and automatic transfer 419 flags set call StreamInChunks. StreamInChunks is tested in an earlier 420 test. 421 """ 422 # Create the upload object. 423 upload = transfer.Upload( 424 stream=self.sample_stream, 425 mime_type='text/plain', 426 total_size=len(self.sample_data), 427 close_stream=False, 428 gzip_encoded=True) 429 upload.strategy = transfer.RESUMABLE_UPLOAD 430 # Mock the upload to return the sample response. 431 with mock.patch.object(transfer.Upload, 432 'StreamInChunks') as mock_result, \ 433 mock.patch.object(http_wrapper, 434 'MakeRequest') as make_request: 435 mock_result.return_value = self.response 436 make_request.return_value = self.response 437 438 # Initialization. 439 upload.InitializeUpload(self.request, 'http') 440 # Ensure the mock was called. 441 self.assertTrue(mock_result.called) 442 443 def testMultipartCompressed(self): 444 """Test that multipart uploads are compressed.""" 445 # Create the multipart configuration. 446 upload_config = base_api.ApiUploadInfo( 447 accept=['*/*'], 448 max_size=None, 449 simple_multipart=True, 450 simple_path=u'/upload',) 451 # Create the upload object. 452 upload = transfer.Upload( 453 stream=self.sample_stream, 454 mime_type='text/plain', 455 total_size=len(self.sample_data), 456 close_stream=False, 457 gzip_encoded=True) 458 # Set a body to trigger multipart configuration. 459 self.request.body = '{"body_field_one": 7}' 460 # Configure the request. 461 upload.ConfigureRequest(upload_config, self.request, self.url_builder) 462 # Ensure the request is a multipart request now. 463 self.assertEqual( 464 self.url_builder.query_params['uploadType'], 'multipart') 465 # Ensure the request is gzip encoded. 466 self.assertEqual(self.request.headers['Content-Encoding'], 'gzip') 467 # Ensure data is compressed 468 self.assertLess(len(self.request.body), len(self.sample_data)) 469 # Ensure uncompressed data includes the sample data. 470 with gzip.GzipFile(fileobj=six.BytesIO(self.request.body)) as f: 471 original = f.read() 472 self.assertTrue(self.sample_data in original) 473 474 def testMediaCompressed(self): 475 """Test that media uploads are compressed.""" 476 # Create the media configuration. 477 upload_config = base_api.ApiUploadInfo( 478 accept=['*/*'], 479 max_size=None, 480 simple_multipart=True, 481 simple_path=u'/upload',) 482 # Create the upload object. 483 upload = transfer.Upload( 484 stream=self.sample_stream, 485 mime_type='text/plain', 486 total_size=len(self.sample_data), 487 close_stream=False, 488 gzip_encoded=True) 489 # Configure the request. 490 upload.ConfigureRequest(upload_config, self.request, self.url_builder) 491 # Ensure the request is a media request now. 492 self.assertEqual(self.url_builder.query_params['uploadType'], 'media') 493 # Ensure the request is gzip encoded. 494 self.assertEqual(self.request.headers['Content-Encoding'], 'gzip') 495 # Ensure data is compressed 496 self.assertLess(len(self.request.body), len(self.sample_data)) 497 # Ensure uncompressed data includes the sample data. 498 with gzip.GzipFile(fileobj=six.BytesIO(self.request.body)) as f: 499 original = f.read() 500 self.assertTrue(self.sample_data in original) 501 502 def HttpRequestSideEffect(self, responses=None): 503 responses = [(response.info, response.content) 504 for response in responses] 505 506 def _side_effect(uri, **kwargs): # pylint: disable=unused-argument 507 body = kwargs['body'] 508 read_func = getattr(body, 'read', None) 509 if read_func: 510 # If the body is a stream, consume the stream. 511 body = read_func() 512 self.assertEqual(int(kwargs['headers']['content-length']), 513 len(body)) 514 return responses.pop(0) 515 return _side_effect 516 517 def testRetryRequestChunks(self): 518 """Test that StreamInChunks will retry correctly.""" 519 refresh_response = http_wrapper.Response( 520 info={'status': http_wrapper.RESUME_INCOMPLETE, 521 'location': 'http://www.uploads.com'}, 522 content='', 523 request_url='http://www.uploads.com',) 524 525 # Create and configure the upload object. 526 bytes_http = httplib2.Http() 527 upload = transfer.Upload( 528 stream=self.sample_stream, 529 mime_type='text/plain', 530 total_size=len(self.sample_data), 531 close_stream=False, 532 http=bytes_http) 533 534 upload.strategy = transfer.RESUMABLE_UPLOAD 535 # Set the chunk size so the entire stream is uploaded. 536 upload.chunksize = len(self.sample_data) 537 # Mock the upload to return the sample response. 538 with mock.patch.object(bytes_http, 539 'request') as make_request: 540 # This side effect also checks the request body. 541 responses = [ 542 self.response, # Initial request in InitializeUpload(). 543 self.fail_response, # 503 status code from server. 544 refresh_response, # Refresh upload progress. 545 self.response, # Successful request. 546 ] 547 make_request.side_effect = self.HttpRequestSideEffect(responses) 548 549 # Initialization. 550 upload.InitializeUpload(self.request, bytes_http) 551 upload.StreamInChunks() 552 553 # Ensure the mock was called the correct number of times. 554 self.assertEquals(make_request.call_count, len(responses)) 555 556 def testStreamInChunks(self): 557 """Test StreamInChunks.""" 558 resume_incomplete_responses = [http_wrapper.Response( 559 info={'status': http_wrapper.RESUME_INCOMPLETE, 560 'location': 'http://www.uploads.com', 561 'range': '0-{}'.format(end)}, 562 content='', 563 request_url='http://www.uploads.com',) for end in [199, 399, 599]] 564 responses = [ 565 self.response # Initial request in InitializeUpload(). 566 ] + resume_incomplete_responses + [ 567 self.response, # Successful request. 568 ] 569 # Create and configure the upload object. 570 bytes_http = httplib2.Http() 571 upload = transfer.Upload( 572 stream=self.sample_stream, 573 mime_type='text/plain', 574 total_size=len(self.sample_data), 575 close_stream=False, 576 http=bytes_http) 577 578 upload.strategy = transfer.RESUMABLE_UPLOAD 579 # Set the chunk size so the entire stream is uploaded. 580 upload.chunksize = 200 581 # Mock the upload to return the sample response. 582 with mock.patch.object(bytes_http, 583 'request') as make_request: 584 # This side effect also checks the request body. 585 make_request.side_effect = self.HttpRequestSideEffect(responses) 586 587 # Initialization. 588 upload.InitializeUpload(self.request, bytes_http) 589 upload.StreamInChunks() 590 591 # Ensure the mock was called the correct number of times. 592 self.assertEquals(make_request.call_count, len(responses)) 593 594 @mock.patch.object(transfer.Upload, 'RefreshResumableUploadState', 595 new=mock.Mock()) 596 def testFinalizesTransferUrlIfClientPresent(self): 597 """Tests upload's enforcement of client custom endpoints.""" 598 mock_client = mock.Mock() 599 mock_http = mock.Mock() 600 fake_json_data = json.dumps({ 601 'auto_transfer': False, 602 'mime_type': '', 603 'total_size': 0, 604 'url': 'url', 605 }) 606 transfer.Upload.FromData(self.sample_stream, fake_json_data, mock_http, 607 client=mock_client) 608 mock_client.FinalizeTransferUrl.assert_called_once_with('url') 609