# -*- coding: utf-8 -*- # Copyright 2013 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Base test case class for unit and integration tests.""" from __future__ import absolute_import from functools import wraps import os.path import random import shutil import tempfile import boto import gslib.tests.util as util from gslib.tests.util import unittest from gslib.util import UTF8 MAX_BUCKET_LENGTH = 63 def NotParallelizable(func): """Wrapper function for cases that are not parallelizable.""" @wraps(func) def ParallelAnnotatedFunc(*args, **kwargs): return func(*args, **kwargs) ParallelAnnotatedFunc.is_parallelizable = False return ParallelAnnotatedFunc def RequiresIsolation(func): """Wrapper function for cases that require running in a separate process.""" @wraps(func) def RequiresIsolationFunc(*args, **kwargs): return func(*args, **kwargs) RequiresIsolationFunc.requires_isolation = True return RequiresIsolationFunc class GsUtilTestCase(unittest.TestCase): """Base test case class for unit and integration tests.""" def setUp(self): if util.RUN_S3_TESTS: self.test_api = 'XML' self.default_provider = 's3' self.provider_custom_meta = 'amz' else: self.test_api = boto.config.get('GSUtil', 'prefer_api', 'JSON').upper() self.default_provider = 'gs' self.provider_custom_meta = 'goog' self.tempdirs = [] def tearDown(self): while self.tempdirs: tmpdir = self.tempdirs.pop() shutil.rmtree(tmpdir, ignore_errors=True) def assertNumLines(self, text, numlines): self.assertEqual(text.count('\n'), numlines) def GetTestMethodName(self): if isinstance(self._testMethodName, unicode): return self._testMethodName.encode(UTF8) return self._testMethodName def MakeRandomTestString(self): """Creates a random string of hex characters 8 characters long.""" return '%08x' % random.randrange(256**4) def MakeTempName(self, kind, prefix=''): """Creates a temporary name that is most-likely unique. Args: kind: A string indicating what kind of test name this is. prefix: Prefix string to be used in the temporary name. Returns: The temporary name. """ name = '%sgsutil-test-%s-%s' % (prefix, self.GetTestMethodName(), kind) name = name[:MAX_BUCKET_LENGTH-9] name = '%s-%s' % (name, self.MakeRandomTestString()) return name def CreateTempDir(self, test_files=0): """Creates a temporary directory on disk. The directory and all of its contents will be deleted after the test. Args: test_files: The number of test files to place in the directory or a list of test file names. Returns: The path to the new temporary directory. """ tmpdir = tempfile.mkdtemp(prefix=self.MakeTempName('directory')) self.tempdirs.append(tmpdir) try: iter(test_files) except TypeError: test_files = [self.MakeTempName('file') for _ in range(test_files)] for i, name in enumerate(test_files): self.CreateTempFile(tmpdir=tmpdir, file_name=name, contents='test %d' % i) return tmpdir def CreateTempFile(self, tmpdir=None, contents=None, file_name=None): """Creates a temporary file on disk. Args: tmpdir: The temporary directory to place the file in. If not specified, a new temporary directory is created. contents: The contents to write to the file. If not specified, a test string is constructed and written to the file. file_name: The name to use for the file. If not specified, a temporary test file name is constructed. This can also be a tuple, where ('dir', 'foo') means to create a file named 'foo' inside a subdirectory named 'dir'. Returns: The path to the new temporary file. """ tmpdir = tmpdir or self.CreateTempDir() file_name = file_name or self.MakeTempName('file') if isinstance(file_name, basestring): fpath = os.path.join(tmpdir, file_name) else: fpath = os.path.join(tmpdir, *file_name) if not os.path.isdir(os.path.dirname(fpath)): os.makedirs(os.path.dirname(fpath)) with open(fpath, 'wb') as f: contents = (contents if contents is not None else self.MakeTempName('contents')) f.write(contents) return fpath