1# 2# Copyright 2015 Google Inc. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Integration tests for uploading and downloading to GCS. 17 18These tests exercise most of the corner cases for upload/download of 19files in apitools, via GCS. There are no performance tests here yet. 20""" 21 22import json 23import os 24import random 25import string 26import unittest 27 28import six 29 30from apitools.base.py import transfer 31import storage 32 33_CLIENT = None 34 35 36def _GetClient(): 37 global _CLIENT # pylint: disable=global-statement 38 if _CLIENT is None: 39 _CLIENT = storage.StorageV1() 40 return _CLIENT 41 42 43class UploadsTest(unittest.TestCase): 44 _DEFAULT_BUCKET = 'apitools' 45 _TESTDATA_PREFIX = 'uploads' 46 47 def setUp(self): 48 self.__client = _GetClient() 49 self.__files = [] 50 self.__content = '' 51 self.__buffer = None 52 self.__upload = None 53 54 def tearDown(self): 55 self.__DeleteFiles() 56 57 def __ResetUpload(self, size, auto_transfer=True): 58 self.__content = ''.join( 59 random.choice(string.ascii_letters) for _ in range(size)) 60 self.__buffer = six.StringIO(self.__content) 61 self.__upload = storage.Upload.FromStream( 62 self.__buffer, 'text/plain', auto_transfer=auto_transfer) 63 64 def __DeleteFiles(self): 65 for filename in self.__files: 66 self.__DeleteFile(filename) 67 68 def __DeleteFile(self, filename): 69 object_name = os.path.join(self._TESTDATA_PREFIX, filename) 70 req = storage.StorageObjectsDeleteRequest( 71 bucket=self._DEFAULT_BUCKET, object=object_name) 72 self.__client.objects.Delete(req) 73 74 def __InsertRequest(self, filename): 75 object_name = os.path.join(self._TESTDATA_PREFIX, filename) 76 return storage.StorageObjectsInsertRequest( 77 name=object_name, bucket=self._DEFAULT_BUCKET) 78 79 def __GetRequest(self, filename): 80 object_name = os.path.join(self._TESTDATA_PREFIX, filename) 81 return storage.StorageObjectsGetRequest( 82 object=object_name, bucket=self._DEFAULT_BUCKET) 83 84 def __InsertFile(self, filename, request=None): 85 if request is None: 86 request = self.__InsertRequest(filename) 87 response = self.__client.objects.Insert(request, upload=self.__upload) 88 self.assertIsNotNone(response) 89 self.__files.append(filename) 90 return response 91 92 def testZeroBytes(self): 93 filename = 'zero_byte_file' 94 self.__ResetUpload(0) 95 response = self.__InsertFile(filename) 96 self.assertEqual(0, response.size) 97 98 def testSimpleUpload(self): 99 filename = 'fifteen_byte_file' 100 self.__ResetUpload(15) 101 response = self.__InsertFile(filename) 102 self.assertEqual(15, response.size) 103 104 def testMultipartUpload(self): 105 filename = 'fifteen_byte_file' 106 self.__ResetUpload(15) 107 request = self.__InsertRequest(filename) 108 request.object = storage.Object(contentLanguage='en') 109 response = self.__InsertFile(filename, request=request) 110 self.assertEqual(15, response.size) 111 self.assertEqual('en', response.contentLanguage) 112 113 def testAutoUpload(self): 114 filename = 'ten_meg_file' 115 size = 10 << 20 116 self.__ResetUpload(size) 117 request = self.__InsertRequest(filename) 118 response = self.__InsertFile(filename, request=request) 119 self.assertEqual(size, response.size) 120 121 def testStreamMedia(self): 122 filename = 'ten_meg_file' 123 size = 10 << 20 124 self.__ResetUpload(size, auto_transfer=False) 125 self.__upload.strategy = 'resumable' 126 self.__upload.total_size = size 127 request = self.__InsertRequest(filename) 128 initial_response = self.__client.objects.Insert( 129 request, upload=self.__upload) 130 self.assertIsNotNone(initial_response) 131 self.assertEqual(0, self.__buffer.tell()) 132 self.__upload.StreamMedia() 133 self.assertEqual(size, self.__buffer.tell()) 134 135 def testBreakAndResumeUpload(self): 136 filename = ('ten_meg_file_' + 137 ''.join(random.sample(string.ascii_letters, 5))) 138 size = 10 << 20 139 self.__ResetUpload(size, auto_transfer=False) 140 self.__upload.strategy = 'resumable' 141 self.__upload.total_size = size 142 # Start the upload 143 request = self.__InsertRequest(filename) 144 initial_response = self.__client.objects.Insert( 145 request, upload=self.__upload) 146 self.assertIsNotNone(initial_response) 147 self.assertEqual(0, self.__buffer.tell()) 148 # Pretend the process died, and resume with a new attempt at the 149 # same upload. 150 upload_data = json.dumps(self.__upload.serialization_data) 151 second_upload_attempt = transfer.Upload.FromData( 152 self.__buffer, upload_data, self.__upload.http) 153 second_upload_attempt._Upload__SendChunk(0) 154 self.assertEqual(second_upload_attempt.chunksize, self.__buffer.tell()) 155 # Simulate a third try, and stream from there. 156 final_upload_attempt = transfer.Upload.FromData( 157 self.__buffer, upload_data, self.__upload.http) 158 final_upload_attempt.StreamInChunks() 159 self.assertEqual(size, self.__buffer.tell()) 160 # Verify the upload 161 object_info = self.__client.objects.Get(self.__GetRequest(filename)) 162 self.assertEqual(size, object_info.size) 163 # Confirm that a new attempt successfully does nothing. 164 completed_upload_attempt = transfer.Upload.FromData( 165 self.__buffer, upload_data, self.__upload.http) 166 self.assertTrue(completed_upload_attempt.complete) 167 completed_upload_attempt.StreamInChunks() 168 # Verify the upload didn't pick up extra bytes. 169 object_info = self.__client.objects.Get(self.__GetRequest(filename)) 170 self.assertEqual(size, object_info.size) 171 # TODO(craigcitro): Add tests for callbacks (especially around 172 # finish callback). 173 174if __name__ == '__main__': 175 unittest.main() 176